Skip to content

Commit

Permalink
[FLINK-34936][Checkpointing] Register reused shared state handle to F…
Browse files Browse the repository at this point in the history
…ileMergingSnapshotManager
  • Loading branch information
Zakelly committed Apr 16, 2024
1 parent 68cc61a commit 78f40ae
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;

import java.io.Closeable;
import java.util.Collection;

/**
* FileMergingSnapshotManager provides an interface to manage files and meta information for
Expand Down Expand Up @@ -157,6 +159,16 @@ DirectoryStreamStateHandle getManagedDirStateHandle(
*/
void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* A callback method which is called when previous state handles are reused by following
* checkpoint(s).
*
* @param checkpointId the checkpoint that reuses the handles.
* @param stateHandles the handles to be reused.
*/
void reusePreviousStateHandle(
long checkpointId, Collection<? extends StreamStateHandle> stateHandles);

/**
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
* the parallelism. Note that this key should be consistent across job attempts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
Expand All @@ -42,6 +43,7 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
@GuardedBy("lock")
protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();

/** The map that holds all the known live logical files. */
private final Map<LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap<>();

/** The {@link FileSystem} that this manager works on. */
protected FileSystem fs;

Expand Down Expand Up @@ -206,7 +211,9 @@ protected LogicalFile createLogicalFile(
long length,
@Nonnull SubtaskKey subtaskKey) {
LogicalFileId fileID = LogicalFileId.generateRandomId();
return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
knownLogicalFiles.put(fileID, file);
return file;
}

