diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index 9096f2814825..72b3742b3fb3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -192,7 +192,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { // context.callAsync will invoke this. This method runs in workerExecutorThreadPool in // parallelism. protected synchronized Optional scanNextSnapshot() { - if (splitAssigner.remainingSplits().size() >= splitMaxNum) { + if (splitAssigner.numberOfRemainingSplits() >= splitMaxNum) { return Optional.empty(); } TableScan.Plan plan = scan.plan(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java index eba25a1afaf7..648758f83846 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/AlignedSplitAssigner.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; /** * Splits are allocated at the granularity of snapshots. When the splits of the current snapshot are @@ -43,8 +44,11 @@ public class AlignedSplitAssigner implements SplitAssigner { private final Deque pendingSplitAssignment; + private final AtomicInteger numberOfPendingSplits; + public AlignedSplitAssigner() { this.pendingSplitAssignment = new LinkedList<>(); + this.numberOfPendingSplits = new AtomicInteger(0); } @Override @@ -52,10 +56,12 @@ public List getNext(int subtask, @Nullable String hostname PendingSnapshot head = pendingSplitAssignment.peek(); if (head != null && !head.isPlaceHolder) { List subtaskSplits = head.remove(subtask); - return subtaskSplits != null ? subtaskSplits : Collections.emptyList(); - } else { - return Collections.emptyList(); + if (subtaskSplits != null) { + numberOfPendingSplits.getAndAdd(-subtaskSplits.size()); + return subtaskSplits; + } } + return Collections.emptyList(); } @Override @@ -70,6 +76,7 @@ public void addSplit(int subtask, FileStoreSourceSplit splits) { } else { last.add(subtask, splits); } + numberOfPendingSplits.incrementAndGet(); } @Override @@ -88,6 +95,7 @@ public void addSplitsBack(int suggestedTask, List splits) } else { head.addAll(suggestedTask, splits); } + numberOfPendingSplits.getAndAdd(splits.size()); } @Override @@ -105,6 +113,11 @@ public Optional getNextSnapshotId(int subtask) { return Optional.ofNullable(head != null ? head.snapshotId : null); } + @Override + public int numberOfRemainingSplits() { + return numberOfPendingSplits.get(); + } + public boolean isAligned() { PendingSnapshot head = pendingSplitAssignment.peek(); return head != null && head.empty(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java index d43bc1b02d34..9221f1f27eb6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java @@ -109,6 +109,11 @@ public Optional getNextSnapshotId(int subtask) { return innerAssigner.getNextSnapshotId(subtask); } + @Override + public int numberOfRemainingSplits() { + return innerAssigner.numberOfRemainingSplits(); + } + private boolean filter(FileStoreSourceSplit sourceSplit) { DataSplit dataSplit = (DataSplit) sourceSplit.split(); BinaryRow partition = dataSplit.partition(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java index 7af226ab015d..a2f0b983cdd8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/FIFOSplitAssigner.java @@ -75,4 +75,9 @@ public Optional getNextSnapshotId(int subtask) { ? Optional.empty() : getSnapshotId(pendingSplitAssignment.peekFirst()); } + + @Override + public int numberOfRemainingSplits() { + return pendingSplitAssignment.size(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java index 118ed109eec9..400a2e5c54eb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/PreAssignSplitAssigner.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Optional; import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.paimon.flink.utils.TableScanUtils.getSnapshotId; @@ -48,6 +49,8 @@ public class PreAssignSplitAssigner implements SplitAssigner { private final Map> pendingSplitAssignment; + private final AtomicInteger numberOfPendingSplits; + public PreAssignSplitAssigner( int splitBatchSize, SplitEnumeratorContext context, @@ -55,6 +58,7 @@ public PreAssignSplitAssigner( this.splitBatchSize = splitBatchSize; this.pendingSplitAssignment = createBatchFairSplitAssignment(splits, context.currentParallelism()); + this.numberOfPendingSplits = new AtomicInteger(splits.size()); } @Override @@ -67,12 +71,14 @@ public List getNext(int subtask, @Nullable String hostname while (taskSplits != null && !taskSplits.isEmpty() && assignment.size() < splitBatchSize) { assignment.add(taskSplits.poll()); } + numberOfPendingSplits.getAndAdd(-assignment.size()); return assignment; } @Override public void addSplit(int suggestedTask, FileStoreSourceSplit split) { pendingSplitAssignment.computeIfAbsent(suggestedTask, k -> new LinkedList<>()).add(split); + numberOfPendingSplits.incrementAndGet(); } @Override @@ -83,6 +89,7 @@ public void addSplitsBack(int subtask, List splits) { while (iterator.hasPrevious()) { remainingSplits.addFirst(iterator.previous()); } + numberOfPendingSplits.getAndAdd(splits.size()); } @Override @@ -115,4 +122,9 @@ public Optional getNextSnapshotId(int subtask) { ? Optional.empty() : getSnapshotId(pendingSplits.peekFirst()); } + + @Override + public int numberOfRemainingSplits() { + return numberOfPendingSplits.get(); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java index fecd59c8872a..2ce3af5d765f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/SplitAssigner.java @@ -54,4 +54,10 @@ public interface SplitAssigner { /** Gets the snapshot id of the next split. */ Optional getNextSnapshotId(int subtask); + + /** + * Gets the current number of remaining splits. This method should be guaranteed to be + * thread-safe. + */ + int numberOfRemainingSplits(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java index 489a1d4fc2cf..86620423c6f2 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumeratorTest.java @@ -791,6 +791,13 @@ public void testEnumeratorSplitMax() throws Exception { context.triggerAllActions(); Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(16 * 2); + Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(16 * 2); + + enumerator.handleSplitRequest(0, "test"); + enumerator.handleSplitRequest(1, "test"); + + Assertions.assertThat(enumerator.splitAssigner.remainingSplits().size()).isEqualTo(15 * 2); + Assertions.assertThat(enumerator.splitAssigner.numberOfRemainingSplits()).isEqualTo(15 * 2); } private void triggerCheckpointAndComplete(