Skip to content

Commit

Permalink
refresh table on get in supplier
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Sep 21, 2023
1 parent fe46157 commit 967edcb
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ class CachingTableSupplier implements SerializableSupplier<Table> {
this.lastLoadTimeMillis = System.currentTimeMillis();
}

public Table initialTable() {
return initialTable;
}

@Override
public Table get() {
if (table == null) {
this.table = initialTable;
}
return table;
}

public void refresh() {
if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) {
try {
if (!tableLoader.isOpen()) {
Expand All @@ -83,5 +84,7 @@ public void refresh() {
LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e);
}
}

return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ static IcebergStreamWriter<RowData> createStreamWriter(
equalityFieldIds,
flinkWriteConf.upsertMode());

return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory, tableSupplier);
return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
Expand All @@ -38,18 +36,15 @@ class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>

private final String fullTableName;
private final TaskWriterFactory<T> taskWriterFactory;
private final Supplier<Table> tableSupplier;

private transient TaskWriter<T> writer;
private transient int subTaskId;
private transient int attemptId;
private transient IcebergStreamWriterMetrics writerMetrics;

IcebergStreamWriter(
String fullTableName, TaskWriterFactory<T> taskWriterFactory, Supplier<Table> tableSupplier) {
IcebergStreamWriter(String fullTableName, TaskWriterFactory<T> taskWriterFactory) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
this.tableSupplier = tableSupplier;
setChainingStrategy(ChainingStrategy.ALWAYS);
}

Expand All @@ -63,24 +58,13 @@ public void open() {
// schema and partition spec are used.
this.taskWriterFactory.initialize(subTaskId, attemptId);

// Refresh the table if needed.
if (tableSupplier instanceof CachingTableSupplier) {
((CachingTableSupplier) tableSupplier).refresh();
}

// Initialize the task writer.
this.writer = taskWriterFactory.create();
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
flush();

// Refresh the table if needed.
if (tableSupplier instanceof CachingTableSupplier) {
((CachingTableSupplier) tableSupplier).refresh();
}

this.writer = taskWriterFactory.create();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,15 @@ public RowDataTaskWriterFactory(
List<Integer> equalityFieldIds,
boolean upsert) {
this.tableSupplier = tableSupplier;
// rely on the initial table metadata for schema, etc., until schema evolution is supported
Table table = tableSupplier.get();

Table table;
if (tableSupplier instanceof CachingTableSupplier) {
// rely on the initial table metadata for schema, etc., until schema evolution is supported
table = ((CachingTableSupplier) tableSupplier).initialTable();
} else {
table = tableSupplier.get();
}

this.schema = table.schema();
this.flinkSchema = flinkSchema;
this.spec = table.spec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,13 @@ public void testTableReload() {
new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100));

// refresh shouldn't do anything as the min reload interval hasn't passed
cachingTableSupplier.refresh();
assertThat(cachingTableSupplier.get()).isEqualTo(initialTable);

// refresh after waiting past the min reload interval
Awaitility.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
cachingTableSupplier.refresh();
assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable);
});
}
Expand Down

0 comments on commit 967edcb

Please sign in to comment.