From 3237c371b798c91e8ccba95a2726e2343c07d457 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Mon, 30 Sep 2024 11:19:10 +0200 Subject: [PATCH] Rod's comments --- .../maintenance/api/MaintenanceTaskBuilder.java | 6 +++--- .../flink/maintenance/api/TableMaintenance.java | 16 ++++------------ 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index 9c20067e9c59..63076876e74d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -19,8 +19,8 @@ package org.apache.iceberg.flink.maintenance.api; import java.time.Duration; +import org.apache.flink.annotation.Experimental; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -28,7 +28,7 @@ import org.apache.iceberg.flink.maintenance.operator.TriggerEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -@PublicEvolving +@Experimental @SuppressWarnings("unchecked") public abstract class MaintenanceTaskBuilder> { private int index; @@ -199,7 +199,7 @@ DataStream append( String mainUidSuffix, String mainSlotSharingGroup, int mainParallelism) { - Preconditions.checkNotNull(defaultTaskName, "Name should not be null"); + Preconditions.checkNotNull(defaultTaskName, "Task name should not be null"); Preconditions.checkNotNull(newTableLoader, "TableLoader should not be null"); this.index = defaultTaskIndex; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index fab7325d168b..5d675d82dc75 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -78,7 +78,7 @@ public static Builder forChangeStream( Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); - return new Builder(changeStream, tableLoader, lockFactory); + return new Builder(null, changeStream, tableLoader, lockFactory); } /** @@ -96,7 +96,7 @@ public static Builder forTable( Preconditions.checkNotNull(tableLoader, "TableLoader should not be null"); Preconditions.checkNotNull(lockFactory, "LockFactory should not be null"); - return new Builder(env, tableLoader, lockFactory); + return new Builder(env, null, tableLoader, lockFactory); } public static class Builder { @@ -114,19 +114,11 @@ public static class Builder { private int maxReadBack = 100; private Builder( - StreamExecutionEnvironment env, TableLoader tableLoader, TriggerLockFactory lockFactory) { - this.env = env; - this.inputStream = null; - this.tableLoader = tableLoader; - this.lockFactory = lockFactory; - this.taskBuilders = Lists.newArrayListWithCapacity(4); - } - - private Builder( + StreamExecutionEnvironment env, DataStream inputStream, TableLoader tableLoader, TriggerLockFactory lockFactory) { - this.env = null; + this.env = env; this.inputStream = inputStream; this.tableLoader = tableLoader; this.lockFactory = lockFactory;