Skip to content

Commit

Permalink
Simplify logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 20, 2023
1 parent 92f58fb commit ecf6a05
Showing 1 changed file with 35 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
private final Set<String> pausedSplits;
// The wakeUp flag which used to indicate that a wakeUp call has arrived, and we still need to
// handle it in the next
// fetch loop. Should be updated only when the synchronization lock is held.
private final AtomicBoolean wakeUp = new AtomicBoolean(false);

private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
Expand All @@ -71,6 +74,25 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
this.pausedSplits = Sets.newHashSet();
}

/**
* The method reads a batch of records from the assigned splits. If all the records from the
* current split are returned then it will emit a {@link ArrayBatchRecords#finishedSplit(String)}
* batch to signal this event. In the next fetch loop the reader will continue with the next split
* (if any).
*
* <p>FLIP-217 defines the API for enabling watermark alignment with limited buffering on Flink
* side, by allowing the {@link #fetch()} call to be blocked on a runaway reader. Since the
* Iceberg reader is single threaded, the reading is fully blocked if {@link
* #pauseOrResumeSplits(Collection, Collection)} pauses the current split. The {@link #wakeUp()}
* method is called by Flink to unblock a blocked reader. In this case after the {@link #fetch()}
* call returns, Flink will call {@link #handleSplitsChanges(SplitsChange)} or {@link
* #pauseOrResumeSplits(Collection, Collection)} to update the splits or splits state. After the
* state update Flink will call {@link #fetch()} again, which might be unblocked at this stage and
* continue reading.
*
* @return The fetched records
* @throws IOException If there is an error during reading
*/
@Override
public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
metrics.incrementSplitReaderFetchCalls(1);
Expand All @@ -88,31 +110,23 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
}

if (currentReader.hasNext()) {
if (pausedSplits.contains(currentSplitId)) {
while (pausedSplits.contains(currentSplitId)) {
// Needed for the implementation of FLIP-217
// Wait until the reader is unblocked. Wake every second to catch any missed signal.
// Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed.
boolean first = true;

while (pausedSplits.contains(currentSplitId)) {
if (first) {
LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId);
first = false;
} else {
LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId);
LOG.trace("Reader {} is still paused reading split {}", indexOfSubtask, currentSplitId);

synchronized (wakeUp) {
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}

synchronized (wakeUp) {
if (wakeUp.get()) {
wakeUp.set(false);
return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet());
}

try {
wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while paused due to watermark alignment.", e);
}
try {
wakeUp.wait(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while paused due to watermark alignment.", e);
}
}
}
Expand Down

0 comments on commit ecf6a05

Please sign in to comment.