Skip to content

Commit

Permalink
[FLINK-34511][Runtime/State] Remove deprecated state related APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Sep 24, 2024
1 parent 4642038 commit f699af3
Show file tree
Hide file tree
Showing 17 changed files with 14 additions and 185 deletions.
2 changes: 1 addition & 1 deletion docs/content.zh/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ table_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().set("execution.checkpointing.interval", "3min")

# 设置 statebackend 类型为 "rocksdb",其他可选项有 "filesystem" 和 "jobmanager"
# 设置 statebackend 类型为 "rocksdb",其他可选项有 "hashmap"
# 你也可以将这个属性设置为 StateBackendFactory 的完整类名
# e.g. org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/dev/python/table/table_environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ table_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
table_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
table_env.get_config().set("execution.checkpointing.interval", "3min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# set the statebackend type to "rocksdb", other available options are "hashmap"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackendFactory
table_env.get_config().set("state.backend.type", "rocksdb")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.util.Disposable;

import java.util.stream.Stream;
Expand Down Expand Up @@ -140,11 +139,6 @@ <N, S extends State> S getPartitionedState(
*/
boolean deregisterKeySelectionListener(KeySelectionListener<K> listener);

@Deprecated
default boolean isStateImmutableInStateBackend(CheckpointType checkpointOptions) {
return false;
}

/**
* Whether it's safe to reuse key-values from the state-backend, e.g for the purpose of
* optimization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ public class StateBackendLoader {
*/
@Deprecated public static final String MEMORY_STATE_BACKEND_NAME = "jobmanager";

/** The shortcut configuration name for the FileSystem State backend. */
@Deprecated public static final String FS_STATE_BACKEND_NAME = "filesystem";

/** The shortcut configuration name for the RocksDB State Backend. */
public static final String ROCKSDB_STATE_BACKEND_NAME = "rocksdb";

Expand All @@ -94,9 +91,8 @@ public class StateBackendLoader {
* StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)} method is called.
*
* <p>Recognized shortcut names are '{@value StateBackendLoader#HASHMAP_STATE_BACKEND_NAME}',
* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}' '{@value
* StateBackendLoader#MEMORY_STATE_BACKEND_NAME}' (Deprecated), and '{@value
* StateBackendLoader#FS_STATE_BACKEND_NAME}' (Deprecated).
* '{@value StateBackendLoader#ROCKSDB_STATE_BACKEND_NAME}' and '{@value
* StateBackendLoader#MEMORY_STATE_BACKEND_NAME}' (Deprecated)
*
* @param config The configuration to load the state backend from
* @param classLoader The class loader that should be used to load the state backend
Expand Down Expand Up @@ -137,15 +133,6 @@ public static StateBackend loadStateBackendFromConfig(
}

return backend;
case FS_STATE_BACKEND_NAME:
if (logger != null) {
logger.warn(
"{} state backend has been deprecated. Please use 'hashmap' state "
+ "backend instead.",
backendName.toLowerCase());
}
// fall through and use the HashMapStateBackend instead which
// utilizes the same HeapKeyedStateBackend runtime implementation.
case HASHMAP_STATE_BACKEND_NAME:
HashMapStateBackend hashMapStateBackend =
new HashMapStateBackendFactory().createFromConfig(config, classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.windowing.windows.Window;

Expand Down Expand Up @@ -182,44 +180,6 @@ public interface TriggerContext {
* the function (function is not part os a KeyedStream).
*/
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

/**
* Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant
* state that is scoped to the window and key of the current trigger invocation.
*
* @param name The name of the key/value state.
* @param stateType The class of the type that is stored in the state. Used to generate
* serializers for managed memory and checkpointing.
* @param defaultState The default state value, returned when the state is accessed and no
* value has yet been set for the key. May be null.
* @param <S> The type of the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for
* the function (function is not part os a KeyedStream).
* @deprecated Use {@link #getPartitionedState(StateDescriptor)}.
*/
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(
String name, Class<S> stateType, S defaultState);

/**
* Retrieves a {@link ValueState} object that can be used to interact with fault-tolerant
* state that is scoped to the window and key of the current trigger invocation.
*
* @param name The name of the key/value state.
* @param stateType The type information for the type that is stored in the state. Used to
* create serializers for managed memory and checkpoints.
* @param defaultState The default state value, returned when the state is accessed and no
* value has yet been set for the key. May be null.
* @param <S> The type of the state.
* @return The partitioned state object.
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for
* the function (function is not part os a KeyedStream).
* @deprecated Use {@link #getPartitionedState(StateDescriptor)}.
*/
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(
String name, TypeInformation<S> stateType, S defaultState);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -71,7 +70,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;

import java.io.Serializable;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -838,41 +836,6 @@ public long getCurrentWatermark() {
return internalTimerService.currentWatermark();
}

@Override
public <S extends Serializable> ValueState<S> getKeyValueState(
String name, Class<S> stateType, S defaultState) {
checkNotNull(stateType, "The state type class must not be null");

TypeInformation<S> typeInfo;
try {
typeInfo = TypeExtractor.getForClass(stateType);
} catch (Exception e) {
throw new RuntimeException(
"Cannot analyze type '"
+ stateType.getName()
+ "' from the class alone, due to generic type parameters. "
+ "Please specify the TypeInformation directly.",
e);
}

return getKeyValueState(name, typeInfo, defaultState);
}

@Override
public <S extends Serializable> ValueState<S> getKeyValueState(
String name, TypeInformation<S> stateType, S defaultState) {

checkNotNull(name, "The name of the state must not be null");
checkNotNull(stateType, "The state type information must not be null");

ValueStateDescriptor<S> stateDesc =
new ValueStateDescriptor<>(
name,
stateType.createSerializer(getExecutionConfig().getSerializerConfig()),
defaultState);
return getPartitionedState(stateDesc);
}

@SuppressWarnings("unchecked")
public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackendFactory;
Expand Down Expand Up @@ -167,7 +166,7 @@ void testConfigureMemoryStateBackend() throws Exception {
final MemoryStateBackend backend = new MemoryStateBackend(maxSize);

final Configuration config = new Configuration();
config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
config.setString(backendKey, "hashmap"); // check that this is not accidentally picked up
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);

Expand Down Expand Up @@ -209,7 +208,7 @@ void testConfigureMemoryStateBackendMixed() throws Exception {
final MemoryStateBackend backend = new MemoryStateBackend(appCheckpointDir, null);

final Configuration config = new Configuration();
config.setString(backendKey, "filesystem"); // check that this is not accidentally picked up
config.setString(backendKey, "hashmap"); // check that this is not accidentally picked up
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
checkpointDir); // this parameter should not be picked up
Expand Down Expand Up @@ -240,52 +239,6 @@ void testConfigureMemoryStateBackendMixed() throws Exception {
// File System State Backend
// ------------------------------------------------------------------------

/**
* Validates loading a file system state backend with additional parameters from the cluster
* configuration.
*/
@Test
void testLoadFileSystemStateBackend() throws Exception {
final String checkpointDir = new Path(TempDirUtils.newFolder(tmp).toURI()).toString();
final String savepointDir = new Path(TempDirUtils.newFolder(tmp).toURI()).toString();
final Path expectedCheckpointsPath = new Path(checkpointDir);
final Path expectedSavepointsPath = new Path(savepointDir);
final MemorySize threshold = MemorySize.parse("900kb");
final int minWriteBufferSize = 1024;

// we configure with the explicit string (rather than
// AbstractStateBackend#X_STATE_BACKEND_NAME)
// to guard against config-breaking changes of the name
final Configuration config1 = new Configuration();
config1.setString(backendKey, "filesystem");
config1.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);

final Configuration config2 = new Configuration();
config2.setString(backendKey, FsStateBackendFactory.class.getName());
config2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.set(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);

StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
StateBackend backend2 = StateBackendLoader.loadStateBackendFromConfig(config2, cl, null);

assertThat(backend1).isInstanceOf(HashMapStateBackend.class);
assertThat(backend2).isInstanceOf(FsStateBackend.class);

HashMapStateBackend fs1 = (HashMapStateBackend) backend1;
FsStateBackend fs2 = (FsStateBackend) backend2;

assertThat(fs2.getCheckpointPath()).isEqualTo(expectedCheckpointsPath);
assertThat(fs2.getSavepointPath()).isEqualTo(expectedSavepointsPath);
assertThat(fs2.getMinFileSizeThreshold()).isEqualTo(threshold.getBytes());
assertThat(fs2.getWriteBufferSize())
.isEqualTo(Math.max(threshold.getBytes(), minWriteBufferSize));
}

/**
* Validates taking the application-defined file system state backend and adding with additional
* parameters from configuration, but giving precedence to application-defined parameters over
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public static Configuration configureZooKeeperHA(
HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, Duration.ofMillis(connTimeout));

// File system state backend
config.set(StateBackendOptions.STATE_BACKEND, "FILESYSTEM");
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, fsStateHandlePath + "/checkpoints");
config.set(HighAvailabilityOptions.HA_STORAGE_PATH, fsStateHandlePath + "/recovery");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -50,7 +47,6 @@
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -360,18 +356,6 @@ public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescri
throw new RuntimeException("Error getting state", e);
}
}

@Override
public <S extends Serializable> ValueState<S> getKeyValueState(
String name, Class<S> stateType, S defaultState) {
return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
}

@Override
public <S extends Serializable> ValueState<S> getKeyValueState(
String name, TypeInformation<S> stateType, S defaultState) {
return getPartitionedState(new ValueStateDescriptor<>(name, stateType, defaultState));
}
}

private static class TestOnMergeContext<K, W extends Window> extends TestTriggerContext<K, W>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,6 @@ public void testLoadingMemoryStateBackend() throws Exception {
"jobmanager", MemoryStateBackend.class, MemoryStateBackend.class, false);
}

@Test
public void testLoadingFsStateBackendFromConfig() throws Exception {
testLoadingStateBackend(
"filesystem", HashMapStateBackend.class, JobManagerCheckpointStorage.class, true);
}

@Test
public void testLoadingFsStateBackend() throws Exception {
testLoadingStateBackend(
"filesystem", HashMapStateBackend.class, JobManagerCheckpointStorage.class, false);
}

@Test
public void testLoadingHashMapStateBackendFromConfig() throws Exception {
testLoadingStateBackend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static Collection<Object[]> data() {
new Object[][] {
{"rocksdb", false},
{"rocksdb", true},
{"filesystem", false}
{"hashmap", false}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public class RescalingITCase extends TestLogger {

@Parameterized.Parameters(name = "backend = {0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {{"filesystem"}, {"rocksdb"}});
return Arrays.asList(new Object[][] {{"hashmap"}, {"rocksdb"}});
}

public RescalingITCase(String backend) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public Configuration getConfiguration() {

@Override
protected String getConfigName() {
return "filesystem";
return "hashmap";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ MiniClusterWithClientResource get() {

private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
final Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "filesystem");
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ public Configuration getConfiguration(File checkpointDir) {

conf.set(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.9f);
conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
conf.set(StateBackendOptions.STATE_BACKEND, "filesystem");
conf.set(StateBackendOptions.STATE_BACKEND, "hashmap");
conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
if (restoreCheckpoint != null) {
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public static void setUp() throws Exception {

// we need to use the "filesystem" state backend to ensure FLINK-2543 is not happening
// again.
config.set(StateBackendOptions.STATE_BACKEND, "filesystem");
config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
FOLDER.newFolder().getAbsoluteFile().toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
public static Collection<String> parameters() {
return Arrays.asList(
StateBackendLoader.MEMORY_STATE_BACKEND_NAME,
StateBackendLoader.FS_STATE_BACKEND_NAME,
StateBackendLoader.HASHMAP_STATE_BACKEND_NAME,
StateBackendLoader.ROCKSDB_STATE_BACKEND_NAME);
}

Expand Down

0 comments on commit f699af3

Please sign in to comment.