Skip to content

Commit

Permalink
[FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapsho…
Browse files Browse the repository at this point in the history
…tManager

This closes apache#24640.
  • Loading branch information
ljz2051 authored and Zakelly committed May 7, 2024
1 parent ca441b8 commit afe4c79
Show file tree
Hide file tree
Showing 15 changed files with 435 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
Expand Down Expand Up @@ -86,6 +87,12 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera
return prioritizedOperatorSubtaskState;
}

@Override
public Optional<OperatorSubtaskState> getSubtaskJobManagerRestoredState(OperatorID operatorID) {
throw new UnsupportedOperationException(
"Unsupported method for SavepointTaskStateManager.");
}

@Nonnull
@Override
public LocalRecoveryConfig createLocalRecoveryConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand All @@ -27,11 +28,13 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;

import java.io.Closeable;
import java.util.Collection;
import java.util.stream.Stream;

/**
* FileMergingSnapshotManager provides an interface to manage files and meta information for
Expand Down Expand Up @@ -169,11 +172,22 @@ DirectoryStreamStateHandle getManagedDirStateHandle(
void reusePreviousStateHandle(
long checkpointId, Collection<? extends StreamStateHandle> stateHandles);

/**
* Restore and re-register the SegmentFileStateHandles into FileMergingSnapshotManager.
*
* @param checkpointId the restored checkpoint id.
* @param subtaskKey the subtask key identifying the subtask.
* @param stateHandles the restored segment file handles.
*/
void restoreStateHandles(
long checkpointId, SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles);

