From 5a1516c09f0ced07177419a16ad29d0fa71c4aff Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Tue, 17 Sep 2024 10:56:03 +0200 Subject: [PATCH] Steven's comments --- .../operator/ExpireSnapshotsProcessor.java | 26 ++-- .../maintenance/stream/ExpireSnapshots.java | 23 ++-- .../stream/MaintenanceTaskBuilder.java | 50 ++++---- .../maintenance/stream/TableMaintenance.java | 114 ++++++++---------- .../operator/OperatorTestBase.java | 4 +- .../stream/ScheduledInfraExtension.java | 2 +- .../stream/TestExpireSnapshots.java | 28 ++--- .../stream/TestMaintenanceE2E.java | 8 +- .../stream/TestTableMaintenance.java | 58 ++++----- 9 files changed, 152 insertions(+), 161 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index bbd6e9775557..54ffa0009f46 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -20,6 +20,7 @@ import java.util.Collections; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicLong; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.ProcessFunction; @@ -41,21 +42,21 @@ public class ExpireSnapshotsProcessor extends ProcessFunction { private static final Logger LOG = LoggerFactory.getLogger(ExpireSnapshotsProcessor.class); public static final OutputTag DELETE_STREAM = - new OutputTag<>("delete-stream", Types.STRING); + new OutputTag<>("expire-snapshots-file-deletes-stream", Types.STRING); private final TableLoader tableLoader; - private final Long minAgeMs; + private final Long maxSnapshotAgeMs; private final Integer retainLast; private final int plannerPoolSize; private transient ExecutorService plannerPool; private transient Table table; public ExpireSnapshotsProcessor( - TableLoader tableLoader, Long minAgeMs, Integer retainLast, int plannerPoolSize) { + TableLoader tableLoader, Long maxSnapshotAgeMs, Integer retainLast, int plannerPoolSize) { Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); this.tableLoader = tableLoader; - this.minAgeMs = minAgeMs; + this.maxSnapshotAgeMs = maxSnapshotAgeMs; this.retainLast = retainLast; this.plannerPoolSize = plannerPoolSize; } @@ -73,21 +74,30 @@ public void processElement(Trigger trigger, Context ctx, Collector o try { table.refresh(); ExpireSnapshots expireSnapshots = table.expireSnapshots(); - if (minAgeMs != null) { - expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - minAgeMs); + if (maxSnapshotAgeMs != null) { + expireSnapshots = expireSnapshots.expireOlderThan(ctx.timestamp() - maxSnapshotAgeMs); } if (retainLast != null) { expireSnapshots = expireSnapshots.retainLast(retainLast); } + AtomicLong deleteFileCounter = new AtomicLong(0L); expireSnapshots .planWith(plannerPool) - .deleteWith(file -> ctx.output(DELETE_STREAM, file)) + .deleteWith( + file -> { + ctx.output(DELETE_STREAM, file); + deleteFileCounter.incrementAndGet(); + }) .cleanExpiredFiles(true) .commit(); - LOG.info("Successfully finished expiring snapshots for {} at {}", table, ctx.timestamp()); + LOG.info( + "Successfully finished expiring snapshots for {} at {}. Scheduled {} files for delete.", + table, + ctx.timestamp(), + deleteFileCounter.get()); out.collect( new TaskResult(trigger.taskId(), trigger.timestamp(), true, Collections.emptyList())); } catch (Exception e) { diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java index 056d8e7d0b7f..f3765c76d4c8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/ExpireSnapshots.java @@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy; import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies; +import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.flink.maintenance.operator.AsyncDeleteFiles; import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; import org.apache.iceberg.flink.maintenance.operator.TaskResult; @@ -37,9 +38,7 @@ public class ExpireSnapshots { private static final long DELETE_MAX_RETRY_DELAY_MS = 1000L; private static final double DELETE_BACKOFF_MULTIPLIER = 1.5; private static final long DELETE_TIMEOUT_MS = 10000L; - private static final int DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT = 10; private static final int DELETE_ATTEMPT_NUM = 10; - private static final int DELETE_WORKER_POOL_SIZE_DEFAULT = 10; private static final String EXECUTOR_TASK_NAME = "ES Executor"; @VisibleForTesting static final String DELETE_FILES_TASK_NAME = "Delete file"; @@ -53,20 +52,20 @@ public static Builder builder() { } public static class Builder extends MaintenanceTaskBuilder { - private Duration minAge = null; + private Duration maxSnapshotAge = null; private Integer retainLast = null; - private int planningWorkerPoolSize = DELETE_PLANNING_WORKER_POOL_SIZE_DEFAULT; + private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); private int deleteAttemptNum = DELETE_ATTEMPT_NUM; - private int deleteWorkerPoolSize = DELETE_WORKER_POOL_SIZE_DEFAULT; + private int deleteWorkerPoolSize = SystemConfigs.DELETE_WORKER_THREAD_POOL_SIZE.value(); /** * The snapshots newer than this age will not be removed. * - * @param newMinAge of the files to be removed + * @param newMaxSnapshotAge of the snapshots to be removed * @return for chained calls */ - public Builder minAge(Duration newMinAge) { - this.minAge = newMinAge; + public Builder maxSnapshotAge(Duration newMaxSnapshotAge) { + this.maxSnapshotAge = newMaxSnapshotAge; return this; } @@ -116,7 +115,7 @@ public Builder deleteWorkerPoolSize(int newDeleteWorkerPoolSize) { } @Override - DataStream buildInternal(DataStream trigger) { + DataStream append(DataStream trigger) { Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); SingleOutputStreamOperator result = @@ -124,11 +123,11 @@ DataStream buildInternal(DataStream trigger) { .process( new ExpireSnapshotsProcessor( tableLoader(), - minAge == null ? null : minAge.toMillis(), + maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), retainLast, planningWorkerPoolSize)) .name(EXECUTOR_TASK_NAME) - .uid(uidPrefix() + "-expire-snapshots") + .uid("expire-snapshots-" + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .forceNonParallel(); @@ -149,7 +148,7 @@ DataStream buildInternal(DataStream trigger) { deleteWorkerPoolSize, retryStrategy) .name(DELETE_FILES_TASK_NAME) - .uid(uidPrefix() + "-delete-expired-files") + .uid("delete-expired-files-" + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .setParallelism(parallelism()); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java index d02ba7662b38..5ca23f8cea74 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/MaintenanceTaskBuilder.java @@ -29,15 +29,15 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; public abstract class MaintenanceTaskBuilder { - private int id; + private int index; private String name; private TableLoader tableLoader; - private String uidPrefix = null; + private String uidSuffix = null; private String slotSharingGroup = null; private Integer parallelism = null; private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); - abstract DataStream buildInternal(DataStream sourceStream); + abstract DataStream append(DataStream sourceStream); /** * After a given number of Iceberg table commits since the last run, starts the downstream job. @@ -79,7 +79,7 @@ public T scheduleOnDataFileSize(long dataFileSizeInBytes) { * @param posDeleteFileCount after the downstream job should be started * @return for chained calls */ - public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) { + public T scheduleOnPosDeleteFileCount(int posDeleteFileCount) { triggerEvaluator.posDeleteFileCount(posDeleteFileCount); return (T) this; } @@ -91,7 +91,7 @@ public T schedulerOnPosDeleteFileCount(int posDeleteFileCount) { * @param posDeleteRecordCount after the downstream job should be started * @return for chained calls */ - public T schedulerOnPosDeleteRecordCount(long posDeleteRecordCount) { + public T scheduleOnPosDeleteRecordCount(long posDeleteRecordCount) { triggerEvaluator.posDeleteRecordCount(posDeleteRecordCount); return (T) this; } @@ -123,22 +123,22 @@ public T scheduleOnEqDeleteRecordCount(long eqDeleteRecordCount) { /** * After a given time since the last run, starts the downstream job. * - * @param time after the downstream job should be started + * @param interval after the downstream job should be started * @return for chained calls */ - public T scheduleOnTime(Duration time) { - triggerEvaluator.timeout(time); + public T scheduleOnInterval(Duration interval) { + triggerEvaluator.timeout(interval); return (T) this; } /** - * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. + * The suffix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. * - * @param newUidPrefix for the transformations + * @param newUidSuffix for the transformations * @return for chained calls */ - public T uidPrefix(String newUidPrefix) { - this.uidPrefix = newUidPrefix; + public T uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; return (T) this; } @@ -167,7 +167,7 @@ public T parallelism(int newParallelism) { @Internal int id() { - return id; + return index; } @Internal @@ -181,8 +181,8 @@ TableLoader tableLoader() { } @Internal - String uidPrefix() { - return uidPrefix; + String uidSuffix() { + return uidSuffix; } @Internal @@ -201,26 +201,26 @@ TriggerEvaluator evaluator() { } @Internal - DataStream build( + DataStream append( DataStream sourceStream, - int newId, - String newName, + int maintenanceTaskIndex, + String maintainanceTaskName, TableLoader newTableLoader, - String mainUidPrefix, + String mainUidSuffix, String mainSlotSharingGroup, int mainParallelism) { Preconditions.checkArgument( parallelism == null || parallelism == -1 || parallelism > 0, "Parallelism should be left to default (-1/null) or greater than 0"); - Preconditions.checkNotNull(newName, "Name should not be null"); + Preconditions.checkNotNull(maintainanceTaskName, "Name should not be null"); Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); - this.id = newId; - this.name = newName; + this.index = maintenanceTaskIndex; + this.name = maintainanceTaskName; this.tableLoader = newTableLoader; - if (uidPrefix == null) { - uidPrefix = mainUidPrefix + "_" + name + "_" + id; + if (uidSuffix == null) { + uidSuffix = name + "_" + index + "_" + mainUidSuffix; } if (parallelism == null) { @@ -233,6 +233,6 @@ DataStream build( tableLoader.open(); - return buildInternal(sourceStream); + return append(sourceStream); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java index 9ba6a9f67eff..3add7adbabb1 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/stream/TableMaintenance.java @@ -30,7 +30,6 @@ import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.streaming.api.datastream.DataStream; @@ -38,7 +37,6 @@ import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.flink.util.Collector; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.maintenance.operator.LockRemover; import org.apache.iceberg.flink.maintenance.operator.MonitorSource; @@ -71,7 +69,7 @@ private TableMaintenance() { * @param lockFactory used for preventing concurrent task runs * @return builder for the maintenance stream */ - public static Builder builder( + public static Builder forChangeStream( DataStream changeStream, TableLoader tableLoader, TriggerLockFactory lockFactory) { @@ -91,7 +89,7 @@ public static Builder builder( * @param lockFactory used for preventing concurrent task runs * @return builder for the maintenance stream */ - public static Builder builder( + public static Builder forTable( StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { Preconditions.checkNotNull(env, "StreamExecutionEnvironment should not be null"); Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); @@ -102,33 +100,33 @@ public static Builder builder( public static class Builder { private final StreamExecutionEnvironment env; - private final DataStream changeStream; + private final DataStream inputStream; private final TableLoader tableLoader; private final List> taskBuilders; private final TriggerLockFactory lockFactory; - private String uidPrefix = "TableMaintenance-" + UUID.randomUUID(); + private String uidSuffix = "TableMaintenance-" + UUID.randomUUID(); private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; private Duration rateLimit = Duration.ofMillis(1); - private Duration concurrentCheckDelay = Duration.ofSeconds(30); + private Duration lockCheckDelay = Duration.ofSeconds(30); private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; private int maxReadBack = 100; private Builder( StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { this.env = env; - this.changeStream = null; + this.inputStream = null; this.tableLoader = tableLoader; this.lockFactory = lockFactory; this.taskBuilders = Lists.newArrayListWithCapacity(4); } private Builder( - DataStream changeStream, + DataStream inputStream, TableLoader tableLoader, TriggerLockFactory lockFactory) { this.env = null; - this.changeStream = changeStream; + this.inputStream = inputStream; this.tableLoader = tableLoader; this.lockFactory = lockFactory; this.taskBuilders = Lists.newArrayListWithCapacity(4); @@ -137,11 +135,11 @@ private Builder( /** * The prefix used for the generated {@link org.apache.flink.api.dag.Transformation}'s uid. * - * @param newUidPrefix for the transformations + * @param newUidSuffix for the transformations * @return for chained calls */ - public Builder uidPrefix(String newUidPrefix) { - this.uidPrefix = newUidPrefix; + public Builder uidSuffix(String newUidSuffix) { + this.uidSuffix = newUidSuffix; return this; } @@ -172,11 +170,11 @@ public Builder rateLimit(Duration newRateLimit) { /** * Sets the delay for checking lock availability when a concurrent run is detected. * - * @param newConcurrentCheckDelay firing frequency + * @param newLockCheckDelay lock checking frequency * @return for chained calls */ - public Builder concurrentCheckDelay(Duration newConcurrentCheckDelay) { - this.concurrentCheckDelay = newConcurrentCheckDelay; + public Builder lockCheckDelay(Duration newLockCheckDelay) { + this.lockCheckDelay = newLockCheckDelay; return this; } @@ -194,14 +192,16 @@ public Builder parallelism(int newParallelism) { /** * Maximum number of snapshots checked when started with an embedded {@link MonitorSource} at - * the first time. Only available when the {@link MonitorSource} is generated by the builder. + * the first time. Only available when the {@link + * TableMaintenance#forTable(StreamExecutionEnvironment, TableLoader, TriggerLockFactory)} is + * used. * * @param newMaxReadBack snapshots to consider when initializing * @return for chained calls */ public Builder maxReadBack(int newMaxReadBack) { Preconditions.checkArgument( - changeStream == null, "Can't set maxReadBack when change stream is provided"); + inputStream == null, "Can't set maxReadBack when change stream is provided"); this.maxReadBack = newMaxReadBack; return this; } @@ -220,26 +220,8 @@ public Builder add(MaintenanceTaskBuilder task) { /** Builds the task graph for the maintenance tasks. */ public void append() { Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); - Preconditions.checkNotNull(uidPrefix, "Uid prefix should no be null"); + Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); - DataStream sourceStream; - if (changeStream == null) { - // Create a monitor source to provide the TableChange stream - MonitorSource source = - new MonitorSource( - tableLoader, - RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), - maxReadBack); - sourceStream = - env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_NAME) - .uid(uidPrefix + "-monitor-source") - .slotSharingGroup(slotSharingGroup) - .forceNonParallel(); - } else { - sourceStream = changeStream.global(); - } - - // Chain the TriggerManager List taskNames = Lists.newArrayListWithCapacity(taskBuilders.size()); List evaluators = Lists.newArrayListWithCapacity(taskBuilders.size()); for (int i = 0; i < taskBuilders.size(); ++i) { @@ -248,8 +230,7 @@ public void append() { } DataStream triggers = - // Add TriggerManager to schedule the tasks - DataStreamUtils.reinterpretAsKeyedStream(sourceStream, unused -> true) + DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true) .process( new TriggerManager( tableLoader, @@ -257,36 +238,36 @@ public void append() { taskNames, evaluators, rateLimit.toMillis(), - concurrentCheckDelay.toMillis())) + lockCheckDelay.toMillis())) .name(TRIGGER_MANAGER_TASK_NAME) - .uid(uidPrefix + "-trigger-manager") + .uid("trigger-manager-" + uidSuffix) .slotSharingGroup(slotSharingGroup) .forceNonParallel() - // Add a watermark after every trigger - .assignTimestampsAndWatermarks(new WindowClosingWatermarkStrategy()) + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) .name("Watermark Assigner") - .uid(uidPrefix + "-watermark-assigner") + .uid("watermark-assigner-" + uidSuffix) .slotSharingGroup(slotSharingGroup) .forceNonParallel(); // Add the specific tasks DataStream unioned = null; for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; DataStream filtered = triggers - .flatMap(new TaskFilter(i)) + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) .name("Filter " + i) .forceNonParallel() - .uid(uidPrefix + "-filter-" + i) + .uid("filter-" + i + "-" + uidSuffix) .slotSharingGroup(slotSharingGroup); MaintenanceTaskBuilder builder = taskBuilders.get(i); DataStream result = - builder.build( + builder.append( filtered, i, taskNames.get(i), tableLoader, - uidPrefix, + uidSuffix, slotSharingGroup, parallelism); if (unioned == null) { @@ -304,9 +285,26 @@ public void append() { TypeInformation.of(Void.class), new LockRemover(lockFactory, taskNames)) .forceNonParallel() - .uid(uidPrefix + "-lock-remover") + .uid("lock-remover-" + uidSuffix) .slotSharingGroup(slotSharingGroup); } + + private DataStream changeStream() { + if (inputStream == null) { + // Create a monitor source to provide the TableChange stream + MonitorSource source = + new MonitorSource( + tableLoader, + RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), + maxReadBack); + return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_NAME) + .uid("monitor-source-" + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + } else { + return inputStream.global(); + } + } } private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskId) { @@ -314,7 +312,7 @@ private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskI } @Internal - public static class WindowClosingWatermarkStrategy implements WatermarkStrategy { + public static class PunctuatedWatermarkStrategy implements WatermarkStrategy { @Override public WatermarkGenerator createWatermarkGenerator( WatermarkGeneratorSupplier.Context context) { @@ -337,20 +335,4 @@ public TimestampAssigner createTimestampAssigner( return (element, unused) -> element.timestamp(); } } - - @Internal - private static class TaskFilter implements FlatMapFunction { - private final int taskId; - - private TaskFilter(int taskId) { - this.taskId = taskId; - } - - @Override - public void flatMap(Trigger trigger, Collector out) { - if (trigger.taskId() != null && trigger.taskId() == taskId) { - out.collect(trigger); - } - } - } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index 63ba6f587462..1cae679b2587 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -44,7 +44,7 @@ public class OperatorTestBase { private static final int NUMBER_TASK_MANAGERS = 1; private static final int SLOTS_PER_TASK_MANAGER = 8; - protected static final String UID_PREFIX = "UID-Dummy"; + protected static final String UID_SUFFIX = "UID-Dummy"; protected static final String SLOT_SHARING_GROUP = "SlotSharingGroup"; protected static final String TABLE_NAME = "test_table"; protected static final TriggerLockFactory LOCK_FACTORY = new MemoryLockFactory(); @@ -131,7 +131,7 @@ protected static void checkUidsAreSet(StreamExecutionEnvironment env, String uid transformation -> { assertThat(transformation.getUid()).isNotNull(); if (uidPrefix != null) { - assertThat(transformation.getUid()).contains(UID_PREFIX); + assertThat(transformation.getUid()).contains(UID_SUFFIX); } }); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java index 210c003f183a..55ea6161517c 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/ScheduledInfraExtension.java @@ -56,7 +56,7 @@ public void beforeEach(ExtensionContext context) { triggerStream = source .dataStream() - .assignTimestampsAndWatermarks(new TableMaintenance.WindowClosingWatermarkStrategy()) + .assignTimestampsAndWatermarks(new TableMaintenance.PunctuatedWatermarkStrategy()) .name(IGNORED_OPERATOR_NAME) .forceNonParallel(); sink = new CollectingSink<>(); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java index 8d3c8864ebe3..dff176d6c7fa 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestExpireSnapshots.java @@ -67,10 +67,10 @@ void testExpireSnapshots() throws Exception { .planningWorkerPoolSize(2) .deleteAttemptNum(2) .deleteWorkerPoolSize(5) - .minAge(Duration.ZERO) + .maxSnapshotAge(Duration.ZERO) .retainLast(1) - .uidPrefix(UID_PREFIX) - .build( + .uidSuffix(UID_SUFFIX) + .append( infra.triggerStream(), 0, DUMMY_NAME, @@ -106,12 +106,12 @@ void testFailure() throws Exception { SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); ExpireSnapshots.builder() - .build( + .append( infra.triggerStream(), 0, DUMMY_NAME, tableLoader, - UID_PREFIX, + UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); @@ -156,18 +156,18 @@ void testUidAndSlotSharingGroup() { ExpireSnapshots.builder() .slotSharingGroup(SLOT_SHARING_GROUP) - .uidPrefix(UID_PREFIX) - .build( + .uidSuffix(UID_SUFFIX) + .append( infra.triggerStream(), 0, DUMMY_NAME, tableLoader, - UID_PREFIX, + UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); - checkUidsAreSet(infra.env(), UID_PREFIX); + checkUidsAreSet(infra.env(), UID_SUFFIX); checkSlotSharingGroupsAreSet(infra.env(), SLOT_SHARING_GROUP); } @@ -177,12 +177,12 @@ void testUidAndSlotSharingGroupUnset() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); ExpireSnapshots.builder() - .build( + .append( infra.triggerStream(), 0, DUMMY_NAME, tableLoader, - UID_PREFIX, + UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); @@ -201,15 +201,15 @@ void testMetrics() throws Exception { Table table = tableLoader.loadTable(); ExpireSnapshots.builder() - .minAge(Duration.ZERO) + .maxSnapshotAge(Duration.ZERO) .retainLast(1) .parallelism(1) - .build( + .append( infra.triggerStream(), 0, DUMMY_NAME, tableLoader, - UID_PREFIX, + UID_SUFFIX, StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP, 1) .sinkTo(infra.sink()); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java index fd9b136af21d..3a6a7909eca2 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestMaintenanceE2E.java @@ -43,14 +43,14 @@ void testE2e() throws Exception { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); - TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) - .uidPrefix("E2eTestUID") + TableMaintenance.forTable(env, tableLoader, LOCK_FACTORY) + .uidSuffix("E2eTestUID") .rateLimit(Duration.ofMinutes(10)) - .concurrentCheckDelay(Duration.ofSeconds(10)) + .lockCheckDelay(Duration.ofSeconds(10)) .add( ExpireSnapshots.builder() .scheduleOnCommitCount(10) - .minAge(Duration.ofMinutes(10)) + .maxSnapshotAge(Duration.ofMinutes(10)) .retainLast(5) .deleteWorkerPoolSize(5) .parallelism(8)) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java index c7fa51675886..181fa1e6dc87 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/stream/TestTableMaintenance.java @@ -104,9 +104,9 @@ void testFromStream() throws Exception { new ManualSource<>(env, TypeInformation.of(TableChange.class)); TableMaintenance.Builder streamBuilder = - TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) .rateLimit(Duration.ofMillis(2)) - .concurrentCheckDelay(Duration.ofSeconds(3)) + .lockCheckDelay(Duration.ofSeconds(3)) .add( new MaintenanceTaskBuilderForTest(true) .scheduleOnCommitCount(1) @@ -114,9 +114,9 @@ void testFromStream() throws Exception { .scheduleOnDataFileSize(3L) .scheduleOnEqDeleteFileCount(4) .scheduleOnEqDeleteRecordCount(5L) - .schedulerOnPosDeleteFileCount(6) - .schedulerOnPosDeleteRecordCount(7L) - .scheduleOnTime(Duration.ofHours(1))); + .scheduleOnPosDeleteFileCount(6) + .scheduleOnPosDeleteRecordCount(7L) + .scheduleOnInterval(Duration.ofHours(1))); sendEvents(schedulerSource, streamBuilder, ImmutableList.of(Tuple2.of(DUMMY_CHANGE, 1))); } @@ -135,7 +135,7 @@ void testFromEnv() throws Exception { env.enableCheckpointing(10); - TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) + TableMaintenance.forTable(env, tableLoader, LOCK_FACTORY) .rateLimit(Duration.ofMillis(2)) .maxReadBack(2) .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(2)) @@ -146,7 +146,7 @@ void testFromEnv() throws Exception { new ManualSource<>(env, InternalTypeInfo.of(FlinkSchemaUtil.convert(table.schema()))); FlinkSink.forRowData(insertSource.dataStream()) .tableLoader(tableLoader) - .uidPrefix(UID_PREFIX + "-iceberg-sink") + .uidPrefix(UID_SUFFIX + "-iceberg-sink") .append(); JobClient jobClient = null; @@ -175,7 +175,7 @@ void testLocking() throws Exception { new ManualSource<>(env, TypeInformation.of(TableChange.class)); TableMaintenance.Builder streamBuilder = - TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) .rateLimit(Duration.ofMillis(2)) .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)); @@ -197,9 +197,9 @@ void testMetrics() throws Exception { new ManualSource<>(env, TypeInformation.of(TableChange.class)); TableMaintenance.Builder streamBuilder = - TableMaintenance.builder(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) + TableMaintenance.forChangeStream(schedulerSource.dataStream(), tableLoader, LOCK_FACTORY) .rateLimit(Duration.ofMillis(2)) - .concurrentCheckDelay(Duration.ofMillis(2)) + .lockCheckDelay(Duration.ofMillis(2)) .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) .add(new MaintenanceTaskBuilderForTest(false).scheduleOnCommitCount(2)); @@ -251,20 +251,20 @@ void testUidAndSlotSharingGroup() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); tableLoader.open(); - TableMaintenance.builder( + TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader, LOCK_FACTORY) - .uidPrefix(UID_PREFIX) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP) .add( new MaintenanceTaskBuilderForTest(true) .scheduleOnCommitCount(1) - .uidPrefix(UID_PREFIX) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP)) .append(); - checkUidsAreSet(env, UID_PREFIX); + checkUidsAreSet(env, UID_SUFFIX); checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } @@ -276,7 +276,7 @@ void testUidAndSlotSharingGroupUnset() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); tableLoader.open(); - TableMaintenance.builder( + TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader, LOCK_FACTORY) @@ -295,16 +295,16 @@ void testUidAndSlotSharingGroupInherit() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); tableLoader.open(); - TableMaintenance.builder( + TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader, LOCK_FACTORY) - .uidPrefix(UID_PREFIX) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP) .add(new MaintenanceTaskBuilderForTest(true).scheduleOnCommitCount(1)) .append(); - checkUidsAreSet(env, UID_PREFIX); + checkUidsAreSet(env, UID_SUFFIX); checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } @@ -318,16 +318,16 @@ void testUidAndSlotSharingGroupOverWrite() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); tableLoader.open(); - TableMaintenance.builder( + TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader, LOCK_FACTORY) - .uidPrefix(UID_PREFIX) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP) .add( new MaintenanceTaskBuilderForTest(true) .scheduleOnCommitCount(1) - .uidPrefix(anotherUid) + .uidSuffix(anotherUid) .slotSharingGroup(anotherSlotSharingGroup)) .append(); @@ -337,7 +337,7 @@ void testUidAndSlotSharingGroupOverWrite() { .filter(t -> t.getName().equals("Trigger manager")) .findFirst() .orElseThrow(); - assertThat(schedulerTransformation.getUid()).contains(UID_PREFIX); + assertThat(schedulerTransformation.getUid()).contains(UID_SUFFIX); assertThat(schedulerTransformation.getSlotSharingGroup()).isPresent(); assertThat(schedulerTransformation.getSlotSharingGroup().get().getName()) .isEqualTo(SLOT_SHARING_GROUP); @@ -363,23 +363,23 @@ void testUidAndSlotSharingGroupForMonitor() { TableLoader tableLoader = sql.tableLoader(TABLE_NAME); tableLoader.open(); - TableMaintenance.builder(env, tableLoader, LOCK_FACTORY) - .uidPrefix(UID_PREFIX) + TableMaintenance.forTable(env, tableLoader, LOCK_FACTORY) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP) .add( new MaintenanceTaskBuilderForTest(true) .scheduleOnCommitCount(1) - .uidPrefix(UID_PREFIX) + .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP)) .append(); Transformation source = monitorSource(); assertThat(source).isNotNull(); - assertThat(source.getUid()).contains(UID_PREFIX); + assertThat(source.getUid()).contains(UID_SUFFIX); assertThat(source.getSlotSharingGroup()).isPresent(); assertThat(source.getSlotSharingGroup().get().getName()).isEqualTo(SLOT_SHARING_GROUP); - checkUidsAreSet(env, UID_PREFIX); + checkUidsAreSet(env, UID_SUFFIX); checkSlotSharingGroupsAreSet(env, SLOT_SHARING_GROUP); } @@ -429,12 +429,12 @@ private static class MaintenanceTaskBuilderForTest } @Override - DataStream buildInternal(DataStream trigger) { + DataStream append(DataStream trigger) { String name = TASKS[id]; return trigger .map(new DummyMaintenanceTask(success)) .name(name) - .uid(uidPrefix() + "-test-mapper-" + name) + .uid(uidSuffix() + "-test-mapper-" + name) .slotSharingGroup(slotSharingGroup()) .forceNonParallel(); }