diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java index 33ec3d92ab840..e7ba144d5b775 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateRestoreOperation.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; @@ -66,7 +67,8 @@ public Void restore() throws Exception { for (OperatorStateHandle stateHandle : stateHandles) { - if (stateHandle == null) { + if (stateHandle == null + || stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle) { continue; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index 11d68e7b394e3..a4b31cc310ef7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -106,7 +106,8 @@ public Optional asBytesIfInMemory() { @Override public PhysicalStateHandleID getStreamStateHandleID() { - return new PhysicalStateHandleID(filePath.toUri().toString()); + return new PhysicalStateHandleID( + String.format("%s-%d-%d", filePath.toUri(), startPos, stateSize)); } public long getStartPos() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e0ffe9b926706..433edf63f255f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -562,10 +562,12 @@ private CheckpointStorageAccess tryApplyFileMergingCheckpoint( return checkpointStorageAccess; } try { - CheckpointStorageWorkerView mergingCheckpointStorageAccess = - checkpointStorageAccess.toFileMergingStorage( - fileMergingSnapshotManager, environment); - return (CheckpointStorageAccess) mergingCheckpointStorageAccess; + CheckpointStorageAccess mergingCheckpointStorageAccess = + (CheckpointStorageAccess) + checkpointStorageAccess.toFileMergingStorage( + fileMergingSnapshotManager, environment); + mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint(); + return mergingCheckpointStorageAccess; } catch (IOException e) { LOG.warn( "Initiating FsMergingCheckpointStorageAccess failed " diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java index cd64384eaaf0d..0f319c8b42878 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java @@ -122,6 +122,9 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio Duration.ofSeconds(2)); randomize(conf, CheckpointingOptions.CLEANER_PARALLEL_MODE, true, false); randomize(conf, ExecutionOptions.SNAPSHOT_COMPRESSION, true, false); + if (!conf.contains(CheckpointingOptions.FILE_MERGING_ENABLED)) { + randomize(conf, CheckpointingOptions.FILE_MERGING_ENABLED, true); + } } randomize( @@ -134,7 +137,9 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio false); // randomize ITTests for enabling state change log - if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) { + // TODO: remove the file merging check after FLINK-32085 + if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG) + && !conf.get(CheckpointingOptions.FILE_MERGING_ENABLED)) { if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) { conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true); } else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java index b5bc898954a9e..170cb83ec2c86 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ChangelogRecoveryITCaseBase.java @@ -93,6 +93,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath; import static org.apache.flink.shaded.guava31.com.google.common.collect.Iterables.get; import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata; @@ -154,7 +155,10 @@ protected StreamExecutionEnvironment getEnv( int restartAttempts, long materializationInterval, int materializationMaxFailure) { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration conf = new Configuration(); + conf.set( + FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085 + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.enableCheckpointing(checkpointInterval).enableChangelogStateBackend(true); env.getCheckpointConfig().enableUnalignedCheckpoints(false); env.setStateBackend(stateBackend) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java index 8899047ad9bcf..38daeb661edb8 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RestoreUpgradedJobITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.JobClient; @@ -166,7 +167,10 @@ private long calculateExpectedResultBeforeSavepoint() { @NotNull private String runOriginalJob() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration conf = new Configuration(); + // TODO: remove this after FLINK-32081 + conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.getCheckpointConfig() .setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); @@ -215,6 +219,7 @@ private void runUpgradedJob(String snapshotPath) throws Exception { StreamExecutionEnvironment env; Configuration conf = new Configuration(); conf.set(StateRecoveryOptions.SAVEPOINT_PATH, snapshotPath); + conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false); env = StreamExecutionEnvironment.getExecutionEnvironment(conf); env.setParallelism(PARALLELISM); env.addSource(new StringSource(allDataEmittedLatch)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java index 8b74d203148cd..85eb4a5863d2b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java @@ -45,6 +45,7 @@ import java.util.Optional; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY; import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath; import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning; @@ -135,6 +136,9 @@ private Optional runAndStoreIfAllowed() throws Exception { } private StreamExecutionEnvironment initEnvironment() { + Configuration conf = new Configuration(); + conf.set( + FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableChangelogStateBackend(testCase.startWithChangelog); if (testCase.restoreSource == RestoreSource.CHECKPOINT) { diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java index 543512643ac64..a4770f525df97 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRecoveryCachingITCase.java @@ -62,6 +62,7 @@ import static org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY; import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND; @@ -187,6 +188,8 @@ private Configuration configureJob(File cpDir) { 1); // prevent file is opened multiple times conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization conf.set(RESTART_STRATEGY, "none"); // not expecting any failures + conf.set( + FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085 return conf; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java index b1bcdb7d18ca9..7a5a267096937 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java @@ -70,6 +70,7 @@ import static org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY; import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE; +import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED; import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD; import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM; import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE; @@ -232,6 +233,8 @@ private Configuration configureJob(int parallelism, File cpDir) { conf.set(ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO); // prevent randomization conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization conf.set(RESTART_STRATEGY, "none"); // not expecting any failures + conf.set( + FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085 return conf; }