diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index ff252f05e7880..cc54854a7a33d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.TaskStateManager;
@@ -34,8 +35,6 @@
* checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task
* in TM, including all subtasks of this single task that is running in this TM. There is one
* FileMergingSnapshotManager for each job per task manager.
- *
- *
TODO (FLINK-32075): leverage checkpoint notification to delete logical files.
*/
public interface FileMergingSnapshotManager extends Closeable {
@@ -118,6 +117,34 @@ FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(
*/
Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+ /**
+ * Notifies the manager that the checkpoint with the given {@code checkpointId} completed and
+ * was committed.
+ *
+ * @param subtaskKey the subtask key identifying the subtask.
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ * @throws Exception thrown if anything goes wrong with the listener.
+ */
+ void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId) throws Exception;
+
+ /**
+ * This method is called as a notification once a distributed checkpoint has been aborted.
+ *
+ * @param subtaskKey the subtask key identifying the subtask.
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ * @throws Exception thrown if anything goes wrong with the listener.
+ */
+ void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception;
+
+ /**
+ * This method is called as a notification once a distributed checkpoint has been subsumed.
+ *
+ * @param subtaskKey the subtask key identifying the subtask.
+ * @param checkpointId The ID of the checkpoint that has been completed.
+ * @throws Exception thrown if anything goes wrong with the listener.
+ */
+ void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception;
+
/**
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
* the parallelism. Note that this key should be consistent across job attempts.
@@ -151,6 +178,12 @@ public SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) {
this.hashCode = hash;
}
+ public static SubtaskKey of(Environment environment) {
+ return new SubtaskKey(
+ OperatorID.fromJobVertexID(environment.getJobVertexId()),
+ environment.getTaskInfo());
+ }
+
/**
* Generate an unique managed directory name for one subtask.
*
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 19414292aecd4..9ad4ff4d6644e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -17,6 +17,7 @@
package org.apache.flink.runtime.checkpoint.filemerging;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.EntropyInjector;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -34,10 +35,15 @@
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@@ -55,6 +61,12 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
/** The executor for I/O operations in this manager. */
protected final Executor ioExecutor;
+ /** Guard for uploadedStates. */
+ protected final Object lock = new Object();
+
+ @GuardedBy("lock")
+ protected TreeMap> uploadedStates = new TreeMap<>();
+
/** The {@link FileSystem} that this manager works on. */
protected FileSystem fs;
@@ -245,6 +257,13 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
physicalFile, startPos, stateSize, subtaskKey);
logicalFile.advanceLastCheckpointId(checkpointId);
+ // track the logical file
+ synchronized (lock) {
+ uploadedStates
+ .computeIfAbsent(checkpointId, key -> new HashSet<>())
+ .add(logicalFile);
+ }
+
// deal with physicalFile file
physicalFile.incSize(stateSize);
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);
@@ -289,6 +308,13 @@ protected Path generatePhysicalFilePath(Path dirPath) {
return new Path(dirPath, fileName);
}
+ @VisibleForTesting
+ boolean isResponsibleForFile(Path filePath) {
+ Path parent = filePath.getParent();
+ return parent.equals(managedExclusiveStateDir)
+ || managedSharedStateDir.containsValue(parent);
+ }
+
/**
* Delete a physical file by given file path. Use the io executor to do the deletion.
*
@@ -345,6 +371,72 @@ protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
protected abstract void returnPhysicalFileForNextReuse(
SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException;
+ /**
+ * The callback which will be triggered when all subtasks discarded (aborted or subsumed).
+ *
+ * @param checkpointId the discarded checkpoint id.
+ * @throws IOException if anything goes wrong with file system.
+ */
+ protected abstract void discardCheckpoint(long checkpointId) throws IOException;
+
+ // ------------------------------------------------------------------------
+ // Checkpoint Listener
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId)
+ throws Exception {
+ // does nothing
+ }
+
+ @Override
+ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception {
+ synchronized (lock) {
+ Set logicalFilesForCurrentCp = uploadedStates.get(checkpointId);
+ if (logicalFilesForCurrentCp == null) {
+ return;
+ }
+ if (discardLogicalFiles(subtaskKey, checkpointId, logicalFilesForCurrentCp)) {
+ uploadedStates.remove(checkpointId);
+ }
+ }
+ }
+
+ @Override
+ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
+ throws Exception {
+ synchronized (lock) {
+ Iterator>> uploadedStatesIterator =
+ uploadedStates.headMap(checkpointId, true).entrySet().iterator();
+ while (uploadedStatesIterator.hasNext()) {
+ Map.Entry> entry = uploadedStatesIterator.next();
+ if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) {
+ uploadedStatesIterator.remove();
+ }
+ }
+ }
+ }
+
+ private boolean discardLogicalFiles(
+ SubtaskKey subtaskKey, long checkpointId, Set logicalFiles)
+ throws Exception {
+ Iterator logicalFileIterator = logicalFiles.iterator();
+ while (logicalFileIterator.hasNext()) {
+ LogicalFile logicalFile = logicalFileIterator.next();
+ if (logicalFile.getSubtaskKey().equals(subtaskKey)
+ && logicalFile.getLastUsedCheckpointID() <= checkpointId) {
+ logicalFile.discardWithCheckpointId(checkpointId);
+ logicalFileIterator.remove();
+ }
+ }
+
+ if (logicalFiles.isEmpty()) {
+ discardCheckpoint(checkpointId);
+ return true;
+ }
+ return false;
+ }
+
// ------------------------------------------------------------------------
// file system
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
index 3a02427547fe4..c8b3573a13fcb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
@@ -50,6 +50,47 @@ public WithinCheckpointFileMergingSnapshotManager(String id, Executor ioExecutor
writablePhysicalFilePool = new HashMap<>();
}
+ // ------------------------------------------------------------------------
+ // CheckpointListener
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyCheckpointComplete(SubtaskKey subtaskKey, long checkpointId)
+ throws Exception {
+ super.notifyCheckpointComplete(subtaskKey, checkpointId);
+ removeAndCloseFiles(subtaskKey, checkpointId);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) throws Exception {
+ super.notifyCheckpointAborted(subtaskKey, checkpointId);
+ removeAndCloseFiles(subtaskKey, checkpointId);
+ }
+
+ /**
+ * Remove files that belongs to specific subtask and checkpoint from the reuse pool. And close
+ * these files. TODO: Refactor this in FLINK-32076.
+ */
+ private void removeAndCloseFiles(SubtaskKey subtaskKey, long checkpointId) throws Exception {
+ Tuple3 fileKey =
+ Tuple3.of(checkpointId, subtaskKey, CheckpointedStateScope.SHARED);
+ PhysicalFile file;
+ synchronized (writablePhysicalFilePool) {
+ file = writablePhysicalFilePool.remove(fileKey);
+ }
+ if (file != null) {
+ file.close();
+ }
+
+ fileKey = Tuple3.of(checkpointId, DUMMY_SUBTASK_KEY, CheckpointedStateScope.EXCLUSIVE);
+ synchronized (writablePhysicalFilePool) {
+ file = writablePhysicalFilePool.remove(fileKey);
+ }
+ if (file != null) {
+ file.close();
+ }
+ }
+
@Override
@Nonnull
protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
@@ -98,4 +139,9 @@ protected void returnPhysicalFileForNextReuse(
physicalFile.close();
}
}
+
+ @Override
+ protected void discardCheckpoint(long checkpointId) throws IOException {
+ // TODO: Discard the whole file pool for checkpoint id (When there is one after FLINK-32076)
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
index b3c91791228fc..3fdb05cebdeb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
@@ -116,5 +116,6 @@ void reportIncompleteTaskStateSnapshots(
StateChangelogStorageView> getStateChangelogStorageView(
Configuration configuration, ChangelogStateHandle changelogStateHandle);
+ @Nullable
FileMergingSnapshotManager getFileMergingSnapshotManager();
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
index ccfe5b3bbd308..a2f7560d653dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java
@@ -24,7 +24,6 @@
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -60,10 +59,7 @@ public FsMergingCheckpointStorageAccess(
fileSizeThreshold,
writeBufferSize);
this.fileMergingSnapshotManager = fileMergingSnapshotManager;
- this.subtaskKey =
- new SubtaskKey(
- OperatorID.fromJobVertexID(environment.getJobVertexId()),
- environment.getTaskInfo());
+ this.subtaskKey = SubtaskKey.of(environment);
}
@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
index a6e1fbbbc87d8..d947a884dbe2a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
@@ -358,6 +358,38 @@ public void testConcurrentWriting() throws Exception {
}
}
+ @Test
+ public void testCheckpointNotification() throws Exception {
+ try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
+ CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+ FileMergingCheckpointStateOutputStream cp1Stream =
+ writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 1);
+ assertFileInManagedDir(fmsm, cp1StateHandle);
+
+ // complete checkpoint-2
+ FileMergingCheckpointStateOutputStream cp2Stream =
+ writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
+ fmsm.notifyCheckpointComplete(subtaskKey1, 2);
+ assertFileInManagedDir(fmsm, cp2StateHandle);
+
+ // subsume checkpoint-1
+ assertThat(fileExists(cp1StateHandle)).isTrue();
+ fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
+ assertThat(fileExists(cp1StateHandle)).isFalse();
+
+ // abort checkpoint-3
+ FileMergingCheckpointStateOutputStream cp3Stream =
+ writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
+ SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
+ assertFileInManagedDir(fmsm, cp3StateHandle);
+ fmsm.notifyCheckpointAborted(subtaskKey1, 3);
+ assertThat(fileExists(cp3StateHandle)).isFalse();
+ }
+ }
+
private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir)
throws IOException {
FileSystem fs = LocalFileSystem.getSharedInstance();
@@ -384,4 +416,42 @@ private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpo
assertThat(fmsm).isNotNull();
return fmsm;
}
+
+ private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+ long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
+ throws IOException {
+ return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32);
+ }
+
+ private FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
+ long checkpointId,
+ FileMergingSnapshotManager fmsm,
+ CloseableRegistry closeableRegistry,
+ int numBytes)
+ throws IOException {
+ FileMergingCheckpointStateOutputStream stream =
+ fmsm.createCheckpointStateOutputStream(
+ subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
+ closeableRegistry.registerCloseable(stream);
+ for (int i = 0; i < numBytes; i++) {
+ stream.write(i);
+ }
+ return stream;
+ }
+
+ private void assertFileInManagedDir(
+ FileMergingSnapshotManager fmsm, SegmentFileStateHandle stateHandle) {
+ assertThat(fmsm instanceof FileMergingSnapshotManagerBase).isTrue();
+ assertThat(stateHandle).isNotNull();
+ Path filePath = stateHandle.getFilePath();
+ assertThat(filePath).isNotNull();
+ assertThat(((FileMergingSnapshotManagerBase) fmsm).isResponsibleForFile(filePath)).isTrue();
+ }
+
+ private boolean fileExists(SegmentFileStateHandle stateHandle) throws IOException {
+ assertThat(stateHandle).isNotNull();
+ Path filePath = stateHandle.getFilePath();
+ assertThat(filePath).isNotNull();
+ return filePath.getFileSystem().exists(filePath);
+ }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 8161634308535..d9603c8993d11 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -53,6 +54,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
@@ -127,6 +129,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
*/
private long alignmentCheckpointId;
+ @Nullable private final FileMergingSnapshotManager fileMergingSnapshotManager;
+
SubtaskCheckpointCoordinatorImpl(
CheckpointStorage checkpointStorage,
CheckpointStorageWorkerView checkpointStorageView,
@@ -157,7 +161,8 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
taskName, checkpointStorage, env, maxSubtasksPerChannelStateFile)
: ChannelStateWriter.NO_OP,
enableCheckpointAfterTasksFinished,
- registerTimer);
+ registerTimer,
+ env.getTaskStateManager().getFileMergingSnapshotManager());
}
@VisibleForTesting
@@ -175,6 +180,36 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
ChannelStateWriter channelStateWriter,
boolean enableCheckpointAfterTasksFinished,
DelayableTimer registerTimer) {
+ this(
+ checkpointStorage,
+ taskName,
+ actionExecutor,
+ asyncOperationsThreadPool,
+ env,
+ asyncExceptionHandler,
+ prepareInputSnapshot,
+ maxRecordAbortedCheckpoints,
+ channelStateWriter,
+ enableCheckpointAfterTasksFinished,
+ registerTimer,
+ null);
+ }
+
+ SubtaskCheckpointCoordinatorImpl(
+ CheckpointStorageWorkerView checkpointStorage,
+ String taskName,
+ StreamTaskActionExecutor actionExecutor,
+ ExecutorService asyncOperationsThreadPool,
+ Environment env,
+ AsyncExceptionHandler asyncExceptionHandler,
+ BiFunctionWithException<
+ ChannelStateWriter, Long, CompletableFuture, CheckpointException>
+ prepareInputSnapshot,
+ int maxRecordAbortedCheckpoints,
+ ChannelStateWriter channelStateWriter,
+ boolean enableCheckpointAfterTasksFinished,
+ DelayableTimer registerTimer,
+ FileMergingSnapshotManager fileMergingSnapshotManager) {
this.checkpointStorage =
new CachingCheckpointStorageWorkerView(checkNotNull(checkpointStorage));
this.taskName = checkNotNull(taskName);
@@ -194,6 +229,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
this.enableCheckpointAfterTasksFinished = enableCheckpointAfterTasksFinished;
this.registerTimer = registerTimer;
this.clock = SystemClock.getInstance();
+ this.fileMergingSnapshotManager = fileMergingSnapshotManager;
}
private static ChannelStateWriter openChannelStateWriter(
@@ -484,6 +520,7 @@ private void notifyCheckpoint(
case COMPLETE:
env.getTaskStateManager().notifyCheckpointComplete(checkpointId);
}
+ notifyFileMergingSnapshotManagerCheckpoint(checkpointId, notifyCheckpointOperation);
} catch (Exception e) {
previousException = ExceptionUtils.firstOrSuppressed(e, previousException);
}
@@ -492,6 +529,27 @@ private void notifyCheckpoint(
ExceptionUtils.tryRethrowException(previousException);
}
+ private void notifyFileMergingSnapshotManagerCheckpoint(
+ long checkpointId, Task.NotifyCheckpointOperation notifyCheckpointOperation)
+ throws Exception {
+ if (fileMergingSnapshotManager != null) {
+ switch (notifyCheckpointOperation) {
+ case ABORT:
+ fileMergingSnapshotManager.notifyCheckpointAborted(
+ FileMergingSnapshotManager.SubtaskKey.of(env), checkpointId);
+ break;
+ case COMPLETE:
+ fileMergingSnapshotManager.notifyCheckpointComplete(
+ FileMergingSnapshotManager.SubtaskKey.of(env), checkpointId);
+ break;
+ case SUBSUME:
+ fileMergingSnapshotManager.notifyCheckpointSubsumed(
+ FileMergingSnapshotManager.SubtaskKey.of(env), checkpointId);
+ break;
+ }
+ }
+ }
+
@Override
public void initInputsCheckpoint(long id, CheckpointOptions checkpointOptions)
throws CheckpointException {