diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index 4e270dfa3d13..9c20494fdbcd 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -20,7 +20,7 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.ArrayDeque; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; @@ -35,6 +35,7 @@ import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Queues; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,9 +61,18 @@ class IcebergSourceSplitReader implements SplitReader, I this.openSplitFunction = openSplitFunction; this.splitComparator = splitComparator; this.indexOfSubtask = context.getIndexOfSubtask(); - this.splits = new ArrayDeque<>(); + this.splits = Queues.newArrayDeque(); } + /** + * The method reads a batch of records from the assigned splits. If all the records from the + * current split are returned then it will emit a {@link ArrayBatchRecords#finishedSplit(String)} + * batch to signal this event. In the next fetch loop the reader will continue with the next split + * (if any). + * + * @return The fetched records + * @throws IOException If there is an error during reading + */ @Override public RecordsWithSplitIds> fetch() throws IOException { metrics.incrementSplitReaderFetchCalls(1); @@ -123,6 +133,16 @@ public void close() throws Exception { } } + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + // IcebergSourceSplitReader only reads splits sequentially. When waiting for watermark alignment + // the SourceOperator will stop processing and recycling the fetched batches. This exhausts the + // {@link ArrayPoolDataIteratorBatcher#pool} and the `currentReader.next()` call will be + // blocked even without split-level watermark alignment. Based on this the + // `pauseOrResumeSplits` and the `wakeUp` are left empty. + } + private long calculateBytes(IcebergSourceSplit split) { return split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum); } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index c4070212df92..2ef8f79aa372 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -40,7 +40,6 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.PipelineOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.metrics.MetricNames; @@ -95,9 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable { .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .setConfiguration( - reporter.addToConfiguration( - new Configuration().set(PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS, true))) + .setConfiguration(reporter.addToConfiguration(new Configuration())) .withHaLeadershipControl() .build()); @@ -383,7 +380,7 @@ protected IcebergSource source() { .project(TestFixtures.TS_SCHEMA) .splitSize(100L) .streaming(true) - .monitorInterval(Duration.ofMillis(2)) + .monitorInterval(Duration.ofMillis(10)) .streamingStartingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) .build(); }