Skip to content

Commit

Permalink
Steven's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 26, 2024
1 parent fa03a71 commit b6dae52
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SystemConfigs;
import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor;
import org.apache.iceberg.flink.maintenance.operator.ExpireSnapshotsProcessor;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand All @@ -35,9 +34,7 @@ public class ExpireSnapshots {
private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot";
@VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file";

private ExpireSnapshots() {
// Do not instantiate directly
}
private ExpireSnapshots() {}

/** Creates the builder for creating a stream which expires snapshots for the table. */
public static Builder builder() {
Expand All @@ -47,9 +44,8 @@ public static Builder builder() {
public static class Builder extends MaintenanceTaskBuilder<ExpireSnapshots.Builder> {
private Duration maxSnapshotAge = null;
private Integer numSnapshots = null;
private int planningWorkerPoolSize = SystemConfigs.WORKER_THREAD_POOL_SIZE.value();
private Integer planningWorkerPoolSize;
private int deleteBatchSize = DELETE_BATCH_SIZE_DEFAULT;
private int deleteParallelism = 1;

/**
* The snapshots older than this age will be removed.
Expand All @@ -73,7 +69,8 @@ public Builder retainLast(int newNumSnapshots) {
}

/**
* The worker pool size used to calculate the files to delete.
* The worker pool size used to calculate the files to delete. If not set, the shared worker
* pool is used.
*
* @param newPlanningWorkerPoolSize for planning files to delete
*/
Expand All @@ -92,16 +89,6 @@ public Builder deleteBatchSize(int newDeleteBatchSize) {
return this;
}

/**
* The number of subtasks which are doing the deletes.
*
* @param newDeleteParallelism used for deleting
*/
public Builder deleteParallelism(int newDeleteParallelism) {
this.deleteParallelism = newDeleteParallelism;
return this;
}

@Override
DataStream<TaskResult> append(DataStream<Trigger> trigger) {
Preconditions.checkNotNull(tableLoader(), "TableLoader should not be null");
Expand All @@ -114,7 +101,7 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
maxSnapshotAge == null ? null : maxSnapshotAge.toMillis(),
numSnapshots,
planningWorkerPoolSize))
.name(EXECUTOR_OPERATOR_NAME)
.name(operatorName(EXECUTOR_OPERATOR_NAME))
.uid(EXECUTOR_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.forceNonParallel();
Expand All @@ -123,13 +110,12 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM)
.rebalance()
.transform(
DELETE_FILES_OPERATOR_NAME,
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize))
.name(DELETE_FILES_OPERATOR_NAME)
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(deleteParallelism);
.setParallelism(parallelism());

// Ignore the file deletion result and return the DataStream<TaskResult> directly
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@PublicEvolving
abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder> {
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> {
private int index;
private String name;
private TableLoader tableLoader;
private String uidSuffix = null;
private String slotSharingGroup = null;
private Integer parallelism = null;
private TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder();
private final TriggerEvaluator.Builder triggerEvaluator = new TriggerEvaluator.Builder();

abstract DataStream<TaskResult> append(DataStream<Trigger> sourceStream);

Expand Down Expand Up @@ -156,35 +156,34 @@ public T parallelism(int newParallelism) {
return (T) this;
}

@Internal
int index() {
protected int index() {
return index;
}

@Internal
String name() {
protected String name() {
return name;
}

@Internal
TableLoader tableLoader() {
protected TableLoader tableLoader() {
return tableLoader;
}

@Internal
String uidSuffix() {
protected String uidSuffix() {
return uidSuffix;
}

@Internal
String slotSharingGroup() {
protected String slotSharingGroup() {
return slotSharingGroup;
}

protected Integer parallelism() {
return parallelism;
}

protected String operatorName(String operatorNameBase) {
return operatorNameBase + "[" + index() + "]";
}

@Internal
TriggerEvaluator evaluator() {
return triggerEvaluator.build();
Expand All @@ -193,17 +192,17 @@ TriggerEvaluator evaluator() {
@Internal
DataStream<TaskResult> append(
DataStream<Trigger> sourceStream,
int maintenanceTaskIndex,
String maintenanceTaskName,
int defaultTaskIndex,
String defaultTaskName,
TableLoader newTableLoader,
String mainUidSuffix,
String mainSlotSharingGroup,
int mainParallelism) {
Preconditions.checkNotNull(maintenanceTaskName, "Name should not be null");
Preconditions.checkNotNull(defaultTaskName, "Name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.index = maintenanceTaskIndex;
this.name = maintenanceTaskName;
this.index = defaultTaskIndex;
this.name = defaultTaskName;
this.tableLoader = newTableLoader;

if (uidSuffix == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize)
FileIO fileIO = table.io();
Preconditions.checkArgument(
fileIO instanceof SupportsBulkOperations,
"Unsupported FileIO. %s should support bulk delete",
fileIO);
"%s doesn't support bulk delete",
fileIO.getClass().getSimpleName());

this.name = name;
this.io = (SupportsBulkOperations) fileIO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ public class ExpireSnapshotsProcessor extends ProcessFunction<Trigger, TaskResul
private final TableLoader tableLoader;
private final Long maxSnapshotAgeMs;
private final Integer numSnapshots;
private final int plannerPoolSize;
private final Integer plannerPoolSize;
private transient ExecutorService plannerPool;
private transient Table table;

public ExpireSnapshotsProcessor(
TableLoader tableLoader, Long maxSnapshotAgeMs, Integer numSnapshots, int plannerPoolSize) {
TableLoader tableLoader,
Long maxSnapshotAgeMs,
Integer numSnapshots,
Integer plannerPoolSize) {
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");

this.tableLoader = tableLoader;
Expand All @@ -69,7 +72,10 @@ public ExpireSnapshotsProcessor(
public void open(Configuration parameters) throws Exception {
tableLoader.open();
this.table = tableLoader.loadTable();
this.plannerPool = ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize);
this.plannerPool =
plannerPoolSize != null
? ThreadPools.newWorkerPool(table.name() + "-table--planner", plannerPoolSize)
: ThreadPools.getWorkerPool();
}

@Override
Expand Down Expand Up @@ -110,4 +116,13 @@ public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> o
new TaskResult(trigger.taskId(), trigger.timestamp(), false, Lists.newArrayList(e)));
}
}

@Override
public void close() throws Exception {
super.close();

if (plannerPoolSize != null) {
plannerPool.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ void testExpireSnapshots() throws Exception {
.parallelism(1)
.planningWorkerPoolSize(2)
.deleteBatchSize(3)
.deleteParallelism(1)
.maxSnapshotAge(Duration.ZERO)
.retainLast(1)
.uidSuffix(UID_SUFFIX)
Expand Down Expand Up @@ -116,9 +115,7 @@ void testFailure() throws Exception {

// Do a single task run
long time = System.currentTimeMillis();
infra
.source()
.sendRecord(Trigger.create(time, serializableTable, 1), System.currentTimeMillis());
infra.source().sendRecord(Trigger.create(time, serializableTable, 1), time);

// First successful run (ensure that the operators are loaded/opened etc.)
assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue();
Expand All @@ -138,10 +135,14 @@ void testFailure() throws Exception {
MetricsReporterFactoryForTests.assertCounters(
new ImmutableMap.Builder<String, Long>()
.put(
DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER,
DELETE_FILES_OPERATOR_NAME + "[0]." + DUMMY_NAME + "." + DELETE_FILE_FAILED_COUNTER,
0L)
.put(
DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER,
DELETE_FILES_OPERATOR_NAME
+ "[0]."
+ DUMMY_NAME
+ "."
+ DELETE_FILE_SUCCEEDED_COUNTER,
0L)
.build());
}
Expand Down Expand Up @@ -212,14 +213,14 @@ void testMetrics() throws Exception {
new ImmutableMap.Builder<String, Long>()
.put(
DELETE_FILES_OPERATOR_NAME
+ "."
+ "[0]."
+ DUMMY_NAME
+ "."
+ DELETE_FILE_FAILED_COUNTER,
0L)
.put(
DELETE_FILES_OPERATOR_NAME
+ "."
+ "[0]."
+ DUMMY_NAME
+ "."
+ DELETE_FILE_SUCCEEDED_COUNTER,
Expand All @@ -230,6 +231,10 @@ void testMetrics() throws Exception {
private static boolean checkDeleteFinished(Long expectedDeleteNum) {
return expectedDeleteNum.equals(
MetricsReporterFactoryForTests.counter(
DELETE_FILES_OPERATOR_NAME + "." + DUMMY_NAME + "." + DELETE_FILE_SUCCEEDED_COUNTER));
DELETE_FILES_OPERATOR_NAME
+ "[0]."
+ DUMMY_NAME
+ "."
+ DELETE_FILE_SUCCEEDED_COUNTER));
}
}

0 comments on commit b6dae52

Please sign in to comment.