Skip to content

Commit

Permalink
Remove Table from the Trigger
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 26, 2024
1 parent b6dae52 commit 415dc90
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@PublicEvolving
@SuppressWarnings("unchecked")
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> {
private int index;
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,33 @@
package org.apache.iceberg.flink.maintenance.api;

import org.apache.flink.annotation.Internal;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

public class Trigger {
private final long timestamp;
private final SerializableTable table;
private final Integer taskId;
private final boolean isRecovery;

private Trigger(long timestamp, SerializableTable table, Integer taskId, boolean isRecovery) {
private Trigger(long timestamp, Integer taskId, boolean isRecovery) {
this.timestamp = timestamp;
this.table = table;
this.taskId = taskId;
this.isRecovery = isRecovery;
}

@Internal
public static Trigger create(long timestamp, SerializableTable table, int taskId) {
return new Trigger(timestamp, table, taskId, false);
public static Trigger create(long timestamp, int taskId) {
return new Trigger(timestamp, taskId, false);
}

@Internal
public static Trigger recovery(long timestamp) {
return new Trigger(timestamp, null, null, true);
return new Trigger(timestamp, null, true);
}

public long timestamp() {
return timestamp;
}

public SerializableTable table() {
return table;
}

public Integer taskId() {
return taskId;
}
Expand All @@ -65,7 +58,6 @@ public boolean isRecovery() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("timestamp", timestamp)
.add("table", table == null ? null : table.name())
.add("taskId", taskId)
.add("isRecovery", isRecovery)
.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
Expand All @@ -63,7 +62,7 @@ public class TriggerManager extends KeyedProcessFunction<Boolean, TableChange, T
implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(TriggerManager.class);

private final TableLoader tableLoader;
private final String tableName;
private final TriggerLockFactory lockFactory;
private final List<String> maintenanceTaskNames;
private final List<TriggerEvaluator> evaluators;
Expand Down Expand Up @@ -112,7 +111,8 @@ public TriggerManager(
Preconditions.checkArgument(
lockCheckDelayMs > 0, "Minimum lock delay rate should be at least 1 ms.");

this.tableLoader = tableLoader;
tableLoader.open();
this.tableName = tableLoader.loadTable().name();
this.lockFactory = lockFactory;
this.maintenanceTaskNames = maintenanceTaskNames;
this.evaluators = evaluators;
Expand Down Expand Up @@ -161,8 +161,6 @@ public void open(Configuration parameters) throws Exception {
this.lastTriggerTimesState =
getRuntimeContext()
.getListState(new ListStateDescriptor<>("triggerManagerLastTriggerTime", Types.LONG));

tableLoader.open();
}

@Override
Expand Down Expand Up @@ -222,7 +220,6 @@ public void onTimer(long timestamp, OnTimerContext ctx, Collector<Trigger> out)

@Override
public void close() throws IOException {
tableLoader.close();
lockFactory.close();
}

Expand Down Expand Up @@ -258,10 +255,8 @@ private void checkAndFire(long current, TimerService timerService, Collector<Tri

if (lock.tryLock()) {
TableChange change = accumulatedChanges.get(taskToStart);
SerializableTable table =
(SerializableTable) SerializableTable.copyOf(tableLoader.loadTable());
out.collect(Trigger.create(current, table, taskToStart));
LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, table.name());
out.collect(Trigger.create(current, taskToStart));
LOG.debug("Fired event with time: {}, collected: {} for {}", current, change, tableName);
triggerCounters.get(taskToStart).inc();
accumulatedChanges.set(taskToStart, TableChange.empty());
lastTriggerTimes.set(taskToStart, current);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.function.Supplier;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.operator.CollectingSink;
import org.apache.iceberg.flink.maintenance.operator.ManualSource;
import org.apache.iceberg.flink.maintenance.operator.OperatorTestBase;
Expand All @@ -38,46 +36,29 @@ class MaintenanceTaskTestBase extends OperatorTestBase {

@RegisterExtension MaintenanceTaskInfraExtension infra = new MaintenanceTaskInfraExtension();

/**
* Triggers a maintenance tasks and waits for the successful result. The {@link Table} is
* refreshed for convenience reasons.
*
* @param env used for testing
* @param triggerSource used for manually emitting the trigger
* @param collectingSink used for collecting the result
* @param waitForCondition used to wait until target condition is reached before stopping the job
* @param table used for generating the payload
* @throws Exception if any
*/
void runAndWaitForSuccess(
StreamExecutionEnvironment env,
ManualSource<Trigger> triggerSource,
CollectingSink<TaskResult> collectingSink,
Supplier<Boolean> waitForCondition,
Table table)
Supplier<Boolean> waitForCondition)
throws Exception {
table.refresh();
SerializableTable payload = (SerializableTable) SerializableTable.copyOf(table);

JobClient jobClient = null;
try {
jobClient = env.executeAsync();

// Do a single task run
long time = System.currentTimeMillis();
triggerSource.sendRecord(Trigger.create(time, payload, TESTING_TASK_ID), time);
triggerSource.sendRecord(Trigger.create(time, TESTING_TASK_ID), time);

TaskResult result = collectingSink.poll(POLL_DURATION);

assertThat(result.startEpoch()).isEqualTo(time);
assertThat(result.success()).isTrue();
assertThat(result.taskIndex()).isEqualTo(TESTING_TASK_ID);

Awaitility.await().until(() -> waitForCondition.get());
Awaitility.await().until(waitForCondition::get);
} finally {
closeJobClient(jobClient);
}

table.refresh();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ void testExpireSnapshots() throws Exception {
1)
.sinkTo(infra.sink());

runAndWaitForSuccess(
infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(3L), table);
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(3L));

table.refresh();
assertThat(Sets.newHashSet(table.snapshots())).hasSize(1);
Expand Down Expand Up @@ -115,7 +114,7 @@ void testFailure() throws Exception {

// Do a single task run
long time = System.currentTimeMillis();
infra.source().sendRecord(Trigger.create(time, serializableTable, 1), time);
infra.source().sendRecord(Trigger.create(time, 1), time);

// First successful run (ensure that the operators are loaded/opened etc.)
assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue();
Expand All @@ -124,7 +123,7 @@ void testFailure() throws Exception {
dropTable();

// Failed run
infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1);
infra.source().sendRecord(Trigger.create(time + 1, 1), time + 1);

assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isFalse();
} finally {
Expand Down Expand Up @@ -202,8 +201,7 @@ void testMetrics() throws Exception {
1)
.sinkTo(infra.sink());

runAndWaitForSuccess(
infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(1L), table);
runAndWaitForSuccess(infra.env(), infra.source(), infra.sink(), () -> checkDeleteFinished(1L));

// Check the metrics
Awaitility.await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
Expand All @@ -42,7 +41,6 @@ void testExpire(boolean success) throws Exception {
Table table = createTable();
insert(table, 1, "a");
insert(table, 2, "b");
SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table);

List<TaskResult> actual;
Queue<StreamRecord<String>> deletes;
Expand All @@ -56,8 +54,7 @@ void testExpire(boolean success) throws Exception {
dropTable();
}

testHarness.processElement(
Trigger.create(10, serializableTable, 11), System.currentTimeMillis());
testHarness.processElement(Trigger.create(10, 11), System.currentTimeMillis());
deletes = testHarness.getSideOutput(ExpireSnapshotsProcessor.DELETE_STREAM);
actual = testHarness.extractOutputValues();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -37,8 +36,6 @@
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.api.TriggerLockFactory;
Expand Down Expand Up @@ -396,11 +393,7 @@ void testRecovery(boolean locked, boolean runningTask) throws Exception {
++processingTime;
testHarness.setProcessingTime(processingTime);
// Releasing lock will create a new snapshot, and we receive this in the trigger
expected.add(
Trigger.create(
processingTime,
(SerializableTable) SerializableTable.copyOf(tableLoader.loadTable()),
0));
expected.add(Trigger.create(processingTime, 0));
assertTriggers(testHarness.extractOutputValues(), expected);
}
}
Expand Down Expand Up @@ -648,17 +641,6 @@ private static void assertTriggers(List<Trigger> expected, List<Trigger> actual)
assertThat(actualTrigger.timestamp()).isEqualTo(expectedTrigger.timestamp());
assertThat(actualTrigger.taskId()).isEqualTo(expectedTrigger.taskId());
assertThat(actualTrigger.isRecovery()).isEqualTo(expectedTrigger.isRecovery());
if (expectedTrigger.table() == null) {
assertThat(actualTrigger.table()).isNull();
} else {
Iterator<Snapshot> expectedSnapshots = expectedTrigger.table().snapshots().iterator();
Iterator<Snapshot> actualSnapshots = actualTrigger.table().snapshots().iterator();
while (expectedSnapshots.hasNext()) {
assertThat(actualSnapshots.hasNext()).isTrue();
assertThat(expectedSnapshots.next().snapshotId())
.isEqualTo(actualSnapshots.next().snapshotId());
}
}
}
}
}

0 comments on commit 415dc90

Please sign in to comment.