diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index 29b18bd41bf6..2b6ce2c1470e 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -111,7 +111,7 @@ public RecordsWithSplitIds> fetch() throws IOException { wakeUp.wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while blocked on reading.", e); + throw new RuntimeException("Interrupted while paused due to watermark alignment.", e); } } } @@ -149,6 +149,12 @@ public void handleSplitsChanges(SplitsChange splitsChange) { metrics.incrementAssignedBytes(calculateBytes(splitsChange)); } + /** + * WakeUp is called by Flink to unblock a reader blocked in {@link #fetch()} before calling {@link + * #handleSplitsChanges(SplitsChange)} or {@link #pauseOrResumeSplits(Collection, Collection)} to + * update the splits or splits state. After {@link #wakeUp()} the specific method will be called + * before calling the {@link #fetch()} again. + */ @Override public void wakeUp() { LOG.info("WakeUp reader {}", indexOfSubtask); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index 09dda67345d8..7aae59ca9d60 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -381,7 +381,7 @@ protected IcebergSource source() { .project(TestFixtures.TS_SCHEMA) .splitSize(100L) .streaming(true) - .monitorInterval(Duration.ofMillis(100)) + .monitorInterval(Duration.ofMillis(10)) .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) // Read in single row batches, to have more batches for testing .flinkConfig(new Configuration().set(SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1)) diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java index 1d05661159e1..9e0f60da3aaa 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java @@ -136,7 +136,10 @@ public void testWakeup() throws Exception { // Wakeup the reading of the split, and check the result size reader.wakeUp(); - Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> gotEmptyResult.get()); + Awaitility.await() + .pollInterval(10, TimeUnit.MILLISECONDS) + .atMost(2, TimeUnit.SECONDS) + .until(() -> gotEmptyResult.get()); // Unpause and read the data in th next fetch reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));