Skip to content

Commit

Permalink
[FLINK-35379][Checkpoint] Fix incorrect checkpoint notification handl…
Browse files Browse the repository at this point in the history
…ing in file merging
  • Loading branch information
Zakelly committed May 20, 2024
1 parent f5a8edb commit d934f89
Show file tree
Hide file tree
Showing 10 changed files with 243 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ void initFileSystem(
*/
void registerSubtaskForSharedStates(SubtaskKey subtaskKey);

/**
* Unregister a subtask.
*
* @param subtaskKey the subtask key identifying a subtask.
*/
void unregisterSubtask(SubtaskKey subtaskKey);

/**
* Create a new {@link FileMergingCheckpointStateOutputStream}. According to the file merging
* strategy, the streams returned by multiple calls to this function may share the same
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
Expand All @@ -50,6 +51,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -62,6 +64,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps

private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class);

/** The number of recent checkpoints whose IDs are remembered. */
private static final int NUM_GHOST_CHECKPOINT_IDS = 16;

/** The identifier of this manager. */
private final String id;

Expand Down Expand Up @@ -109,6 +114,14 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps

protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile;

private final Object notifyLock = new Object();

@GuardedBy("notifyLock")
private final Map<Long, Set<SubtaskKey>> notifiedSubtaskCheckpoint = new HashMap<>();

@GuardedBy("notifyLock")
private final TreeSet<Long> notifiedCheckpoint = new TreeSet<>();

/**
* Currently the shared state files are merged within each subtask, files are split by different
* directories.
Expand Down Expand Up @@ -198,6 +211,14 @@ public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
}
}

@Override
public void unregisterSubtask(SubtaskKey subtaskKey) {
if (managedSharedStateDir.containsKey(subtaskKey)) {
managedSharedStateDir.remove(subtaskKey);
managedSharedStateDirHandles.remove(subtaskKey);
}
}

// ------------------------------------------------------------------------
// logical & physical file
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -470,6 +491,7 @@ public void notifyCheckpointAborted(SubtaskKey subtaskKey, long checkpointId) th
uploadedStates.remove(checkpointId);
}
}
notifyReleaseCheckpoint(subtaskKey, checkpointId);
}

@Override
Expand All @@ -485,6 +507,36 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
}
}
}
notifyReleaseCheckpoint(subtaskKey, checkpointId);
}

private void notifyReleaseCheckpoint(SubtaskKey subtaskKey, long checkpointId)
throws IOException {
synchronized (notifyLock) {
if (notifiedCheckpoint.contains(checkpointId)) {
// already release, skip
return;
}
Set<SubtaskKey> knownSubtask =
notifiedSubtaskCheckpoint.computeIfAbsent(checkpointId, (e) -> new HashSet<>());
knownSubtask.add(subtaskKey);
if (knownSubtask.containsAll(managedSharedStateDir.keySet())) {
// all known subtask has been notified.
tryDiscardCheckpoint(checkpointId);
}
}
}

private void tryDiscardCheckpoint(long checkpointId) throws IOException {
synchronized (notifyLock) {
if (!notifiedCheckpoint.contains(checkpointId)) {
notifiedCheckpoint.add(checkpointId);
discardCheckpoint(checkpointId);
if (notifiedCheckpoint.size() > NUM_GHOST_CHECKPOINT_IDS) {
notifiedCheckpoint.pollFirst();
}
}
}
}

@Override
Expand All @@ -498,6 +550,14 @@ public void reusePreviousStateHandle(
if (file != null) {
file.advanceLastCheckpointId(checkpointId);
}
} else if (stateHandle instanceof PlaceholderStreamStateHandle) {
LogicalFile file =
knownLogicalFiles.get(
new LogicalFileId(
stateHandle.getStreamStateHandleID().getKeyString()));
if (file != null) {
file.advanceLastCheckpointId(checkpointId);
}
}
}
}
Expand Down Expand Up @@ -525,7 +585,7 @@ private boolean discardLogicalFiles(
}

if (logicalFiles.isEmpty()) {
discardCheckpoint(checkpointId);
tryDiscardCheckpoint(checkpointId);
return true;
}
return false;
Expand Down Expand Up @@ -677,4 +737,9 @@ public LogicalFile getLogicalFile(LogicalFileId fileId) {
TreeMap<Long, Set<LogicalFile>> getUploadedStates() {
return uploadedStates;
}

@VisibleForTesting
boolean isCheckpointDiscard(long checkpointId) {
return notifiedCheckpoint.contains(checkpointId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringBasedID;

import javax.annotation.Nonnull;
Expand All @@ -42,10 +41,6 @@ public LogicalFileId(String keyString) {
super(keyString);
}

public Path getFilePath() {
return new Path(getKeyString());
}

public static LogicalFileId generateRandomId() {
return new LogicalFileId(UUID.randomUUID().toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.EmptySegmentFileStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -93,8 +94,12 @@ public void restore() {
Stream<SegmentFileStateHandle> segmentStateHandles =
Stream.of(keyedStateHandles, operatorStateHandles)
.flatMap(Function.identity())
.filter(handle -> handle instanceof SegmentFileStateHandle)
.filter(
handle ->
(handle instanceof SegmentFileStateHandle)
&& !(handle instanceof EmptySegmentFileStateHandle))
.map(handle -> (SegmentFileStateHandle) handle);
System.out.println("Restoring from checkpoint " + checkpointId);
fileMergingSnapshotManager.restoreStateHandles(
checkpointId, subtaskKey, segmentStateHandles);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,7 @@ public Optional<byte[]> asBytesIfInMemory() {

@Override
public PhysicalStateHandleID getStreamStateHandleID() {
return new PhysicalStateHandleID(
String.format("%s-%d-%d", filePath.toUri(), startPos, stateSize));
return new PhysicalStateHandleID(logicalFileId.getKeyString());
}

public long getStartPos() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;

/** An implementation of file merging checkpoint storage to file systems. */
public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess {
public class FsMergingCheckpointStorageAccess extends FsCheckpointStorageAccess
implements Closeable {

/** FileMergingSnapshotManager manages files and meta information for checkpoints. */
private final FileMergingSnapshotManager fileMergingSnapshotManager;
Expand Down Expand Up @@ -112,4 +114,10 @@ public CheckpointStreamFactory resolveCheckpointStorageLocation(
checkpointId);
}
}

/** This will be registered to resource closer of {@code StreamTask}. */
@Override
public void close() {
fileMergingSnapshotManager.unregisterSubtask(subtaskKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.util.function.BiFunctionWithException;

import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -159,40 +159,76 @@ public void testCheckpointNotification() throws Exception {
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.registerSubtaskForSharedStates(subtaskKey1);
fmsm.registerSubtaskForSharedStates(subtaskKey2);
BiFunctionWithException<
FileMergingSnapshotManager.SubtaskKey,
Long,
SegmentFileStateHandle,
Exception>
writer =
((subtaskKey, checkpointId) -> {
return writeCheckpointAndGetStream(
subtaskKey,
checkpointId,
CheckpointedStateScope.SHARED,
fmsm,
closeableRegistry)
.closeAndGetHandle();
});

SegmentFileStateHandle cp1StateHandle1 = writer.apply(subtaskKey1, 1L);
SegmentFileStateHandle cp1StateHandle2 = writer.apply(subtaskKey2, 1L);
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertFileInManagedDir(fmsm, cp1StateHandle1);
assertFileInManagedDir(fmsm, cp1StateHandle2);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);

// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
SegmentFileStateHandle cp2StateHandle1 = writer.apply(subtaskKey1, 2L);
SegmentFileStateHandle cp2StateHandle2 = writer.apply(subtaskKey2, 2L);
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
fmsm.notifyCheckpointComplete(subtaskKey2, 2);
assertFileInManagedDir(fmsm, cp2StateHandle1);
assertFileInManagedDir(fmsm, cp2StateHandle2);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4);

assertThat(fmsm.isCheckpointDiscard(1)).isFalse();

// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
assertThat(fileExists(cp1StateHandle1)).isTrue();
assertThat(fileExists(cp1StateHandle2)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertThat(fileExists(cp1StateHandle1)).isTrue();
assertThat(fileExists(cp1StateHandle2)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3);

// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.isCheckpointDiscard(1)).isFalse();
fmsm.notifyCheckpointSubsumed(subtaskKey2, 1);
assertThat(fmsm.isCheckpointDiscard(1)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
assertFileInManagedDir(fmsm, cp3StateHandle);

// abort checkpoint-3
SegmentFileStateHandle cp3StateHandle1 = writer.apply(subtaskKey1, 3L);
SegmentFileStateHandle cp3StateHandle2 = writer.apply(subtaskKey2, 3L);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(4);
assertFileInManagedDir(fmsm, cp3StateHandle1);
assertFileInManagedDir(fmsm, cp3StateHandle2);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
assertThat(fileExists(cp3StateHandle1)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(3);

assertThat(fmsm.isCheckpointDiscard(3)).isFalse();
fmsm.notifyCheckpointAborted(subtaskKey2, 3);
assertThat(fmsm.isCheckpointDiscard(3)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -601,18 +601,36 @@ FileMergingSnapshotManager createFileMergingSnapshotManager(
FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry, 32);
return writeCheckpointAndGetStream(
subtaskKey1,
checkpointId,
CheckpointedStateScope.EXCLUSIVE,
fmsm,
closeableRegistry,
32);
}

FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry)
throws IOException {
return writeCheckpointAndGetStream(
subtaskKey, checkpointId, scope, fmsm, closeableRegistry, 32);
}

FileMergingCheckpointStateOutputStream writeCheckpointAndGetStream(
SubtaskKey subtaskKey,
long checkpointId,
CheckpointedStateScope scope,
FileMergingSnapshotManager fmsm,
CloseableRegistry closeableRegistry,
int numBytes)
throws IOException {
FileMergingCheckpointStateOutputStream stream =
fmsm.createCheckpointStateOutputStream(
subtaskKey1, checkpointId, CheckpointedStateScope.EXCLUSIVE);
fmsm.createCheckpointStateOutputStream(subtaskKey, checkpointId, scope);
closeableRegistry.registerCloseable(stream);
for (int i = 0; i < numBytes; i++) {
stream.write(i);
Expand Down
Loading

0 comments on commit d934f89

Please sign in to comment.