From 1d828237b4bdc1dd67260d265609d152003b55c0 Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Fri, 15 Dec 2023 15:21:08 +0100 Subject: [PATCH] Flink: Support watermark alignment of source splits --- flink/v1.18/build.gradle | 3 + .../reader/IcebergSourceSplitReader.java | 31 ++++++ ...stIcebergSourceWithWatermarkExtractor.java | 72 ++++++------ .../TestWatermarkBasedSplitAssigner.java | 24 ++-- .../flink/source/reader/ReaderUtil.java | 14 +++ .../reader/TestIcebergSourceSplitReader.java | 104 ++++++++++++++++++ 6 files changed, 201 insertions(+), 47 deletions(-) create mode 100644 flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java diff --git a/flink/v1.18/build.gradle b/flink/v1.18/build.gradle index 55578d3b117d..c08ae5d8cc1f 100644 --- a/flink/v1.18/build.gradle +++ b/flink/v1.18/build.gradle @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { // These artifacts are shaded and included in the orc-core fat jar exclude group: 'com.google.protobuf', module: 'protobuf-java' exclude group: 'org.apache.hive', module: 'hive-storage-api' + exclude group: 'org.slf4j' } testImplementation libs.flink118.connector.test.utils @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { exclude group: 'com.tdunning', module: 'json' exclude group: 'javax.transaction', module: 'transaction-api' exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.slf4j' } testImplementation libs.awaitility @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") { exclude group: 'com.tdunning', module: 'json' exclude group: 'javax.transaction', module: 'transaction-api' exclude group: 'com.zaxxer', module: 'HikariCP' + exclude group: 'org.slf4j' } integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") { diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java index 4e270dfa3d13..20becab38e05 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/reader/IcebergSourceSplitReader.java @@ -21,9 +21,12 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Queue; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.connector.base.source.reader.RecordsBySplits; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -35,6 +38,7 @@ import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,6 +50,8 @@ class IcebergSourceSplitReader implements SplitReader, I private final SerializableComparator splitComparator; private final int indexOfSubtask; private final Queue splits; + private final Set pausedSplits; + private final Object signal = new Object(); private CloseableIterator>> currentReader; private IcebergSourceSplit currentSplit; @@ -61,6 +67,7 @@ class IcebergSourceSplitReader implements SplitReader, I this.splitComparator = splitComparator; this.indexOfSubtask = context.getIndexOfSubtask(); this.splits = new ArrayDeque<>(); + this.pausedSplits = Sets.newHashSet(); } @Override @@ -80,6 +87,18 @@ public RecordsWithSplitIds> fetch() throws IOException { } if (currentReader.hasNext()) { + // Wait until the reader is blocked. Wake every second in case this missed a signal + while (pausedSplits.contains(currentSplitId)) { + try { + synchronized (signal) { + signal.wait(1000); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while blocked on reading.", e); + } + } + // Because Iterator#next() doesn't support checked exception, // we need to wrap and unwrap the checked IOException with UncheckedIOException try { @@ -123,6 +142,18 @@ public void close() throws Exception { } } + @Override + public void pauseOrResumeSplits( + Collection splitsToPause, Collection splitsToResume) { + pausedSplits.addAll( + splitsToPause.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet())); + pausedSplits.removeAll( + splitsToResume.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet())); + synchronized (signal) { + signal.notify(); + } + } + private long calculateBytes(IcebergSourceSplit split) { return split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum); } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java index aa4b9cd79e55..7428d30f4261 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceWithWatermarkExtractor.java @@ -40,11 +40,11 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.CoreOptions; import org.apache.flink.core.execution.JobClient; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.minicluster.RpcServiceSharing; +import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.InMemoryReporter; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.datastream.DataStream; @@ -78,7 +78,6 @@ import org.junit.rules.TemporaryFolder; public class TestIcebergSourceWithWatermarkExtractor implements Serializable { - private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); private static final int PARALLELISM = 4; private static final String SOURCE_NAME = "IcebergSource"; private static final int RECORD_NUM_FOR_2_SPLITS = 200; @@ -86,6 +85,8 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable { @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics(); + @Rule public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource( @@ -93,10 +94,7 @@ public class TestIcebergSourceWithWatermarkExtractor implements Serializable { .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(PARALLELISM) .setRpcServiceSharing(RpcServiceSharing.DEDICATED) - .setConfiguration( - reporter.addToConfiguration( - // disable classloader check as Avro may cache class in the serializers. - new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false))) + .setConfiguration(reporter.addToConfiguration(new Configuration())) .withHaLeadershipControl() .build()); @@ -273,23 +271,40 @@ public void apply( public void testThrottling() throws Exception { GenericAppenderHelper dataAppender = appender(); - // Generate records with the following pattern: - // - File 1 - Later records (Watermark 6000000) - // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") - // - File 2 - First records (Watermark 0) - // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... - // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... - List batch = + // Generate records in advance + + // File 1 - Later records (Watermark 6.000.000 - 100 min) + // - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103") + List batch1 = ImmutableList.of( generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103")); - dataAppender.appendToTable(batch); - batch = Lists.newArrayListWithCapacity(100); + // File 2 - First records (Watermark 0 - 0 min) + // - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + // - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),... + List batch2 = Lists.newArrayListWithCapacity(100); for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) { - batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); + batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i)); } - dataAppender.appendToTable(batch); + // File 3 - Some records will be blocked (Watermark 900.000 - 15 min) + List batch3 = + ImmutableList.of( + generateRecord(15, "file_3-recordTs_15"), + generateRecord(16, "file_3-recordTs_16"), + generateRecord(17, "file_3-recordTs_17")); + + // File 4 - Some records will be blocked (Watermark 900.000 - 15 min) + List batch4 = + ImmutableList.of( + generateRecord(15, "file_4-recordTs_15"), + generateRecord(16, "file_4-recordTs_16"), + generateRecord(17, "file_4-recordTs_17")); + + // File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min) + List batch5 = + ImmutableList.of( + generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91")); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); @@ -304,6 +319,11 @@ public void testThrottling() throws Exception { try (CloseableIterator resultIterator = stream.collectAsync()) { JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test"); + CommonTestUtils.waitForAllTaskRunning( + miniClusterResource.getMiniCluster(), jobClient.getJobID(), false); + + // Insert the first data into the table + dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2)); // Check that the read the non-blocked data // The first RECORD_NUM_FOR_2_SPLITS should be read @@ -325,18 +345,7 @@ public void testThrottling() throws Exception { // Add some old records with 2 splits, so even if the blocked gets one split, the other reader // one gets one as well - List newBatch1 = - ImmutableList.of( - generateRecord(15, "file_3-recordTs_15"), - generateRecord(16, "file_3-recordTs_16"), - generateRecord(17, "file_3-recordTs_17")); - List newBatch2 = - ImmutableList.of( - generateRecord(15, "file_4-recordTs_15"), - generateRecord(16, "file_4-recordTs_16"), - generateRecord(17, "file_4-recordTs_17")); - dataAppender.appendToTable( - dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2)); + dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4)); // The records received will highly depend on scheduling // We minimally get 3 records from the non-blocked reader // We might get 1 record from the blocked reader (as part of the previous batch - @@ -352,10 +361,7 @@ public void testThrottling() throws Exception { .until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65)); // Add some new records which should unblock the throttled reader - batch = - ImmutableList.of( - generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91")); - dataAppender.appendToTable(batch); + dataAppender.appendToTable(batch5); // We should get all the records at this point waitForRecords(resultIterator, 6); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java index e1fc63fda918..45680e762109 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/assigner/TestWatermarkBasedSplitAssigner.java @@ -20,7 +20,6 @@ import static org.apache.iceberg.types.Types.NestedField.required; -import java.io.IOException; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; @@ -118,29 +117,26 @@ protected List createSplits( return IntStream.range(0, fileCount / filesPerSplit) .mapToObj( splitNum -> - splitFromRecords( + ReaderUtil.createSplit( IntStream.range(0, filesPerSplit) .mapToObj( fileNum -> RandomGenericData.generate( SCHEMA, 2, splitNum * filesPerSplit + fileNum)) - .collect(Collectors.toList()))) + .collect(Collectors.toList()), + TEMPORARY_FOLDER, + FileFormat.PARQUET, + APPENDER_FACTORY)) .collect(Collectors.toList()); } private IcebergSourceSplit splitFromInstant(Instant instant) { Record record = GenericRecord.create(SCHEMA); record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC)); - return splitFromRecords(ImmutableList.of(ImmutableList.of(record))); - } - - private IcebergSourceSplit splitFromRecords(List> records) { - try { - return IcebergSourceSplit.fromCombinedScanTask( - ReaderUtil.createCombinedScanTask( - records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY)); - } catch (IOException e) { - throw new RuntimeException("Split creation exception", e); - } + return ReaderUtil.createSplit( + ImmutableList.of(ImmutableList.of(record)), + TEMPORARY_FOLDER, + FileFormat.PARQUET, + APPENDER_FACTORY); } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java index 2a2503ef2478..bbf328da577e 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/ReaderUtil.java @@ -44,6 +44,7 @@ import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.source.DataIterator; import org.apache.iceberg.flink.source.RowDataFileScanTaskReader; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; import org.apache.iceberg.hadoop.HadoopFileIO; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileAppenderFactory; @@ -122,4 +123,17 @@ public static CombinedScanTask createCombinedScanTask( return new BaseCombinedScanTask(fileTasks); } + + public static IcebergSourceSplit createSplit( + List> records, + TemporaryFolder temporaryFolder, + FileFormat fileFormat, + GenericAppenderFactory appenderFactory) { + try { + return IcebergSourceSplit.fromCombinedScanTask( + ReaderUtil.createCombinedScanTask(records, temporaryFolder, fileFormat, appenderFactory)); + } catch (IOException e) { + throw new RuntimeException("Split creation exception", e); + } + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java new file mode 100644 index 000000000000..19e5dacf220a --- /dev/null +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestIcebergSourceSplitReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.source.reader; + +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition; +import org.apache.flink.connector.testutils.source.reader.TestingReaderContext; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.RandomGenericData; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.PlaintextEncryptionManager; +import org.apache.iceberg.flink.source.split.IcebergSourceSplit; +import org.apache.iceberg.flink.source.split.SplitComparators; +import org.apache.iceberg.hadoop.HadoopFileIO; +import org.apache.iceberg.types.Types; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionTimeoutException; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; + +public class TestIcebergSourceSplitReader { + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + public static final Schema SCHEMA = + new Schema(required(1, "timestamp_column", Types.TimestampType.withoutZone())); + private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA); + + @Test + public void testPause() throws Exception { + TestingMetricGroup metricGroup = new TestingMetricGroup(); + TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup); + RowDataReaderFunction readerFunction = + new RowDataReaderFunction( + new Configuration(), + SCHEMA, + SCHEMA, + null, + true, + new HadoopFileIO(new org.apache.hadoop.conf.Configuration()), + new PlaintextEncryptionManager(), + Collections.emptyList()); + + List> records = + ImmutableList.of( + RandomGenericData.generate(SCHEMA, 2, 0L), RandomGenericData.generate(SCHEMA, 3, 1L)); + IcebergSourceSplitReader reader = + new IcebergSourceSplitReader<>( + new IcebergSourceReaderMetrics(metricGroup, "dummy"), + readerFunction, + SplitComparators.fileSequenceNumber(), + readerContext); + IcebergSourceSplit split = + ReaderUtil.createSplit(records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY); + + // Add the new split to the reader + reader.handleSplitsChanges(new SplitsAddition<>(ImmutableList.of(split))); + + // Fetch the first batch, and check the result size + ArrayBatchRecords> result = (ArrayBatchRecords) reader.fetch(); + assertThat(result.numberOfRecords()).isEqualTo(2); + + // Pause the reading of the split, and check that the read is blocked when trying to fetch new + // records + reader.pauseOrResumeSplits(ImmutableList.of(split), ImmutableList.of()); + assertThatThrownBy( + () -> + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> reader.fetch() != null)) + .isInstanceOf(ConditionTimeoutException.class) + .hasMessageContaining("was not fulfilled within"); + + // Unpause the reading of the split, and check the result size + reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split)); + result = (ArrayBatchRecords) reader.fetch(); + assertThat(result.numberOfRecords()).isEqualTo(3); + } +}