From 967edcb977c4f0c83b3e9a50843727e68087fa49 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Wed, 20 Sep 2023 21:03:59 -0700 Subject: [PATCH] refresh table on get in supplier --- .../flink/sink/CachingTableSupplier.java | 9 ++++++--- .../apache/iceberg/flink/sink/FlinkSink.java | 2 +- .../flink/sink/IcebergStreamWriter.java | 18 +----------------- .../flink/sink/RowDataTaskWriterFactory.java | 11 +++++++++-- .../flink/sink/TestCachingTableSupplier.java | 2 -- 5 files changed, 17 insertions(+), 25 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java index 6ae204ce6eb9..3b5fec104469 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -56,15 +56,16 @@ class CachingTableSupplier implements SerializableSupplier { this.lastLoadTimeMillis = System.currentTimeMillis(); } + public Table initialTable() { + return initialTable; + } + @Override public Table get() { if (table == null) { this.table = initialTable; } - return table; - } - public void refresh() { if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) { try { if (!tableLoader.isOpen()) { @@ -83,5 +84,7 @@ public void refresh() { LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e); } } + + return table; } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index 0e486df27981..58828799255d 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -614,7 +614,7 @@ static IcebergStreamWriter createStreamWriter( equalityFieldIds, flinkWriteConf.upsertMode()); - return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory, tableSupplier); + return new IcebergStreamWriter<>(initTable.name(), taskWriterFactory); } /** diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java index fa36f2dfd3d9..e4f51ec7d0d7 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java @@ -20,13 +20,11 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.iceberg.Table; import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; @@ -38,18 +36,15 @@ class IcebergStreamWriter extends AbstractStreamOperator private final String fullTableName; private final TaskWriterFactory taskWriterFactory; - private final Supplier
tableSupplier; private transient TaskWriter writer; private transient int subTaskId; private transient int attemptId; private transient IcebergStreamWriterMetrics writerMetrics; - IcebergStreamWriter( - String fullTableName, TaskWriterFactory taskWriterFactory, Supplier
tableSupplier) { + IcebergStreamWriter(String fullTableName, TaskWriterFactory taskWriterFactory) { this.fullTableName = fullTableName; this.taskWriterFactory = taskWriterFactory; - this.tableSupplier = tableSupplier; setChainingStrategy(ChainingStrategy.ALWAYS); } @@ -63,11 +58,6 @@ public void open() { // schema and partition spec are used. this.taskWriterFactory.initialize(subTaskId, attemptId); - // Refresh the table if needed. - if (tableSupplier instanceof CachingTableSupplier) { - ((CachingTableSupplier) tableSupplier).refresh(); - } - // Initialize the task writer. this.writer = taskWriterFactory.create(); } @@ -75,12 +65,6 @@ public void open() { @Override public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { flush(); - - // Refresh the table if needed. - if (tableSupplier instanceof CachingTableSupplier) { - ((CachingTableSupplier) tableSupplier).refresh(); - } - this.writer = taskWriterFactory.create(); } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index a9764fd94181..c6742aac99c0 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -81,8 +81,15 @@ public RowDataTaskWriterFactory( List equalityFieldIds, boolean upsert) { this.tableSupplier = tableSupplier; - // rely on the initial table metadata for schema, etc., until schema evolution is supported - Table table = tableSupplier.get(); + + Table table; + if (tableSupplier instanceof CachingTableSupplier) { + // rely on the initial table metadata for schema, etc., until schema evolution is supported + table = ((CachingTableSupplier) tableSupplier).initialTable(); + } else { + table = tableSupplier.get(); + } + this.schema = table.schema(); this.flinkSchema = flinkSchema; this.spec = table.spec(); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java index 78d3ae2df3b6..794e7b721032 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java @@ -66,7 +66,6 @@ public void testTableReload() { new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); // refresh shouldn't do anything as the min reload interval hasn't passed - cachingTableSupplier.refresh(); assertThat(cachingTableSupplier.get()).isEqualTo(initialTable); // refresh after waiting past the min reload interval @@ -74,7 +73,6 @@ public void testTableReload() { .atLeast(100, TimeUnit.MILLISECONDS) .untilAsserted( () -> { - cachingTableSupplier.refresh(); assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable); }); }