diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index e0c7b2e4dcc1..416dae03a886 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Queue; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsBySplits; @@ -51,7 +52,7 @@ class IcebergSourceSplitReader implements SplitReader, I private final int indexOfSubtask; private final Queue splits; private final Set pausedSplits; - private final Object signal = new Object(); + private final AtomicBoolean wakeUp = new AtomicBoolean(false); private CloseableIterator>> currentReader; private IcebergSourceSplit currentSplit; @@ -87,22 +88,32 @@ public RecordsWithSplitIds> fetch() throws IOException { } if (currentReader.hasNext()) { - // Wait until the reader is blocked. Wake every second in case this missed a signal - boolean first = true; - while (pausedSplits.contains(currentSplitId)) { - if (first) { - LOG.info("Paused reading {}", currentSplitId); - } else { - LOG.trace("Still paused {}", currentSplitId); - } + if (pausedSplits.contains(currentSplitId)) { + // Wait until the reader is blocked. Wake every second to catch any missed signal. + // Return empty records if wakeUp is called, so pauseOrResumeSplits could be processed. + boolean first = true; + + while (pausedSplits.contains(currentSplitId)) { + if (first) { + LOG.info("Reader {} paused reading split {}", indexOfSubtask, currentSplitId); + first = false; + } else { + LOG.trace("Reader {} is Still paused reading split {}", indexOfSubtask, currentSplitId); + } - try { - synchronized (signal) { - signal.wait(1000); + synchronized (wakeUp) { + if (wakeUp.get()) { + wakeUp.set(false); + return new RecordsBySplits(Collections.emptyMap(), Collections.emptySet()); + } + + try { + wakeUp.wait(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while blocked on reading.", e); + } } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Interrupted while blocked on reading.", e); } } @@ -140,10 +151,10 @@ public void handleSplitsChanges(SplitsChange splitsChange) { @Override public void wakeUp() { - LOG.info("WakeUp called"); - pausedSplits.clear(); - synchronized (signal) { - signal.notify(); + LOG.info("WakeUp reader {}", indexOfSubtask); + synchronized (wakeUp) { + wakeUp.set(true); + wakeUp.notifyAll(); } } @@ -158,14 +169,15 @@ public void close() throws Exception { @Override public void pauseOrResumeSplits( Collection splitsToPause, Collection splitsToResume) { - LOG.info("Pause splits: {} and resume splits: {}", splitsToPause, splitsToResume); + LOG.info( + "For reader {}, pause splits {} and resume splits {}", + indexOfSubtask, + splitsToPause, + splitsToResume); pausedSplits.addAll( splitsToPause.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet())); pausedSplits.removeAll( splitsToResume.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet())); - synchronized (signal) { - signal.notify(); - } } private long calculateBytes(IcebergSourceSplit split) { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index 7428d30f4261..74f3defd9029 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -19,6 +19,7 @@ package org.apache.iceberg.flink.source; import static org.apache.flink.connector.testframe.utils.ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT; +import static org.apache.iceberg.flink.FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import java.io.Serializable; @@ -382,6 +383,8 @@ protected IcebergSource source() { .streaming(true) .monitorInterval(Duration.ofMillis(2)) .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + // Read in single row batches, to have more batches for testing + .flinkConfig(new Configuration().set(SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 1)) .build(); } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java index bd6e44ccac62..9f1e0a8048e9 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java @@ -22,9 +22,14 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; @@ -39,6 +44,7 @@ import org.apache.iceberg.flink.source.split.SplitComparators; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.types.Types; import org.awaitility.Awaitility; import org.awaitility.core.ConditionTimeoutException; @@ -106,14 +112,34 @@ public void testWakeup() throws Exception { // Pause the reading of the split, and check that the read is blocked when trying to fetch new // records reader.pauseOrResumeSplits(ImmutableList.of(split), ImmutableList.of()); + assertThatThrownBy( () -> Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> reader.fetch() != null)) .isInstanceOf(ConditionTimeoutException.class) .hasMessageContaining("was not fulfilled within"); + // Run the new fetch and wakeUp concurrently. Expect empty result from the blocked fetch. + ExecutorService executorService = + MoreExecutors.getExitingExecutorService( + (ThreadPoolExecutor) Executors.newFixedThreadPool(1)); + + AtomicBoolean emptyResult = new AtomicBoolean(false); + executorService.execute( + () -> { + try { + emptyResult.set(reader.fetch().nextSplit() == null); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + // Wakeup the reading of the split, and check the result size reader.wakeUp(); + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> emptyResult.get()); + + // Unpause and read the data in th next fetch + reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split)); result = (ArrayBatchRecords) reader.fetch(); assertThat(result.numberOfRecords()).isEqualTo(3); }