From 3a121bc33f06d42ab49e3573f15d3083add8d290 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Thu, 21 Sep 2023 08:56:44 -0700 Subject: [PATCH] refresh before creating task writer --- .../iceberg/flink/sink/CachingTableSupplier.java | 13 +++++++------ .../flink/sink/RowDataTaskWriterFactory.java | 10 ++++++++++ .../flink/sink/TestCachingTableSupplier.java | 2 ++ 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java index 3b5fec104469..e9f9786f9190 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java @@ -56,16 +56,19 @@ class CachingTableSupplier implements SerializableSupplier { this.lastLoadTimeMillis = System.currentTimeMillis(); } - public Table initialTable() { - return initialTable; - } - @Override public Table get() { if (table == null) { this.table = initialTable; } + return table; + } + Table initialTable() { + return initialTable; + } + + void refreshTable() { if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) { try { if (!tableLoader.isOpen()) { @@ -84,7 +87,5 @@ public Table get() { LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e); } } - - return table; } } diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java index 10b2d66a3888..67422a1afeb1 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java @@ -142,6 +142,8 @@ public void initialize(int taskId, int attemptId) { table = tableSupplier.get(); } + refreshTable(); + this.outputFileFactory = OutputFileFactory.builderFor(table, taskId, attemptId) .format(format) @@ -155,6 +157,8 @@ public TaskWriter create() { outputFileFactory, "The outputFileFactory shouldn't be null if we have invoked the initialize()."); + refreshTable(); + if (equalityFieldIds == null || equalityFieldIds.isEmpty()) { // Initialize a task writer to write INSERT only. if (spec.isUnpartitioned()) { @@ -206,6 +210,12 @@ public TaskWriter create() { } } + void refreshTable() { + if (tableSupplier instanceof CachingTableSupplier) { + ((CachingTableSupplier) tableSupplier).refreshTable(); + } + } + private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter { private final PartitionKey partitionKey; diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java index 794e7b721032..360db658cd2f 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java @@ -66,6 +66,7 @@ public void testTableReload() { new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100)); // refresh shouldn't do anything as the min reload interval hasn't passed + cachingTableSupplier.refreshTable(); assertThat(cachingTableSupplier.get()).isEqualTo(initialTable); // refresh after waiting past the min reload interval @@ -73,6 +74,7 @@ public void testTableReload() { .atLeast(100, TimeUnit.MILLISECONDS) .untilAsserted( () -> { + cachingTableSupplier.refreshTable(); assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable); }); }