Skip to content

Commit

Permalink
Flink: add read options configuration for watermark
Browse files Browse the repository at this point in the history
  • Loading branch information
rodmeneses committed Dec 20, 2023
1 parent b79a8ff commit e0cf6ef
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 20 deletions.
3 changes: 2 additions & 1 deletion docs/flink-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ env.getConfig()
| max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. |
| limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. |
| max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. |

| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. |
| watermark-timeunit | connector.iceberg.watermark-timeunit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS |

### Write options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private FlinkConfigOptions() {}
public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
ConfigOptions.key("table.exec.iceberg.split-assigner-type")
.enumType(SplitAssignerType.class)
.defaultValue(SplitAssignerType.SIMPLE)
.defaultValue(null)
.withDescription(
Description.builder()
.text("Split assigner type that determine how splits are assigned to readers.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.TimeUtils;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() {
.defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue())
.parse();
}

public String watermarkColumn() {
return confParser
.stringConf()
.option(FlinkReadOptions.WATERMARK_COLUMN)
.flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue())
.parseOptional();
}

public TimeUnit watermarkTimeUnit() {
return confParser
.enumConfParser(TimeUnit.class)
.option(FlinkReadOptions.WATERMARK_TIME_UNIT)
.flinkConfig(FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION)
.defaultValue(FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION.defaultValue())
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink;

import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -109,4 +110,14 @@ private FlinkReadOptions() {}
public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures";
public static final ConfigOption<Integer> MAX_ALLOWED_PLANNING_FAILURES_OPTION =
ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3);

public static final String WATERMARK_COLUMN = "watermark-column";
public static final ConfigOption<String> WATERMARK_COLUMN_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().defaultValue(null);

public static final String WATERMARK_TIME_UNIT = "watermark-time-unit";
public static final ConfigOption<TimeUnit> WATERMARK_TIME_UNIT_OPTION =
ConfigOptions.key(PREFIX + WATERMARK_TIME_UNIT)
.enumType(TimeUnit.class)
.defaultValue(TimeUnit.MICROSECONDS);
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -226,8 +225,6 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private String watermarkColumn;
private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -249,9 +246,6 @@ public Builder<T> table(Table newTable) {
}

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -457,7 +451,7 @@ public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName);
return this;
}

