Skip to content

Commit

Permalink
Rod's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 30, 2024
1 parent e3dd6f4 commit 3237c37
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@PublicEvolving
@Experimental
@SuppressWarnings("unchecked")
public abstract class MaintenanceTaskBuilder<T extends MaintenanceTaskBuilder<?>> {
private int index;
Expand Down Expand Up @@ -199,7 +199,7 @@ DataStream<TaskResult> append(
String mainUidSuffix,
String mainSlotSharingGroup,
int mainParallelism) {
Preconditions.checkNotNull(defaultTaskName, "Name should not be null");
Preconditions.checkNotNull(defaultTaskName, "Task name should not be null");
Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null");

this.index = defaultTaskIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public static Builder forChangeStream(
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(changeStream, tableLoader, lockFactory);
return new Builder(null, changeStream, tableLoader, lockFactory);
}

/**
Expand All @@ -96,7 +96,7 @@ public static Builder forTable(
Preconditions.checkNotNull(tableLoader, "TableLoader should not be null");
Preconditions.checkNotNull(lockFactory, "LockFactory should not be null");

return new Builder(env, tableLoader, lockFactory);
return new Builder(env, null, tableLoader, lockFactory);
}

public static class Builder {
Expand All @@ -114,19 +114,11 @@ public static class Builder {
private int maxReadBack = 100;

private Builder(
StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) {
this.env = env;
this.inputStream = null;
this.tableLoader = tableLoader;
this.lockFactory = lockFactory;
this.taskBuilders = Lists.newArrayListWithCapacity(4);
}

private Builder(
StreamExecutionEnvironment env,
DataStream<TableChange> inputStream,
TableLoader tableLoader,
TriggerLockFactory lockFactory) {
this.env = null;
this.env = env;
this.inputStream = inputStream;
this.tableLoader = tableLoader;
this.lockFactory = lockFactory;
Expand Down

0 comments on commit 3237c37

Please sign in to comment.