Skip to content

Commit

Permalink
[FLINK-32092][tests] Integrate snapshot file-merging with existing IT…
Browse files Browse the repository at this point in the history
… cases (apache#24789)
  • Loading branch information
fredia authored May 16, 2024
1 parent 73a7e1c commit b87ead7
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -66,7 +67,8 @@ public Void restore() throws Exception {

for (OperatorStateHandle stateHandle : stateHandles) {

if (stateHandle == null) {
if (stateHandle == null
|| stateHandle instanceof EmptyFileMergingOperatorStreamStateHandle) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public Optional<byte[]> asBytesIfInMemory() {

@Override
public PhysicalStateHandleID getStreamStateHandleID() {
return new PhysicalStateHandleID(filePath.toUri().toString());
return new PhysicalStateHandleID(
String.format("%s-%d-%d", filePath.toUri(), startPos, stateSize));
}

public long getStartPos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,12 @@ private CheckpointStorageAccess tryApplyFileMergingCheckpoint(
return checkpointStorageAccess;
}
try {
CheckpointStorageWorkerView mergingCheckpointStorageAccess =
checkpointStorageAccess.toFileMergingStorage(
fileMergingSnapshotManager, environment);
return (CheckpointStorageAccess) mergingCheckpointStorageAccess;
CheckpointStorageAccess mergingCheckpointStorageAccess =
(CheckpointStorageAccess)
checkpointStorageAccess.toFileMergingStorage(
fileMergingSnapshotManager, environment);
mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint();
return mergingCheckpointStorageAccess;
} catch (IOException e) {
LOG.warn(
"Initiating FsMergingCheckpointStorageAccess failed "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio
Duration.ofSeconds(2));
randomize(conf, CheckpointingOptions.CLEANER_PARALLEL_MODE, true, false);
randomize(conf, ExecutionOptions.SNAPSHOT_COMPRESSION, true, false);
if (!conf.contains(CheckpointingOptions.FILE_MERGING_ENABLED)) {
randomize(conf, CheckpointingOptions.FILE_MERGING_ENABLED, true);
}
}

randomize(
Expand All @@ -134,7 +137,9 @@ private static void randomizeConfiguration(MiniCluster miniCluster, Configuratio
false);

// randomize ITTests for enabling state change log
if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)) {
// TODO: remove the file merging check after FLINK-32085
if (!conf.contains(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG)
&& !conf.get(CheckpointingOptions.FILE_MERGING_ENABLED)) {
if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_ON)) {
conf.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
} else if (STATE_CHANGE_LOG_CONFIG.equalsIgnoreCase(STATE_CHANGE_LOG_CONFIG_RAND)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
import static org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath;
import static org.apache.flink.shaded.guava31.com.google.common.collect.Iterables.get;
import static org.apache.flink.test.util.TestUtils.loadCheckpointMetadata;
Expand Down Expand Up @@ -154,7 +155,10 @@ protected StreamExecutionEnvironment getEnv(
int restartAttempts,
long materializationInterval,
int materializationMaxFailure) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
conf.set(
FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.enableCheckpointing(checkpointInterval).enableChangelogStateBackend(true);
env.getCheckpointConfig().enableUnalignedCheckpoints(false);
env.setStateBackend(stateBackend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.JobClient;
Expand Down Expand Up @@ -166,7 +167,10 @@ private long calculateExpectedResultBeforeSavepoint() {

@NotNull
private String runOriginalJob() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
// TODO: remove this after FLINK-32081
conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.getCheckpointConfig()
.setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
Expand Down Expand Up @@ -215,6 +219,7 @@ private void runUpgradedJob(String snapshotPath) throws Exception {
StreamExecutionEnvironment env;
Configuration conf = new Configuration();
conf.set(StateRecoveryOptions.SAVEPOINT_PATH, snapshotPath);
conf.set(CheckpointingOptions.FILE_MERGING_ENABLED, false);
env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(PARALLELISM);
env.addSource(new StringSource(allDataEmittedLatch))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Optional;

import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRECTORY;
import static org.apache.flink.runtime.jobgraph.SavepointRestoreSettings.forPath;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
Expand Down Expand Up @@ -135,6 +136,9 @@ private Optional<String> runAndStoreIfAllowed() throws Exception {
}

private StreamExecutionEnvironment initEnvironment() {
Configuration conf = new Configuration();
conf.set(
FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableChangelogStateBackend(testCase.startWithChangelog);
if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static org.apache.flink.configuration.RestartStrategyOptions.RESTART_STRATEGY;
import static org.apache.flink.configuration.StateBackendOptions.STATE_BACKEND;
Expand Down Expand Up @@ -187,6 +188,8 @@ private Configuration configureJob(File cpDir) {
1); // prevent file is opened multiple times
conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization
conf.set(RESTART_STRATEGY, "none"); // not expecting any failures
conf.set(
FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085

return conf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import static org.apache.flink.changelog.fs.FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINTS_DIRECTORY;
import static org.apache.flink.configuration.CheckpointingOptions.CHECKPOINT_STORAGE;
import static org.apache.flink.configuration.CheckpointingOptions.FILE_MERGING_ENABLED;
import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
import static org.apache.flink.configuration.CoreOptions.DEFAULT_PARALLELISM;
import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
Expand Down Expand Up @@ -232,6 +233,8 @@ private Configuration configureJob(int parallelism, File cpDir) {
conf.set(ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO); // prevent randomization
conf.set(BUFFER_DEBLOAT_ENABLED, false); // prevent randomization
conf.set(RESTART_STRATEGY, "none"); // not expecting any failures
conf.set(
FILE_MERGING_ENABLED, false); // TODO: remove file merging setting after FLINK-32085

return conf;
}
Expand Down

0 comments on commit b87ead7

Please sign in to comment.