From 3062cb01e30f90bc594bac0cf211c726abfc0d3f Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Fri, 22 Sep 2023 19:24:18 +0800 Subject: [PATCH] [flink] create tags for the previous savepoint after a successful checkpoint. --- .../AutoTagForSavepointCommitterOperator.java | 28 +++++++--- .../apache/paimon/flink/sink/FlinkSink.java | 3 +- ...oTagForSavepointCommitterOperatorTest.java | 56 ++++++++++++++++--- 3 files changed, 71 insertions(+), 16 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 318c36aba2f4..02c5b960669f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.utils.SerializableSupplier; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -45,10 +46,10 @@ import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; import java.util.List; +import java.util.NavigableSet; import java.util.Set; +import java.util.TreeSet; /** * Commit {@link Committable} for each snapshot using the {@link CommitterOperator}. At the same @@ -68,22 +69,28 @@ public class AutoTagForSavepointCommitterOperator private final SerializableSupplier tagManagerFactory; - private final Set identifiersForTags; + private final SerializableSupplier tagDeletionFactory; + + private final NavigableSet identifiersForTags; protected SnapshotManager snapshotManager; protected TagManager tagManager; + protected TagDeletion tagDeletion; + private transient ListState identifiersForTagsState; public AutoTagForSavepointCommitterOperator( CommitterOperator commitOperator, SerializableSupplier snapshotManagerFactory, - SerializableSupplier tagManagerFactory) { + SerializableSupplier tagManagerFactory, + SerializableSupplier tagDeletionFactory) { this.commitOperator = commitOperator; this.tagManagerFactory = tagManagerFactory; this.snapshotManagerFactory = snapshotManagerFactory; - this.identifiersForTags = new HashSet<>(); + this.tagDeletionFactory = tagDeletionFactory; + this.identifiersForTags = new TreeSet<>(); } @Override @@ -94,6 +101,7 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager) } finally { snapshotManager = snapshotManagerFactory.get(); tagManager = tagManagerFactory.get(); + tagDeletion = tagDeletionFactory.get(); identifiersForTagsState = commitOperator @@ -127,8 +135,10 @@ public OperatorSnapshotFutures snapshotState( @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { commitOperator.notifyCheckpointComplete(checkpointId); - if (identifiersForTags.remove(checkpointId)) { - createTagForIdentifiers(Collections.singletonList(checkpointId)); + Set headSet = identifiersForTags.headSet(checkpointId, true); + if (!headSet.isEmpty()) { + createTagForIdentifiers(new ArrayList<>(headSet)); + headSet.clear(); } } @@ -136,6 +146,10 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { public void notifyCheckpointAborted(long checkpointId) throws Exception { commitOperator.notifyCheckpointAborted(checkpointId); identifiersForTags.remove(checkpointId); + String tagName = SAVEPOINT_TAG_PREFIX + checkpointId; + if (tagManager.tagExists(tagName)) { + tagManager.deleteTag(tagName, tagDeletion, snapshotManager); + } } private void createTagForIdentifiers(List identifiers) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 4701e439162e..70dcbf5a32bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -200,7 +200,8 @@ protected DataStreamSink doCommit(DataStream written, String com new AutoTagForSavepointCommitterOperator<>( (CommitterOperator) committerOperator, table::snapshotManager, - table::tagManager); + table::tagManager, + () -> table.store().newTagDeletion()); } SingleOutputStreamOperator committed = written.transform( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java index 7b44a1930362..f68c00b11d98 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperatorTest.java @@ -74,17 +74,23 @@ public void testAutoTagForSavepoint() throws Exception { assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1); assertThat(table.tagManager().tagCount()).isEqualTo(0); - // notify savepoint success + // trigger next checkpoint + processCommittable(testHarness, write, ++checkpointId, ++timestamp, GenericRow.of(3, 20L)); + testHarness.snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1); + + // notify checkpoint success and tag for savepoint-2 testHarness.notifyOfCompletedCheckpoint(checkpointId); - Snapshot snapshot = table.snapshotManager().latestSnapshot(); - assertThat(snapshot).isNotNull(); - assertThat(snapshot.id()).isEqualTo(checkpointId); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(3); + assertThat(table.tagManager().tagCount()).isEqualTo(1); + Snapshot snapshot = table.snapshotManager().snapshot(2); + assertThat(snapshot).isNotNull(); + assertThat(snapshot.id()).isEqualTo(2); Map tags = table.tagManager().tags(); assertThat(tags).containsOnlyKeys(snapshot); assertThat(tags.get(snapshot)) - .isEqualTo( - AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId); + .isEqualTo(AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + 2); } @Test @@ -136,6 +142,38 @@ public void testRestore() throws Exception { AutoTagForSavepointCommitterOperator.SAVEPOINT_TAG_PREFIX + checkpointId); } + @Test + public void testAbortSavepointAndCleanTag() throws Exception { + FileStoreTable table = createFileStoreTable(); + + OneInputStreamOperatorTestHarness testHarness = + createRecoverableTestHarness(table); + testHarness.open(); + StreamTableWrite write = + table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite(); + + long checkpointId = 1L, timestamp = 1L; + processCommittable(testHarness, write, checkpointId, timestamp, GenericRow.of(1, 10L)); + + // trigger savepoint but not notified + testHarness.snapshotWithLocalState( + checkpointId, timestamp, SavepointType.savepoint(SavepointFormatType.CANONICAL)); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(0); + assertThat(table.tagManager().tagCount()).isEqualTo(0); + + // trigger checkpoint and notify complete + processCommittable(testHarness, write, ++checkpointId, timestamp, GenericRow.of(1, 10L)); + testHarness.snapshotWithLocalState(checkpointId, timestamp, CheckpointType.CHECKPOINT); + testHarness.notifyOfCompletedCheckpoint(checkpointId); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2); + assertThat(table.tagManager().tagCount()).isEqualTo(1); + + // abort savepoint 1 + testHarness.getOneInputOperator().notifyCheckpointAborted(1); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(2); + assertThat(table.tagManager().tagCount()).isEqualTo(0); + } + private void processCommittable( OneInputStreamOperatorTestHarness testHarness, StreamTableWrite write, @@ -161,7 +199,8 @@ protected OneInputStreamOperator createCommitterOperat (CommitterOperator) super.createCommitterOperator(table, commitUser, committableStateManager), table::snapshotManager, - table::tagManager); + table::tagManager, + () -> table.store().newTagDeletion()); } @Override @@ -175,6 +214,7 @@ protected OneInputStreamOperator createCommitterOperat super.createCommitterOperator( table, commitUser, committableStateManager, initializeFunction), table::snapshotManager, - table::tagManager); + table::tagManager, + () -> table.store().newTagDeletion()); } }