Skip to content

Commit

Permalink
Further review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 19, 2023
1 parent 83044f9 commit a3e78ba
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
metrics.incrementAssignedBytes(calculateBytes(splitsChange));
}

/**
* WakeUp is called by Flink to unblock a reader blocked in {@link #fetch()} before calling {@link
* #handleSplitsChanges(SplitsChange)} or {@link #pauseOrResumeSplits(Collection, Collection)} to
* update the splits or splits state. After {@link #wakeUp()} the specific method will be called
* before calling the {@link #fetch()} again.
*/
@Override
public void wakeUp() {
LOG.info("WakeUp reader {}", indexOfSubtask);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ protected IcebergSource<RowData> source() {
.project(TestFixtures.TS_SCHEMA)
.splitSize(100L)
.streaming(true)
.monitorInterval(Duration.ofMillis(100))
.monitorInterval(Duration.ofMillis(10))
.streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
// Read in single row batches, to have more batches for testing
.flinkConfig(new Configuration().set(SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ public void testWakeup() throws Exception {

// Wakeup the reading of the split, and check the result size
reader.wakeUp();
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> gotEmptyResult.get());
Awaitility.await()
.pollInterval(10, TimeUnit.MILLISECONDS)
.atMost(2, TimeUnit.SECONDS)
.until(() -> gotEmptyResult.get());

// Unpause and read the data in th next fetch
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
Expand Down

0 comments on commit a3e78ba

Please sign in to comment.