From fa56618a58a7c11a9edc8bcf5917b3b2a9cb3ecb Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 18 Sep 2024 20:47:24 +0200 Subject: [PATCH] Small fixes --- .../iceberg/flink/maintenance/api/ExpireSnapshots.java | 2 +- .../flink/maintenance/api/TableMaintenance.java | 3 +-- .../maintenance/operator/DeleteFilesProcessor.java | 7 ++++--- .../iceberg/flink/maintenance/operator/TaskResult.java | 3 ++- .../flink/maintenance/api/TestExpireSnapshots.java | 3 +-- .../flink/maintenance/operator/OperatorTestBase.java | 4 ++++ .../maintenance/operator/TestDeleteFilesProcessor.java | 10 ++++------ .../operator/TestExpireSnapshotsProcessor.java | 3 +-- 8 files changed, 18 insertions(+), 17 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index 9109a9f1a97f..a640e6f2188d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -33,7 +33,7 @@ /** Deletes expired snapshots and the corresponding files. */ public class ExpireSnapshots { - private static final int DELETE_BATCH_SIZE_DEFAULT = 10; + private static final int DELETE_BATCH_SIZE_DEFAULT = 1000; private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot"; @VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file"; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 6a0909c85040..59433ca10308 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -53,7 +53,6 @@ /** Creates the table maintenance graph. */ public class TableMaintenance { - private static final String TASK_NAME_FORMAT = "%s [%d]"; static final String SOURCE_OPERATOR_NAME = "Monitor source"; static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; @@ -305,7 +304,7 @@ private DataStream changeStream() { } private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskId) { - return String.format(TASK_NAME_FORMAT, streamBuilder.getClass().getSimpleName(), taskId); + return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId); } @Internal diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index 6f01f8ac46be..f2705c3727c7 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -21,7 +21,6 @@ import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; -import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -32,10 +31,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Delete the files using the {@link FileIO}. */ +/** Delete the files using the {@link FileIO} which implements {@link SupportsBulkOperations}. */ @Internal public class DeleteFilesProcessor extends AbstractStreamOperator implements OneInputStreamOperator { @@ -116,7 +116,8 @@ private void deleteFiles() { "Deleted only {} of {} files from table {} using bulk deletes", deletedFilesCount, filesToDelete.size(), - tableName); + tableName, + e); succeededCounter.inc(deletedFilesCount); failedCounter.inc(e.numberFailedObjects()); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java index 06f10f1c1d68..1568b6965e1c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java @@ -18,13 +18,14 @@ */ package org.apache.iceberg.flink.maintenance.operator; +import java.io.Serializable; import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** The result of a single Maintenance Task. */ @Internal -public class TaskResult { +public class TaskResult implements Serializable { private final int taskIndex; private final long startEpoch; private final boolean success; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index 67970862a416..5fe027a3e73d 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -32,7 +32,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; import org.apache.iceberg.flink.maintenance.operator.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -126,7 +125,7 @@ void testFailure() throws Exception { assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); // Drop the table, so it will cause an exception - CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + dropTable(); // Failed run infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index e251625dd2c1..067bd3962fe2 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -128,6 +128,10 @@ protected void insert(Table table, Integer id, String data) throws IOException { table.refresh(); } + protected void dropTable() { + CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + protected TableLoader tableLoader() { return CATALOG_EXTENSION.tableLoader(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java index 9d6ea438ad49..d4ff8ea6048b 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -46,12 +46,10 @@ class TestDeleteFilesProcessor extends OperatorTestBase { "metadata/.v1.metadata.json.crc"); private Table table; - private TableLoader tableLoader; @BeforeEach - void before() throws IOException { + void before() { this.table = createTable(); - this.tableLoader = tableLoader(); } @Test @@ -66,7 +64,7 @@ void testDelete() throws Exception { .contains(DUMMY_FILE_NAME) .hasSize(TABLE_FILES.size() + 1); - deleteFile(tableLoader, dummyFile.toString()); + deleteFile(tableLoader(), dummyFile.toString()); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } @@ -76,14 +74,14 @@ void testDeleteMissingFile() throws Exception { Path dummyFile = FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME); - deleteFile(tableLoader, dummyFile.toString()); + deleteFile(tableLoader(), dummyFile.toString()); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } @Test void testInvalidURIScheme() throws Exception { - deleteFile(tableLoader, "wrong://"); + deleteFile(tableLoader(), "wrong://"); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java index f357924f5aa2..4167a4010de9 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -29,7 +29,6 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -52,7 +51,7 @@ void testExpire(boolean success) throws Exception { if (!success) { // Cause an exception - CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + dropTable(); } testHarness.processElement(