Skip to content

Commit

Permalink
[flink] create tags for the previous savepoint after a successful che…
Browse files Browse the repository at this point in the history
…ckpoint.
  • Loading branch information
liming30 committed Sep 22, 2023
1 parent 733eae7 commit 3062cb0
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand All @@ -45,10 +46,10 @@
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.TreeSet;

/**
* Commit {@link Committable} for each snapshot using the {@link CommitterOperator}. At the same
Expand All @@ -68,22 +69,28 @@ public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>

private final SerializableSupplier<TagManager> tagManagerFactory;

private final Set<Long> identifiersForTags;
private final SerializableSupplier<TagDeletion> tagDeletionFactory;

private final NavigableSet<Long> identifiersForTags;

protected SnapshotManager snapshotManager;

protected TagManager tagManager;

protected TagDeletion tagDeletion;

private transient ListState<Long> identifiersForTagsState;

public AutoTagForSavepointCommitterOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator,
SerializableSupplier<SnapshotManager> snapshotManagerFactory,
SerializableSupplier<TagManager> tagManagerFactory) {
SerializableSupplier<TagManager> tagManagerFactory,
SerializableSupplier<TagDeletion> tagDeletionFactory) {
this.commitOperator = commitOperator;
this.tagManagerFactory = tagManagerFactory;
this.snapshotManagerFactory = snapshotManagerFactory;
this.identifiersForTags = new HashSet<>();
this.tagDeletionFactory = tagDeletionFactory;
this.identifiersForTags = new TreeSet<>();
}

@Override
Expand All @@ -94,6 +101,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
} finally {
snapshotManager = snapshotManagerFactory.get();
tagManager = tagManagerFactory.get();
tagDeletion = tagDeletionFactory.get();

identifiersForTagsState =
commitOperator
Expand Down Expand Up @@ -127,15 +135,21 @@ public OperatorSnapshotFutures snapshotState(
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
commitOperator.notifyCheckpointComplete(checkpointId);
if (identifiersForTags.remove(checkpointId)) {
createTagForIdentifiers(Collections.singletonList(checkpointId));
Set<Long> headSet = identifiersForTags.headSet(checkpointId, true);
if (!headSet.isEmpty()) {
createTagForIdentifiers(new ArrayList<>(headSet));
headSet.clear();
}
}

@Override
public void notifyCheckpointAborted(long checkpointId) throws Exception {
commitOperator.notifyCheckpointAborted(checkpointId);
identifiersForTags.remove(checkpointId);
String tagName = SAVEPOINT_TAG_PREFIX + checkpointId;
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(tagName, tagDeletion, snapshotManager);
}
}

private void createTagForIdentifiers(List<Long> identifiers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ protected DataStreamSink<?> doCommit(DataStream<Committable> written, String com
new AutoTagForSavepointCommitterOperator<>(
(CommitterOperator<Committable, ManifestCommittable>) committerOperator,
table::snapshotManager,
table::tagManager);
table::tagManager,
() -> table.store().newTagDeletion());
}
SingleOutputStreamOperator<?> committed =
written.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,17 +74,23 @@ public void testAutoTagForSavepoint() throws Exception {
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);
assertThat(table.tagManager().tagCount()).isEqualTo(0);

// notify savepoint success
// trigger next checkpoint
processCommittable(testHarness, write, ++checkpointId, ++timestamp, GenericRow.of(3, 20L));
testHarness.snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1);

// notify checkpoint success and tag for savepoint-2
testHarness.notifyOfCompletedCheckpoint(checkpointId);
Snapshot snapshot = table.snapshotManager().latestSnapshot();
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(checkpointId);
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(3);
assertThat(table.tagManager().tagCount()).isEqualTo(1);

Snapshot snapshot = table.snapshotManager().snapshot(2);
assertThat(snapshot).isNotNull();
assertThat(snapshot.id()).isEqualTo(2);
Map<Snapshot, String> tags = table.tagManager().tags();
assertThat(tags).containsOnlyKeys(snapshot);
assertThat(tags.get(snapshot))
.isEqualTo(
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
.isEqualTo(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2);
}

@Test
Expand Down Expand Up @@ -136,6 +142,38 @@ public void testRestore() throws Exception {
AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId);
}

@Test
public void testAbortSavepointAndCleanTag() throws Exception {
FileStoreTable table = createFileStoreTable();

OneInputStreamOperatorTestHarness<Committable, Committable> testHarness =
createRecoverableTestHarness(table);
testHarness.open();
StreamTableWrite write =
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();

long checkpointId = 1L, timestamp = 1L;
processCommittable(testHarness, write, checkpointId, timestamp, GenericRow.of(1, 10L));

// trigger savepoint but not notified
testHarness.snapshotWithLocalState(
checkpointId, timestamp, SavepointType.savepoint(SavepointFormatType.CANONICAL));
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(0);
assertThat(table.tagManager().tagCount()).isEqualTo(0);

// trigger checkpoint and notify complete
processCommittable(testHarness, write, ++checkpointId, timestamp, GenericRow.of(1, 10L));
testHarness.snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT);
testHarness.notifyOfCompletedCheckpoint(checkpointId);
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2);
assertThat(table.tagManager().tagCount()).isEqualTo(1);

// abort savepoint 1
testHarness.getOneInputOperator().notifyCheckpointAborted(1);
assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2);
assertThat(table.tagManager().tagCount()).isEqualTo(0);
}

private void processCommittable(
OneInputStreamOperatorTestHarness<Committable, Committable> testHarness,
StreamTableWrite write,
Expand All @@ -161,7 +199,8 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
(CommitterOperator<Committable, ManifestCommittable>)
super.createCommitterOperator(table, commitUser, committableStateManager),
table::snapshotManager,
table::tagManager);
table::tagManager,
() -> table.store().newTagDeletion());
}

@Override
Expand All @@ -175,6 +214,7 @@ protected OneInputStreamOperator<Committable, Committable> createCommitterOperat
super.createCommitterOperator(
table, commitUser, committableStateManager, initializeFunction),
table::snapshotManager,
table::tagManager);
table::tagManager,
() -> table.store().newTagDeletion());
}
}

0 comments on commit 3062cb0

Please sign in to comment.