Skip to content

Commit

Permalink
Flink: Support watermark alignment of source splits
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 15, 2023
1 parent d56dd63 commit 1d82823
Show file tree
Hide file tree
Showing 6 changed files with 201 additions and 47 deletions.
3 changes: 3 additions & 0 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
// These artifacts are shaded and included in the orc-core fat jar
exclude group: 'com.google.protobuf', module: 'protobuf-java'
exclude group: 'org.apache.hive', module: 'hive-storage-api'
exclude group: 'org.slf4j'
}

testImplementation libs.flink118.connector.test.utils
Expand Down Expand Up @@ -111,6 +112,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

testImplementation libs.awaitility
Expand Down Expand Up @@ -202,6 +204,7 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
exclude group: 'com.tdunning', module: 'json'
exclude group: 'javax.transaction', module: 'transaction-api'
exclude group: 'com.zaxxer', module: 'HikariCP'
exclude group: 'org.slf4j'
}

integrationImplementation("${libs.hive2.exec.get().module}:${libs.hive2.exec.get().getVersion()}:core") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand All @@ -35,6 +38,7 @@
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +50,8 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
private final Set<String> pausedSplits;
private final Object signal = new Object();

private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
private IcebergSourceSplit currentSplit;
Expand All @@ -61,6 +67,7 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
this.pausedSplits = Sets.newHashSet();
}

@Override
Expand All @@ -80,6 +87,18 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
}

