Skip to content

Commit

Permalink
Steven's new comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 26, 2024
1 parent 415dc90 commit c0097a6
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ DataStream<TaskResult> append(DataStream<Trigger> trigger) {
.transform(
operatorName(DELETE_FILES_OPERATOR_NAME),
TypeInformation.of(Void.class),
new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize))
new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize))
.uid(DELETE_FILES_OPERATOR_NAME + uidSuffix())
.slotSharingGroup(slotSharingGroup())
.setParallelism(parallelism());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ DataStream<TaskResult> append(
slotSharingGroup = mainSlotSharingGroup;
}

tableLoader.open();

return append(sourceStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.maintenance.api;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
Expand Down Expand Up @@ -57,9 +58,7 @@ public class TableMaintenance {
static final String FILTER_OPERATOR_NAME_PREFIX = "Filter ";
static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover";

private TableMaintenance() {
// Do not instantiate directly
}
private TableMaintenance() {}

/**
* Use when the change stream is already provided, like in the {@link
Expand Down Expand Up @@ -214,7 +213,7 @@ public Builder add(MaintenanceTaskBuilder<?> task) {
}

/** Builds the task graph for the maintenance tasks. */
public void append() {
public void append() throws IOException {
Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task");
Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null");

Expand All @@ -225,73 +224,67 @@ public void append() {
evaluators.add(taskBuilders.get(i).evaluator());
}

DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true)
.process(
new TriggerManager(
tableLoader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();

// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int finalIndex = i;
DataStream<Trigger> filtered =
triggers
.filter(t -> t.taskId() != null && t.taskId() == finalIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + i)
try (TableLoader loader = tableLoader.clone()) {
DataStream<Trigger> triggers =
DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true)
.process(
new TriggerManager(
loader,
lockFactory,
taskNames,
evaluators,
rateLimit.toMillis(),
lockCheckDelay.toMillis()))
.name(TRIGGER_MANAGER_OPERATOR_NAME)
.uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel()
.uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
DataStream<TaskResult> result =
builder.append(
filtered,
i,
taskNames.get(i),
tableLoader,
uidSuffix,
slotSharingGroup,
parallelism);
if (unioned == null) {
unioned = result;
} else {
unioned = unioned.union(result);
.assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy())
.name(WATERMARK_ASSIGNER_OPERATOR_NAME)
.uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
.forceNonParallel();

// Add the specific tasks
DataStream<TaskResult> unioned = null;
for (int i = 0; i < taskBuilders.size(); ++i) {
int finalIndex = i;
DataStream<Trigger> filtered =
triggers
.filter(t -> t.taskId() != null && t.taskId() == finalIndex)
.name(FILTER_OPERATOR_NAME_PREFIX + i)
.forceNonParallel()
.uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
MaintenanceTaskBuilder<?> builder = taskBuilders.get(i);
DataStream<TaskResult> result =
builder.append(
filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism);
if (unioned == null) {
unioned = result;
} else {
unioned = unioned.union(result);
}
}
}

// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
// Add the LockRemover to the end
unioned
.transform(
LOCK_REMOVER_OPERATOR_NAME,
TypeInformation.of(Void.class),
new LockRemover(lockFactory, taskNames))
.forceNonParallel()
.uid("lock-remover-" + uidSuffix)
.slotSharingGroup(slotSharingGroup);
}
}

private DataStream<TableChange> changeStream() {
private DataStream<TableChange> changeStream(TableLoader loader) {
if (inputStream == null) {
// Create a monitor source to provide the TableChange stream
MonitorSource source =
new MonitorSource(
tableLoader,
RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()),
maxReadBack);
loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack);
return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME)
.uid(SOURCE_OPERATOR_NAME + uidSuffix)
.slotSharingGroup(slotSharingGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
Expand All @@ -50,12 +49,10 @@ public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
private transient Counter failedCounter;
private transient Counter succeededCounter;

public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize) {
public DeleteFilesProcessor(String name, Table table, int batchSize) {
Preconditions.checkNotNull(name, "Name should no be null");
Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
Preconditions.checkNotNull(table, "Table should no be null");

tableLoader.open();
Table table = tableLoader.loadTable();
FileIO fileIO = table.io();
Preconditions.checkArgument(
fileIO instanceof SupportsBulkOperations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void processElement(Trigger trigger, Context ctx, Collector<TaskResult> o
public void close() throws Exception {
super.close();

tableLoader.close();
if (plannerPoolSize != null) {
plannerPool.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.util.Set;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.SimpleDataUtil;
Expand All @@ -47,6 +46,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase {
void before() {
MetricsReporterFactoryForTests.reset();
this.table = createTable();
tableLoader().open();
}

@Test
Expand Down Expand Up @@ -95,8 +95,6 @@ void testFailure() throws Exception {
insert(table, 1, "a");
insert(table, 2, "b");

SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

ExpireSnapshots.builder()
.append(
infra.triggerStream(),
Expand Down Expand Up @@ -130,7 +128,8 @@ void testFailure() throws Exception {
closeJobClient(jobClient);
}

// Check the metrics
// Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has
// no max age of number of snapshots set, so no files are removed
MetricsReporterFactoryForTests.assertCounters(
new ImmutableMap.Builder<String, Long>()
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ void testMetrics() throws Exception {
}

@Test
void testUidAndSlotSharingGroup() {
void testUidAndSlotSharingGroup() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
Expand All @@ -243,7 +243,7 @@ void testUidAndSlotSharingGroup() {
}

@Test
void testUidAndSlotSharingGroupUnset() {
void testUidAndSlotSharingGroupUnset() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
Expand All @@ -256,7 +256,7 @@ void testUidAndSlotSharingGroupUnset() {
}

@Test
void testUidAndSlotSharingGroupInherit() {
void testUidAndSlotSharingGroupInherit() throws IOException {
TableMaintenance.forChangeStream(
new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(),
tableLoader(),
Expand All @@ -271,7 +271,7 @@ void testUidAndSlotSharingGroupInherit() {
}

@Test
void testUidAndSlotSharingGroupOverWrite() {
void testUidAndSlotSharingGroupOverWrite() throws IOException {
String anotherUid = "Another-UID";
String anotherSlotSharingGroup = "Another-SlotSharingGroup";
TableMaintenance.forChangeStream(
Expand Down Expand Up @@ -312,7 +312,7 @@ void testUidAndSlotSharingGroupOverWrite() {
}

@Test
void testUidAndSlotSharingGroupForMonitorSource() {
void testUidAndSlotSharingGroupForMonitorSource() throws IOException {
TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY)
.uidSuffix(UID_SUFFIX)
.slotSharingGroup(SLOT_SHARING_GROUP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ void testInvalidURIScheme() throws Exception {
}

private void deleteFile(TableLoader tableLoader, String fileName) throws Exception {
tableLoader().open();
try (OneInputStreamOperatorTestHarness<String, Void> testHarness =
new OneInputStreamOperatorTestHarness<>(
new DeleteFilesProcessor(DUMMY_NAME, tableLoader, 10), StringSerializer.INSTANCE)) {
new DeleteFilesProcessor(DUMMY_NAME, tableLoader.loadTable(), 10),
StringSerializer.INSTANCE)) {
testHarness.open();
testHarness.processElement(fileName, System.currentTimeMillis());
testHarness.processWatermark(EVENT_TIME);
Expand Down

0 comments on commit c0097a6

Please sign in to comment.