Skip to content

Commit

Permalink
[FLINK-32074][checkpoint] Merge file across checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
Zakelly committed Mar 14, 2024
1 parent 3b9623e commit 54d355d
Show file tree
Hide file tree
Showing 8 changed files with 461 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.util.concurrent.Executor;

public class AcrossCheckpointFileMergingSnapshotManager extends FileMergingSnapshotManagerBase {

private final PhysicalFilePool filePool;

public AcrossCheckpointFileMergingSnapshotManager(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
super(id, maxFileSize, filePoolType, ioExecutor);
filePool = createPhysicalPool();
}

@Override
@Nonnull
protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
SubtaskKey subtaskKey, long checkpointID, CheckpointedStateScope scope)
throws IOException {
PhysicalFile result = filePool.pollFile(subtaskKey, scope);

// a new file could be put into the file pool after closeAndGetHandle()
return result == null ? createPhysicalFile(subtaskKey, scope) : result;
}

@Override
protected void discardCheckpoint(long checkpointId) {}

@Override
protected void returnPhysicalFileForNextReuse(
SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile)
throws IOException {

if (shouldSyncAfterClosingLogicalFile) {
FSDataOutputStream os = physicalFile.getOutputStream();
if (os != null) {
os.sync();
}
}

if (!filePool.tryPutFile(subtaskKey, physicalFile)) {
physicalFile.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class FileMergingSnapshotManagerBuilder {
/** The id for identifying a {@link FileMergingSnapshotManager}. */
private final String id;

private FileMergingType fileMergingType = FileMergingType.NO_MERGE;

/** Max size for a file. TODO: Make it configurable. */
private long maxFileSize = 32 * 1024 * 1024;

Expand All @@ -46,6 +48,12 @@ public FileMergingSnapshotManagerBuilder(String id) {
this.id = id;
}

/** Set the type. */
public FileMergingSnapshotManagerBuilder setType(FileMergingType fileMergingType) {
this.fileMergingType = fileMergingType;
return this;
}

/** Set the max file size. */
public FileMergingSnapshotManagerBuilder setMaxFileSize(long maxFileSize) {
Preconditions.checkArgument(maxFileSize > 0);
Expand All @@ -71,13 +79,27 @@ public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExec
/**
* Create file-merging snapshot manager based on configuration.
*
* <p>TODO (FLINK-32074): Support another type of FileMergingSnapshotManager that merges files
* across different checkpoints.
*
* @return the created manager.
*/
public FileMergingSnapshotManager build() {
return new WithinCheckpointFileMergingSnapshotManager(
id, maxFileSize, filePoolType, ioExecutor == null ? Runnable::run : ioExecutor);
switch (fileMergingType) {
case MERGE_WITHIN_CHECKPOINT:
return new WithinCheckpointFileMergingSnapshotManager(
id,
maxFileSize,
filePoolType,
ioExecutor == null ? Runnable::run : ioExecutor);
case MERGE_ACROSS_CHECKPOINT:
return new AcrossCheckpointFileMergingSnapshotManager(
id,
maxFileSize,
filePoolType,
ioExecutor == null ? Runnable::run : ioExecutor);
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported type %s when creating file merging manager",
fileMergingType));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.flink.runtime.checkpoint.filemerging;

/** How the checkpoint files can be segmented. */
public enum FileMergingType {
// do not write logical checkpoint files into file segments
// and thus do not merge checkpoint files
NO_MERGE,
// merge checkpoint files within checkpoint boundaries
MERGE_WITHIN_CHECKPOINT,
// merge checkpoint files across checkpoint boundaries
MERGE_ACROSS_CHECKPOINT
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
import org.apache.flink.util.ShutdownHookUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -78,7 +79,9 @@ public TaskExecutorFileMergingManager() {
if (fileMergingSnapshotManager == null) {
// TODO FLINK-32440: choose different FileMergingSnapshotManager by configuration
fileMergingSnapshotManager =
new FileMergingSnapshotManagerBuilder(jobId.toString()).build();
new FileMergingSnapshotManagerBuilder(jobId.toString())
.setType(FileMergingType.MERGE_WITHIN_CHECKPOINT)
.build();
fileMergingSnapshotManagerByJobId.put(jobId, fileMergingSnapshotManager);
LOG.info("Registered new file merging snapshot manager for job {}.", jobId);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.checkpoint.filemerging;

import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;

import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link AcrossCheckpointFileMergingSnapshotManager}. */
public class AcrossCheckpointFileMergingSnapshotManagerTest
extends FileMergingSnapshotManagerTestBase {
@Override
FileMergingType getFileMergingType() {
return FileMergingType.MERGE_ACROSS_CHECKPOINT;
}

@Test
void testCreateAndReuseFiles() throws IOException {
try (FileMergingSnapshotManagerBase fmsm =
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(checkpointBaseDir)) {
fmsm.registerSubtaskForSharedStates(subtaskKey1);
fmsm.registerSubtaskForSharedStates(subtaskKey2);
// firstly, we try shared state.
PhysicalFile file1 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file1.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
// allocate another
PhysicalFile file2 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 0, CheckpointedStateScope.SHARED);
assertThat(file2.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file2).isNotEqualTo(file1);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);

// allocate for another subtask
PhysicalFile file3 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey2, 0, CheckpointedStateScope.SHARED);
assertThat(file3.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED));
assertThat(file3).isNotEqualTo(file1);

// allocate for another checkpoint
PhysicalFile file4 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.SHARED);
assertThat(file4.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file4).isEqualTo(file1);

// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file4.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4);
PhysicalFile file5 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.SHARED);
assertThat(file5.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file5).isNotEqualTo(file4);

// Secondly, we try private state
PhysicalFile file6 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
assertThat(file6.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));

// allocate another
PhysicalFile file7 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
assertThat(file7.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file7).isNotEqualTo(file5);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6);

// allocate for another checkpoint
PhysicalFile file8 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 2, CheckpointedStateScope.EXCLUSIVE);
assertThat(file8.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file8).isEqualTo(file6);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8);

// allocate for this checkpoint but another subtask
PhysicalFile file9 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey2, 2, CheckpointedStateScope.EXCLUSIVE);
assertThat(file9.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE));
assertThat(file9).isEqualTo(file6);

// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file9.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9);
PhysicalFile file10 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 2, CheckpointedStateScope.SHARED);
assertThat(file10.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file10).isNotEqualTo(file9);

assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE))
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
}
}

@Test
public void testCheckpointNotification() throws Exception {
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);

// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);

// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isTrue();

// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertFileInManagedDir(fmsm, cp3StateHandle);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isTrue();
}
}
}
Loading

0 comments on commit 54d355d

Please sign in to comment.