From b95e86d691a10450e83d19e2d736d094215ec5b1 Mon Sep 17 00:00:00 2001 From: Bryan Keller Date: Tue, 12 Sep 2023 18:16:56 -0700 Subject: [PATCH] open table loader if needed --- .../java/org/apache/iceberg/flink/TableLoader.java | 13 +++++++++++++ .../iceberg/flink/sink/ReloadingTableSupplier.java | 3 +++ 2 files changed, 16 insertions(+) diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java index f18c5ccda1f6..e544ba8a77cd 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java @@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable, Cloneable { void open(); + boolean isOpen(); + Table loadTable(); /** Clone a TableLoader */ @@ -75,6 +77,11 @@ public void open() { tables = new HadoopTables(hadoopConf.get()); } + @Override + public boolean isOpen() { + return tables != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -115,6 +122,11 @@ public void open() { catalog = catalogLoader.loadCatalog(); } + @Override + public boolean isOpen() { + return catalog != null; + } + @Override public Table loadTable() { FlinkEnvironmentContext.init(); @@ -126,6 +138,7 @@ public void close() throws IOException { if (catalog instanceof Closeable) { ((Closeable) catalog).close(); } + catalog = null; } @Override diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java index d27150beed17..a30332c9e739 100644 --- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/ReloadingTableSupplier.java @@ -65,6 +65,9 @@ public Table get() { } if (System.currentTimeMillis() > nextReloadTimeMs) { + if (!tableLoader.isOpen()) { + tableLoader.open(); + } table = tableLoader.loadTable(); nextReloadTimeMs = calcNextReloadTimeMs(System.currentTimeMillis()); LOG.info("Table {} reloaded, next load time is at {}", table.name(), nextReloadTimeMs);