From d934f89d0f32ba3828a364512ed071cc9f210590 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Fri, 17 May 2024 00:26:19 +0800 Subject: [PATCH] [FLINK-35379][Checkpoint] Fix incorrect checkpoint notification handling in file merging --- .../FileMergingSnapshotManager.java | 7 ++ .../FileMergingSnapshotManagerBase.java | 67 +++++++++++++- .../checkpoint/filemerging/LogicalFile.java | 5 -- ...askFileMergingManagerRestoreOperation.java | 7 +- .../filemerging/SegmentFileStateHandle.java | 3 +- .../FsMergingCheckpointStorageAccess.java | 10 ++- ...ckpointFileMergingSnapshotManagerTest.java | 88 ++++++++++++------ .../FileMergingSnapshotManagerTestBase.java | 24 ++++- ...ckpointFileMergingSnapshotManagerTest.java | 90 +++++++++++++------ .../streaming/runtime/tasks/StreamTask.java | 9 +- 10 files changed, 243 insertions(+), 67 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index f3523c4430f62..f5f88c050805b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -101,6 +101,13 @@ void initFileSystem( */ void registerSubtaskForSharedStates(SubtaskKey subtaskKey); + /** + * Unregister a subtask. + * + * @param subtaskKey the subtask key identifying a subtask. + */ + void unregisterSubtask(SubtaskKey subtaskKey); + /** * Create a new {@link FileMergingCheckpointStateOutputStream}. According to the file merging * strategy, the streams returned by multiple calls to this function may share the same diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 0149ff90e0086..63ef4dd9c4cf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; @@ -50,6 +51,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -62,6 +64,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); + /** The number of recent checkpoints whose IDs are remembered. */ + private static final int NUM_GHOST_CHECKPOINT_IDS = 16; + /** The identifier of this manager. */ private final String id; @@ -109,6 +114,14 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; + private final Object notifyLock = new Object(); + + @GuardedBy("notifyLock") + private final Map> notifiedSubtaskCheckpoint = new HashMap<>(); + + @GuardedBy("notifyLock") + private final TreeSet notifiedCheckpoint = new TreeSet<>(); + /** * Currently the shared state files are merged within each subtask, files are split by different * directories. @@ -198,6 +211,14 @@ public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) { } } + @Override + public void unregisterSubtask(SubtaskKey subtaskKey) { + if (managedSharedStateDir.containsKey(subtaskKey)) { + managedSharedStateDir.remove(subtaskKey); + managedSharedStateDirHandles.remove(subtaskKey); + } + } + // ------------------------------------------------------------------------ // logical & physical file // ------------------------------------------------------------------------ @@ -470,6 +491,7 @@ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) th uploadedStates.remove(checkpointId); } } + notifyReleaseCheckpoint(subtaskKey, checkpointId); } @Override @@ -485,6 +507,36 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) } } } + notifyReleaseCheckpoint(subtaskKey, checkpointId); + } + + private void notifyReleaseCheckpoint(SubtaskKey subtaskKey, long checkpointId) + throws IOException { + synchronized (notifyLock) { + if (notifiedCheckpoint.contains(checkpointId)) { + // already release, skip + return; + } + Set knownSubtask = + notifiedSubtaskCheckpoint.computeIfAbsent(checkpointId, (e) -> new HashSet<>()); + knownSubtask.add(subtaskKey); + if (knownSubtask.containsAll(managedSharedStateDir.keySet())) { + // all known subtask has been notified. + tryDiscardCheckpoint(checkpointId); + } + } + } + + private void tryDiscardCheckpoint(long checkpointId) throws IOException { + synchronized (notifyLock) { + if (!notifiedCheckpoint.contains(checkpointId)) { + notifiedCheckpoint.add(checkpointId); + discardCheckpoint(checkpointId); + if (notifiedCheckpoint.size() > NUM_GHOST_CHECKPOINT_IDS) { + notifiedCheckpoint.pollFirst(); + } + } + } } @Override @@ -498,6 +550,14 @@ public void reusePreviousStateHandle( if (file != null) { file.advanceLastCheckpointId(checkpointId); } + } else if (stateHandle instanceof PlaceholderStreamStateHandle) { + LogicalFile file = + knownLogicalFiles.get( + new LogicalFileId( + stateHandle.getStreamStateHandleID().getKeyString())); + if (file != null) { + file.advanceLastCheckpointId(checkpointId); + } } } } @@ -525,7 +585,7 @@ private boolean discardLogicalFiles( } if (logicalFiles.isEmpty()) { - discardCheckpoint(checkpointId); + tryDiscardCheckpoint(checkpointId); return true; } return false; @@ -677,4 +737,9 @@ public LogicalFile getLogicalFile(LogicalFileId fileId) { TreeMap> getUploadedStates() { return uploadedStates; } + + @VisibleForTesting + boolean isCheckpointDiscard(long checkpointId) { + return notifiedCheckpoint.contains(checkpointId); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java index c53a1116a36f1..fa8fc8cc80a67 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.core.fs.Path; import org.apache.flink.util.StringBasedID; import javax.annotation.Nonnull; @@ -42,10 +41,6 @@ public LogicalFileId(String keyString) { super(keyString); } - public Path getFilePath() { - return new Path(getKeyString()); - } - public static LogicalFileId generateRandomId() { return new LogicalFileId(UUID.randomUUID().toString()); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java index 2be6086849c91..7e9ba081e55a0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.util.Preconditions; @@ -93,8 +94,12 @@ public void restore() { Stream segmentStateHandles = Stream.of(keyedStateHandles, operatorStateHandles) .flatMap(Function.identity()) - .filter(handle -> handle instanceof SegmentFileStateHandle) + .filter( + handle -> + (handle instanceof SegmentFileStateHandle) + && !(handle instanceof EmptySegmentFileStateHandle)) .map(handle -> (SegmentFileStateHandle) handle); + System.out.println("Restoring from checkpoint " + checkpointId); fileMergingSnapshotManager.restoreStateHandles( checkpointId, subtaskKey, segmentStateHandles); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index a4b31cc310ef7..9e925841d7657 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -106,8 +106,7 @@ public Optional asBytesIfInMemory() { @Override public PhysicalStateHandleID getStreamStateHandleID() { - return new PhysicalStateHandleID( - String.format("%s-%d-%d", filePath.toUri(), startPos, stateSize)); + return new PhysicalStateHandleID(logicalFileId.getKeyString()); } public long getStartPos() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java index f41a375178587..21a639ae808b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageAccess.java @@ -29,10 +29,12 @@ import javax.annotation.Nullable; +import java.io.Closeable; import java.io.IOException; /** An implementation of file merging checkpoint storage to file systems. */ -public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess { +public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess + implements Closeable { /** FileMergingSnapshotManager manages files and meta information for checkpoints. */ private final FileMergingSnapshotManager fileMergingSnapshotManager; @@ -112,4 +114,10 @@ public CheckpointStreamFactory resolveCheckpointStorageLocation( checkpointId); } } + + /** This will be registered to resource closer of {@code StreamTask}. */ + @Override + public void close() { + fileMergingSnapshotManager.unregisterSubtask(subtaskKey); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java index d4bec90b33537..2f040ab2513f2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java @@ -20,7 +20,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; -import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; +import org.apache.flink.util.function.BiFunctionWithException; import org.junit.jupiter.api.Test; @@ -159,40 +159,76 @@ public void testCheckpointNotification() throws Exception { (FileMergingSnapshotManagerBase) createFileMergingSnapshotManager(checkpointBaseDir); CloseableRegistry closeableRegistry = new CloseableRegistry()) { - FileMergingCheckpointStateOutputStream cp1Stream = - writeCheckpointAndGetStream(1, fmsm, closeableRegistry); - SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + BiFunctionWithException< + FileMergingSnapshotManager.SubtaskKey, + Long, + SegmentFileStateHandle, + Exception> + writer = + ((subtaskKey, checkpointId) -> { + return writeCheckpointAndGetStream( + subtaskKey, + checkpointId, + CheckpointedStateScope.SHARED, + fmsm, + closeableRegistry) + .closeAndGetHandle(); + }); + + SegmentFileStateHandle cp1StateHandle1 = writer.apply(subtaskKey1, 1L); + SegmentFileStateHandle cp1StateHandle2 = writer.apply(subtaskKey2, 1L); fmsm.notifyCheckpointComplete(subtaskKey1, 1); - assertFileInManagedDir(fmsm, cp1StateHandle); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertFileInManagedDir(fmsm, cp1StateHandle1); + assertFileInManagedDir(fmsm, cp1StateHandle2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + // complete checkpoint-2 - FileMergingCheckpointStateOutputStream cp2Stream = - writeCheckpointAndGetStream(2, fmsm, closeableRegistry); - SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); + SegmentFileStateHandle cp2StateHandle1 = writer.apply(subtaskKey1, 2L); + SegmentFileStateHandle cp2StateHandle2 = writer.apply(subtaskKey2, 2L); fmsm.notifyCheckpointComplete(subtaskKey1, 2); - assertFileInManagedDir(fmsm, cp2StateHandle); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + fmsm.notifyCheckpointComplete(subtaskKey2, 2); + assertFileInManagedDir(fmsm, cp2StateHandle1); + assertFileInManagedDir(fmsm, cp2StateHandle2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + + assertThat(fmsm.isCheckpointDiscard(1)).isFalse(); // subsume checkpoint-1 - assertThat(fileExists(cp1StateHandle)).isTrue(); + assertThat(fileExists(cp1StateHandle1)).isTrue(); + assertThat(fileExists(cp1StateHandle2)).isTrue(); fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); - assertThat(fileExists(cp1StateHandle)).isTrue(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertThat(fileExists(cp1StateHandle1)).isTrue(); + assertThat(fileExists(cp1StateHandle2)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); - // abort checkpoint-3 - FileMergingCheckpointStateOutputStream cp3Stream = - writeCheckpointAndGetStream(3, fmsm, closeableRegistry); - SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.isCheckpointDiscard(1)).isFalse(); + fmsm.notifyCheckpointSubsumed(subtaskKey2, 1); + assertThat(fmsm.isCheckpointDiscard(1)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); - assertFileInManagedDir(fmsm, cp3StateHandle); + + // abort checkpoint-3 + SegmentFileStateHandle cp3StateHandle1 = writer.apply(subtaskKey1, 3L); + SegmentFileStateHandle cp3StateHandle2 = writer.apply(subtaskKey2, 3L); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + assertFileInManagedDir(fmsm, cp3StateHandle1); + assertFileInManagedDir(fmsm, cp3StateHandle2); fmsm.notifyCheckpointAborted(subtaskKey1, 3); - assertThat(fileExists(cp3StateHandle)).isTrue(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertThat(fileExists(cp3StateHandle1)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); + + assertThat(fmsm.isCheckpointDiscard(3)).isFalse(); + fmsm.notifyCheckpointAborted(subtaskKey2, 3); + assertThat(fmsm.isCheckpointDiscard(3)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index 5dcd9b8626666..94c32482d0f55 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -601,18 +601,36 @@ FileMergingSnapshotManager createFileMergingSnapshotManager( FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) throws IOException { - return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32); + return writeCheckpointAndGetStream( + subtaskKey1, + checkpointId, + CheckpointedStateScope.EXCLUSIVE, + fmsm, + closeableRegistry, + 32); } FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( + SubtaskKey subtaskKey, long checkpointId, + CheckpointedStateScope scope, + FileMergingSnapshotManager fmsm, + CloseableRegistry closeableRegistry) + throws IOException { + return writeCheckpointAndGetStream( + subtaskKey, checkpointId, scope, fmsm, closeableRegistry, 32); + } + + FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream( + SubtaskKey subtaskKey, + long checkpointId, + CheckpointedStateScope scope, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry, int numBytes) throws IOException { FileMergingCheckpointStateOutputStream stream = - fmsm.createCheckpointStateOutputStream( - subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE); + fmsm.createCheckpointStateOutputStream(subtaskKey, checkpointId, scope); closeableRegistry.registerCloseable(stream); for (int i = 0; i < numBytes; i++) { stream.write(i); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java index 2401fc1430370..ea6df6fe9da6e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java @@ -20,11 +20,12 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; -import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; +import org.apache.flink.util.function.BiFunctionWithException; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.Collections; import static org.assertj.core.api.Assertions.assertThat; @@ -164,42 +165,77 @@ public void testCheckpointNotification() throws Exception { (FileMergingSnapshotManagerBase) createFileMergingSnapshotManager(checkpointBaseDir); CloseableRegistry closeableRegistry = new CloseableRegistry()) { - FileMergingCheckpointStateOutputStream cp1Stream = - writeCheckpointAndGetStream(1, fmsm, closeableRegistry); - SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + BiFunctionWithException< + FileMergingSnapshotManager.SubtaskKey, + Long, + SegmentFileStateHandle, + Exception> + writer = + ((subtaskKey, checkpointId) -> { + return writeCheckpointAndGetStream( + subtaskKey, + checkpointId, + CheckpointedStateScope.SHARED, + fmsm, + closeableRegistry) + .closeAndGetHandle(); + }); + + SegmentFileStateHandle cp1StateHandle1 = writer.apply(subtaskKey1, 1L); + SegmentFileStateHandle cp1StateHandle2 = writer.apply(subtaskKey2, 1L); fmsm.notifyCheckpointComplete(subtaskKey1, 1); - assertFileInManagedDir(fmsm, cp1StateHandle); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertFileInManagedDir(fmsm, cp1StateHandle1); + assertFileInManagedDir(fmsm, cp1StateHandle2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); // complete checkpoint-2 - FileMergingCheckpointStateOutputStream cp2Stream = - writeCheckpointAndGetStream(2, fmsm, closeableRegistry); - SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); + SegmentFileStateHandle cp2StateHandle1 = writer.apply(subtaskKey1, 2L); + SegmentFileStateHandle cp2StateHandle2 = writer.apply(subtaskKey2, 2L); + fmsm.reusePreviousStateHandle(2L, Collections.singleton(cp1StateHandle2)); fmsm.notifyCheckpointComplete(subtaskKey1, 2); - assertFileInManagedDir(fmsm, cp2StateHandle); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + fmsm.notifyCheckpointComplete(subtaskKey2, 2); + assertFileInManagedDir(fmsm, cp2StateHandle1); + assertFileInManagedDir(fmsm, cp2StateHandle2); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + + assertThat(fmsm.isCheckpointDiscard(1)).isFalse(); // subsume checkpoint-1 - assertThat(fileExists(cp1StateHandle)).isTrue(); + assertThat(fileExists(cp1StateHandle1)).isTrue(); + assertThat(fileExists(cp1StateHandle2)).isTrue(); fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); - assertThat(fileExists(cp1StateHandle)).isFalse(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertThat(fileExists(cp1StateHandle1)).isFalse(); + assertThat(fileExists(cp1StateHandle2)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); - // abort checkpoint-3 - FileMergingCheckpointStateOutputStream cp3Stream = - writeCheckpointAndGetStream(3, fmsm, closeableRegistry); - SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.isCheckpointDiscard(1)).isFalse(); + fmsm.notifyCheckpointSubsumed(subtaskKey2, 1); + assertThat(fmsm.isCheckpointDiscard(1)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); - assertFileInManagedDir(fmsm, cp3StateHandle); + // abort checkpoint-3 + SegmentFileStateHandle cp3StateHandle1 = writer.apply(subtaskKey1, 3L); + SegmentFileStateHandle cp3StateHandle2 = writer.apply(subtaskKey2, 3L); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(5); + assertFileInManagedDir(fmsm, cp3StateHandle1); + assertFileInManagedDir(fmsm, cp3StateHandle2); fmsm.notifyCheckpointAborted(subtaskKey1, 3); - assertThat(fileExists(cp3StateHandle)).isFalse(); - assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); - assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertThat(fileExists(cp3StateHandle1)).isFalse(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4); + + assertThat(fmsm.isCheckpointDiscard(3)).isFalse(); + fmsm.notifyCheckpointAborted(subtaskKey2, 3); + assertThat(fmsm.isCheckpointDiscard(3)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 433edf63f255f..e764ca8586003 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -532,7 +532,8 @@ protected StreamTask( ExecutionCheckpointingOptions .ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH), BarrierAlignmentUtil.createRegisterTimerCallback( - mainMailboxExecutor, systemTimerService)); + mainMailboxExecutor, systemTimerService), + environment.getTaskStateManager().getFileMergingSnapshotManager()); resourceCloser.registerCloseable(subtaskCheckpointCoordinator::close); // Register to stop all timers and threads. Should be closed first. @@ -567,6 +568,12 @@ private CheckpointStorageAccess tryApplyFileMergingCheckpoint( checkpointStorageAccess.toFileMergingStorage( fileMergingSnapshotManager, environment); mergingCheckpointStorageAccess.initializeBaseLocationsForCheckpoint(); + if (mergingCheckpointStorageAccess instanceof FsMergingCheckpointStorageAccess) { + resourceCloser.registerCloseable( + () -> + ((FsMergingCheckpointStorageAccess) mergingCheckpointStorageAccess) + .close()); + } return mergingCheckpointStorageAccess; } catch (IOException e) { LOG.warn(