diff --git a/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java new file mode 100644 index 000000000000..77fd3a5e5289 --- /dev/null +++ b/paimon-flink/paimon-flink-1.14/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.flink.source.assigners.SplitAssigner; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; + +import javax.annotation.Nullable; + +/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ +public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext context, + @Nullable Snapshot snapshot, + SplitAssigner splitAssigner) { + this(context, snapshot, splitAssigner, null); + } + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext context, + @Nullable Snapshot snapshot, + SplitAssigner splitAssigner, + @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { + super(context, snapshot, splitAssigner); + } +} diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java new file mode 100644 index 000000000000..77fd3a5e5289 --- /dev/null +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.flink.source.assigners.SplitAssigner; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; + +import javax.annotation.Nullable; + +/** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ +public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext context, + @Nullable Snapshot snapshot, + SplitAssigner splitAssigner) { + this(context, snapshot, splitAssigner, null); + } + + public StaticFileStoreSplitEnumerator( + SplitEnumeratorContext context, + @Nullable Snapshot snapshot, + SplitAssigner splitAssigner, + @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { + super(context, snapshot, splitAssigner); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java index b6a0e6988da2..e68338b24541 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumerator.java @@ -19,37 +19,21 @@ package org.apache.paimon.flink.source; import org.apache.paimon.Snapshot; -import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.flink.source.assigners.DynamicPartitionPruningAssigner; import org.apache.paimon.flink.source.assigners.SplitAssigner; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.table.connector.source.DynamicFilteringData; import org.apache.flink.table.connector.source.DynamicFilteringEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.util.Collections; -import java.util.List; - import static org.apache.paimon.utils.Preconditions.checkNotNull; /** A {@link SplitEnumerator} implementation for {@link StaticFileStoreSource} input. */ -public class StaticFileStoreSplitEnumerator - implements SplitEnumerator { - - private static final Logger LOG = LoggerFactory.getLogger(StaticFileStoreSplitEnumerator.class); - - private final SplitEnumeratorContext context; - - @Nullable private final Snapshot snapshot; - - private SplitAssigner splitAssigner; +public class StaticFileStoreSplitEnumerator extends StaticFileStoreSplitEnumeratorBase { @Nullable private final DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo; @@ -65,59 +49,10 @@ public StaticFileStoreSplitEnumerator( @Nullable Snapshot snapshot, SplitAssigner splitAssigner, @Nullable DynamicPartitionFilteringInfo dynamicPartitionFilteringInfo) { - this.context = context; - this.snapshot = snapshot; - this.splitAssigner = splitAssigner; + super(context, snapshot, splitAssigner); this.dynamicPartitionFilteringInfo = dynamicPartitionFilteringInfo; } - @Override - public void start() { - // no resources to start - } - - @Override - public void handleSplitRequest(int subtask, @Nullable String hostname) { - if (!context.registeredReaders().containsKey(subtask)) { - // reader failed between sending the request and now. skip this request. - return; - } - - List assignment = splitAssigner.getNext(subtask, hostname); - if (assignment.size() > 0) { - context.assignSplits( - new SplitsAssignment<>(Collections.singletonMap(subtask, assignment))); - } else { - context.signalNoMoreSplits(subtask); - } - } - - @Override - public void addSplitsBack(List backSplits, int subtaskId) { - splitAssigner.addSplitsBack(subtaskId, backSplits); - } - - @Override - public void addReader(int subtaskId) { - // this source is purely lazy-pull-based, nothing to do upon registration - } - - @Override - public PendingSplitsCheckpoint snapshotState(long checkpointId) { - return new PendingSplitsCheckpoint( - splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id()); - } - - @Override - public void close() { - // no resources to close - } - - @Nullable - public Snapshot snapshot() { - return snapshot; - } - @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { if (sourceEvent instanceof DynamicFilteringEvent) { @@ -140,12 +75,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { dynamicFilteringData); } } else { - LOG.error("Received unrecognized event: {}", sourceEvent); + super.handleSourceEvent(subtaskId, sourceEvent); } } - - @VisibleForTesting - public SplitAssigner getSplitAssigner() { - return splitAssigner; - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java new file mode 100644 index 000000000000..9b1b534bdf55 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/StaticFileStoreSplitEnumeratorBase.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source; + +import org.apache.paimon.Snapshot; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.flink.source.assigners.SplitAssigner; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +/** A {@link SplitEnumerator} base implementation for {@link StaticFileStoreSource} input. */ +public abstract class StaticFileStoreSplitEnumeratorBase + implements SplitEnumerator { + + protected static final Logger LOG = + LoggerFactory.getLogger(StaticFileStoreSplitEnumeratorBase.class); + + protected final SplitEnumeratorContext context; + + @Nullable protected final Snapshot snapshot; + + protected SplitAssigner splitAssigner; + + public StaticFileStoreSplitEnumeratorBase( + SplitEnumeratorContext context, + @Nullable Snapshot snapshot, + SplitAssigner splitAssigner) { + this.context = context; + this.snapshot = snapshot; + this.splitAssigner = splitAssigner; + } + + @Override + public void start() { + // no resources to start + } + + @Override + public void handleSplitRequest(int subtask, @Nullable String hostname) { + if (!context.registeredReaders().containsKey(subtask)) { + // reader failed between sending the request and now. skip this request. + return; + } + + List assignment = splitAssigner.getNext(subtask, hostname); + if (assignment.size() > 0) { + context.assignSplits( + new SplitsAssignment<>(Collections.singletonMap(subtask, assignment))); + } else { + context.signalNoMoreSplits(subtask); + } + } + + @Override + public void addSplitsBack(List backSplits, int subtaskId) { + splitAssigner.addSplitsBack(subtaskId, backSplits); + } + + @Override + public void addReader(int subtaskId) { + // this source is purely lazy-pull-based, nothing to do upon registration + } + + @Override + public PendingSplitsCheckpoint snapshotState(long checkpointId) { + return new PendingSplitsCheckpoint( + splitAssigner.remainingSplits(), snapshot == null ? null : snapshot.id()); + } + + @Override + public void close() { + // no resources to close + } + + @Nullable + public Snapshot snapshot() { + return snapshot; + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + LOG.error("Received unrecognized event: {}", sourceEvent); + } + + @VisibleForTesting + public SplitAssigner getSplitAssigner() { + return splitAssigner; + } +}