From 4fe66e0697471105e0f0a3f8519bb0c0ac559709 Mon Sep 17 00:00:00 2001 From: fredia Date: Thu, 11 Apr 2024 15:40:40 +0800 Subject: [PATCH] [FLINK-32084][checkpoint] Migrate current file merging of channel state snapshot into the unify file merging framework --- ...hannelStateWriteRequestDispatcherImpl.java | 20 ++++----- ...annelStateWriteRequestExecutorFactory.java | 24 ++++++++--- .../channel/ChannelStateWriterImpl.java | 13 +++--- .../filesystem/FsCheckpointStorageAccess.java | 1 - .../FsMergingCheckpointStorageAccess.java | 6 ++- ...elStateWriteRequestDispatcherImplTest.java | 12 ++---- ...hannelStateWriteRequestDispatcherTest.java | 3 +- ...lStateWriteRequestExecutorFactoryTest.java | 11 +++-- ...nnelStateWriteRequestExecutorImplTest.java | 3 +- .../channel/ChannelStateWriterImplTest.java | 12 ++++-- .../operators/testutils/DummyEnvironment.java | 14 ++++++ .../state/ChannelPersistenceITCase.java | 4 +- .../ExecutionCheckpointingOptions.java | 9 ++++ .../streaming/runtime/tasks/StreamTask.java | 43 ++++++++++++++++--- .../SubtaskCheckpointCoordinatorImpl.java | 43 +++---------------- ...ckSubtaskCheckpointCoordinatorBuilder.java | 27 +++++++----- .../runtime/tasks/StreamMockEnvironment.java | 11 +++++ .../StreamTaskMailboxTestHarnessBuilder.java | 4 ++ .../SubtaskCheckpointCoordinatorTest.java | 10 ++++- 19 files changed, 166 insertions(+), 104 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java index 5151d9be70124..2fb0c0426c110 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImpl.java @@ -17,11 +17,10 @@ package org.apache.flink.runtime.checkpoint.channel; -import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,16 +43,14 @@ final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteR private static final Logger LOG = LoggerFactory.getLogger(ChannelStateWriteRequestDispatcherImpl.class); - private final CheckpointStorage checkpointStorage; - - private final JobID jobID; + private final SupplierWithException + checkpointStorageWorkerViewSupplier; + private CheckpointStorageWorkerView streamFactoryResolver; private final ChannelStateSerializer serializer; private final Set registeredSubtasks; - private CheckpointStorageWorkerView streamFactoryResolver; - /** * It is the checkpointId corresponding to writer. And It should be always update with {@link * #writer}. @@ -78,9 +75,10 @@ final class ChannelStateWriteRequestDispatcherImpl implements ChannelStateWriteR private ChannelStateCheckpointWriter writer; ChannelStateWriteRequestDispatcherImpl( - CheckpointStorage checkpointStorage, JobID jobID, ChannelStateSerializer serializer) { - this.checkpointStorage = checkNotNull(checkpointStorage); - this.jobID = jobID; + SupplierWithException + checkpointStorageWorkerViewSupplier, + ChannelStateSerializer serializer) { + this.checkpointStorageWorkerViewSupplier = checkpointStorageWorkerViewSupplier; this.serializer = checkNotNull(serializer); this.registeredSubtasks = new HashSet<>(); this.ongoingCheckpointId = -1; @@ -247,7 +245,7 @@ public void fail(Throwable cause) { CheckpointStorageWorkerView getStreamFactoryResolver() throws IOException { if (streamFactoryResolver == null) { - streamFactoryResolver = checkpointStorage.createCheckpointStorage(jobID); + streamFactoryResolver = checkpointStorageWorkerViewSupplier.get(); } return streamFactoryResolver; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java index a05e47a191665..7f7e99fbef33b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactory.java @@ -19,10 +19,13 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; +import org.apache.flink.util.function.SupplierWithException; import javax.annotation.concurrent.GuardedBy; +import java.io.IOException; + import static org.apache.flink.util.Preconditions.checkState; /** The factory of {@link ChannelStateWriteRequestExecutor}. */ @@ -42,10 +45,15 @@ public ChannelStateWriteRequestExecutorFactory(JobID jobID) { public ChannelStateWriteRequestExecutor getOrCreateExecutor( JobVertexID jobVertexID, int subtaskIndex, - CheckpointStorage checkpointStorage, + SupplierWithException + checkpointStorageWorkerViewSupplier, int maxSubtasksPerChannelStateFile) { return getOrCreateExecutor( - jobVertexID, subtaskIndex, checkpointStorage, maxSubtasksPerChannelStateFile, true); + jobVertexID, + subtaskIndex, + checkpointStorageWorkerViewSupplier, + maxSubtasksPerChannelStateFile, + true); } /** @@ -55,15 +63,19 @@ public ChannelStateWriteRequestExecutor getOrCreateExecutor( ChannelStateWriteRequestExecutor getOrCreateExecutor( JobVertexID jobVertexID, int subtaskIndex, - CheckpointStorage checkpointStorage, + SupplierWithException + checkpointStorageWorkerViewSupplier, int maxSubtasksPerChannelStateFile, boolean startExecutor) { synchronized (lock) { if (executor == null) { + ChannelStateWriteRequestDispatcher dispatcher = + new ChannelStateWriteRequestDispatcherImpl( + checkpointStorageWorkerViewSupplier, + new ChannelStateSerializerImpl()); executor = new ChannelStateWriteRequestExecutorImpl( - new ChannelStateWriteRequestDispatcherImpl( - checkpointStorage, jobID, new ChannelStateSerializerImpl()), + dispatcher, maxSubtasksPerChannelStateFile, executor -> { assert Thread.holdsLock(lock); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 9b80813cf816b..40d7ddffd1e18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -23,9 +23,10 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.CheckpointStateOutputStream; -import org.apache.flink.runtime.state.CheckpointStorage; +import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,14 +88,15 @@ public ChannelStateWriterImpl( JobVertexID jobVertexID, String taskName, int subtaskIndex, - CheckpointStorage checkpointStorage, + SupplierWithException + checkpointStorageWorkerViewSupplier, ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory, int maxSubtasksPerChannelStateFile) { this( jobVertexID, taskName, subtaskIndex, - checkpointStorage, + checkpointStorageWorkerViewSupplier, DEFAULT_MAX_CHECKPOINTS, channelStateExecutorFactory, maxSubtasksPerChannelStateFile); @@ -113,7 +115,8 @@ public ChannelStateWriterImpl( JobVertexID jobVertexID, String taskName, int subtaskIndex, - CheckpointStorage checkpointStorage, + SupplierWithException + checkpointStorageWorkerViewSupplier, int maxCheckpoints, ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory, int maxSubtasksPerChannelStateFile) { @@ -125,7 +128,7 @@ public ChannelStateWriterImpl( channelStateExecutorFactory.getOrCreateExecutor( jobVertexID, subtaskIndex, - checkpointStorage, + checkpointStorageWorkerViewSupplier, maxSubtasksPerChannelStateFile), maxCheckpoints); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java index 1c18398475fa7..c32ff6c3892c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageAccess.java @@ -232,7 +232,6 @@ public FsMergingCheckpointStorageAccess toFileMergingStorage( FileMergingSnapshotManager mergingSnapshotManager, Environment environment) throws IOException { return new FsMergingCheckpointStorageAccess( - fileSystem, checkpointsDirectory, getDefaultSavepointDirectory(), environment.getJobID(), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java index a2f7560d653dc..f41a375178587 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java @@ -41,7 +41,6 @@ public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess private final FileMergingSnapshotManager.SubtaskKey subtaskKey; public FsMergingCheckpointStorageAccess( - FileSystem fs, Path checkpointBaseDirectory, @Nullable Path defaultSavepointDirectory, JobID jobId, @@ -51,7 +50,10 @@ public FsMergingCheckpointStorageAccess( Environment environment) throws IOException { super( - fs, + // Multiple subtask/threads would share one output stream, + // SafetyNetWrapperFileSystem cannot be used to prevent different threads from + // interfering with each other when exiting. + FileSystem.getUnguardedFileSystem(checkpointBaseDirectory.toUri()), checkpointBaseDirectory, defaultSavepointDirectory, false, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java index 5ece5423a39ae..08c88eff0cd29 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java @@ -76,8 +76,7 @@ private void testBuffersRecycled( Function requestBuilder) throws Exception { ChannelStateWriteRequestDispatcher dispatcher = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); ChannelStateWriteResult result = new ChannelStateWriteResult(); dispatcher.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX)); @@ -113,8 +112,7 @@ private void testStartNewCheckpointAndCheckOldCheckpointResult(boolean isDiffere throws Exception { ChannelStateWriteRequestDispatcher processor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); ChannelStateWriteResult result = new ChannelStateWriteResult(); processor.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX)); @@ -157,8 +155,7 @@ private void testStartOldCheckpointAfterNewCheckpointAborted(boolean isDifferent throws Exception { ChannelStateWriteRequestDispatcher processor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); processor.dispatch(ChannelStateWriteRequest.registerSubtask(JOB_VERTEX_ID, SUBTASK_INDEX)); JobVertexID newJobVertex = JOB_VERTEX_ID; @@ -194,8 +191,7 @@ void testAbortCheckpointAndCheckAllException() throws Exception { private void testAbortCheckpointAndCheckAllException(int numberOfSubtask) throws Exception { ChannelStateWriteRequestDispatcher processor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); List results = new ArrayList<>(numberOfSubtask); for (int i = 0; i < numberOfSubtask; i++) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java index 36de469bbfa65..63684f663b315 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherTest.java @@ -169,8 +169,7 @@ private static CheckpointStartRequest start() { void doRun() { ChannelStateWriteRequestDispatcher processor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); try { processor.dispatch(register()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactoryTest.java index d25b41deaf24c..8e6a3ee12280d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorFactoryTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; +import java.io.IOException; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -37,7 +38,7 @@ public class ChannelStateWriteRequestExecutorFactoryTest { private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage(); @Test - void testReuseExecutorForSameJobId() { + void testReuseExecutorForSameJobId() throws IOException { assertReuseExecutor(1); assertReuseExecutor(2); assertReuseExecutor(3); @@ -45,7 +46,7 @@ void testReuseExecutorForSameJobId() { assertReuseExecutor(10); } - private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) { + private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) throws IOException { JobID JOB_ID = new JobID(); Random RANDOM = new Random(); ChannelStateWriteRequestExecutorFactory executorFactory = @@ -58,7 +59,7 @@ private void assertReuseExecutor(int maxSubtasksPerChannelStateFile) { executorFactory.getOrCreateExecutor( new JobVertexID(), RANDOM.nextInt(numberOfTasks), - CHECKPOINT_STORAGE, + () -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID), maxSubtasksPerChannelStateFile); if (i % maxSubtasksPerChannelStateFile == 0) { assertThat(newExecutor) @@ -94,7 +95,9 @@ void testSomeSubtasksCloseDuringOtherSubtasksStarting() throws Exception { executorFactory.getOrCreateExecutor( jobVertexID, i, - CHECKPOINT_STORAGE, + () -> + CHECKPOINT_STORAGE + .createCheckpointStorage(jobID), maxSubtasksPerChannelStateFile, false); assertThat(executor).isNotNull(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java index 8f9f073154c70..2b91f3d875900 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestExecutorImplTest.java @@ -175,8 +175,7 @@ void testCanBeClosed() throws Exception { long checkpointId = 1L; ChannelStateWriteRequestDispatcher processor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), - JOB_ID, + () -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), new ChannelStateSerializerImpl()); Object registerLock = new Object(); ChannelStateWriteRequestExecutorImpl worker = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 0b2cbe3cc6c51..fb931946bbb24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.util.function.BiConsumerWithException; @@ -48,6 +49,8 @@ class ChannelStateWriterImplTest { private static final JobVertexID JOB_VERTEX_ID = new JobVertexID(); private static final int SUBTASK_INDEX = 0; + private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage(); + @Test void testAddEventBuffer() throws Exception { @@ -241,7 +244,7 @@ void testLimit() throws IOException { JOB_VERTEX_ID, TASK_NAME, SUBTASK_INDEX, - new JobManagerCheckpointStorage(), + () -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID), maxCheckpoints, new ChannelStateWriteRequestExecutorFactory(JOB_ID), 5)) { @@ -338,12 +341,12 @@ private void executeCallbackWithSyncWorker( } } - private ChannelStateWriterImpl openWriter() { + private ChannelStateWriterImpl openWriter() throws IOException { return new ChannelStateWriterImpl( JOB_VERTEX_ID, TASK_NAME, SUBTASK_INDEX, - new JobManagerCheckpointStorage(), + () -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID), new ChannelStateWriteRequestExecutorFactory(JOB_ID), 5); } @@ -381,7 +384,8 @@ class SyncChannelStateWriteRequestExecutor implements ChannelStateWriteRequestEx deque = new ArrayDeque<>(); requestProcessor = new ChannelStateWriteRequestDispatcherImpl( - new JobManagerCheckpointStorage(), jobID, new ChannelStateSerializerImpl()); + () -> new JobManagerCheckpointStorage().createCheckpointStorage(jobID), + new ChannelStateSerializerImpl()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java index da757208b5fc9..edb85cf19c590 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java @@ -48,6 +48,7 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; @@ -64,6 +65,7 @@ import java.util.concurrent.Future; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; +import static org.apache.flink.util.Preconditions.checkNotNull; /** The {@link DummyEnvironment} is used for test purpose. */ public class DummyEnvironment implements Environment { @@ -82,6 +84,8 @@ public class DummyEnvironment implements Environment { private final ChannelStateWriteRequestExecutorFactory channelStateExecutorFactory = new ChannelStateWriteRequestExecutorFactory(jobInfo.getJobId()); + private CheckpointStorageAccess checkpointStorageAccess; + public DummyEnvironment() { this("Test Job", 1, 0, 1); } @@ -291,4 +295,14 @@ public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() public JobInfo getJobInfo() { return jobInfo; } + + @Override + public void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) { + this.checkpointStorageAccess = checkpointStorageAccess; + } + + @Override + public CheckpointStorageAccess getCheckpointStorageAccess() { + return checkNotNull(checkpointStorageAccess); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java index e85c17bf42900..cd1cc1ee66a02 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java @@ -81,6 +81,8 @@ class ChannelPersistenceITCase { private static final JobVertexID JOB_VERTEX_ID = new JobVertexID(); private static final int SUBTASK_INDEX = 0; + private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage(); + @Test void testUpstreamBlocksAfterRecoveringState() throws Exception { upstreamBlocksAfterRecoveringState(ResultPartitionType.PIPELINED); @@ -264,7 +266,7 @@ private ChannelStateWriteResult write( JOB_VERTEX_ID, "test", SUBTASK_INDEX, - new JobManagerCheckpointStorage(maxStateSize), + () -> CHECKPOINT_STORAGE.createCheckpointStorage(JOB_ID), new ChannelStateWriteRequestExecutorFactory(JOB_ID), 5)) { writer.start( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 65d31ee595946..74d07f471eaaf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -327,6 +327,15 @@ public class ExecutionCheckpointingOptions { .defaultValue(false) .withDescription("Flag to enable approximate local recovery."); + // TODO: deprecated + // Currently, both two file merging mechanism can work simultaneously: + // 1. If UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE=1 and + // state.checkpoints.file-merging.enabled: true, only the unified file merging mechanism takes + // effect. + // 2. if UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE>1 and + // state.checkpoints.file-merging.enabled: false, only the current mechanism takes effect. + // 3. if UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE>1 and + // state.checkpoints.file-merging.enabled: true, both two mechanism take effect. public static final ConfigOption UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE = key("execution.checkpointing.unaligned.max-subtasks-per-channel-state-file") .intType() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 988c19a06e4da..e0ffe9b926706 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -72,6 +72,7 @@ import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageAccess; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; import org.apache.flink.runtime.taskmanager.AsynchronousException; @@ -148,6 +149,7 @@ import static org.apache.flink.runtime.metrics.MetricNames.INITIALIZE_STATE_DURATION; import static org.apache.flink.runtime.metrics.MetricNames.MAILBOX_START_DURATION; import static org.apache.flink.runtime.metrics.MetricNames.READ_OUTPUT_DATA_DURATION; +import static org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.openChannelStateWriter; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.concurrent.FutureUtils.assertNoException; @@ -483,27 +485,54 @@ protected StreamTask( } this.systemTimerService = createTimerService("System Time Trigger for " + getName()); - + final CheckpointStorageAccess finalCheckpointStorageAccess = checkpointStorageAccess; + + ChannelStateWriter channelStateWriter = + configuration.isUnalignedCheckpointsEnabled() + ? openChannelStateWriter( + getName(), + // Note: don't pass checkpointStorageAccess directly to channel + // state writer. + // The fileSystem of checkpointStorageAccess may be an instance + // of SafetyNetWrapperFileSystem, which close all held streams + // when thread exits. Channel state writers are invoked in other + // threads instead of task thread, therefore channel state + // writer cannot share file streams directly, otherwise + // conflicts will occur on job exit. + () -> { + if (finalCheckpointStorageAccess + instanceof FsMergingCheckpointStorageAccess) { + // FsMergingCheckpointStorageAccess using unguarded + // fileSystem, which can be shared. + return finalCheckpointStorageAccess; + } else { + // Other checkpoint storage access should be lazily + // initialized to avoid sharing. + return checkpointStorage.createCheckpointStorage( + getEnvironment().getJobID()); + } + }, + environment, + configuration.getMaxSubtasksPerChannelStateFile()) + : ChannelStateWriter.NO_OP; this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl( - checkpointStorage, checkpointStorageAccess, getName(), actionExecutor, getAsyncOperationsThreadPool(), environment, this, - configuration.isUnalignedCheckpointsEnabled(), + this::prepareInputSnapshot, + configuration.getMaxConcurrentCheckpoints(), + channelStateWriter, configuration .getConfiguration() .get( ExecutionCheckpointingOptions .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), - this::prepareInputSnapshot, - configuration.getMaxConcurrentCheckpoints(), BarrierAlignmentUtil.createRegisterTimerCallback( - mainMailboxExecutor, systemTimerService), - configuration.getMaxSubtasksPerChannelStateFile()); + mainMailboxExecutor, systemTimerService)); resourceCloser.registerCloseable(subtaskCheckpointCoordinator::close); // Register to stop all timers and threads. Should be closed first. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index 79af3c4875a26..b97328dc902d7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStateToolset; -import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStorageWorkerView; import org.apache.flink.runtime.state.CheckpointStreamFactory; @@ -50,6 +49,7 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; import org.apache.flink.util.function.BiFunctionWithException; +import org.apache.flink.util.function.SupplierWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,40 +131,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { @Nullable private final FileMergingSnapshotManager fileMergingSnapshotManager; - SubtaskCheckpointCoordinatorImpl( - CheckpointStorage checkpointStorage, - CheckpointStorageWorkerView checkpointStorageView, - String taskName, - StreamTaskActionExecutor actionExecutor, - ExecutorService asyncOperationsThreadPool, - Environment env, - AsyncExceptionHandler asyncExceptionHandler, - boolean unalignedCheckpointEnabled, - boolean enableCheckpointAfterTasksFinished, - BiFunctionWithException< - ChannelStateWriter, Long, CompletableFuture, CheckpointException> - prepareInputSnapshot, - int maxRecordAbortedCheckpoints, - DelayableTimer registerTimer, - int maxSubtasksPerChannelStateFile) { - this( - checkpointStorageView, - taskName, - actionExecutor, - asyncOperationsThreadPool, - env, - asyncExceptionHandler, - prepareInputSnapshot, - maxRecordAbortedCheckpoints, - unalignedCheckpointEnabled - ? openChannelStateWriter( - taskName, checkpointStorage, env, maxSubtasksPerChannelStateFile) - : ChannelStateWriter.NO_OP, - enableCheckpointAfterTasksFinished, - registerTimer, - env.getTaskStateManager().getFileMergingSnapshotManager()); - } - @VisibleForTesting SubtaskCheckpointCoordinatorImpl( CheckpointStorageWorkerView checkpointStorage, @@ -232,16 +198,17 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { this.fileMergingSnapshotManager = fileMergingSnapshotManager; } - private static ChannelStateWriter openChannelStateWriter( + public static ChannelStateWriter openChannelStateWriter( String taskName, - CheckpointStorage checkpointStorage, + SupplierWithException + checkpointStorageWorkerView, Environment env, int maxSubtasksPerChannelStateFile) { return new ChannelStateWriterImpl( env.getJobVertexId(), taskName, env.getTaskInfo().getIndexOfThisSubtask(), - checkpointStorage, + checkpointStorageWorkerView, env.getChannelStateExecutorFactory(), maxSubtasksPerChannelStateFile); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java index 38b3e7dce02b2..f315c7efc09d3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MockSubtaskCheckpointCoordinatorBuilder.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import static org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor.IMMEDIATE; +import static org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.openChannelStateWriter; /** A mock builder to build {@link SubtaskCheckpointCoordinator}. */ public class MockSubtaskCheckpointCoordinatorBuilder { @@ -87,37 +88,41 @@ public MockSubtaskCheckpointCoordinatorBuilder setEnableCheckpointAfterTasksFini return this; } - public MockSubtaskCheckpointCoordinatorBuilder setMaxSubtasksPerChannelStateFile( - int maxSubtasksPerChannelStateFile) { - this.maxSubtasksPerChannelStateFile = maxSubtasksPerChannelStateFile; - return this; - } - SubtaskCheckpointCoordinator build() throws IOException { if (environment == null) { this.environment = MockEnvironment.builder().build(); } if (checkpointStorage == null) { this.checkpointStorage = new JobManagerCheckpointStorage(); + this.environment.setCheckpointStorageAccess( + checkpointStorage.createCheckpointStorage(environment.getJobID())); } if (asyncExceptionHandler == null) { this.asyncExceptionHandler = new NonHandleAsyncException(); } + ChannelStateWriter channelStateWriter = + unalignedCheckpointEnabled + ? openChannelStateWriter( + taskName, + () -> + checkpointStorage.createCheckpointStorage( + environment.getJobID()), + environment, + maxSubtasksPerChannelStateFile) + : ChannelStateWriter.NO_OP; return new SubtaskCheckpointCoordinatorImpl( - checkpointStorage, checkpointStorage.createCheckpointStorage(environment.getJobID()), taskName, actionExecutor, executorService, environment, asyncExceptionHandler, - unalignedCheckpointEnabled, - enableCheckpointAfterTasksFinished, prepareInputSnapshot, maxRecordAbortedCheckpoints, - (callable, duration) -> () -> {}, - maxSubtasksPerChannelStateFile); + channelStateWriter, + enableCheckpointAfterTasksFinished, + (callable, duration) -> () -> {}); } private static class NonHandleAsyncException implements AsyncExceptionHandler { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java index bd31e19631be5..7e9f04be81c84 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; import org.apache.flink.runtime.query.KvStateRegistry; import org.apache.flink.runtime.query.TaskKvStateRegistry; +import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager; import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager; @@ -139,6 +140,8 @@ public class StreamMockEnvironment implements Environment { private CheckpointResponder checkpointResponder = NoOpCheckpointResponder.INSTANCE; + private CheckpointStorageAccess checkpointStorageAccess; + public StreamMockEnvironment( Configuration jobConfig, Configuration taskConfig, @@ -438,4 +441,12 @@ public ChannelStateWriteRequestExecutorFactory getChannelStateExecutorFactory() public JobInfo getJobInfo() { return jobInfo; } + + public void setCheckpointStorageAccess(CheckpointStorageAccess checkpointStorageAccess) { + this.checkpointStorageAccess = checkpointStorageAccess; + } + + public CheckpointStorageAccess getCheckpointStorageAccess() { + return checkpointStorageAccess; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index 80f2a9eca8170..e16b2f73f4990 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.state.TestTaskStateManagerBuilder; +import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; @@ -252,6 +253,9 @@ public StreamTaskMailboxTestHarness buildUnrestored() throws Exception { Queue outputList = new ArrayDeque<>(); streamMockEnvironment.addOutput(outputList, outputStreamRecordSerializer); streamMockEnvironment.setTaskMetricGroup(taskMetricGroup); + streamMockEnvironment.setCheckpointStorageAccess( + new JobManagerCheckpointStorage() + .createCheckpointStorage(streamMockEnvironment.getJobID())); for (ResultPartitionWriter writer : additionalOutputs) { streamMockEnvironment.addOutput(writer); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java index a752c515222e3..8d60bb27cc3fa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.DoneFuture; @@ -92,6 +93,7 @@ /** Tests for {@link SubtaskCheckpointCoordinator}. */ class SubtaskCheckpointCoordinatorTest { + private static final CheckpointStorage CHECKPOINT_STORAGE = new JobManagerCheckpointStorage(); @Test void testInitCheckpoint() throws IOException, CheckpointException { @@ -194,6 +196,8 @@ public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) void testForceAlignedCheckpointResultingInPriorityEvents() throws Exception { final long checkpointId = 42L; MockEnvironment mockEnvironment = MockEnvironment.builder().build(); + mockEnvironment.setCheckpointStorageAccess( + CHECKPOINT_STORAGE.createCheckpointStorage(mockEnvironment.getJobID())); try (SubtaskCheckpointCoordinator coordinator = new MockSubtaskCheckpointCoordinatorBuilder() @@ -586,12 +590,13 @@ public void snapshotState( void testChannelStateWriteResultLeakAndNotFailAfterCheckpointAborted() throws Exception { String taskName = "test"; DummyEnvironment env = new DummyEnvironment(); + env.setCheckpointStorageAccess(CHECKPOINT_STORAGE.createCheckpointStorage(env.getJobID())); ChannelStateWriterImpl writer = new ChannelStateWriterImpl( env.getJobVertexId(), taskName, 0, - new JobManagerCheckpointStorage(), + () -> env.getCheckpointStorageAccess(), env.getChannelStateExecutorFactory(), 5); try (MockEnvironment mockEnvironment = MockEnvironment.builder().build(); @@ -646,12 +651,13 @@ void testAbortOldAndStartNewCheckpoint() throws Exception { CheckpointOptions.unaligned( CHECKPOINT, CheckpointStorageLocationReference.getDefault()); DummyEnvironment env = new DummyEnvironment(); + env.setCheckpointStorageAccess(CHECKPOINT_STORAGE.createCheckpointStorage(env.getJobID())); ChannelStateWriterImpl writer = new ChannelStateWriterImpl( env.getJobVertexId(), taskName, 0, - new JobManagerCheckpointStorage(), + () -> env.getCheckpointStorageAccess(), env.getChannelStateExecutorFactory(), 5); try (MockEnvironment mockEnvironment = MockEnvironment.builder().build();