diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index da464f2215d8..c42195f9d6e8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -112,7 +112,7 @@ DataStream append(DataStream trigger) { .transform( operatorName(DELETE_FILES_OPERATOR_NAME), TypeInformation.of(Void.class), - new DeleteFilesProcessor(name(), tableLoader(), deleteBatchSize)) + new DeleteFilesProcessor(name(), tableLoader().loadTable(), deleteBatchSize)) .uid(DELETE_FILES_OPERATOR_NAME + uidSuffix()) .slotSharingGroup(slotSharingGroup()) .setParallelism(parallelism()); diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index ac4ba7642b5d..9c20067e9c59 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -218,8 +218,6 @@ DataStream append( slotSharingGroup = mainSlotSharingGroup; } - tableLoader.open(); - return append(sourceStream); } } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 25d4754783b9..fab7325d168b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink.maintenance.api; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.UUID; @@ -57,9 +58,7 @@ public class TableMaintenance { static final String FILTER_OPERATOR_NAME_PREFIX = "Filter "; static final String LOCK_REMOVER_OPERATOR_NAME = "Lock remover"; - private TableMaintenance() { - // Do not instantiate directly - } + private TableMaintenance() {} /** * Use when the change stream is already provided, like in the {@link @@ -214,7 +213,7 @@ public Builder add(MaintenanceTaskBuilder task) { } /** Builds the task graph for the maintenance tasks. */ - public void append() { + public void append() throws IOException { Preconditions.checkArgument(!taskBuilders.isEmpty(), "Provide at least one task"); Preconditions.checkNotNull(uidSuffix, "Uid suffix should no be null"); @@ -225,73 +224,67 @@ public void append() { evaluators.add(taskBuilders.get(i).evaluator()); } - DataStream triggers = - DataStreamUtils.reinterpretAsKeyedStream(changeStream(), unused -> true) - .process( - new TriggerManager( - tableLoader, - lockFactory, - taskNames, - evaluators, - rateLimit.toMillis(), - lockCheckDelay.toMillis())) - .name(TRIGGER_MANAGER_OPERATOR_NAME) - .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) - .slotSharingGroup(slotSharingGroup) - .forceNonParallel() - .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) - .name(WATERMARK_ASSIGNER_OPERATOR_NAME) - .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) - .slotSharingGroup(slotSharingGroup) - .forceNonParallel(); - - // Add the specific tasks - DataStream unioned = null; - for (int i = 0; i < taskBuilders.size(); ++i) { - int finalIndex = i; - DataStream filtered = - triggers - .filter(t -> t.taskId() != null && t.taskId() == finalIndex) - .name(FILTER_OPERATOR_NAME_PREFIX + i) + try (TableLoader loader = tableLoader.clone()) { + DataStream triggers = + DataStreamUtils.reinterpretAsKeyedStream(changeStream(loader), unused -> true) + .process( + new TriggerManager( + loader, + lockFactory, + taskNames, + evaluators, + rateLimit.toMillis(), + lockCheckDelay.toMillis())) + .name(TRIGGER_MANAGER_OPERATOR_NAME) + .uid(TRIGGER_MANAGER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) .forceNonParallel() - .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix) - .slotSharingGroup(slotSharingGroup); - MaintenanceTaskBuilder builder = taskBuilders.get(i); - DataStream result = - builder.append( - filtered, - i, - taskNames.get(i), - tableLoader, - uidSuffix, - slotSharingGroup, - parallelism); - if (unioned == null) { - unioned = result; - } else { - unioned = unioned.union(result); + .assignTimestampsAndWatermarks(new PunctuatedWatermarkStrategy()) + .name(WATERMARK_ASSIGNER_OPERATOR_NAME) + .uid(WATERMARK_ASSIGNER_OPERATOR_NAME + uidSuffix) + .slotSharingGroup(slotSharingGroup) + .forceNonParallel(); + + // Add the specific tasks + DataStream unioned = null; + for (int i = 0; i < taskBuilders.size(); ++i) { + int finalIndex = i; + DataStream filtered = + triggers + .filter(t -> t.taskId() != null && t.taskId() == finalIndex) + .name(FILTER_OPERATOR_NAME_PREFIX + i) + .forceNonParallel() + .uid(FILTER_OPERATOR_NAME_PREFIX + i + "-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + MaintenanceTaskBuilder builder = taskBuilders.get(i); + DataStream result = + builder.append( + filtered, i, taskNames.get(i), loader, uidSuffix, slotSharingGroup, parallelism); + if (unioned == null) { + unioned = result; + } else { + unioned = unioned.union(result); + } } - } - // Add the LockRemover to the end - unioned - .transform( - LOCK_REMOVER_OPERATOR_NAME, - TypeInformation.of(Void.class), - new LockRemover(lockFactory, taskNames)) - .forceNonParallel() - .uid("lock-remover-" + uidSuffix) - .slotSharingGroup(slotSharingGroup); + // Add the LockRemover to the end + unioned + .transform( + LOCK_REMOVER_OPERATOR_NAME, + TypeInformation.of(Void.class), + new LockRemover(lockFactory, taskNames)) + .forceNonParallel() + .uid("lock-remover-" + uidSuffix) + .slotSharingGroup(slotSharingGroup); + } } - private DataStream changeStream() { + private DataStream changeStream(TableLoader loader) { if (inputStream == null) { // Create a monitor source to provide the TableChange stream MonitorSource source = new MonitorSource( - tableLoader, - RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), - maxReadBack); + loader, RateLimiterStrategy.perSecond(1.0 / rateLimit.getSeconds()), maxReadBack); return env.fromSource(source, WatermarkStrategy.noWatermarks(), SOURCE_OPERATOR_NAME) .uid(SOURCE_OPERATOR_NAME + uidSuffix) .slotSharingGroup(slotSharingGroup) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index 8cf290708481..c3ef059e9c46 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.io.BulkDeletionFailureException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; @@ -50,12 +49,10 @@ public class DeleteFilesProcessor extends AbstractStreamOperator private transient Counter failedCounter; private transient Counter succeededCounter; - public DeleteFilesProcessor(String name, TableLoader tableLoader, int batchSize) { + public DeleteFilesProcessor(String name, Table table, int batchSize) { Preconditions.checkNotNull(name, "Name should no be null"); - Preconditions.checkNotNull(tableLoader, "Table loader should no be null"); + Preconditions.checkNotNull(table, "Table should no be null"); - tableLoader.open(); - Table table = tableLoader.loadTable(); FileIO fileIO = table.io(); Preconditions.checkArgument( fileIO instanceof SupportsBulkOperations, diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java index 58581e2a7e6c..a09d0244e95d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/ExpireSnapshotsProcessor.java @@ -121,6 +121,7 @@ public void processElement(Trigger trigger, Context ctx, Collector o public void close() throws Exception { super.close(); + tableLoader.close(); if (plannerPoolSize != null) { plannerPool.shutdown(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index 31273f9d7d0d..bc41bc9f7e06 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -28,7 +28,6 @@ import java.util.Set; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.graph.StreamGraphGenerator; -import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.flink.SimpleDataUtil; @@ -47,6 +46,7 @@ class TestExpireSnapshots extends MaintenanceTaskTestBase { void before() { MetricsReporterFactoryForTests.reset(); this.table = createTable(); + tableLoader().open(); } @Test @@ -95,8 +95,6 @@ void testFailure() throws Exception { insert(table, 1, "a"); insert(table, 2, "b"); - SerializableTable serializableTable = (SerializableTable) SerializableTable.copyOf(table); - ExpireSnapshots.builder() .append( infra.triggerStream(), @@ -130,7 +128,8 @@ void testFailure() throws Exception { closeJobClient(jobClient); } - // Check the metrics + // Check the metrics. There are no expired snapshots or data files because ExpireSnapshots has + // no max age of number of snapshots set, so no files are removed MetricsReporterFactoryForTests.assertCounters( new ImmutableMap.Builder() .put( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java index fee0b65a3754..f4c1f8380e89 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestTableMaintenance.java @@ -224,7 +224,7 @@ void testMetrics() throws Exception { } @Test - void testUidAndSlotSharingGroup() { + void testUidAndSlotSharingGroup() throws IOException { TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader(), @@ -243,7 +243,7 @@ void testUidAndSlotSharingGroup() { } @Test - void testUidAndSlotSharingGroupUnset() { + void testUidAndSlotSharingGroupUnset() throws IOException { TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader(), @@ -256,7 +256,7 @@ void testUidAndSlotSharingGroupUnset() { } @Test - void testUidAndSlotSharingGroupInherit() { + void testUidAndSlotSharingGroupInherit() throws IOException { TableMaintenance.forChangeStream( new ManualSource<>(env, TypeInformation.of(TableChange.class)).dataStream(), tableLoader(), @@ -271,7 +271,7 @@ void testUidAndSlotSharingGroupInherit() { } @Test - void testUidAndSlotSharingGroupOverWrite() { + void testUidAndSlotSharingGroupOverWrite() throws IOException { String anotherUid = "Another-UID"; String anotherSlotSharingGroup = "Another-SlotSharingGroup"; TableMaintenance.forChangeStream( @@ -312,7 +312,7 @@ void testUidAndSlotSharingGroupOverWrite() { } @Test - void testUidAndSlotSharingGroupForMonitorSource() { + void testUidAndSlotSharingGroupForMonitorSource() throws IOException { TableMaintenance.forTable(env, tableLoader(), LOCK_FACTORY) .uidSuffix(UID_SUFFIX) .slotSharingGroup(SLOT_SHARING_GROUP) diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java index 3f0cccf08718..1160f6bff601 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -87,9 +87,11 @@ void testInvalidURIScheme() throws Exception { } private void deleteFile(TableLoader tableLoader, String fileName) throws Exception { + tableLoader().open(); try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness<>( - new DeleteFilesProcessor(DUMMY_NAME, tableLoader, 10), StringSerializer.INSTANCE)) { + new DeleteFilesProcessor(DUMMY_NAME, tableLoader.loadTable(), 10), + StringSerializer.INSTANCE)) { testHarness.open(); testHarness.processElement(fileName, System.currentTimeMillis()); testHarness.processWatermark(EVENT_TIME);