From 4d34398cfd32465222f55df522fcd5a2db59c92c Mon Sep 17 00:00:00 2001 From: Rodrigo Date: Tue, 9 Jan 2024 09:22:29 -0800 Subject: [PATCH] Flink: Watermark read options (#9346) --- docs/flink-configuration.md | 4 +- .../apache/iceberg/flink/FlinkReadConf.java | 19 +++ .../iceberg/flink/FlinkReadOptions.java | 11 ++ .../iceberg/flink/source/IcebergSource.java | 19 ++- .../iceberg/flink/source/ScanContext.java | 38 ++++- .../org/apache/iceberg/flink/TestHelpers.java | 17 ++- .../flink/source/TestIcebergSourceSql.java | 130 +++++++++++++++++- 7 files changed, 217 insertions(+), 21 deletions(-) diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md index 1877de2a6dd1..8cb1f799580b 100644 --- a/docs/flink-configuration.md +++ b/docs/flink-configuration.md @@ -75,7 +75,6 @@ The following properties can be set if using the REST catalog: | credential | | | A credential to exchange for a token in the OAuth2 client credentials flow. | | token | | | A token which will be used to interact with the server. | - ## Runtime configuration ### Read options @@ -133,7 +132,8 @@ env.getConfig() | max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. | | limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. | | max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. | - +| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. If this option is present, the `splitAssignerFactory` will be overridden with `OrderedSplitAssignerFactory`. | +| watermark-column-time-unit | connector.iceberg.watermark-column-time-unit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS. | ### Write options diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java index 0e04c9affb19..d53ea73f9342 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() { .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue()) .parse(); } + + public String watermarkColumn() { + return confParser + .stringConf() + .option(FlinkReadOptions.WATERMARK_COLUMN) + .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION) + .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue()) + .parseOptional(); + } + + public TimeUnit watermarkColumnTimeUnit() { + return confParser + .enumConfParser(TimeUnit.class) + .option(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT) + .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION) + .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue()) + .parse(); + } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java index 55c5aca3b677..1bbd88146c8f 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.TableProperties; @@ -109,4 +110,14 @@ private FlinkReadOptions() {} public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures"; public static final ConfigOption MAX_ALLOWED_PLANNING_FAILURES_OPTION = ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3); + + public static final String WATERMARK_COLUMN = "watermark-column"; + public static final ConfigOption WATERMARK_COLUMN_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().noDefaultValue(); + + public static final String WATERMARK_COLUMN_TIME_UNIT = "watermark-column-time-unit"; + public static final ConfigOption WATERMARK_COLUMN_TIME_UNIT_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_COLUMN_TIME_UNIT) + .enumType(TimeUnit.class) + .defaultValue(TimeUnit.MICROSECONDS); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index a7ce2db61ffb..0655cf87a996 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -46,6 +46,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.FlinkReadConf; import org.apache.iceberg.flink.FlinkReadOptions; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TableLoader; @@ -219,8 +220,6 @@ public static class Builder { private Table table; private SplitAssignerFactory splitAssignerFactory; private SerializableComparator splitComparator; - private String watermarkColumn; - private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS; private ReaderFunction readerFunction; private ReadableConfig flinkConfig = new Configuration(); private final ScanContext.Builder contextBuilder = ScanContext.builder(); @@ -242,9 +241,6 @@ public Builder table(Table newTable) { } public Builder assignerFactory(SplitAssignerFactory assignerFactory) { - Preconditions.checkArgument( - watermarkColumn == null, - "Watermark column and SplitAssigner should not be set in the same source"); this.splitAssignerFactory = assignerFactory; return this; } @@ -441,7 +437,7 @@ public Builder setAll(Map properties) { * Emits watermarks once per split based on the min value of column statistics from files * metadata in the given split. The generated watermarks are also used for ordering the splits * for read. Accepted column types are timestamp/timestamptz/long. For long columns consider - * setting {@link #watermarkTimeUnit(TimeUnit)}. + * setting {@link #watermarkColumnTimeUnit(TimeUnit)}. * *

Consider setting `read.split.open-file-cost` to prevent combining small files to a single * split when the watermark is used for watermark alignment. @@ -450,7 +446,7 @@ public Builder watermarkColumn(String columnName) { Preconditions.checkArgument( splitAssignerFactory == null, "Watermark column and SplitAssigner should not be set in the same source"); - this.watermarkColumn = columnName; + readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName); return this; } @@ -459,8 +455,8 @@ public Builder watermarkColumn(String columnName) { * org.apache.iceberg.types.Types.LongType}, then sets the {@link TimeUnit} to convert the * value. The default value is {@link TimeUnit#MICROSECONDS}. */ - public Builder watermarkTimeUnit(TimeUnit timeUnit) { - this.watermarkTimeUnit = timeUnit; + public Builder watermarkColumnTimeUnit(TimeUnit timeUnit) { + readOptions.put(FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT, timeUnit.name()); return this; } @@ -482,13 +478,16 @@ public IcebergSource build() { } contextBuilder.resolveConfig(table, readOptions, flinkConfig); - Schema icebergSchema = table.schema(); if (projectedFlinkSchema != null) { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } SerializableRecordEmitter emitter = SerializableRecordEmitter.defaultEmitter(); + FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, flinkConfig); + String watermarkColumn = flinkReadConf.watermarkColumn(); + TimeUnit watermarkTimeUnit = flinkReadConf.watermarkColumnTimeUnit(); + if (watermarkColumn != null) { // Column statistics is needed for watermark generation contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn)); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 4357b1f57df6..3dce5dd5901e 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.Preconditions; @@ -67,6 +68,8 @@ public class ScanContext implements Serializable { private final Integer planParallelism; private final int maxPlanningSnapshotCount; private final int maxAllowedPlanningFailures; + private final String watermarkColumn; + private final TimeUnit watermarkColumnTimeUnit; private ScanContext( boolean caseSensitive, @@ -91,6 +94,8 @@ private ScanContext( Integer planParallelism, int maxPlanningSnapshotCount, int maxAllowedPlanningFailures, + String watermarkColumn, + TimeUnit watermarkColumnTimeUnit, String branch, String tag, String startTag, @@ -122,6 +127,8 @@ private ScanContext( this.planParallelism = planParallelism; this.maxPlanningSnapshotCount = maxPlanningSnapshotCount; this.maxAllowedPlanningFailures = maxAllowedPlanningFailures; + this.watermarkColumn = watermarkColumn; + this.watermarkColumnTimeUnit = watermarkColumnTimeUnit; validate(); } @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() { return maxAllowedPlanningFailures; } + public String watermarkColumn() { + return watermarkColumn; + } + + public TimeUnit watermarkColumnTimeUnit() { + return watermarkColumnTimeUnit; + } + public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) @@ -298,6 +313,8 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkColumnTimeUnit(watermarkColumnTimeUnit) .build(); } @@ -327,6 +344,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkColumnTimeUnit(watermarkColumnTimeUnit) .build(); } @@ -367,6 +386,9 @@ public static class Builder { FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue(); private int maxAllowedPlanningFailures = FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue(); + private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue(); + private TimeUnit watermarkColumnTimeUnit = + FlinkReadOptions.WATERMARK_COLUMN_TIME_UNIT_OPTION.defaultValue(); private Builder() {} @@ -500,6 +522,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) { return this; } + public Builder watermarkColumn(String newWatermarkColumn) { + this.watermarkColumn = newWatermarkColumn; + return this; + } + + public Builder watermarkColumnTimeUnit(TimeUnit newWatermarkTimeUnit) { + this.watermarkColumnTimeUnit = newWatermarkTimeUnit; + return this; + } + public Builder resolveConfig( Table table, Map readOptions, ReadableConfig readableConfig) { FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig); @@ -525,7 +557,9 @@ public Builder resolveConfig( .planParallelism(flinkReadConf.workerPoolSize()) .includeColumnStats(flinkReadConf.includeColumnStats()) .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount()) - .maxAllowedPlanningFailures(maxAllowedPlanningFailures); + .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(flinkReadConf.watermarkColumn()) + .watermarkColumnTimeUnit(flinkReadConf.watermarkColumnTimeUnit()); } public ScanContext build() { @@ -552,6 +586,8 @@ public ScanContext build() { planParallelism, maxPlanningSnapshotCount, maxAllowedPlanningFailures, + watermarkColumn, + watermarkColumnTimeUnit, branch, tag, startTag, diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 7d3777935ca8..38b0eb0b406e 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -126,7 +126,7 @@ public static List convertRowDataToRow(List rowDataList, RowType r .collect(Collectors.toList()); } - public static void assertRecords(List results, List expectedRecords, Schema schema) { + private static List convertRecordToRow(List expectedRecords, Schema schema) { List expected = Lists.newArrayList(); @SuppressWarnings("unchecked") DataStructureConverter converter = @@ -135,6 +135,17 @@ public static void assertRecords(List results, List expectedRecords TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); expectedRecords.forEach( r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); + return expected; + } + + public static void assertRecordsWithOrder( + List results, List expectedRecords, Schema schema) { + List expected = convertRecordToRow(expectedRecords, schema); + assertRowsWithOrder(results, expected); + } + + public static void assertRecords(List results, List expectedRecords, Schema schema) { + List expected = convertRecordToRow(expectedRecords, schema); assertRows(results, expected); } @@ -146,6 +157,10 @@ public static void assertRows(List results, List expected) { Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); } + public static void assertRowsWithOrder(List results, List expected) { + Assertions.assertThat(results).containsExactlyElementsOf(expected); + } + public static void assertRowData(Schema schema, StructLike expected, RowData actual) { assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual); } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java index e66ae79c28f8..4250460d278d 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java @@ -18,25 +18,141 @@ */ package org.apache.iceberg.flink.source; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; +import java.util.List; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Test; /** Use the IcebergSource (FLIP-27) */ public class TestIcebergSourceSql extends TestSqlBase { + private static final Schema SCHEMA_TS = + new Schema( + required(1, "t1", Types.TimestampType.withoutZone()), + required(2, "t2", Types.LongType.get())); + @Override public void before() throws IOException { - Configuration tableConf = getTableEnv().getConfig().getConfiguration(); + TableEnvironment tableEnvironment = getTableEnv(); + Configuration tableConf = tableEnvironment.getConfig().getConfiguration(); tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true); + + tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1"); SqlHelpers.sql( - getTableEnv(), + tableEnvironment, "create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')", catalogResource.warehouse()); - SqlHelpers.sql(getTableEnv(), "use catalog iceberg_catalog"); - getTableEnv() - .getConfig() - .getConfiguration() - .set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + SqlHelpers.sql(tableEnvironment, "use catalog iceberg_catalog"); + + tableConf.set(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED, true); + } + + private Record generateRecord(Instant t1, long t2) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t2); + return record; + } + + /** Generates the records in the expected order, with respect to their datafile */ + private List generateExpectedRecords(boolean ascending) throws Exception { + Table table = catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + long baseTime = 1702382109000L; + + GenericAppenderHelper helper = + new GenericAppenderHelper(table, FileFormat.PARQUET, TEMPORARY_FOLDER); + + Record file1Record1 = + generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 60 * 24 * 30L)); + Record file1Record2 = + generateRecord( + Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 * 60 * 24 * 35L)); + + List recordsDataFile1 = Lists.newArrayList(); + recordsDataFile1.add(file1Record1); + recordsDataFile1.add(file1Record2); + DataFile dataFile1 = helper.writeFile(recordsDataFile1); + + Record file2Record1 = + generateRecord( + Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 * 60 * 24 * 30L)); + Record file2Record2 = + generateRecord( + Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 * 61 * 24 * 35L)); + + List recordsDataFile2 = Lists.newArrayList(); + recordsDataFile2.add(file2Record1); + recordsDataFile2.add(file2Record2); + + DataFile dataFile2 = helper.writeFile(recordsDataFile2); + helper.appendToTable(dataFile1, dataFile2); + + // Expected records if the splits are ordered + // - ascending (watermark from t1) - records from the split with early timestamps, then + // records from the split with late timestamps + // - descending (watermark from t2) - records from the split with old longs, then records + // from the split with new longs + List expected = Lists.newArrayList(); + if (ascending) { + expected.addAll(recordsDataFile1); + expected.addAll(recordsDataFile2); + } else { + expected.addAll(recordsDataFile2); + expected.addAll(recordsDataFile1); + } + return expected; + } + + /** Tests the order of splits returned when setting the watermark-column options */ + @Test + public void testWatermarkOptionsAscending() throws Exception { + List expected = generateExpectedRecords(true); + TestHelpers.assertRecordsWithOrder( + run( + ImmutableMap.of("watermark-column", "t1", "split-file-open-cost", "128000000"), + "", + "*"), + expected, + SCHEMA_TS); + } + + /** + * Tests the order of splits returned when setting the watermark-column and + * watermark-column-time-unit" options + */ + @Test + public void testWatermarkOptionsDescending() throws Exception { + List expected = generateExpectedRecords(false); + TestHelpers.assertRecordsWithOrder( + run( + ImmutableMap.of( + "watermark-column", + "t2", + "watermark-column-time-unit", + "MILLISECONDS", + "split-file-open-cost", + "128000000"), + "", + "*"), + expected, + SCHEMA_TS); } }