Skip to content

Commit

Permalink
Further review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 19, 2023
1 parent 83044f9 commit 52d9051
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
throw new RuntimeException("Interrupted while paused due to watermark alignment.", e);
}
}
}
Expand Down Expand Up @@ -149,6 +149,12 @@ public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
metrics.incrementAssignedBytes(calculateBytes(splitsChange));
}

/**
* WakeUp is called by Flink to unblock a reader blocked in {@link #fetch()} before calling {@link
* #handleSplitsChanges(SplitsChange)} or {@link #pauseOrResumeSplits(Collection, Collection)} to
* update the splits or splits state. After {@link #wakeUp()} the specific method will be called
* before calling the {@link #fetch()} again.
*/
@Override
public void wakeUp() {
LOG.info("WakeUp reader {}", indexOfSubtask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(100))
.monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
// Read in single row batches, to have more batches for testing
.flinkConfig(new Configuration().set(SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public void testWakeup() throws Exception {

// Wakeup the reading of the split, and check the result size
reader.wakeUp();
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> gotEmptyResult.get());
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.SECONDS)
.until(() -> gotEmptyResult.get());

// Unpause and read the data in th next fetch
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
Expand Down

0 comments on commit 52d9051

Please sign in to comment.