Skip to content

Commit

Permalink
refresh before creating task writer
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck committed Sep 21, 2023
1 parent d87a246 commit 3a121bc
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,19 @@ class CachingTableSupplier implements SerializableSupplier<Table> {
this.lastLoadTimeMillis = System.currentTimeMillis();
}

public Table initialTable() {
return initialTable;
}

@Override
public Table get() {
if (table == null) {
this.table = initialTable;
}
return table;
}

Table initialTable() {
return initialTable;
}

void refreshTable() {
if (System.currentTimeMillis() > lastLoadTimeMillis + tableRefreshInterval.toMillis()) {
try {
if (!tableLoader.isOpen()) {
Expand All @@ -84,7 +87,5 @@ public Table get() {
LOG.warn("An error occurred reloading table {}, table was not reloaded", table.name(), e);
}
}

return table;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public void initialize(int taskId, int attemptId) {
table = tableSupplier.get();
}

refreshTable();

this.outputFileFactory =
OutputFileFactory.builderFor(table, taskId, attemptId)
.format(format)
Expand All @@ -155,6 +157,8 @@ public TaskWriter<RowData> create() {
outputFileFactory,
"The outputFileFactory shouldn't be null if we have invoked the initialize().");

refreshTable();

if (equalityFieldIds == null || equalityFieldIds.isEmpty()) {
// Initialize a task writer to write INSERT only.
if (spec.isUnpartitioned()) {
Expand Down Expand Up @@ -206,6 +210,12 @@ public TaskWriter<RowData> create() {
}
}

void refreshTable() {
if (tableSupplier instanceof CachingTableSupplier) {
((CachingTableSupplier) tableSupplier).refreshTable();
}
}

private static class RowDataPartitionedFanoutWriter extends PartitionedFanoutWriter<RowData> {

private final PartitionKey partitionKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ public void testTableReload() {
new CachingTableSupplier(initialTable, tableLoader, Duration.ofMillis(100));

// refresh shouldn't do anything as the min reload interval hasn't passed
cachingTableSupplier.refreshTable();
assertThat(cachingTableSupplier.get()).isEqualTo(initialTable);

// refresh after waiting past the min reload interval
Awaitility.await()
.atLeast(100, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
cachingTableSupplier.refreshTable();
assertThat(cachingTableSupplier.get()).isEqualTo(loadedTable);
});
}
Expand Down

0 comments on commit 3a121bc

Please sign in to comment.