if (currentReader.hasNext()) {
// Wait until the reader is blocked. Wake every second in case this missed a signal
while (pausedSplits.contains(currentSplitId)) {
try {
synchronized (signal) {
signal.wait(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
}
}

// Because Iterator#next() doesn't support checked exception,
// we need to wrap and unwrap the checked IOException with UncheckedIOException
try {
Expand Down Expand Up @@ -123,6 +142,18 @@ public void close() throws Exception {
}
}

@Override
public void pauseOrResumeSplits(
Collection<IcebergSourceSplit> splitsToPause, Collection<IcebergSourceSplit> splitsToResume) {
pausedSplits.addAll(
splitsToPause.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
pausedSplits.removeAll(
splitsToResume.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
synchronized (signal) {
signal.notify();
}
}

private long calculateBytes(IcebergSourceSplit split) {
return split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -78,25 +78,23 @@
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceWithWatermarkExtractor implements Serializable {
private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
private static final int PARALLELISM = 4;
private static final String SOURCE_NAME = "IcebergSource";
private static final int RECORD_NUM_FOR_2_SPLITS = 200;
private static final ConcurrentMap<Long, Integer> windows = Maps.newConcurrentMap();

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
.setRpcServiceSharing(RpcServiceSharing.DEDICATED)
.setConfiguration(
reporter.addToConfiguration(
// disable classloader check as Avro may cache class in the serializers.
new Configuration().set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)))
.setConfiguration(reporter.addToConfiguration(new Configuration()))
.withHaLeadershipControl()
.build());

Expand Down Expand Up @@ -273,23 +271,40 @@ public void apply(
public void testThrottling() throws Exception {
GenericAppenderHelper dataAppender = appender();

// Generate records with the following pattern:
// - File 1 - Later records (Watermark 6000000)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
// - File 2 - First records (Watermark 0)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch =
// Generate records in advance

// File 1 - Later records (Watermark 6.000.000 - 100 min)
// - Split 1 - 2 records (100, "file_1-recordTs_100"), (103, "file_1-recordTs_103")
List<Record> batch1 =
ImmutableList.of(
generateRecord(100, "file_1-recordTs_100"), generateRecord(103, "file_1-recordTs_103"));
dataAppender.appendToTable(batch);

batch = Lists.newArrayListWithCapacity(100);
// File 2 - First records (Watermark 0 - 0 min)
// - Split 1 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
// - Split 2 - 100 records (0, "file_2-recordTs_0"), (1, "file_2-recordTs_1"),...
List<Record> batch2 = Lists.newArrayListWithCapacity(100);
for (int i = 0; i < RECORD_NUM_FOR_2_SPLITS; ++i) {
batch.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
batch2.add(generateRecord(4 - i % 5, "file_2-recordTs_" + i));
}

dataAppender.appendToTable(batch);
// File 3 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch3 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));

// File 4 - Some records will be blocked (Watermark 900.000 - 15 min)
List<Record> batch4 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));

// File 5 - Records which will remove the block (Watermark 5.400.000 - 90 min)
List<Record> batch5 =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
Expand All @@ -304,6 +319,11 @@ public void testThrottling() throws Exception {

try (CloseableIterator<RowData> resultIterator = stream.collectAsync()) {
JobClient jobClient = env.executeAsync("Iceberg Source Throttling Test");
CommonTestUtils.waitForAllTaskRunning(
miniClusterResource.getMiniCluster(), jobClient.getJobID(), false);

// Insert the first data into the table
dataAppender.appendToTable(dataAppender.writeFile(batch1), dataAppender.writeFile(batch2));

// Check that the read the non-blocked data
// The first RECORD_NUM_FOR_2_SPLITS should be read
Expand All @@ -325,18 +345,7 @@ public void testThrottling() throws Exception {

// Add some old records with 2 splits, so even if the blocked gets one split, the other reader
// one gets one as well
List<Record> newBatch1 =
ImmutableList.of(
generateRecord(15, "file_3-recordTs_15"),
generateRecord(16, "file_3-recordTs_16"),
generateRecord(17, "file_3-recordTs_17"));
List<Record> newBatch2 =
ImmutableList.of(
generateRecord(15, "file_4-recordTs_15"),
generateRecord(16, "file_4-recordTs_16"),
generateRecord(17, "file_4-recordTs_17"));
dataAppender.appendToTable(
dataAppender.writeFile(newBatch1), dataAppender.writeFile(newBatch2));
dataAppender.appendToTable(dataAppender.writeFile(batch3), dataAppender.writeFile(batch4));
// The records received will highly depend on scheduling
// We minimally get 3 records from the non-blocked reader
// We might get 1 record from the blocked reader (as part of the previous batch -
Expand All @@ -352,10 +361,7 @@ public void testThrottling() throws Exception {
.until(() -> drift.getValue() == TimeUnit.MINUTES.toMillis(65));

// Add some new records which should unblock the throttled reader
batch =
ImmutableList.of(
generateRecord(90, "file_5-recordTs_90"), generateRecord(91, "file_5-recordTs_91"));
dataAppender.appendToTable(batch);
dataAppender.appendToTable(batch5);
// We should get all the records at this point
waitForRecords(resultIterator, 6);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -118,29 +117,26 @@ protected List<IcebergSourceSplit> createSplits(
return IntStream.range(0, fileCount / filesPerSplit)
.mapToObj(
splitNum ->
splitFromRecords(
ReaderUtil.createSplit(
IntStream.range(0, filesPerSplit)
.mapToObj(
fileNum ->
RandomGenericData.generate(
SCHEMA, 2, splitNum * filesPerSplit + fileNum))
.collect(Collectors.toList())))
.collect(Collectors.toList()),
TEMPORARY_FOLDER,
FileFormat.PARQUET,
APPENDER_FACTORY))
.collect(Collectors.toList());
}

private IcebergSourceSplit splitFromInstant(Instant instant) {
Record record = GenericRecord.create(SCHEMA);
record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
}

private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
return ReaderUtil.createSplit(
ImmutableList.of(ImmutableList.of(record)),
TEMPORARY_FOLDER,
FileFormat.PARQUET,
APPENDER_FACTORY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
Expand Down Expand Up @@ -122,4 +123,17 @@ public static CombinedScanTask createCombinedScanTask(

return new BaseCombinedScanTask(fileTasks);
}

public static IcebergSourceSplit createSplit(
List<List<Record>> records,
TemporaryFolder temporaryFolder,
FileFormat fileFormat,
GenericAppenderFactory appenderFactory) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(records, temporaryFolder, fileFormat, appenderFactory));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
}
}
Loading

0 comments on commit 1d82823

Please sign in to comment.