Expand All @@ -467,7 +461,7 @@ public Builder<T> watermarkColumn(String columnName) {
* value. The default value is {@link TimeUnit#MICROSECONDS}.
*/
public Builder<T> watermarkTimeUnit(TimeUnit timeUnit) {
this.watermarkTimeUnit = timeUnit;
readOptions.put(FlinkReadOptions.WATERMARK_TIME_UNIT, timeUnit.name());
return this;
}

Expand All @@ -489,25 +483,33 @@ public IcebergSource<T> build() {
}

contextBuilder.resolveConfig(table, readOptions, flinkConfig);

Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();

ScanContext context = contextBuilder.build();
String watermarkColumn = context.watermarkColumn();
TimeUnit watermarkTimeUnit = context.watermarkTimeUnit();

if (splitAssignerFactory != null) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
}

if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

context = context.copyWithColumnStat(watermarkColumn);
SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,16 +131,21 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
// null check on assignerType
IcebergSource.Builder builder =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
.flinkConfig(readableConfig);

if (assignerType != null) {
builder.assignerFactory(assignerType.factory());
}

IcebergSource<RowData> source = builder.build();
DataStreamSource stream =
env.fromSource(
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.util.Preconditions;
Expand All @@ -33,6 +34,7 @@
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
import org.apache.iceberg.flink.FlinkReadOptions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

/** Context object with optional arguments for a Flink Scan. */
@Internal
Expand Down Expand Up @@ -67,6 +69,8 @@ public class ScanContext implements Serializable {
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
private final String watermarkColumn;
private final TimeUnit watermarkTimeUnit;

private ScanContext(
boolean caseSensitive,
Expand All @@ -91,6 +95,8 @@ private ScanContext(
Integer planParallelism,
int maxPlanningSnapshotCount,
int maxAllowedPlanningFailures,
String watermarkColumn,
TimeUnit watermarkTimeUnit,
String branch,
String tag,
String startTag,
Expand Down Expand Up @@ -122,7 +128,8 @@ private ScanContext(
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
this.maxAllowedPlanningFailures = maxAllowedPlanningFailures;

this.watermarkColumn = watermarkColumn;
this.watermarkTimeUnit = watermarkTimeUnit;
validate();
}

Expand Down Expand Up @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() {
return maxAllowedPlanningFailures;
}

public String watermarkColumn() {
return watermarkColumn;
}

public TimeUnit watermarkTimeUnit() {
return watermarkTimeUnit;
}

public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) {
return ScanContext.builder()
.caseSensitive(caseSensitive)
Expand All @@ -298,6 +313,40 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkTimeUnit(watermarkTimeUnit)
.build();
}

public ScanContext copyWithColumnStat(String columnStat) {
return ScanContext.builder()
.includeColumnStats(Sets.newHashSet(columnStat))
.caseSensitive(caseSensitive)
.useSnapshotId(snapshotId)
.useBranch(branch)
.useTag(tag)
.startSnapshotId(startSnapshotId)
.endSnapshotId(endSnapshotId)
.startTag(startTag)
.endTag(endTag)
.asOfTimestamp(asOfTimestamp)
.splitSize(splitSize)
.splitLookback(splitLookback)
.splitOpenFileCost(splitOpenFileCost)
.streaming(isStreaming)
.monitorInterval(monitorInterval)
.nameMapping(nameMapping)
.project(schema)
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkTimeUnit(watermarkTimeUnit)
.startingStrategy(startingStrategy)
.build();
}

Expand Down Expand Up @@ -327,6 +376,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(watermarkColumn)
.watermarkTimeUnit(watermarkTimeUnit)
.build();
}

Expand Down Expand Up @@ -367,6 +418,8 @@ public static class Builder {
FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue();
private int maxAllowedPlanningFailures =
FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue();
private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue();
private TimeUnit watermarkTimeUnit = FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION.defaultValue();

private Builder() {}

Expand Down Expand Up @@ -500,6 +553,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) {
return this;
}

public Builder watermarkColumn(String newWatermarkColumn) {
this.watermarkColumn = newWatermarkColumn;
return this;
}

public Builder watermarkTimeUnit(TimeUnit newWatermarkTimeUnit) {
this.watermarkTimeUnit = newWatermarkTimeUnit;
return this;
}

public Builder resolveConfig(
Table table, Map<String, String> readOptions, ReadableConfig readableConfig) {
FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig);
Expand All @@ -525,7 +588,9 @@ public Builder resolveConfig(
.planParallelism(flinkReadConf.workerPoolSize())
.includeColumnStats(flinkReadConf.includeColumnStats())
.maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount())
.maxAllowedPlanningFailures(maxAllowedPlanningFailures);
.maxAllowedPlanningFailures(maxAllowedPlanningFailures)
.watermarkColumn(flinkReadConf.watermarkColumn())
.watermarkTimeUnit(flinkReadConf.watermarkTimeUnit());
}

public ScanContext build() {
Expand All @@ -552,6 +617,8 @@ public ScanContext build() {
planParallelism,
maxPlanningSnapshotCount,
maxAllowedPlanningFailures,
watermarkColumn,
watermarkTimeUnit,
branch,
tag,
startTag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,19 @@ public static List<Row> convertRowDataToRow(List<RowData> rowDataList, RowType r
.collect(Collectors.toList());
}

public static void assertRecordsWithOrder(
List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
DataStructureConverter<RowData, Row> converter =
(DataStructureConverter)
DataStructureConverters.getConverter(
TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema)));
expectedRecords.forEach(
r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r))));
assertRowsWithOrder(results, expected);
}

public static void assertRecords(List<Row> results, List<Record> expectedRecords, Schema schema) {
List<Row> expected = Lists.newArrayList();
@SuppressWarnings("unchecked")
Expand All @@ -146,6 +159,10 @@ public static void assertRows(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected);
}

public static void assertRowsWithOrder(List<Row> results, List<Row> expected) {
Assertions.assertThat(results).isEqualTo(expected);
}

public static void assertRowData(Schema schema, StructLike expected, RowData actual) {
assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual);
}
Expand Down
Loading

0 comments on commit e0cf6ef

Please sign in to comment.