From b6dae52c1c0632af08447aecc227d823590e07b1 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 26 Sep 2024 14:32:00 +0200 Subject: [PATCH] Steven's comments --- .../maintenance/api/ExpireSnapshots.java | 28 ++++------------ .../api/MaintenanceTaskBuilder.java | 33 +++++++++---------- .../operator/DeleteFilesProcessor.java | 4 +-- .../operator/ExpireSnapshotsProcessor.java | 21 ++++++++++-- .../maintenance/api/TestExpireSnapshots.java | 23 ++++++++----- 5 files changed, 57 insertions(+), 52 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index e69fa00616a5..da464f2215d8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SystemConfigs; import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor; import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -35,9 +34,7 @@ public class ExpireSnapshots { private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot"; @VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file"; - private ExpireSnapshots() { - // Do not instantiate directly - } + private ExpireSnapshots() {} /** Creates the builder for creating a stream which expires snapshots for the table. */ public static Builder builder() { @@ -47,9 +44,8 @@ public static Builder builder() { public static class Builder extends MaintenanceTaskBuilder { private Duration maxSnapshotAge = null; private Integer numSnapshots = null; - private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value(); + private Integer planningWorkerPoolSize; private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT; - private int deleteParallelism = 1; /** * The snapshots older than this age will be removed. @@ -73,7 +69,8 @@ public Builder retainLast(int newNumSnapshots) { } /** - * The worker pool size used to calculate the files to delete. + * The worker pool size used to calculate the files to delete. If not set, the shared worker + * pool is used. * * @param newPlanningWorkerPoolSize for planning files to delete */ @@ -92,16 +89,6 @@ public Builder deleteBatchSize(int newDeleteBatchSize) { return this; } - /** - * The number of subtasks which are doing the deletes. - * - * @param newDeleteParallelism used for deleting - */ - public Builder deleteParallelism(int newDeleteParallelism) { - this.deleteParallelism = newDeleteParallelism; - return this; - } - @Override DataStream append(DataStream trigger) { Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null"); @@ -114,7 +101,7 @@ DataStream append(DataStream trigger) { maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(), numSnapshots, planningWorkerPoolSize)) - .name(EXECUTOR_OPERATOR_NAME) + .name(operatorName(EXECUTOR_OPERATOR_NAME)) .uid(EXECUTOR_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .forceNonParallel(); @@ -123,13 +110,12 @@ DataStream append(DataStream trigger) { .getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM) .rebalance() .transform( - DELETE_FILES_OPERATOR_NAME, + operatorName(DELETE_FILES_OPERATOR_NAME), TypeInformation.of(Void.class), new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize)) - .name(DELETE_FILES_OPERATOR_NAME) .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) - .setParallelism(deleteParallelism); + .setParallelism(parallelism()); // Ignore the file deletion result and return the DataStream directly return result; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index b2383fc93c74..bfee1a2adfd9 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -29,14 +29,14 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @PublicEvolving -abstract class MaintenanceTaskBuilder { +public abstract class MaintenanceTaskBuilder> { private int index; private String name; private TableLoader tableLoader; private String uidSuffix = null; private String slotSharingGroup = null; private Integer parallelism = null; - private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); + private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder(); abstract DataStream append(DataStream sourceStream); @@ -156,28 +156,23 @@ public T parallelism(int newParallelism) { return (T) this; } - @Internal - int index() { + protected int index() { return index; } - @Internal - String name() { + protected String name() { return name; } - @Internal - TableLoader tableLoader() { + protected TableLoader tableLoader() { return tableLoader; } - @Internal - String uidSuffix() { + protected String uidSuffix() { return uidSuffix; } - @Internal - String slotSharingGroup() { + protected String slotSharingGroup() { return slotSharingGroup; } @@ -185,6 +180,10 @@ protected Integer parallelism() { return parallelism; } + protected String operatorName(String operatorNameBase) { + return operatorNameBase + "[" + index() + "]"; + } + @Internal TriggerEvaluator evaluator() { return triggerEvaluator.build(); @@ -193,17 +192,17 @@ TriggerEvaluator evaluator() { @Internal DataStream append( DataStream sourceStream, - int maintenanceTaskIndex, - String maintenanceTaskName, + int defaultTaskIndex, + String defaultTaskName, TableLoader newTableLoader, String mainUidSuffix, String mainSlotSharingGroup, int mainParallelism) { - Preconditions.checkNotNull(maintenanceTaskName, "Name should not be null"); + Preconditions.checkNotNull(defaultTaskName, "Name should not be null"); Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); - this.index = maintenanceTaskIndex; - this.name = maintenanceTaskName; + this.index = defaultTaskIndex; + this.name = defaultTaskName; this.tableLoader = newTableLoader; if (uidSuffix == null) { diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index f2705c3727c7..8cf290708481 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -59,8 +59,8 @@ public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize) FileIO fileIO = table.io(); Preconditions.checkArgument( fileIO instanceof SupportsBulkOperations, - "Unsupported FileIO. %s should support bulk delete", - fileIO); + "%s doesn't support bulk delete", + fileIO.getClass().getSimpleName()); this.name = name; this.io = (SupportsBulkOperations) fileIO; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index 50bea6a89e93..58581e2a7e6c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -51,12 +51,15 @@ public class ExpireSnapshotsProcessor extends ProcessFunction o new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e))); } } + + @Override + public void close() throws Exception { + super.close(); + + if (plannerPoolSize != null) { + plannerPool.shutdown(); + } + } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index 27cfc9d421fd..b5f83d890890 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -63,7 +63,6 @@ void testExpireSnapshots() throws Exception { .parallelism(1) .planningWorkerPoolSize(2) .deleteBatchSize(3) - .deleteParallelism(1) .maxSnapshotAge(Duration.ZERO) .retainLast(1) .uidSuffix(UID_SUFFIX) @@ -116,9 +115,7 @@ void testFailure() throws Exception { // Do a single task run long time = System.currentTimeMillis(); - infra - .source() - .sendRecord(Trigger.create(time, serializableTable, 1), System.currentTimeMillis()); + infra.source().sendRecord(Trigger.create(time, serializableTable, 1), time); // First successful run (ensure that the operators are loaded/opened etc.) assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); @@ -138,10 +135,14 @@ void testFailure() throws Exception { MetricsReporterFactoryForTests.assertCounters( new ImmutableMap.Builder() .put( - DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER, + DELETE_FILES_OPERATOR_NAME + "[0]." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER, 0L) .put( - DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER, + DELETE_FILES_OPERATOR_NAME + + "[0]." + + DUMMY_NAME + + "." + + DELETE_FILE_SUCCEEDED_COUNTER, 0L) .build()); } @@ -212,14 +213,14 @@ void testMetrics() throws Exception { new ImmutableMap.Builder() .put( DELETE_FILES_OPERATOR_NAME - + "." + + "[0]." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER, 0L) .put( DELETE_FILES_OPERATOR_NAME - + "." + + "[0]." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER, @@ -230,6 +231,10 @@ void testMetrics() throws Exception { private static boolean checkDeleteFinished(Long expectedDeleteNum) { return expectedDeleteNum.equals( MetricsReporterFactoryForTests.counter( - DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER)); + DELETE_FILES_OPERATOR_NAME + + "[0]." + + DUMMY_NAME + + "." + + DELETE_FILE_SUCCEEDED_COUNTER)); } }