Skip to content

Commit

Permalink
open table loader if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Sep 13, 2023
1 parent d5f884d commit b95e86d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable, Cloneable {

void open();

boolean isOpen();

Table loadTable();

/** Clone a TableLoader */
Expand Down Expand Up @@ -75,6 +77,11 @@ public void open() {
tables = new HadoopTables(hadoopConf.get());
}

@Override
public boolean isOpen() {
return tables != null;
}

@Override
public Table loadTable() {
FlinkEnvironmentContext.init();
Expand Down Expand Up @@ -115,6 +122,11 @@ public void open() {
catalog = catalogLoader.loadCatalog();
}

@Override
public boolean isOpen() {
return catalog != null;
}

@Override
public Table loadTable() {
FlinkEnvironmentContext.init();
Expand All @@ -126,6 +138,7 @@ public void close() throws IOException {
if (catalog instanceof Closeable) {
((Closeable) catalog).close();
}
catalog = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public Table get() {
}

if (System.currentTimeMillis() > nextReloadTimeMs) {
if (!tableLoader.isOpen()) {
tableLoader.open();
}
table = tableLoader.loadTable();
nextReloadTimeMs = calcNextReloadTimeMs(System.currentTimeMillis());
LOG.info("Table {} reloaded, next load time is at {}", table.name(), nextReloadTimeMs);
Expand Down

0 comments on commit b95e86d

Please sign in to comment.