/**
Expand Down Expand Up @@ -300,7 +307,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);

return new SegmentFileStateHandle(
physicalFile.getFilePath(), startPos, stateSize, scope);
physicalFile.getFilePath(),
startPos,
stateSize,
scope,
logicalFile.getFileId());
}
}

Expand Down Expand Up @@ -459,13 +470,28 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
uploadedStates.headMap(checkpointId, true).entrySet().iterator();
while (uploadedStatesIterator.hasNext()) {
Map.Entry<Long, Set<LogicalFile>> entry = uploadedStatesIterator.next();
if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) {
if (discardLogicalFiles(subtaskKey, checkpointId, entry.getValue())) {
uploadedStatesIterator.remove();
}
}
}
}

@Override
public void reusePreviousStateHandle(
long checkpointId, Collection<? extends StreamStateHandle> stateHandles) {
for (StreamStateHandle stateHandle : stateHandles) {
if (stateHandle instanceof SegmentFileStateHandle) {
LogicalFile file =
knownLogicalFiles.get(
((SegmentFileStateHandle) stateHandle).getLogicalFileId());
if (file != null) {
file.advanceLastCheckpointId(checkpointId);
}
}
}
}

private boolean discardLogicalFiles(
SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles)
throws Exception {
Expand All @@ -476,6 +502,7 @@ private boolean discardLogicalFiles(
&& logicalFile.getLastUsedCheckpointID() <= checkpointId) {
logicalFile.discardWithCheckpointId(checkpointId);
logicalFileIterator.remove();
knownLogicalFiles.remove(logicalFile.getFileId());
}
}

Expand Down Expand Up @@ -547,4 +574,9 @@ private void createManagedDirectory(Path managedPath) {

@Override
public void close() throws IOException {}

@VisibleForTesting
public LogicalFile getLogicalFile(LogicalFileId fileId) {
return knownLogicalFiles.get(fileId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
Expand Down Expand Up @@ -745,6 +746,7 @@ static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutput
dos.writeLong(segmentFileStateHandle.getStateSize());
dos.writeInt(segmentFileStateHandle.getScope().ordinal());
dos.writeUTF(segmentFileStateHandle.getFilePath().toString());
dos.writeUTF(segmentFileStateHandle.getLogicalFileId().getKeyString());
}
} else if (stateHandle instanceof FileStateHandle) {
dos.writeByte(FILE_STREAM_STATE_HANDLE);
Expand Down Expand Up @@ -819,7 +821,9 @@ static StreamStateHandle deserializeStreamStateHandle(
long stateSize = dis.readLong();
CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()];
Path physicalFilePath = new Path(dis.readUTF());
return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope);
LogicalFile.LogicalFileId logicalFileId = new LogicalFile.LogicalFileId(dis.readUTF());
return new SegmentFileStateHandle(
physicalFilePath, startPos, stateSize, scope, logicalFileId);
} else if (EMPTY_SEGMENT_FILE_HANDLE == type) {
return EmptySegmentFileStateHandle.INSTANCE;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -67,4 +68,15 @@ boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope s
*/
List<StreamStateHandle> duplicate(
List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException;

/**
* A callback method when some previous handle is reused. It is needed by the file merging
* mechanism (FLIP-306) which will manage the life cycle of underlying files by file-reusing
* information.
*
* @param previousHandle the previous handles that will be reused.
*/
default void reusePreviousStateHandle(Collection<? extends StreamStateHandle> previousHandle) {
// Does nothing for normal stream factory
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import java.io.IOException;
Expand All @@ -31,11 +32,19 @@ public class EmptySegmentFileStateHandle extends SegmentFileStateHandle {

public static final EmptySegmentFileStateHandle INSTANCE =
new EmptySegmentFileStateHandle(
new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE);
new Path("empty"),
0,
0,
CheckpointedStateScope.EXCLUSIVE,
new LogicalFile.LogicalFileId("DUMMY"));

private EmptySegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
Path filePath,
long startPos,
long stateSize,
CheckpointedStateScope scope,
LogicalFile.LogicalFileId fileId) {
super(filePath, startPos, stateSize, scope, fileId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,29 @@ public class SegmentFileStateHandle implements StreamStateHandle {
/** The scope of the state. */
private final CheckpointedStateScope scope;

/** The id for corresponding logical file. Used to retrieve LogicalFile in TM. */
private final LogicalFile.LogicalFileId logicalFileId;

/**
* Creates a new segment file state for the given file path.
*
* @param filePath The path to the file that stores the state.
* @param startPos Start position of the segment in the physical file.
* @param stateSize Size of the segment.
* @param scope The state's scope, whether it is exclusive or shared.
* @param fileId The corresponding logical file id.
*/
public SegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
Path filePath,
long startPos,
long stateSize,
CheckpointedStateScope scope,
LogicalFile.LogicalFileId fileId) {
this.filePath = filePath;
this.stateSize = stateSize;
this.startPos = startPos;
this.scope = scope;
this.logicalFileId = fileId;
}

/**
Expand Down Expand Up @@ -118,6 +127,10 @@ public CheckpointedStateScope getScope() {
return scope;
}

public LogicalFile.LogicalFileId getLogicalFileId() {
return logicalFileId;
}

/**
* Gets the file system that stores the file state.
*
Expand All @@ -142,15 +155,17 @@ public boolean equals(Object o) {

SegmentFileStateHandle that = (SegmentFileStateHandle) o;

return filePath.equals(that.filePath)
return logicalFileId.equals(that.logicalFileId)
&& filePath.equals(that.filePath)
&& startPos == that.startPos
&& stateSize == that.stateSize
&& scope.equals(that.scope);
}

@Override
public int hashCode() {
int result = getFilePath().hashCode();
int result = logicalFileId.hashCode();
result = 31 * result + Objects.hashCode(getFilePath());
result = 31 * result + Objects.hashCode(startPos);
result = 31 * result + Objects.hashCode(stateSize);
result = 31 * result + Objects.hashCode(scope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -113,4 +114,9 @@ public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(
return fileMergingSnapshotManager.createCheckpointStateOutputStream(
subtaskKey, checkpointId, scope);
}

@Override
public void reusePreviousStateHandle(Collection<? extends StreamStateHandle> previousHandle) {
fileMergingSnapshotManager.reusePreviousStateHandle(checkpointId, previousHandle);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -319,6 +320,67 @@ public void testConcurrentFileReusingWithBlockingPool() throws Exception {
}
}

@Test
public void testReuseCallbackAndAdvanceWatermark() throws Exception {
long checkpointId = 1;
int streamNum = 20;
int perStreamWriteNum = 128;

// write random bytes and then read them from the file
byte[] bytes = new byte[streamNum * perStreamWriteNum];
Random rd = new Random();
rd.nextBytes(bytes);
int byteIndex = 0;

SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum];
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
fmsm.registerSubtaskForSharedStates(subtaskKey1);

// repeatedly get-write-close streams
for (int i = 0; i < streamNum; i++) {
FileMergingCheckpointStateOutputStream stream =
fmsm.createCheckpointStateOutputStream(
subtaskKey1, checkpointId, CheckpointedStateScope.SHARED);
try {
closeableRegistry.registerCloseable(stream);
for (int j = 0; j < perStreamWriteNum; j++) {
stream.write(bytes[byteIndex++]);
}
handles[i] = stream.closeAndGetHandle();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// start reuse
for (long cp = checkpointId + 1; cp <= 10; cp++) {
ArrayList<SegmentFileStateHandle> reuse = new ArrayList<>();
for (int j = 0; j <= 10 - cp; j++) {
reuse.add(handles[j]);
}
fmsm.reusePreviousStateHandle(cp, reuse);
// assert the reusing affects the watermark
for (SegmentFileStateHandle handle : reuse) {
assertThat(
((FileMergingSnapshotManagerBase) fmsm)
.getLogicalFile(handle.getLogicalFileId())
.getLastUsedCheckpointID())
.isEqualTo(cp);
}
// subsumed
fmsm.notifyCheckpointSubsumed(subtaskKey1, cp - 1);
// assert the other files discarded.
for (int j = 10 - (int) cp + 1; j < streamNum; j++) {
assertThat(
((FileMergingSnapshotManagerBase) fmsm)
.getLogicalFile(handles[j].getLogicalFileId()))
.isNull();
}
}
}
}

FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir)
throws IOException {
return createFileMergingSnapshotManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DiscardRecordedStateObject;
Expand Down Expand Up @@ -363,7 +364,12 @@ private static class TestingSegmentFileStateHandle extends SegmentFileStateHandl

public TestingSegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
super(
filePath,
startPos,
stateSize,
scope,
LogicalFile.LogicalFileId.generateRandomId());
}

@Override
Expand Down
Loading

0 comments on commit 78f40ae

Please sign in to comment.