From ecf6a05b4094ec17b87a1b4827f4cfb6a734f4cd Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Wed, 20 Dec 2023 11:52:04 +0100 Subject: [PATCH] Simplify logging --- .../reader/IcebergSourceSplitReader.java | 56 ++++++++++++------- 1 file changed, 35 insertions(+), 21 deletions(-) diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index a4b1f50fe767..2c9c92640028 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -52,6 +52,9 @@ class IcebergSourceSplitReader implements SplitReader, I private final int indexOfSubtask; private final Queue splits; private final Set pausedSplits; + // The wakeUp flag which used to indicate that a wakeUp call has arrived, and we still need to + // handle it in the next + // fetch loop. Should be updated only when the synchronization lock is held. private final AtomicBoolean wakeUp = new AtomicBoolean(false); private CloseableIterator>> currentReader; @@ -71,6 +74,25 @@ class IcebergSourceSplitReader implements SplitReader, I this.pausedSplits = Sets.newHashSet(); } + /** + * The method reads a batch of records from the assigned splits. If all the records from the + * current split are returned then it will emit a {@link ArrayBatchRecords#finishedSplit(String)} + * batch to signal this event. In the next fetch loop the reader will continue with the next split + * (if any). + * + *

FLIP-217 defines the API for enabling watermark alignment with limited buffering on Flink + * side, by allowing the {@link #fetch()} call to be blocked on a runaway reader. Since the + * Iceberg reader is single threaded, the reading is fully blocked if {@link + * #pauseOrResumeSplits(Collection, Collection)} pauses the current split. The {@link #wakeUp()} + * method is called by Flink to unblock a blocked reader. In this case after the {@link #fetch()} + * call returns, Flink will call {@link #handleSplitsChanges(SplitsChange)} or {@link + * #pauseOrResumeSplits(Collection, Collection)} to update the splits or splits state. After the + * state update Flink will call {@link #fetch()} again, which might be unblocked at this stage and + * continue reading. + * + * @return The fetched records + * @throws IOException If there is an error during reading + */ @Override public RecordsWithSplitIds> fetch() throws IOException { metrics.incrementSplitReaderFetchCalls(1); @@ -88,31 +110,23 @@ public RecordsWithSplitIds> fetch() throws IOException { } if (currentReader.hasNext()) { - if (pausedSplits.contains(currentSplitId)) { + while (pausedSplits.contains(currentSplitId)) { + // Needed for the implementation of FLIP-217 // Wait until the reader is unblocked. Wake every second to catch any missed signal. // Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed. - boolean first = true; - - while (pausedSplits.contains(currentSplitId)) { - if (first) { - LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId); - first = false; - } else { - LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId); + LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId); + + synchronized (wakeUp) { + if (wakeUp.get()) { + wakeUp.set(false); + return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet()); } - synchronized (wakeUp) { - if (wakeUp.get()) { - wakeUp.set(false); - return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet()); - } - - try { - wakeUp.wait(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while paused due to watermark alignment.", e); - } + try { + wakeUp.wait(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Interrupted while paused due to watermark alignment.", e); } } }