diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java
index 3b5fec104469..e9f9786f9190 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java
@@ -56,16 +56,19 @@ class CachingTableSupplier implements SerializableSupplier
{
this.lastLoadTimeMillis = System.currentTimeMillis();
}
- public Table initialTable() {
- return initialTable;
- }
-
@Override
public Table get() {
if (table == null) {
this.table = initialTable;
}
+ return table;
+ }
+ Table initialTable() {
+ return initialTable;
+ }
+
+ void refreshTable() {
if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) {
try {
if (!tableLoader.isOpen()) {
@@ -84,7 +87,5 @@ public Table get() {
LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e);
}
}
-
- return table;
}
}
diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index 10b2d66a3888..67422a1afeb1 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -142,6 +142,8 @@ public void initialize(int taskId, int attemptId) {
table = tableSupplier.get();
}
+ refreshTable();
+
this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId)
.format(format)
@@ -155,6 +157,8 @@ public TaskWriter create() {
outputFileFactory,
"The outputFileFactory shouldn't be null if we have invoked the initialize().");
+ refreshTable();
+
if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
// Initialize a task writer to write INSERT only.
if (spec.isUnpartitioned()) {
@@ -206,6 +210,12 @@ public TaskWriter create() {
}
}
+ void refreshTable() {
+ if (tableSupplier instanceof CachingTableSupplier) {
+ ((CachingTableSupplier) tableSupplier).refreshTable();
+ }
+ }
+
private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter {
private final PartitionKey partitionKey;
diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java
index 794e7b721032..360db658cd2f 100644
--- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java
+++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/TestCachingTableSupplier.java
@@ -66,6 +66,7 @@ public void testTableReload() {
new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100));
// refresh shouldn't do anything as the min reload interval hasn't passed
+ cachingTableSupplier.refreshTable();
assertThat(cachingTableSupplier.get()).isEqualTo(initialTable);
// refresh after waiting past the min reload interval
@@ -73,6 +74,7 @@ public void testTableReload() {
.atLeast(100, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
+ cachingTableSupplier.refreshTable();
assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable);
});
}