Skip to content

Commit

Permalink
Flink: Support watermark alignment of source splits
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Dec 16, 2023
1 parent 5d0d76f commit 7534f5a
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
Expand All @@ -35,6 +37,8 @@
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Queues;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,6 +50,8 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final int indexOfSubtask;
private final Queue<IcebergSourceSplit> splits;
private final Set<String> pausedSplits;
private final Object signal = new Object();

private CloseableIterator<RecordsWithSplitIds<RecordAndPosition<T>>> currentReader;
private IcebergSourceSplit currentSplit;
Expand All @@ -60,7 +66,8 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
this.openSplitFunction = openSplitFunction;
this.splitComparator = splitComparator;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();
this.splits = Queues.newArrayDeque();
this.pausedSplits = Sets.newHashSet();
}

@Override
Expand All @@ -80,6 +87,25 @@ public RecordsWithSplitIds<RecordAndPosition<T>> fetch() throws IOException {
}

if (currentReader.hasNext()) {
// Wait until the reader is blocked. Wake every second in case this missed a signal
boolean first = true;
while (pausedSplits.contains(currentSplitId)) {
if (first) {
LOG.info("Paused reading {}", currentSplitId);
} else {
LOG.trace("Still paused {}", currentSplitId);
}

try {
synchronized (signal) {
signal.wait(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while blocked on reading.", e);
}
}

// Because Iterator#next() doesn't support checked exception,
// we need to wrap and unwrap the checked IOException with UncheckedIOException
try {
Expand Down Expand Up @@ -113,7 +139,13 @@ public void handleSplitsChanges(SplitsChange<IcebergSourceSplit> splitsChange) {
}

@Override
public void wakeUp() {}
public void wakeUp() {
LOG.info("WakeUp called");
pausedSplits.clear();
synchronized (signal) {
signal.notify();
}
}

@Override
public void close() throws Exception {
Expand All @@ -123,6 +155,19 @@ public void close() throws Exception {
}
}

@Override
public void pauseOrResumeSplits(
Collection<IcebergSourceSplit> splitsToPause, Collection<IcebergSourceSplit> splitsToResume) {
LOG.info("Pause splits: {} and resume splits: {}", splitsToPause, splitsToResume);
pausedSplits.addAll(
splitsToPause.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
pausedSplits.removeAll(
splitsToResume.stream().map(IcebergSourceSplit::splitId).collect(Collectors.toSet()));
synchronized (signal) {
signal.notify();
}
}

private long calculateBytes(IcebergSourceSplit split) {
return split.task().files().stream().map(FileScanTask::length).reduce(0L, Long::sum);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -118,29 +117,26 @@ protected List<IcebergSourceSplit> createSplits(
return IntStream.range(0, fileCount / filesPerSplit)
.mapToObj(
splitNum ->
splitFromRecords(
ReaderUtil.createSplit(
IntStream.range(0, filesPerSplit)
.mapToObj(
fileNum ->
RandomGenericData.generate(
SCHEMA, 2, splitNum * filesPerSplit + fileNum))
.collect(Collectors.toList())))
.collect(Collectors.toList()),
TEMPORARY_FOLDER,
FileFormat.PARQUET,
APPENDER_FACTORY))
.collect(Collectors.toList());
}

private IcebergSourceSplit splitFromInstant(Instant instant) {
Record record = GenericRecord.create(SCHEMA);
record.set(0, LocalDateTime.ofInstant(instant, ZoneOffset.UTC));
return splitFromRecords(ImmutableList.of(ImmutableList.of(record)));
}

private IcebergSourceSplit splitFromRecords(List<List<Record>> records) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(
records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
return ReaderUtil.createSplit(
ImmutableList.of(ImmutableList.of(record)),
TEMPORARY_FOLDER,
FileFormat.PARQUET,
APPENDER_FACTORY);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.DataIterator;
import org.apache.iceberg.flink.source.RowDataFileScanTaskReader;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
Expand Down Expand Up @@ -122,4 +123,17 @@ public static CombinedScanTask createCombinedScanTask(

return new BaseCombinedScanTask(fileTasks);
}

public static IcebergSourceSplit createSplit(
List<List<Record>> records,
TemporaryFolder temporaryFolder,
FileFormat fileFormat,
GenericAppenderFactory appenderFactory) {
try {
return IcebergSourceSplit.fromCombinedScanTask(
ReaderUtil.createCombinedScanTask(records, temporaryFolder, fileFormat, appenderFactory));
} catch (IOException e) {
throw new RuntimeException("Split creation exception", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.source.reader;

import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.PlaintextEncryptionManager;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class TestIcebergSourceSplitReader {
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();

private static final Schema SCHEMA =
new Schema(required(1, "timestamp_column", Types.TimestampType.withoutZone()));
private static final GenericAppenderFactory APPENDER_FACTORY = new GenericAppenderFactory(SCHEMA);

@Test
public void testPause() throws Exception {
IcebergSourceSplitReader<RowData> reader = reader();

List<List<Record>> records =
ImmutableList.of(
RandomGenericData.generate(SCHEMA, 2, 0L), RandomGenericData.generate(SCHEMA, 3, 1L));

IcebergSourceSplit split =
ReaderUtil.createSplit(records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY);

// Add the new split to the reader
reader.handleSplitsChanges(new SplitsAddition<>(ImmutableList.of(split)));

// Fetch the first batch, and check the result size
ArrayBatchRecords<RecordAndPosition<RowData>> result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(2);

// Pause the reading of the split, and check that the read is blocked when trying to fetch new
// records
reader.pauseOrResumeSplits(ImmutableList.of(split), ImmutableList.of());
assertThatThrownBy(
() ->
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> reader.fetch() != null))
.isInstanceOf(ConditionTimeoutException.class)
.hasMessageContaining("was not fulfilled within");

// Unpause the reading of the split, and check the result size
reader.pauseOrResumeSplits(ImmutableList.of(), ImmutableList.of(split));
result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(3);
}

@Test
public void testWakeup() throws Exception {
IcebergSourceSplitReader<RowData> reader = reader();

List<List<Record>> records =
ImmutableList.of(
RandomGenericData.generate(SCHEMA, 2, 0L), RandomGenericData.generate(SCHEMA, 3, 1L));
IcebergSourceSplit split =
ReaderUtil.createSplit(records, TEMPORARY_FOLDER, FileFormat.PARQUET, APPENDER_FACTORY);

// Add the new split to the reader
reader.handleSplitsChanges(new SplitsAddition<>(ImmutableList.of(split)));

// Fetch the first batch, and check the result size
ArrayBatchRecords<RecordAndPosition<RowData>> result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(2);

// Pause the reading of the split, and check that the read is blocked when trying to fetch new
// records
reader.pauseOrResumeSplits(ImmutableList.of(split), ImmutableList.of());
assertThatThrownBy(
() ->
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> reader.fetch() != null))
.isInstanceOf(ConditionTimeoutException.class)
.hasMessageContaining("was not fulfilled within");

// Wakeup the reading of the split, and check the result size
reader.wakeUp();
result = (ArrayBatchRecords) reader.fetch();
assertThat(result.numberOfRecords()).isEqualTo(3);
}

private static IcebergSourceSplitReader reader() {
TestingMetricGroup metricGroup = new TestingMetricGroup();
TestingReaderContext readerContext = new TestingReaderContext(new Configuration(), metricGroup);
RowDataReaderFunction readerFunction =
new RowDataReaderFunction(
new Configuration(),
SCHEMA,
SCHEMA,
null,
true,
new HadoopFileIO(new org.apache.hadoop.conf.Configuration()),
new PlaintextEncryptionManager(),
Collections.emptyList());

return new IcebergSourceSplitReader<>(
new IcebergSourceReaderMetrics(metricGroup, "dummy"),
readerFunction,
SplitComparators.fileSequenceNumber(),
readerContext);
}
}

0 comments on commit 7534f5a

Please sign in to comment.