diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java index b1c0d9d3c6ae0..e8ca0f7ca0519 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/ContinuousFileSplitEnumerator.java @@ -71,7 +71,7 @@ public class ContinuousFileSplitEnumerator protected final SplitAssigner splitAssigner; - private final ConsumerProgressCalculator consumerProgressCalculator; + protected final ConsumerProgressCalculator consumerProgressCalculator; @Nullable protected Long nextSnapshotId; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java index 7b997dae226a7..0174070695c61 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/assigners/DynamicPartitionPruningAssigner.java @@ -34,6 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; /** Assigner to perform dynamic partition pruning by given {@link DynamicFilteringData}. */ @@ -99,10 +100,15 @@ public static SplitAssigner createDynamicPartitionPruningAssignerIfNeeded( dynamicFilteringData.isFiltering()); return dynamicFilteringData.isFiltering() ? new DynamicPartitionPruningAssigner( - oriAssigner, partitionRowProjection, dynamicFilteringData) + oriAssigner, partitionRowProjection, dynamicFilteringData) : oriAssigner; } + @Override + public Optional getNextSnapshotId(int subtask) { + return innerAssigner.getNextSnapshotId(subtask); + } + private boolean filter(FileStoreSourceSplit sourceSplit) { DataSplit dataSplit = (DataSplit) sourceSplit.split(); BinaryRow partition = dataSplit.partition();