Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Sep 18, 2024
1 parent ea3366e commit fa56618
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/** Deletes expired snapshots and the corresponding files. */
public class ExpireSnapshots {
private static final int DELETE_BATCH_SIZE_DEFAULT = 10;
private static final int DELETE_BATCH_SIZE_DEFAULT = 1000;
private static final String EXECUTOR_OPERATOR_NAME = "Expire Snapshot";
@VisibleForTesting static final String DELETE_FILES_OPERATOR_NAME = "Delete file";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

/** Creates the table maintenance graph. */
public class TableMaintenance {
private static final String TASK_NAME_FORMAT = "%s [%d]";
static final String SOURCE_OPERATOR_NAME = "Monitor source";
static final String TRIGGER_MANAGER_OPERATOR_NAME = "Trigger manager";
static final String WATERMARK_ASSIGNER_OPERATOR_NAME = "Watermark Assigner";
Expand Down Expand Up @@ -305,7 +304,7 @@ private DataStream<TableChange> changeStream() {
}

private static String nameFor(MaintenanceTaskBuilder<?> streamBuilder, int taskId) {
return String.format(TASK_NAME_FORMAT, streamBuilder.getClass().getSimpleName(), taskId);
return String.format("%s [%d]", streamBuilder.getClass().getSimpleName(), taskId);
}

@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand All @@ -32,10 +31,11 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Delete the files using the {@link FileIO}. */
/** Delete the files using the {@link FileIO} which implements {@link SupportsBulkOperations}. */
@Internal
public class DeleteFilesProcessor extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<String, Void> {
Expand Down Expand Up @@ -116,7 +116,8 @@ private void deleteFiles() {
"Deleted only {} of {} files from table {} using bulk deletes",
deletedFilesCount,
filesToDelete.size(),
tableName);
tableName,
e);
succeededCounter.inc(deletedFilesCount);
failedCounter.inc(e.numberFailedObjects());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
*/
package org.apache.iceberg.flink.maintenance.operator;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

/** The result of a single Maintenance Task. */
@Internal
public class TaskResult {
public class TaskResult implements Serializable {
private final int taskIndex;
private final long startEpoch;
private final boolean success;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.maintenance.operator.MetricsReporterFactoryForTests;
import org.apache.iceberg.flink.maintenance.operator.Trigger;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -126,7 +125,7 @@ void testFailure() throws Exception {
assertThat(infra.sink().poll(Duration.ofSeconds(5)).success()).isTrue();

// Drop the table, so it will cause an exception
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
dropTable();

// Failed run
infra.source().sendRecord(Trigger.create(time + 1, serializableTable, 1), time + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ protected void insert(Table table, Integer id, String data) throws IOException {
table.refresh();
}

protected void dropTable() {
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
}

protected TableLoader tableLoader() {
return CATALOG_EXTENSION.tableLoader();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,10 @@ class TestDeleteFilesProcessor extends OperatorTestBase {
"metadata/.v1.metadata.json.crc");

private Table table;
private TableLoader tableLoader;

@BeforeEach
void before() throws IOException {
void before() {
this.table = createTable();
this.tableLoader = tableLoader();
}

@Test
Expand All @@ -66,7 +64,7 @@ void testDelete() throws Exception {
.contains(DUMMY_FILE_NAME)
.hasSize(TABLE_FILES.size() + 1);

deleteFile(tableLoader, dummyFile.toString());
deleteFile(tableLoader(), dummyFile.toString());

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
Expand All @@ -76,14 +74,14 @@ void testDeleteMissingFile() throws Exception {
Path dummyFile =
FileSystems.getDefault().getPath(table.location().substring(5), DUMMY_FILE_NAME);

deleteFile(tableLoader, dummyFile.toString());
deleteFile(tableLoader(), dummyFile.toString());

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}

@Test
void testInvalidURIScheme() throws Exception {
deleteFile(tableLoader, "wrong://");
deleteFile(tableLoader(), "wrong://");

assertThat(listFiles(table)).isEqualTo(TABLE_FILES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.iceberg.SerializableTable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -52,7 +51,7 @@ void testExpire(boolean success) throws Exception {

if (!success) {
// Cause an exception
CATALOG_EXTENSION.catalogLoader().loadCatalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
dropTable();
}

testHarness.processElement(
Expand Down

0 comments on commit fa56618

Please sign in to comment.