From afe4c79efa15902369d41ef5a6e73d79a2e7d525 Mon Sep 17 00:00:00 2001 From: Jinzhong Li Date: Wed, 13 Mar 2024 18:54:02 +0800 Subject: [PATCH] [FLINK-32080][FLIP-306][checkpoint] Restoration of FileMergingSnapshotManager This closes #24640. --- .../runtime/SavepointTaskStateManager.java | 7 + .../FileMergingSnapshotManager.java | 35 ++++- .../FileMergingSnapshotManagerBase.java | 73 ++++++++++ .../checkpoint/filemerging/PhysicalFile.java | 4 + ...askFileMergingManagerRestoreOperation.java | 116 +++++++++++++++ .../IncrementalRemoteKeyedStateHandle.java | 2 +- .../flink/runtime/state/TaskStateManager.java | 10 ++ .../runtime/state/TaskStateManagerImpl.java | 11 ++ .../FileMergingSnapshotManagerTestBase.java | 136 +++++++++++++++++- .../state/OperatorStateBackendTest.java | 2 +- .../state/SharedStateRegistryTest.java | 2 +- .../TaskExecutorFileMergingManagerTest.java | 4 +- .../runtime/state/TestTaskStateManager.java | 15 ++ ...sMergingCheckpointStorageLocationTest.java | 2 +- .../StreamTaskStateInitializerImpl.java | 31 +++- 15 files changed, 435 insertions(+), 15 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java index 1fb84df416dc3..a9cf8c875a001 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -86,6 +87,12 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera return prioritizedOperatorSubtaskState; } + @Override + public Optional getSubtaskJobManagerRestoredState(OperatorID operatorID) { + throw new UnsupportedOperationException( + "Unsupported method for SavepointTaskStateManager."); + } + @Nonnull @Override public LocalRecoveryConfig createLocalRecoveryConfig() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index 2aa32ba65c072..add8806369cb8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; @@ -27,11 +28,13 @@ import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import java.io.Closeable; import java.util.Collection; +import java.util.stream.Stream; /** * FileMergingSnapshotManager provides an interface to manage files and meta information for @@ -169,11 +172,22 @@ DirectoryStreamStateHandle getManagedDirStateHandle( void reusePreviousStateHandle( long checkpointId, Collection stateHandles); + /** + * Restore and re-register the SegmentFileStateHandles into FileMergingSnapshotManager. + * + * @param checkpointId the restored checkpoint id. + * @param subtaskKey the subtask key identifying the subtask. + * @param stateHandles the restored segment file handles. + */ + void restoreStateHandles( + long checkpointId, SubtaskKey subtaskKey, Stream stateHandles); + /** * A key identifies a subtask. A subtask can be identified by the operator id, subtask index and * the parallelism. Note that this key should be consistent across job attempts. */ final class SubtaskKey { + final String jobIDString; final String operatorIDString; final int subtaskIndex; final int parallelism; @@ -184,19 +198,23 @@ final class SubtaskKey { */ final int hashCode; - public SubtaskKey(OperatorID operatorID, TaskInfo taskInfo) { + public SubtaskKey(JobID jobID, OperatorID operatorID, TaskInfo taskInfo) { this( + jobID.toHexString(), operatorID.toHexString(), taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks()); } @VisibleForTesting - public SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) { + public SubtaskKey( + String jobIDString, String operatorIDString, int subtaskIndex, int parallelism) { + this.jobIDString = jobIDString; this.operatorIDString = operatorIDString; this.subtaskIndex = subtaskIndex; this.parallelism = parallelism; - int hash = operatorIDString.hashCode(); + int hash = jobIDString.hashCode(); + hash = 31 * hash + operatorIDString.hashCode(); hash = 31 * hash + subtaskIndex; hash = 31 * hash + parallelism; this.hashCode = hash; @@ -204,6 +222,7 @@ public SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) { public static SubtaskKey of(Environment environment) { return new SubtaskKey( + environment.getJobID(), OperatorID.fromJobVertexID(environment.getJobVertexId()), environment.getTaskInfo()); } @@ -214,7 +233,9 @@ public static SubtaskKey of(Environment environment) { * @return the managed directory name. */ public String getManagedDirName() { - return String.format("%s_%d_%d_", operatorIDString, subtaskIndex, parallelism) + return String.format( + "%s_%s_%d_%d_", + jobIDString, operatorIDString, subtaskIndex, parallelism) .replaceAll("[^a-zA-Z0-9\\-]", "_"); } @@ -232,7 +253,8 @@ public boolean equals(Object o) { return hashCode == that.hashCode && subtaskIndex == that.subtaskIndex && parallelism == that.parallelism - && operatorIDString.equals(that.operatorIDString); + && operatorIDString.equals(that.operatorIDString) + && jobIDString.equals(that.jobIDString); } @Override @@ -242,7 +264,8 @@ public int hashCode() { @Override public String toString() { - return String.format("%s(%d/%d)", operatorIDString, subtaskIndex, parallelism); + return String.format( + "%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index db90d654d611d..1e1abdcaf6430 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -44,6 +44,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -52,6 +53,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; +import java.util.stream.Stream; import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; @@ -575,8 +577,79 @@ private void createManagedDirectory(Path managedPath) { @Override public void close() throws IOException {} + // ------------------------------------------------------------------------ + // restore + // ------------------------------------------------------------------------ + + @Override + public void restoreStateHandles( + long checkpointId, SubtaskKey subtaskKey, Stream stateHandles) { + + synchronized (lock) { + Set restoredLogicalFiles = + uploadedStates.computeIfAbsent(checkpointId, id -> new HashSet<>()); + + Map knownPhysicalFiles = new HashMap<>(); + knownLogicalFiles.values().stream() + .map(LogicalFile::getPhysicalFile) + .forEach(file -> knownPhysicalFiles.putIfAbsent(file.getFilePath(), file)); + + stateHandles.forEach( + fileHandle -> { + PhysicalFile physicalFile = + knownPhysicalFiles.computeIfAbsent( + fileHandle.getFilePath(), + path -> { + PhysicalFileDeleter fileDeleter = + (isManagedByFileMergingManager( + path, + subtaskKey, + fileHandle.getScope())) + ? physicalFileDeleter + : null; + return new PhysicalFile( + null, path, fileDeleter, fileHandle.getScope()); + }); + + LogicalFileId logicalFileId = fileHandle.getLogicalFileId(); + LogicalFile logicalFile = + new LogicalFile( + logicalFileId, + physicalFile, + fileHandle.getStartPos(), + fileHandle.getStateSize(), + subtaskKey); + knownLogicalFiles.put(logicalFileId, logicalFile); + logicalFile.advanceLastCheckpointId(checkpointId); + restoredLogicalFiles.add(logicalFile); + }); + } + } + + /** + * Distinguish whether the given filePath is managed by the FileMergingSnapshotManager. If the + * filePath is located under managedDir (managedSharedStateDir or managedExclusiveStateDir) as a + * subFile, it should be managed by the FileMergingSnapshotManager. + */ + private boolean isManagedByFileMergingManager( + Path filePath, SubtaskKey subtaskKey, CheckpointedStateScope scope) { + if (scope == CheckpointedStateScope.SHARED) { + Path managedDir = managedSharedStateDir.get(subtaskKey); + return filePath.toString().startsWith(managedDir.toString()); + } + if (scope == CheckpointedStateScope.EXCLUSIVE) { + return filePath.toString().startsWith(managedExclusiveStateDir.toString()); + } + throw new UnsupportedOperationException("Unsupported CheckpointStateScope " + scope); + } + @VisibleForTesting public LogicalFile getLogicalFile(LogicalFileId fileId) { return knownLogicalFiles.get(fileId); } + + @VisibleForTesting + TreeMap> getUploadedStates() { + return uploadedStates; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java index 6c672ced26f92..8c99b97ea0537 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java @@ -143,6 +143,10 @@ public void deleteIfNecessary() throws IOException { } if (deleter != null) { deleter.perform(filePath); + } else { + LOG.debug( + "Skip deleting this file {} because it is not owned by FileMergingManager.", + filePath); } this.deleted = true; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java new file mode 100644 index 0000000000000..2be6086849c91 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SubtaskFileMergingManagerRestoreOperation.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateObject; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; +import org.apache.flink.util.Preconditions; + +import java.util.function.Function; +import java.util.stream.Stream; + +/** + * Restore operation that restores file-merging information belonging to one subtask for {@link + * FileMergingSnapshotManager}. + */ +public class SubtaskFileMergingManagerRestoreOperation { + + /** The restored checkpoint id. */ + private final long checkpointId; + + /** The restored job id. */ + private final JobID jobID; + + /** The restored Task info. */ + private final TaskInfo taskInfo; + + /** The id of the operator to which the subtask belongs. */ + private final OperatorID operatorID; + + private final FileMergingSnapshotManager fileMergingSnapshotManager; + + /** The state which belongs to the restored subtask. */ + private final OperatorSubtaskState subtaskState; + + public SubtaskFileMergingManagerRestoreOperation( + long checkpointId, + FileMergingSnapshotManager fileMergingSnapshotManager, + JobID jobID, + TaskInfo taskInfo, + OperatorID operatorID, + OperatorSubtaskState subtaskState) { + this.checkpointId = checkpointId; + this.fileMergingSnapshotManager = fileMergingSnapshotManager; + this.jobID = jobID; + this.taskInfo = Preconditions.checkNotNull(taskInfo); + this.operatorID = Preconditions.checkNotNull(operatorID); + this.subtaskState = Preconditions.checkNotNull(subtaskState); + } + + public void restore() { + FileMergingSnapshotManager.SubtaskKey subtaskKey = + new FileMergingSnapshotManager.SubtaskKey(jobID, operatorID, taskInfo); + + Stream keyedStateHandles = + Stream.concat( + subtaskState.getManagedKeyedState().stream(), + subtaskState.getRawKeyedState().stream()) + .flatMap(this::getChildrenStreamHandles); + + Stream operatorStateHandles = + Stream.concat( + subtaskState.getManagedOperatorState().stream(), + subtaskState.getRawOperatorState().stream()) + .flatMap(this::getChildrenStreamHandles); + + // TODO support channel state restore for unaligned checkpoint. + + Stream segmentStateHandles = + Stream.of(keyedStateHandles, operatorStateHandles) + .flatMap(Function.identity()) + .filter(handle -> handle instanceof SegmentFileStateHandle) + .map(handle -> (SegmentFileStateHandle) handle); + fileMergingSnapshotManager.restoreStateHandles( + checkpointId, subtaskKey, segmentStateHandles); + } + + private Stream getChildrenStreamHandles(KeyedStateHandle parentHandle) { + if (parentHandle instanceof IncrementalRemoteKeyedStateHandle) { + return ((IncrementalRemoteKeyedStateHandle) parentHandle).streamSubHandles(); + } + if (parentHandle instanceof KeyGroupsStateHandle) { + return Stream.of(((KeyGroupsStateHandle) parentHandle).getDelegateStateHandle()); + } + // TODO support changelog keyed state handle + return Stream.of(parentHandle); + } + + private Stream getChildrenStreamHandles(OperatorStateHandle parentHandle) { + return Stream.of(parentHandle.getDelegateStateHandle()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java index 2f5335b88532a..c1f041052efe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -246,7 +246,7 @@ public void collectSizeStats(StateObjectSizeStatsCollector collector) { streamSubHandles().forEach(handle -> handle.collectSizeStats(collector)); } - private Stream streamSubHandles() { + public Stream streamSubHandles() { return Stream.of( Stream.of(metaStateHandle), sharedState.stream().map(HandleAndLocalPath::getHandle), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java index 3fdb05cebdeb9..683fdf3a33a59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; @@ -96,6 +97,15 @@ void reportIncompleteTaskStateSnapshots( @Nonnull PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID); + /** + * Get the restored state from jobManager which belongs to an operator running in the owning + * task. + * + * @param operatorID the id of the operator for which we request state. + * @return the subtask restored state from jobManager. + */ + Optional getSubtaskJobManagerRestoredState(OperatorID operatorID); + /** * Returns the configuration for local recovery, i.e. the base directories for all file-based * local state of the owning subtask and the general mode for local recovery. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java index 3143a87a11e60..4d4967204db63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManagerImpl.java @@ -253,6 +253,17 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera return builder.build(); } + public Optional getSubtaskJobManagerRestoredState(OperatorID operatorID) { + if (jobManagerTaskRestore == null) { + return Optional.empty(); + } + OperatorSubtaskState state = + jobManagerTaskRestore + .getTaskStateSnapshot() + .getSubtaskStateByOperatorID(operatorID); + return (state == null) ? Optional.empty() : Optional.of(state); + } + @Nonnull @Override public LocalRecoveryConfig createLocalRecoveryConfig() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index af044b9a24db2..f8677e491db15 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -17,15 +17,26 @@ package org.apache.flink.runtime.checkpoint.filemerging; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.TaskInfoImpl; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeOffsets; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; @@ -36,11 +47,17 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -49,6 +66,8 @@ public abstract class FileMergingSnapshotManagerTestBase { final String tmId = "Testing"; + final JobID jobID = new JobID(); + final OperatorID operatorID = new OperatorID(289347923L, 75893479L); SubtaskKey subtaskKey1; @@ -64,8 +83,10 @@ public abstract class FileMergingSnapshotManagerTestBase { public void setup(@TempDir java.nio.file.Path tempFolder) { // use simplified job ids for the tests long jobId = 1; - subtaskKey1 = new SubtaskKey(operatorID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3)); - subtaskKey2 = new SubtaskKey(operatorID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3)); + subtaskKey1 = + new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 0, 128, 3)); + subtaskKey2 = + new SubtaskKey(jobID, operatorID, new TaskInfoImpl("TestingTask", 128, 1, 128, 3)); checkpointBaseDir = new Path(tempFolder.toString(), String.valueOf(jobId)); writeBufferSize = 4096; } @@ -381,6 +402,117 @@ public void testReuseCallbackAndAdvanceWatermark() throws Exception { } } + @Test + public void testRestore() throws Exception { + TaskStateSnapshot taskStateSnapshot; + long checkpointId = 222; + + // Step1: build TaskStateSnapshot using FileMergingSnapshotManagerBase; + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + Map subtaskStatesByOperatorID = new HashMap<>(); + subtaskStatesByOperatorID.put( + operatorID, buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry)); + taskStateSnapshot = new TaskStateSnapshot(subtaskStatesByOperatorID); + } + assertThat(taskStateSnapshot).isNotNull(); + + // Step 2: restore FileMergingSnapshotManagerBase from the TaskStateSnapshot. + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + TaskInfo taskInfo = + new TaskInfoImpl( + "test restore", + 128, + subtaskKey1.subtaskIndex, + subtaskKey1.parallelism, + 0); + for (Map.Entry entry : + taskStateSnapshot.getSubtaskStateMappings()) { + SubtaskFileMergingManagerRestoreOperation restoreOperation = + new SubtaskFileMergingManagerRestoreOperation( + checkpointId, + fmsm, + jobID, + taskInfo, + entry.getKey(), + entry.getValue()); + restoreOperation.restore(); + } + TreeMap> stateFiles = fmsm.getUploadedStates(); + assertThat(stateFiles.size()).isEqualTo(1); + Set restoreFileSet = stateFiles.get(checkpointId); + assertThat(restoreFileSet).isNotNull(); + assertThat(restoreFileSet.size()).isEqualTo(4); + for (LogicalFile file : restoreFileSet) { + assertThat(fmsm.getLogicalFile(file.getFileId())).isEqualTo(file); + } + Set physicalFileSet = + restoreFileSet.stream() + .map(LogicalFile::getPhysicalFile) + .map(PhysicalFile::getFilePath) + .collect(Collectors.toSet()); + fmsm.notifyCheckpointSubsumed(subtaskKey1, checkpointId); + for (Path path : physicalFileSet) { + assertThat(path.getFileSystem().exists(path)).isFalse(); + } + } + } + + private OperatorSubtaskState buildOperatorSubtaskState( + long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) + throws Exception { + IncrementalRemoteKeyedStateHandle keyedStateHandle1 = + new IncrementalRemoteKeyedStateHandle( + UUID.randomUUID(), + new KeyGroupRange(0, 8), + checkpointId, + Collections.singletonList( + IncrementalKeyedStateHandle.HandleAndLocalPath.of( + buildOneSegmentFileHandle( + checkpointId, fmsm, closeableRegistry), + "localPath")), + Collections.emptyList(), + null); + + KeyGroupsStateHandle keyedStateHandle2 = + new KeyGroupsStateHandle( + new KeyGroupRangeOffsets(0, 8), + buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + + OperatorStateHandle operatorStateHandle1 = + new FileMergingOperatorStreamStateHandle( + null, + null, + Collections.emptyMap(), + buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + + OperatorStateHandle operatorStateHandle2 = + new FileMergingOperatorStreamStateHandle( + null, + null, + Collections.emptyMap(), + buildOneSegmentFileHandle(checkpointId, fmsm, closeableRegistry)); + + return OperatorSubtaskState.builder() + .setManagedKeyedState(keyedStateHandle1) + .setRawKeyedState(keyedStateHandle2) + .setManagedOperatorState(operatorStateHandle1) + .setRawOperatorState(operatorStateHandle2) + .build(); + } + + private SegmentFileStateHandle buildOneSegmentFileHandle( + long checkpointId, FileMergingSnapshotManager fmsm, CloseableRegistry closeableRegistry) + throws Exception { + FileMergingCheckpointStateOutputStream outputStream = + writeCheckpointAndGetStream(checkpointId, fmsm, closeableRegistry); + return outputStream.closeAndGetHandle(); + } + FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException { return createFileMergingSnapshotManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 2122303545b22..ace6bffa9c044 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -452,7 +452,7 @@ void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception { AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); final FileMergingSnapshotManager.SubtaskKey subtaskKey = - new FileMergingSnapshotManager.SubtaskKey("opId", 1, 1); + new FileMergingSnapshotManager.SubtaskKey("jobId", "opId", 1, 1); LocalFileSystem fs = getSharedInstance(); CheckpointStorageLocationReference cslReference = AbstractFsCheckpointStorageAccess.encodePathAsReference( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index a26830c18be46..7e6a2e1476305 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -280,7 +280,7 @@ void testFireMergingOperatorStateRegister(@TempDir File tmpFolder) throws IOExce checkpointBaseDir, AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); final FileMergingSnapshotManager.SubtaskKey subtaskKey = - new FileMergingSnapshotManager.SubtaskKey("opId", 1, 2); + new FileMergingSnapshotManager.SubtaskKey("jobId", "opId", 1, 2); FileMergingSnapshotManager snapshotManager = createFileMergingSnapshotManager( checkpointBaseDir, sharedStateDir, taskOwnedStateDir); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java index 5d77e9fe87562..79268002cc2e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java @@ -39,8 +39,8 @@ public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir) throws new TaskExecutorFileMergingManager(); JobID job1 = new JobID(1234L, 4321L); JobID job2 = new JobID(1234L, 5678L); - SubtaskKey key1 = new SubtaskKey("test-op1", 0, 128); - SubtaskKey key2 = new SubtaskKey("test-op2", 1, 128); + SubtaskKey key1 = new SubtaskKey("test-jobId", "test-op1", 0, 128); + SubtaskKey key2 = new SubtaskKey("test-jobId", "test-op2", 1, 128); Path checkpointDir1 = new Path(testBaseDir.toString(), "job1"); Path checkpointDir2 = new Path(testBaseDir.toString(), "job2"); int writeBufferSize = 4096; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java index d534e6e769e39..75817257ea194 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java @@ -218,6 +218,21 @@ public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID opera } } + @Override + public Optional getSubtaskJobManagerRestoredState(OperatorID operatorID) { + TaskStateSnapshot taskStateSnapshot = + jobManagerTaskStateSnapshotsByCheckpointId.get(reportedCheckpointId); + if (taskStateSnapshot == null) { + return Optional.empty(); + } + OperatorSubtaskState subtaskState = + taskStateSnapshot.getSubtaskStateByOperatorID(operatorID); + if (subtaskState == null) { + return Optional.empty(); + } + return Optional.of(subtaskState); + } + @Nonnull @Override public LocalRecoveryConfig createLocalRecoveryConfig() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java index 1eb0be9e2abd3..858788cd2acd5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java @@ -65,7 +65,7 @@ public class FsMergingCheckpointStorageLocationTest { private static final int WRITE_BUFFER_SIZE = 1024; private static final FileMergingSnapshotManager.SubtaskKey SUBTASK_KEY = - new FileMergingSnapshotManager.SubtaskKey("opId", 1, 1); + new FileMergingSnapshotManager.SubtaskKey("jobId", "opId", 1, 1); @Before public void prepareDirectories() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 50b3c4e0867d9..3e3a745c12a5e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -18,15 +18,19 @@ package org.apache.flink.streaming.api.operators; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.metrics.MetricNames; @@ -68,6 +72,7 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; +import java.util.Optional; import java.util.OptionalLong; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -152,6 +157,8 @@ public StreamOperatorStateContext streamOperatorStateContext( throws Exception { TaskInfo taskInfo = environment.getTaskInfo(); + registerRestoredStateToFileMergingManager(environment.getJobID(), taskInfo, operatorID); + OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText( operatorID, @@ -160,7 +167,6 @@ public StreamOperatorStateContext streamOperatorStateContext( taskInfo.getNumberOfParallelSubtasks()); final String operatorIdentifierText = operatorSubtaskDescription.toString(); - final PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = taskStateManager.prioritizedOperatorState(operatorID); @@ -296,6 +302,29 @@ public StreamOperatorStateContext streamOperatorStateContext( } } + private void registerRestoredStateToFileMergingManager( + JobID jobID, TaskInfo taskInfo, OperatorID operatorID) { + FileMergingSnapshotManager fileMergingSnapshotManager = + taskStateManager.getFileMergingSnapshotManager(); + Optional restoredCheckpointId = taskStateManager.getRestoreCheckpointId(); + if (fileMergingSnapshotManager == null || !restoredCheckpointId.isPresent()) { + return; + } + Optional subtaskState = + taskStateManager.getSubtaskJobManagerRestoredState(operatorID); + if (subtaskState.isPresent()) { + SubtaskFileMergingManagerRestoreOperation restoreOperation = + new SubtaskFileMergingManagerRestoreOperation( + restoredCheckpointId.get(), + fileMergingSnapshotManager, + jobID, + taskInfo, + operatorID, + subtaskState.get()); + restoreOperation.restore(); + } + } + protected OperatorStateBackend operatorStateBackend( String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates,