From 3952792feff2d8e7e0f25e30211facd14a2a4be4 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 18 Sep 2024 20:47:24 +0200 Subject: [PATCH] Small fixes --- .../flink/maintenance/api/ExpireSnapshots.java | 2 +- .../maintenance/api/MaintenanceTaskBuilder.java | 3 ++- .../flink/maintenance/api/TableMaintenance.java | 8 ++++---- .../maintenance/operator/DeleteFilesProcessor.java | 7 ++++--- .../flink/maintenance/operator/TaskResult.java | 3 ++- .../flink/maintenance/api/TestExpireSnapshots.java | 3 +-- .../flink/maintenance/operator/OperatorTestBase.java | 4 ++++ .../operator/TestDeleteFilesProcessor.java | 12 +++++------- .../operator/TestExpireSnapshotsProcessor.java | 3 +-- 9 files changed, 24 insertions(+), 21 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java index 9109a9f1a97f..a640e6f2188d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/ExpireSnapshots.java @@ -33,7 +33,7 @@ /** Deletes expired snapshots and the corresponding files. */ public class ExpireSnapshots { - private static final int DELETE_BATCH_SIZE_DEFAULT = 10; + private static final int DELETE_BATCH_SIZE_DEFAULT = 1000; private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot"; @VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file"; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java index f286ef4bd898..96d5c97625f6 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/MaintenanceTaskBuilder.java @@ -21,6 +21,7 @@ import java.time.Duration; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.iceberg.flink.TableLoader; @@ -152,7 +153,7 @@ public T slotSharingGroup(String newSlotSharingGroup) { * @param newParallelism the required parallelism */ public T parallelism(int newParallelism) { - Preconditions.checkArgument(newParallelism > 0, "Parallelism should be greater than 0"); + OperatorValidationUtils.validateParallelism(newParallelism); this.parallelism = newParallelism; return (T) this; } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java index 6a0909c85040..e8b698d0cbba 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/TableMaintenance.java @@ -30,6 +30,7 @@ import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; import org.apache.flink.api.common.eventtime.WatermarkOutput; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.api.dag.Transformation; @@ -53,7 +54,6 @@ /** Creates the table maintenance graph. */ public class TableMaintenance { - private static final String TASK_NAME_FORMAT = "%s [%d]"; static final String SOURCE_OPERATOR_NAME = "Monitor source"; static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager"; static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner"; @@ -113,7 +113,7 @@ public static class Builder { private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP; private Duration rateLimit = Duration.ofMinutes(1); private Duration lockCheckDelay = Duration.ofSeconds(30); - private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT; + private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; private int maxReadBack = 100; private Builder( @@ -185,7 +185,7 @@ public Builder lockCheckDelay(Duration newLockCheckDelay) { * @param newParallelism task parallelism */ public Builder parallelism(int newParallelism) { - Preconditions.checkArgument(newParallelism > 0, "Parallelism should be greater than 0"); + OperatorValidationUtils.validateParallelism(newParallelism); this.parallelism = newParallelism; return this; } @@ -305,7 +305,7 @@ private DataStream changeStream() { } private static String nameFor(MaintenanceTaskBuilder streamBuilder, int taskId) { - return String.format(TASK_NAME_FORMAT, streamBuilder.getClass().getSimpleName(), taskId); + return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId); } @Internal diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java index 6f01f8ac46be..f2705c3727c7 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/DeleteFilesProcessor.java @@ -21,7 +21,6 @@ import java.util.Set; import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; -import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; @@ -32,10 +31,11 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** Delete the files using the {@link FileIO}. */ +/** Delete the files using the {@link FileIO} which implements {@link SupportsBulkOperations}. */ @Internal public class DeleteFilesProcessor extends AbstractStreamOperator implements OneInputStreamOperator { @@ -116,7 +116,8 @@ private void deleteFiles() { "Deleted only {} of {} files from table {} using bulk deletes", deletedFilesCount, filesToDelete.size(), - tableName); + tableName, + e); succeededCounter.inc(deletedFilesCount); failedCounter.inc(e.numberFailedObjects()); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java index 06f10f1c1d68..1568b6965e1c 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/operator/TaskResult.java @@ -18,13 +18,14 @@ */ package org.apache.iceberg.flink.maintenance.operator; +import java.io.Serializable; import java.util.List; import org.apache.flink.annotation.Internal; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; /** The result of a single Maintenance Task. */ @Internal -public class TaskResult { +public class TaskResult implements Serializable { private final int taskIndex; private final long startEpoch; private final boolean success; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java index 67970862a416..5fe027a3e73d 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/api/TestExpireSnapshots.java @@ -32,7 +32,6 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.flink.SimpleDataUtil; -import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests; import org.apache.iceberg.flink.maintenance.operator.Trigger; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -126,7 +125,7 @@ void testFailure() throws Exception { assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue(); // Drop the table, so it will cause an exception - CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + dropTable(); // Failed run infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java index e251625dd2c1..067bd3962fe2 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/OperatorTestBase.java @@ -128,6 +128,10 @@ protected void insert(Table table, Integer id, String data) throws IOException { table.refresh(); } + protected void dropTable() { + CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + } + protected TableLoader tableLoader() { return CATALOG_EXTENSION.tableLoader(); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java index 9d6ea438ad49..3f0cccf08718 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestDeleteFilesProcessor.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; -import com.google.common.collect.ImmutableSet; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.FileSystems; @@ -33,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.flink.TableLoader; import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -46,12 +46,10 @@ class TestDeleteFilesProcessor extends OperatorTestBase { "metadata/.v1.metadata.json.crc"); private Table table; - private TableLoader tableLoader; @BeforeEach - void before() throws IOException { + void before() { this.table = createTable(); - this.tableLoader = tableLoader(); } @Test @@ -66,7 +64,7 @@ void testDelete() throws Exception { .contains(DUMMY_FILE_NAME) .hasSize(TABLE_FILES.size() + 1); - deleteFile(tableLoader, dummyFile.toString()); + deleteFile(tableLoader(), dummyFile.toString()); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } @@ -76,14 +74,14 @@ void testDeleteMissingFile() throws Exception { Path dummyFile = FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME); - deleteFile(tableLoader, dummyFile.toString()); + deleteFile(tableLoader(), dummyFile.toString()); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } @Test void testInvalidURIScheme() throws Exception { - deleteFile(tableLoader, "wrong://"); + deleteFile(tableLoader(), "wrong://"); assertThat(listFiles(table)).isEqualTo(TABLE_FILES); } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java index f357924f5aa2..4167a4010de9 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/maintenance/operator/TestExpireSnapshotsProcessor.java @@ -29,7 +29,6 @@ import org.apache.iceberg.SerializableTable; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; -import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -52,7 +51,7 @@ void testExpire(boolean success) throws Exception { if (!success) { // Cause an exception - CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER); + dropTable(); } testHarness.processElement(