Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 18, 2024
1 parent ea3366e commit 3952792
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/** Deletes expired snapshots and the corresponding files. */
public class ExpireSnapshots {
private static final int DELETE_BATCH_SIZE_DEFAULT = 10;
private static final int DELETE_BATCH_SIZE_DEFAULT = 1000;
private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot";
@VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.time.Duration;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.iceberg.flink.TableLoader;
Expand Down Expand Up @@ -152,7 +153,7 @@ public T slotSharingGroup(String newSlotSharingGroup) {
* @param newParallelism the required parallelism
*/
public T parallelism(int newParallelism) {
Preconditions.checkArgument(newParallelism > 0, "Parallelism should be greater than 0");
OperatorValidationUtils.validateParallelism(newParallelism);
this.parallelism = newParallelism;
return (T) this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.dag.Transformation;
Expand All @@ -53,7 +54,6 @@

/** Creates the table maintenance graph. */
public class TableMaintenance {
private static final String TASK_NAME_FORMAT = "%s [%d]";
static final String SOURCE_OPERATOR_NAME = "Monitor source";
static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager";
static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner";
Expand Down Expand Up @@ -113,7 +113,7 @@ public static class Builder {
private String slotSharingGroup = StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP;
private Duration rateLimit = Duration.ofMinutes(1);
private Duration lockCheckDelay = Duration.ofSeconds(30);
private Integer parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
private int maxReadBack = 100;

private Builder(
Expand Down Expand Up @@ -185,7 +185,7 @@ public Builder lockCheckDelay(Duration newLockCheckDelay) {
* @param newParallelism task parallelism
*/
public Builder parallelism(int newParallelism) {
Preconditions.checkArgument(newParallelism > 0, "Parallelism should be greater than 0");
OperatorValidationUtils.validateParallelism(newParallelism);
this.parallelism = newParallelism;
return this;
}
Expand Down Expand Up @@ -305,7 +305,7 @@ private DataStream<TableChange> changeStream() {
}

private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskId) {
return String.format(TASK_NAME_FORMAT, streamBuilder.getClass().getSimpleName(), taskId);
return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -32,10 +31,11 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Delete the files using the {@link FileIO}. */
/** Delete the files using the {@link FileIO} which implements {@link SupportsBulkOperations}. */
@Internal
public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<String, Void> {
Expand Down Expand Up @@ -116,7 +116,8 @@ private void deleteFiles() {
"Deleted only {} of {} files from table {} using bulk deletes",
deletedFilesCount,
filesToDelete.size(),
tableName);
tableName,
e);
succeededCounter.inc(deletedFilesCount);
failedCounter.inc(e.numberFailedObjects());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/** The result of a single Maintenance Task. */
@Internal
public class TaskResult {
public class TaskResult implements Serializable {
private final int taskIndex;
private final long startEpoch;
private final boolean success;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests;
import org.apache.iceberg.flink.maintenance.operator.Trigger;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -126,7 +125,7 @@ void testFailure() throws Exception {
assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue();

// Drop the table, so it will cause an exception
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
dropTable();

// Failed run
infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ protected void insert(Table table, Integer id, String data) throws IOException {
table.refresh();
}

protected void dropTable() {
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}

protected TableLoader tableLoader() {
return CATALOG_EXTENSION.tableLoader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystems;
Expand All @@ -33,6 +32,7 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -46,12 +46,10 @@ class TestDeleteFilesProcessor extends OperatorTestBase {
"metadata/.v1.metadata.json.crc");

private Table table;
private TableLoader tableLoader;

@BeforeEach
void before() throws IOException {
void before() {
this.table = createTable();
this.tableLoader = tableLoader();
}

@Test
Expand All @@ -66,7 +64,7 @@ void testDelete() throws Exception {
.contains(DUMMY_FILE_NAME)
.hasSize(TABLE_FILES.size() + 1);

deleteFile(tableLoader, dummyFile.toString());
deleteFile(tableLoader(), dummyFile.toString());

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
Expand All @@ -76,14 +74,14 @@ void testDeleteMissingFile() throws Exception {
Path dummyFile =
FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME);

deleteFile(tableLoader, dummyFile.toString());
deleteFile(tableLoader(), dummyFile.toString());

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}

@Test
void testInvalidURIScheme() throws Exception {
deleteFile(tableLoader, "wrong://");
deleteFile(tableLoader(), "wrong://");

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -52,7 +51,7 @@ void testExpire(boolean success) throws Exception {

if (!success) {
// Cause an exception
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
dropTable();
}

testHarness.processElement(
Expand Down

0 comments on commit 3952792

Please sign in to comment.