Skip to content

Commit

Permalink
[FLINK-35379][Checkpoint] Fix incorrect checkpoint notification handl…
Browse files Browse the repository at this point in the history
…ing in file merging
  • Loading branch information
Zakelly committed May 17, 2024
1 parent cc21eec commit 4a6305f
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ void initFileSystem(
*/
void registerSubtaskForSharedStates(SubtaskKey subtaskKey);

/**
* Unregister a subtask.
*
* @param subtaskKey the subtask key identifying a subtask.
*/
void unregisterSubtask(SubtaskKey subtaskKey);

/**
* Create a new {@link FileMergingCheckpointStateOutputStream}. According to the file merging
* strategy, the streams returned by multiple calls to this function may share the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -62,6 +63,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps

private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);

/** The number of recent checkpoints whose IDs are remembered. */
private static final int NUM_GHOST_CHECKPOINT_IDS = 16;

/** The identifier of this manager. */
private final String id;

Expand Down Expand Up @@ -109,6 +113,14 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps

protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;

private final Object notifyLock = new Object();

@GuardedBy("notifyLock")
private final Map<Long, Set<SubtaskKey>> notifiedSubtaskCheckpoint = new HashMap<>();

@GuardedBy("notifyLock")
private final TreeSet<Long> notifiedCheckpoint = new TreeSet<>();

/**
* Currently the shared state files are merged within each subtask, files are split by different
* directories.
Expand Down Expand Up @@ -198,6 +210,14 @@ public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
}
}

@Override
public void unregisterSubtask(SubtaskKey subtaskKey) {
if (managedSharedStateDir.containsKey(subtaskKey)) {
managedSharedStateDir.remove(subtaskKey);
managedSharedStateDirHandles.remove(subtaskKey);
}
}

// ------------------------------------------------------------------------
// logical & physical file
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -470,6 +490,7 @@ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) th
uploadedStates.remove(checkpointId);
}
}
notifyReleaseCheckpoint(subtaskKey, checkpointId);
}

@Override
Expand All @@ -485,6 +506,36 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
}
}
}
notifyReleaseCheckpoint(subtaskKey, checkpointId);
}

private void notifyReleaseCheckpoint(SubtaskKey subtaskKey, long checkpointId)
throws IOException {
synchronized (notifyLock) {
if (notifiedCheckpoint.contains(checkpointId)) {
// already release, skip
return;
}
Set<SubtaskKey> knownSubtask =
notifiedSubtaskCheckpoint.computeIfAbsent(checkpointId, (e) -> new HashSet<>());
knownSubtask.add(subtaskKey);
if (knownSubtask.containsAll(managedSharedStateDir.keySet())) {
// all known subtask has been notified.
tryDiscardCheckpoint(checkpointId);
}
}
}

private void tryDiscardCheckpoint(long checkpointId) throws IOException {
synchronized (notifyLock) {
if (!notifiedCheckpoint.contains(checkpointId)) {
notifiedCheckpoint.add(checkpointId);
discardCheckpoint(checkpointId);
if (notifiedCheckpoint.size() > NUM_GHOST_CHECKPOINT_IDS) {
notifiedCheckpoint.pollFirst();
}
}
}
}

@Override
Expand Down Expand Up @@ -525,7 +576,7 @@ private boolean discardLogicalFiles(
}

if (logicalFiles.isEmpty()) {
discardCheckpoint(checkpointId);
tryDiscardCheckpoint(checkpointId);
return true;
}
return false;
Expand Down Expand Up @@ -677,4 +728,9 @@ public LogicalFile getLogicalFile(LogicalFileId fileId) {
TreeMap<Long, Set<LogicalFile>> getUploadedStates() {
return uploadedStates;
}

@VisibleForTesting
boolean isCheckpointDiscard(long checkpointId) {
return notifiedCheckpoint.contains(checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;

/** An implementation of file merging checkpoint storage to file systems. */
public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess {
public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess
implements Closeable {

/** FileMergingSnapshotManager manages files and meta information for checkpoints. */
private final FileMergingSnapshotManager fileMergingSnapshotManager;
Expand Down Expand Up @@ -112,4 +114,10 @@ public CheckpointStreamFactory resolveCheckpointStorageLocation(
checkpointId);
}
}

/** This will be registered to resource closer of {@code StreamTask}. */
@Override
public void close() {
fileMergingSnapshotManager.unregisterSubtask(subtaskKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.function.BiFunctionWithException;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -159,40 +159,76 @@ public void testCheckpointNotification() throws Exception {
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.registerSubtaskForSharedStates(subtaskKey1);
fmsm.registerSubtaskForSharedStates(subtaskKey2);
BiFunctionWithException<
FileMergingSnapshotManager.SubtaskKey,
Long,
SegmentFileStateHandle,
Exception>
writer =
((subtaskKey, checkpointId) -> {
return writeCheckpointAndGetStream(
subtaskKey,
checkpointId,
CheckpointedStateScope.SHARED,
fmsm,
closeableRegistry)
.closeAndGetHandle();
});

SegmentFileStateHandle cp1StateHandle1 = writer.apply(subtaskKey1, 1L);
SegmentFileStateHandle cp1StateHandle2 = writer.apply(subtaskKey2, 1L);
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertFileInManagedDir(fmsm, cp1StateHandle1);
assertFileInManagedDir(fmsm, cp1StateHandle2);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);

// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
SegmentFileStateHandle cp2StateHandle1 = writer.apply(subtaskKey1, 2L);
SegmentFileStateHandle cp2StateHandle2 = writer.apply(subtaskKey2, 2L);
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
fmsm.notifyCheckpointComplete(subtaskKey2, 2);
assertFileInManagedDir(fmsm, cp2StateHandle1);
assertFileInManagedDir(fmsm, cp2StateHandle2);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4);

assertThat(fmsm.isCheckpointDiscard(1)).isFalse();

// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
assertThat(fileExists(cp1StateHandle1)).isTrue();
assertThat(fileExists(cp1StateHandle2)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertThat(fileExists(cp1StateHandle1)).isTrue();
assertThat(fileExists(cp1StateHandle2)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3);

// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.isCheckpointDiscard(1)).isFalse();
fmsm.notifyCheckpointSubsumed(subtaskKey2, 1);
assertThat(fmsm.isCheckpointDiscard(1)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
assertFileInManagedDir(fmsm, cp3StateHandle);

// abort checkpoint-3
SegmentFileStateHandle cp3StateHandle1 = writer.apply(subtaskKey1, 3L);
SegmentFileStateHandle cp3StateHandle2 = writer.apply(subtaskKey2, 3L);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4);
assertFileInManagedDir(fmsm, cp3StateHandle1);
assertFileInManagedDir(fmsm, cp3StateHandle2);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertThat(fileExists(cp3StateHandle1)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3);

assertThat(fmsm.isCheckpointDiscard(3)).isFalse();
fmsm.notifyCheckpointAborted(subtaskKey2, 3);
assertThat(fmsm.isCheckpointDiscard(3)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,36 @@ FileMergingSnapshotManager createFileMergingSnapshotManager(
FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32);
return writeCheckpointAndGetStream(
subtaskKey1,
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
fmsm,
closeableRegistry,
32);
}

FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(
subtaskKey, checkpointId, scope, fmsm, closeableRegistry, 32);
}

FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry,
int numBytes)
throws IOException {
FileMergingCheckpointStateOutputStream stream =
fmsm.createCheckpointStateOutputStream(
subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
fmsm.createCheckpointStateOutputStream(subtaskKey, checkpointId, scope);
closeableRegistry.registerCloseable(stream);
for (int i = 0; i < numBytes; i++) {
stream.write(i);
Expand Down
Loading

0 comments on commit 4a6305f

Please sign in to comment.