From f7585932a6d89b04c0d45b5f9dfe6f45483efd0b Mon Sep 17 00:00:00 2001 From: Steven Zhen Wu Date: Mon, 29 Jul 2024 15:35:18 -0700 Subject: [PATCH] Flink: support limit pushdown in FLIP-27 source (#10748) --- .../iceberg/flink/source/IcebergSource.java | 3 +- .../source/reader/LimitableDataIterator.java | 56 +++++++++++++ .../flink/source/reader/RecordLimiter.java | 45 ++++++++++ .../source/reader/RowDataReaderFunction.java | 40 ++++++++- .../flink/source/TestFlinkSourceConfig.java | 7 +- .../reader/TestLimitableDataIterator.java | 84 +++++++++++++++++++ 6 files changed, 228 insertions(+), 7 deletions(-) create mode 100644 flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/LimitableDataIterator.java create mode 100644 flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordLimiter.java create mode 100644 flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestLimitableDataIterator.java diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 48201ea09359..ccbd0d9997ed 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -521,7 +521,8 @@ public IcebergSource build() { context.caseSensitive(), table.io(), table.encryption(), - context.filters()); + context.filters(), + context.limit()); this.readerFunction = (ReaderFunction) rowDataReaderFunction; } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/LimitableDataIterator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/LimitableDataIterator.java new file mode 100644 index 000000000000..020e87646d05 --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/LimitableDataIterator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.flink.source.DataIterator; +import org.apache.iceberg.flink.source.FileScanTaskReader; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +class LimitableDataIterator extends DataIterator { + private final RecordLimiter limiter; + + LimitableDataIterator( + FileScanTaskReader fileScanTaskReader, + CombinedScanTask task, + FileIO io, + EncryptionManager encryption, + RecordLimiter limiter) { + super(fileScanTaskReader, task, io, encryption); + Preconditions.checkArgument(limiter != null, "Invalid record limiter: null"); + this.limiter = limiter; + } + + @Override + public boolean hasNext() { + if (limiter.reachedLimit()) { + return false; + } + + return super.hasNext(); + } + + @Override + public T next() { + limiter.increment(); + return super.next(); + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordLimiter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordLimiter.java new file mode 100644 index 000000000000..f260a53089ff --- /dev/null +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RecordLimiter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.annotation.Internal; + +@Internal +class RecordLimiter { + private final long limit; + private final AtomicLong counter; + + static RecordLimiter create(long limit) { + return new RecordLimiter(limit); + } + + private RecordLimiter(long limit) { + this.limit = limit; + this.counter = new AtomicLong(0); + } + + public boolean reachedLimit() { + return limit > 0 && counter.get() >= limit; + } + + public void increment() { + counter.incrementAndGet(); + } +} diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java index 5d0a00954e7a..c9208a0e1834 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/source/reader/RowDataReaderFunction.java @@ -39,6 +39,9 @@ public class RowDataReaderFunction extends DataIteratorReaderFunction { private final FileIO io; private final EncryptionManager encryption; private final List filters; + private final long limit; + + private transient RecordLimiter recordLimiter = null; public RowDataReaderFunction( ReadableConfig config, @@ -49,6 +52,28 @@ public RowDataReaderFunction( FileIO io, EncryptionManager encryption, List filters) { + this( + config, + tableSchema, + projectedSchema, + nameMapping, + caseSensitive, + io, + encryption, + filters, + -1L); + } + + public RowDataReaderFunction( + ReadableConfig config, + Schema tableSchema, + Schema projectedSchema, + String nameMapping, + boolean caseSensitive, + FileIO io, + EncryptionManager encryption, + List filters, + long limit) { super( new ArrayPoolDataIteratorBatcher<>( config, @@ -61,19 +86,30 @@ public RowDataReaderFunction( this.io = io; this.encryption = encryption; this.filters = filters; + this.limit = limit; } @Override public DataIterator createDataIterator(IcebergSourceSplit split) { - return new DataIterator<>( + return new LimitableDataIterator<>( new RowDataFileScanTaskReader(tableSchema, readSchema, nameMapping, caseSensitive, filters), split.task(), io, - encryption); + encryption, + lazyLimiter()); } private static Schema readSchema(Schema tableSchema, Schema projectedSchema) { Preconditions.checkNotNull(tableSchema, "Table schema can't be null"); return projectedSchema == null ? tableSchema : projectedSchema; } + + /** Lazily create RecordLimiter to avoid the need to make it serializable */ + private RecordLimiter lazyLimiter() { + if (recordLimiter == null) { + this.recordLimiter = RecordLimiter.create(limit); + } + + return recordLimiter; + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java index 8131bd7ab0d3..14131d9e96d5 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkSourceConfig.java @@ -20,7 +20,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.assertj.core.api.Assumptions.assumeThat; import java.util.List; import org.apache.flink.types.Row; @@ -49,11 +48,11 @@ public void testFlinkHintConfig() { @TestTemplate public void testReadOptionHierarchy() { - // TODO: FLIP-27 source doesn't implement limit pushdown yet - assumeThat(useFlip27Source).isFalse(); - getTableEnv().getConfig().set(FlinkReadOptions.LIMIT_OPTION, 1L); List result = sql("SELECT * FROM %s", TABLE); + // Note that this query doesn't have the limit clause in the SQL. + // This assertions works because limit is pushed down to the reader and + // reader parallelism is 1. assertThat(result).hasSize(1); result = sql("SELECT * FROM %s /*+ OPTIONS('limit'='3')*/", TABLE); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestLimitableDataIterator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestLimitableDataIterator.java new file mode 100644 index 000000000000..36749d3ec2dc --- /dev/null +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestLimitableDataIterator.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.CombinedScanTask; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptionManager; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +public class TestLimitableDataIterator { + @TempDir private static Path temporaryFolder; + + private final RowDataFileScanTaskReader reader = + new RowDataFileScanTaskReader( + TestFixtures.SCHEMA, TestFixtures.SCHEMA, null, true, Collections.emptyList()); + private final HadoopFileIO fileIO = new HadoopFileIO(new org.apache.hadoop.conf.Configuration()); + private final EncryptionManager encryptionManager = PlaintextEncryptionManager.instance(); + + private static CombinedScanTask combinedScanTask; + private static int totalRecords; + + @BeforeAll + public static void beforeClass() throws Exception { + GenericAppenderFactory appenderFactory = new GenericAppenderFactory(TestFixtures.SCHEMA); + List> recordBatchList = + ReaderUtil.createRecordBatchList(TestFixtures.SCHEMA, 3, 2); + combinedScanTask = + ReaderUtil.createCombinedScanTask( + recordBatchList, temporaryFolder, FileFormat.PARQUET, appenderFactory); + totalRecords = 3 * 2; + } + + @ParameterizedTest + @ValueSource(longs = {-1L, 0L, 1L, 6L, 7L}) + public void testUnlimited(long limit) { + LimitableDataIterator dataIterator = + new LimitableDataIterator<>( + reader, combinedScanTask, fileIO, encryptionManager, RecordLimiter.create(limit)); + + List result = Lists.newArrayList(); + while (dataIterator.hasNext()) { + result.add(dataIterator.next()); + } + + if (limit <= 0 || limit > totalRecords) { + // read all records + assertThat(result).hasSize(totalRecords); + } else { + assertThat(result).hasSize((int) limit); + } + } +}