Skip to content

Commit

Permalink
Steven's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Oct 24, 2024
1 parent 1b6daac commit 5a5c8ee
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize))
new DeleteFilesProcessor(
index(), taskName(), tableLoader().loadTable(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
@SuppressWarnings("unchecked")
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> {
private int index;
private String name;
private String taskName;
private String tableName;
private TableLoader tableLoader;
private String uidSuffix = null;
private String slotSharingGroup = null;
Expand Down Expand Up @@ -160,8 +161,12 @@ protected int index() {
return index;
}

protected String name() {
return name;
protected String taskName() {
return taskName;
}

protected String tableName() {
return tableName;
}

protected TableLoader tableLoader() {
Expand Down Expand Up @@ -190,29 +195,31 @@ TriggerEvaluator evaluator() {

DataStream<TaskResult> append(
DataStream<Trigger> sourceStream,
int defaultTaskIndex,
String defaultTaskName,
int taskIndex,
String taskName,
String tableName,
TableLoader newTableLoader,
String mainUidSuffix,
String mainSlotSharingGroup,
String defaultUidSuffix,
String defaultSlotSharingGroup,
int mainParallelism) {
Preconditions.checkNotNull(defaultTaskName, "Task name should not be null");
Preconditions.checkNotNull(taskName, "Task name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.index = defaultTaskIndex;
this.name = defaultTaskName;
this.index = taskIndex;
this.taskName = taskName;
this.tableName = tableName;
this.tableLoader = newTableLoader;

if (uidSuffix == null) {
uidSuffix = name + "_" + index + "_" + mainUidSuffix;
uidSuffix = this.taskName + "_" + index + "_" + defaultUidSuffix;
}

if (parallelism == null) {
parallelism = mainParallelism;
}

if (slotSharingGroup == null) {
slotSharingGroup = mainSlotSharingGroup;
slotSharingGroup = defaultSlotSharingGroup;
}

return append(sourceStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@

/** Creates the table maintenance graph. */
public class TableMaintenance {
static final String SOURCE_OPERATOR_NAME = "Monitor source";
static final String SOURCE_OPERATOR_NAME_PREFIX = "Monitor source for ";
static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager";
static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner";
static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
Expand Down Expand Up @@ -217,8 +217,11 @@ public void append() throws IOException {
}

try (TableLoader loader = tableLoader.clone()) {
loader.open();
String tableName = loader.loadTable().name();
DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true)
DataStreamUtils.reinterpretAsKeyedStream(
changeStream(tableName, loader), unused -> true)
.process(
new TriggerManager(
loader,
Expand All @@ -240,18 +243,25 @@ public void append() throws IOException {
// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int finalIndex = i;
int taskIndex = i;
DataStream<Trigger> filtered =
triggers
.filter(t -> t.taskId() != null && t.taskId() == finalIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + i)
.filter(t -> t.taskId() != null && t.taskId() == taskIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + taskIndex)
.forceNonParallel()
.uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
.uid(FILTER_OPERATOR_NAME_PREFIX + taskIndex + "-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(taskIndex);
DataStream<TaskResult> result =
builder.append(
filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism);
filtered,
taskIndex,
taskNames.get(taskIndex),
tableName,
loader,
uidSuffix,
slotSharingGroup,
parallelism);
if (unioned == null) {
unioned = result;
} else {
Expand All @@ -264,31 +274,33 @@ public void append() throws IOException {
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(lockFactory, taskNames))
new LockRemover(tableName, lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
}
}

private DataStream<TableChange> changeStream(TableLoader loader) {
private DataStream<TableChange> changeStream(String tableName, TableLoader loader) {
if (inputStream == null) {
// Create a monitor source to provide the TableChange stream
MonitorSource source =
new MonitorSource(
loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack);
return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME)
.uid(SOURCE_OPERATOR_NAME + uidSuffix)
return env.fromSource(
source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME_PREFIX + tableName)
.uid(SOURCE_OPERATOR_NAME_PREFIX + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();
} else {
return inputStream.global();
}
}
}

private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskId) {
return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId);
private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskIndex) {
return String.format(
"%s [%s]", streamBuilder.getClass().getSimpleName(), String.valueOf(taskIndex));
}
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<String, Void> {
private static final Logger LOG = LoggerFactory.getLogger(DeleteFilesProcessor.class);

private final String name;
private final String taskIndex;
private final String taskName;
private final SupportsBulkOperations io;
private final String tableName;
private final Set<String> filesToDelete = Sets.newHashSet();
Expand All @@ -49,8 +50,8 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
private transient Counter failedCounter;
private transient Counter succeededCounter;

public DeleteFilesProcessor(String name, Table table, int batchSize) {
Preconditions.checkNotNull(name, "Name should no be null");
public DeleteFilesProcessor(int taskIndex, String taskName, Table table, int batchSize) {
Preconditions.checkNotNull(taskName, "Task name should no be null");
Preconditions.checkNotNull(table, "Table should no be null");

FileIO fileIO = table.io();
Expand All @@ -59,7 +60,8 @@ public DeleteFilesProcessor(String name, Table table, int batchSize) {
"%s doesn't support bulk delete",
fileIO.getClass().getSimpleName());

this.name = name;
this.taskIndex = String.valueOf(taskIndex);
this.taskName = taskName;
this.io = (SupportsBulkOperations) fileIO;
this.tableName = table.name();
this.batchSize = batchSize;
Expand All @@ -70,12 +72,18 @@ public void open() throws Exception {
this.failedCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName)
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex)
.counter(TableMaintenanceMetrics.DELETE_FILE_FAILED_COUNTER);
this.succeededCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, taskName)
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, taskIndex)
.counter(TableMaintenanceMetrics.DELETE_FILE_SUCCEEDED_COUNTER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class LockRemover extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<TaskResult, Void> {
private static final Logger LOG = LoggerFactory.getLogger(LockRemover.class);

private final String tableName;
private final TriggerLockFactory lockFactory;
private final List<String> maintenanceTaskNames;

Expand All @@ -77,12 +78,14 @@ public class LockRemover extends AbstractStreamOperator<Void>
private transient TriggerLockFactory.Lock recoveryLock;
private transient long lastProcessedTaskStartEpoch = 0L;

public LockRemover(TriggerLockFactory lockFactory, List<String> maintenanceTaskNames) {
public LockRemover(
String tableName, TriggerLockFactory lockFactory, List<String> maintenanceTaskNames) {
Preconditions.checkNotNull(lockFactory, "Lock factory should no be null");
Preconditions.checkArgument(
maintenanceTaskNames != null && !maintenanceTaskNames.isEmpty(),
"Invalid maintenance task names: null or empty");

this.tableName = tableName;
this.lockFactory = lockFactory;
this.maintenanceTaskNames = maintenanceTaskNames;
}
Expand All @@ -94,22 +97,31 @@ public void open() throws Exception {
Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
this.failedTaskResultCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
this.taskLastRunDurationMs = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
for (String name : maintenanceTaskNames) {
for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) {
succeededTaskResultCounters.add(
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex))
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex))
.counter(TableMaintenanceMetrics.SUCCEEDED_TASK_COUNTER));
failedTaskResultCounters.add(
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex))
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex))
.counter(TableMaintenanceMetrics.FAILED_TASK_COUNTER));
AtomicLong duration = new AtomicLong(0);
taskLastRunDurationMs.add(duration);
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex))
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex))
.gauge(TableMaintenanceMetrics.LAST_RUN_DURATION_MS, duration::get);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.iceberg.flink.maintenance.operator;

public class TableMaintenanceMetrics {
public static final String GROUP_KEY = "maintenanceTask";
public static final String GROUP_VALUE_DEFAULT = "maintenanceTask";
public static final String GROUP_KEY = "maintenance";
public static final String TASK_NAME_KEY = "taskName";
public static final String TASK_INDEX_KEY = "taskIndex";
public static final String TABLE_NAME_KEY = "tableName";

// TriggerManager metrics
public static final String RATE_LIMITER_TRIGGERED = "rateLimiterTriggered";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
Expand Down Expand Up @@ -125,30 +124,32 @@ public void open(Configuration parameters) throws Exception {
this.rateLimiterTriggeredCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(
TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.counter(TableMaintenanceMetrics.RATE_LIMITER_TRIGGERED);
this.concurrentRunThrottledCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(
TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.counter(TableMaintenanceMetrics.CONCURRENT_RUN_THROTTLED);
this.nothingToTriggerCounter =
getRuntimeContext()
.getMetricGroup()
.addGroup(
TableMaintenanceMetrics.GROUP_KEY, TableMaintenanceMetrics.GROUP_VALUE_DEFAULT)
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.counter(TableMaintenanceMetrics.NOTHING_TO_TRIGGER);
this.triggerCounters =
maintenanceTaskNames.stream()
.map(
name ->
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY, name)
.counter(TableMaintenanceMetrics.TRIGGERED))
.collect(Collectors.toList());
this.triggerCounters = Lists.newArrayListWithExpectedSize(maintenanceTaskNames.size());
for (int taskIndex = 0; taskIndex < maintenanceTaskNames.size(); ++taskIndex) {
triggerCounters.add(
getRuntimeContext()
.getMetricGroup()
.addGroup(TableMaintenanceMetrics.GROUP_KEY)
.addGroup(TableMaintenanceMetrics.TABLE_NAME_KEY, tableName)
.addGroup(TableMaintenanceMetrics.TASK_NAME_KEY, maintenanceTaskNames.get(taskIndex))
.addGroup(TableMaintenanceMetrics.TASK_INDEX_KEY, String.valueOf(taskIndex))
.counter(TableMaintenanceMetrics.TRIGGERED));
}

this.nextEvaluationTimeState =
getRuntimeContext()
Expand Down
Loading

0 comments on commit 5a5c8ee

Please sign in to comment.