/**
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
* the parallelism. Note that this key should be consistent across job attempts.
*/
final class SubtaskKey {
final String jobIDString;
final String operatorIDString;
final int subtaskIndex;
final int parallelism;
Expand All @@ -184,26 +198,31 @@ final class SubtaskKey {
*/
final int hashCode;

public SubtaskKey(OperatorID operatorID, TaskInfo taskInfo) {
public SubtaskKey(JobID jobID, OperatorID operatorID, TaskInfo taskInfo) {
this(
jobID.toHexString(),
operatorID.toHexString(),
taskInfo.getIndexOfThisSubtask(),
taskInfo.getNumberOfParallelSubtasks());
}

@VisibleForTesting
public SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) {
public SubtaskKey(
String jobIDString, String operatorIDString, int subtaskIndex, int parallelism) {
this.jobIDString = jobIDString;
this.operatorIDString = operatorIDString;
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
int hash = operatorIDString.hashCode();
int hash = jobIDString.hashCode();
hash = 31 * hash + operatorIDString.hashCode();
hash = 31 * hash + subtaskIndex;
hash = 31 * hash + parallelism;
this.hashCode = hash;
}

public static SubtaskKey of(Environment environment) {
return new SubtaskKey(
environment.getJobID(),
OperatorID.fromJobVertexID(environment.getJobVertexId()),
environment.getTaskInfo());
}
Expand All @@ -214,7 +233,9 @@ public static SubtaskKey of(Environment environment) {
* @return the managed directory name.
*/
public String getManagedDirName() {
return String.format("%s_%d_%d_", operatorIDString, subtaskIndex, parallelism)
return String.format(
"%s_%s_%d_%d_",
jobIDString, operatorIDString, subtaskIndex, parallelism)
.replaceAll("[^a-zA-Z0-9\\-]", "_");
}

Expand All @@ -232,7 +253,8 @@ public boolean equals(Object o) {
return hashCode == that.hashCode
&& subtaskIndex == that.subtaskIndex
&& parallelism == that.parallelism
&& operatorIDString.equals(that.operatorIDString);
&& operatorIDString.equals(that.operatorIDString)
&& jobIDString.equals(that.jobIDString);
}

@Override
Expand All @@ -242,7 +264,8 @@ public int hashCode() {

@Override
public String toString() {
return String.format("%s(%d/%d)", operatorIDString, subtaskIndex, parallelism);
return String.format(
"%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand All @@ -52,6 +53,7 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Stream;

import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter;

Expand Down Expand Up @@ -575,8 +577,79 @@ private void createManagedDirectory(Path managedPath) {
@Override
public void close() throws IOException {}

// ------------------------------------------------------------------------
// restore
// ------------------------------------------------------------------------

@Override
public void restoreStateHandles(
long checkpointId, SubtaskKey subtaskKey, Stream<SegmentFileStateHandle> stateHandles) {

synchronized (lock) {
Set<LogicalFile> restoredLogicalFiles =
uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>());

Map<Path, PhysicalFile> knownPhysicalFiles = new HashMap<>();
knownLogicalFiles.values().stream()
.map(LogicalFile::getPhysicalFile)
.forEach(file -> knownPhysicalFiles.putIfAbsent(file.getFilePath(), file));

stateHandles.forEach(
fileHandle -> {
PhysicalFile physicalFile =
knownPhysicalFiles.computeIfAbsent(
fileHandle.getFilePath(),
path -> {
PhysicalFileDeleter fileDeleter =
(isManagedByFileMergingManager(
path,
subtaskKey,
fileHandle.getScope()))
? physicalFileDeleter
: null;
return new PhysicalFile(
null, path, fileDeleter, fileHandle.getScope());
});

LogicalFileId logicalFileId = fileHandle.getLogicalFileId();
LogicalFile logicalFile =
new LogicalFile(
logicalFileId,
physicalFile,
fileHandle.getStartPos(),
fileHandle.getStateSize(),
subtaskKey);
knownLogicalFiles.put(logicalFileId, logicalFile);
logicalFile.advanceLastCheckpointId(checkpointId);
restoredLogicalFiles.add(logicalFile);
});
}
}

/**
* Distinguish whether the given filePath is managed by the FileMergingSnapshotManager. If the
* filePath is located under managedDir (managedSharedStateDir or managedExclusiveStateDir) as a
* subFile, it should be managed by the FileMergingSnapshotManager.
*/
private boolean isManagedByFileMergingManager(
Path filePath, SubtaskKey subtaskKey, CheckpointedStateScope scope) {
if (scope == CheckpointedStateScope.SHARED) {
Path managedDir = managedSharedStateDir.get(subtaskKey);
return filePath.toString().startsWith(managedDir.toString());
}
if (scope == CheckpointedStateScope.EXCLUSIVE) {
return filePath.toString().startsWith(managedExclusiveStateDir.toString());
}
throw new UnsupportedOperationException("Unsupported CheckpointStateScope " + scope);
}

@VisibleForTesting
public LogicalFile getLogicalFile(LogicalFileId fileId) {
return knownLogicalFiles.get(fileId);
}

@VisibleForTesting
TreeMap<Long, Set<LogicalFile>> getUploadedStates() {
return uploadedStates;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ public void deleteIfNecessary() throws IOException {
}
if (deleter != null) {
deleter.perform(filePath);
} else {
LOG.debug(
"Skip deleting this file {} because it is not owned by FileMergingManager.",
filePath);
}
this.deleted = true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.util.Preconditions;

import java.util.function.Function;
import java.util.stream.Stream;

/**
* Restore operation that restores file-merging information belonging to one subtask for {@link
* FileMergingSnapshotManager}.
*/
public class SubtaskFileMergingManagerRestoreOperation {

/** The restored checkpoint id. */
private final long checkpointId;

/** The restored job id. */
private final JobID jobID;

/** The restored Task info. */
private final TaskInfo taskInfo;

/** The id of the operator to which the subtask belongs. */
private final OperatorID operatorID;

private final FileMergingSnapshotManager fileMergingSnapshotManager;

/** The state which belongs to the restored subtask. */
private final OperatorSubtaskState subtaskState;

public SubtaskFileMergingManagerRestoreOperation(
long checkpointId,
FileMergingSnapshotManager fileMergingSnapshotManager,
JobID jobID,
TaskInfo taskInfo,
OperatorID operatorID,
OperatorSubtaskState subtaskState) {
this.checkpointId = checkpointId;
this.fileMergingSnapshotManager = fileMergingSnapshotManager;
this.jobID = jobID;
this.taskInfo = Preconditions.checkNotNull(taskInfo);
this.operatorID = Preconditions.checkNotNull(operatorID);
this.subtaskState = Preconditions.checkNotNull(subtaskState);
}

public void restore() {
FileMergingSnapshotManager.SubtaskKey subtaskKey =
new FileMergingSnapshotManager.SubtaskKey(jobID, operatorID, taskInfo);

Stream<? extends StateObject> keyedStateHandles =
Stream.concat(
subtaskState.getManagedKeyedState().stream(),
subtaskState.getRawKeyedState().stream())
.flatMap(this::getChildrenStreamHandles);

Stream<? extends StateObject> operatorStateHandles =
Stream.concat(
subtaskState.getManagedOperatorState().stream(),
subtaskState.getRawOperatorState().stream())
.flatMap(this::getChildrenStreamHandles);

// TODO support channel state restore for unaligned checkpoint.

Stream<SegmentFileStateHandle> segmentStateHandles =
Stream.of(keyedStateHandles, operatorStateHandles)
.flatMap(Function.identity())
.filter(handle -> handle instanceof SegmentFileStateHandle)
.map(handle -> (SegmentFileStateHandle) handle);
fileMergingSnapshotManager.restoreStateHandles(
checkpointId, subtaskKey, segmentStateHandles);
}

private Stream<? extends StateObject> getChildrenStreamHandles(KeyedStateHandle parentHandle) {
if (parentHandle instanceof IncrementalRemoteKeyedStateHandle) {
return ((IncrementalRemoteKeyedStateHandle) parentHandle).streamSubHandles();
}
if (parentHandle instanceof KeyGroupsStateHandle) {
return Stream.of(((KeyGroupsStateHandle) parentHandle).getDelegateStateHandle());
}
// TODO support changelog keyed state handle
return Stream.of(parentHandle);
}

private Stream<StreamStateHandle> getChildrenStreamHandles(OperatorStateHandle parentHandle) {
return Stream.of(parentHandle.getDelegateStateHandle());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public void collectSizeStats(StateObjectSizeStatsCollector collector) {
streamSubHandles().forEach(handle -> handle.collectSizeStats(collector));
}

private Stream<StreamStateHandle> streamSubHandles() {
public Stream<StreamStateHandle> streamSubHandles() {
return Stream.of(
Stream.of(metaStateHandle),
sharedState.stream().map(HandleAndLocalPath::getHandle),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
Expand Down Expand Up @@ -96,6 +97,15 @@ void reportIncompleteTaskStateSnapshots(
@Nonnull
PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID);

/**
* Get the restored state from jobManager which belongs to an operator running in the owning
* task.
*
* @param operatorID the id of the operator for which we request state.
* @return the subtask restored state from jobManager.
*/
Optional<OperatorSubtaskState> getSubtaskJobManagerRestoredState(OperatorID operatorID);

/**
* Returns the configuration for local recovery, i.e. the base directories for all file-based
* local state of the owning subtask and the general mode for local recovery.
Expand Down
Loading

0 comments on commit afe4c79

Please sign in to comment.