diff --git a/docs/flink-configuration.md b/docs/flink-configuration.md index 1877de2a6dd1..afb92a72765c 100644 --- a/docs/flink-configuration.md +++ b/docs/flink-configuration.md @@ -133,7 +133,8 @@ env.getConfig() | max-planning-snapshot-count | connector.iceberg.max-planning-snapshot-count | N/A | Integer.MAX_VALUE | Max number of snapshots limited per split enumeration. Applicable only to streaming read. | | limit | connector.iceberg.limit | N/A | -1 | Limited output number of rows. | | max-allowed-planning-failures | connector.iceberg.max-allowed-planning-failures | N/A | 3 | Max allowed consecutive failures for scan planning before failing the job. Set to -1 for never failing the job for scan planing failure. | - +| watermark-column | connector.iceberg.watermark-column | N/A | null | Specifies the watermark column to use for watermark generation. | +| watermark-timeunit | connector.iceberg.watermark-timeunit | N/A | TimeUnit.MICROSECONDS | Specifies the watermark time unit to use for watermark generation. The possible values are DAYS, HOURS, MINUTES, SECONDS, MILLISECONDS, MICROSECONDS, NANOSECONDS | ### Write options diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 7c7afd24ed8e..7c2ee69f806d 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -94,7 +94,7 @@ private FlinkConfigOptions() {} public static final ConfigOption TABLE_EXEC_SPLIT_ASSIGNER_TYPE = ConfigOptions.key("table.exec.iceberg.split-assigner-type") .enumType(SplitAssignerType.class) - .defaultValue(SplitAssignerType.SIMPLE) + .defaultValue(null) .withDescription( Description.builder() .text("Split assigner type that determine how splits are assigned to readers.") diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java index 0e04c9affb19..292b9bb63079 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.TimeUtils; import org.apache.iceberg.Table; @@ -190,4 +191,22 @@ public int maxAllowedPlanningFailures() { .defaultValue(FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue()) .parse(); } + + public String watermarkColumn() { + return confParser + .stringConf() + .option(FlinkReadOptions.WATERMARK_COLUMN) + .flinkConfig(FlinkReadOptions.WATERMARK_COLUMN_OPTION) + .defaultValue(FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue()) + .parseOptional(); + } + + public TimeUnit watermarkTimeUnit() { + return confParser + .enumConfParser(TimeUnit.class) + .option(FlinkReadOptions.WATERMARK_TIME_UNIT) + .flinkConfig(FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION) + .defaultValue(FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION.defaultValue()) + .parse(); + } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java index 55c5aca3b677..f837dddd039a 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.flink; +import java.util.concurrent.TimeUnit; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.iceberg.TableProperties; @@ -109,4 +110,14 @@ private FlinkReadOptions() {} public static final String MAX_ALLOWED_PLANNING_FAILURES = "max-allowed-planning-failures"; public static final ConfigOption MAX_ALLOWED_PLANNING_FAILURES_OPTION = ConfigOptions.key(PREFIX + MAX_ALLOWED_PLANNING_FAILURES).intType().defaultValue(3); + + public static final String WATERMARK_COLUMN = "watermark-column"; + public static final ConfigOption WATERMARK_COLUMN_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_COLUMN).stringType().defaultValue(null); + + public static final String WATERMARK_TIME_UNIT = "watermark-time-unit"; + public static final ConfigOption WATERMARK_TIME_UNIT_OPTION = + ConfigOptions.key(PREFIX + WATERMARK_TIME_UNIT) + .enumType(TimeUnit.class) + .defaultValue(TimeUnit.MICROSECONDS); } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 179253cb3a18..45d7205095fa 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -72,7 +72,6 @@ import org.apache.iceberg.flink.source.split.SerializableComparator; import org.apache.iceberg.flink.source.split.SplitComparators; import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.util.ThreadPools; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,8 +225,6 @@ public static class Builder { private Table table; private SplitAssignerFactory splitAssignerFactory; private SerializableComparator splitComparator; - private String watermarkColumn; - private TimeUnit watermarkTimeUnit = TimeUnit.MICROSECONDS; private ReaderFunction readerFunction; private ReadableConfig flinkConfig = new Configuration(); private final ScanContext.Builder contextBuilder = ScanContext.builder(); @@ -249,9 +246,6 @@ public Builder table(Table newTable) { } public Builder assignerFactory(SplitAssignerFactory assignerFactory) { - Preconditions.checkArgument( - watermarkColumn == null, - "Watermark column and SplitAssigner should not be set in the same source"); this.splitAssignerFactory = assignerFactory; return this; } @@ -457,7 +451,7 @@ public Builder watermarkColumn(String columnName) { Preconditions.checkArgument( splitAssignerFactory == null, "Watermark column and SplitAssigner should not be set in the same source"); - this.watermarkColumn = columnName; + readOptions.put(FlinkReadOptions.WATERMARK_COLUMN, columnName); return this; } @@ -467,7 +461,7 @@ public Builder watermarkColumn(String columnName) { * value. The default value is {@link TimeUnit#MICROSECONDS}. */ public Builder watermarkTimeUnit(TimeUnit timeUnit) { - this.watermarkTimeUnit = timeUnit; + readOptions.put(FlinkReadOptions.WATERMARK_TIME_UNIT, timeUnit.name()); return this; } @@ -489,17 +483,26 @@ public IcebergSource build() { } contextBuilder.resolveConfig(table, readOptions, flinkConfig); - Schema icebergSchema = table.schema(); if (projectedFlinkSchema != null) { contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema)); } SerializableRecordEmitter emitter = SerializableRecordEmitter.defaultEmitter(); + + ScanContext context = contextBuilder.build(); + String watermarkColumn = context.watermarkColumn(); + TimeUnit watermarkTimeUnit = context.watermarkTimeUnit(); + + if (splitAssignerFactory != null) { + Preconditions.checkArgument( + watermarkColumn == null, + "Watermark column and SplitAssigner should not be set in the same source"); + } + if (watermarkColumn != null) { // Column statistics is needed for watermark generation - contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn)); - + context = context.copyWithColumnStat(watermarkColumn); SplitWatermarkExtractor watermarkExtractor = new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit); emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor); @@ -507,7 +510,6 @@ public IcebergSource build() { new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor)); } - ScanContext context = contextBuilder.build(); if (readerFunction == null) { if (table instanceof BaseMetadataTable) { MetaDataReaderFunction rowDataReaderFunction = diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java index 610657e8d47b..4769fe47c145 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java @@ -131,16 +131,21 @@ private DataStream createDataStream(StreamExecutionEnvironment execEnv) private DataStreamSource createFLIP27Stream(StreamExecutionEnvironment env) { SplitAssignerType assignerType = readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE); - IcebergSource source = + // null check on assignerType + IcebergSource.Builder builder = IcebergSource.forRowData() .tableLoader(loader) - .assignerFactory(assignerType.factory()) .properties(properties) .project(getProjectedSchema()) .limit(limit) .filters(filters) - .flinkConfig(readableConfig) - .build(); + .flinkConfig(readableConfig); + + if (assignerType != null) { + builder.assignerFactory(assignerType.factory()); + } + + IcebergSource source = builder.build(); DataStreamSource stream = env.fromSource( source, diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index 4357b1f57df6..2c2bf5b6dde0 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.util.Preconditions; @@ -33,6 +34,7 @@ import org.apache.iceberg.flink.FlinkConfigOptions; import org.apache.iceberg.flink.FlinkReadConf; import org.apache.iceberg.flink.FlinkReadOptions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; /** Context object with optional arguments for a Flink Scan. */ @Internal @@ -67,6 +69,8 @@ public class ScanContext implements Serializable { private final Integer planParallelism; private final int maxPlanningSnapshotCount; private final int maxAllowedPlanningFailures; + private final String watermarkColumn; + private final TimeUnit watermarkTimeUnit; private ScanContext( boolean caseSensitive, @@ -91,6 +95,8 @@ private ScanContext( Integer planParallelism, int maxPlanningSnapshotCount, int maxAllowedPlanningFailures, + String watermarkColumn, + TimeUnit watermarkTimeUnit, String branch, String tag, String startTag, @@ -122,7 +128,8 @@ private ScanContext( this.planParallelism = planParallelism; this.maxPlanningSnapshotCount = maxPlanningSnapshotCount; this.maxAllowedPlanningFailures = maxAllowedPlanningFailures; - + this.watermarkColumn = watermarkColumn; + this.watermarkTimeUnit = watermarkTimeUnit; validate(); } @@ -272,6 +279,14 @@ public int maxAllowedPlanningFailures() { return maxAllowedPlanningFailures; } + public String watermarkColumn() { + return watermarkColumn; + } + + public TimeUnit watermarkTimeUnit() { + return watermarkTimeUnit; + } + public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSnapshotId) { return ScanContext.builder() .caseSensitive(caseSensitive) @@ -298,6 +313,40 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkTimeUnit(watermarkTimeUnit) + .build(); + } + + public ScanContext copyWithColumnStat(String columnStat) { + return ScanContext.builder() + .includeColumnStats(Sets.newHashSet(columnStat)) + .caseSensitive(caseSensitive) + .useSnapshotId(snapshotId) + .useBranch(branch) + .useTag(tag) + .startSnapshotId(startSnapshotId) + .endSnapshotId(endSnapshotId) + .startTag(startTag) + .endTag(endTag) + .asOfTimestamp(asOfTimestamp) + .splitSize(splitSize) + .splitLookback(splitLookback) + .splitOpenFileCost(splitOpenFileCost) + .streaming(isStreaming) + .monitorInterval(monitorInterval) + .nameMapping(nameMapping) + .project(schema) + .filters(filters) + .limit(limit) + .includeColumnStats(includeColumnStats) + .exposeLocality(exposeLocality) + .planParallelism(planParallelism) + .maxPlanningSnapshotCount(maxPlanningSnapshotCount) + .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkTimeUnit(watermarkTimeUnit) + .startingStrategy(startingStrategy) .build(); } @@ -327,6 +376,8 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(watermarkColumn) + .watermarkTimeUnit(watermarkTimeUnit) .build(); } @@ -367,6 +418,8 @@ public static class Builder { FlinkReadOptions.MAX_PLANNING_SNAPSHOT_COUNT_OPTION.defaultValue(); private int maxAllowedPlanningFailures = FlinkReadOptions.MAX_ALLOWED_PLANNING_FAILURES_OPTION.defaultValue(); + private String watermarkColumn = FlinkReadOptions.WATERMARK_COLUMN_OPTION.defaultValue(); + private TimeUnit watermarkTimeUnit = FlinkReadOptions.WATERMARK_TIME_UNIT_OPTION.defaultValue(); private Builder() {} @@ -500,6 +553,16 @@ public Builder maxAllowedPlanningFailures(int newMaxAllowedPlanningFailures) { return this; } + public Builder watermarkColumn(String newWatermarkColumn) { + this.watermarkColumn = newWatermarkColumn; + return this; + } + + public Builder watermarkTimeUnit(TimeUnit newWatermarkTimeUnit) { + this.watermarkTimeUnit = newWatermarkTimeUnit; + return this; + } + public Builder resolveConfig( Table table, Map readOptions, ReadableConfig readableConfig) { FlinkReadConf flinkReadConf = new FlinkReadConf(table, readOptions, readableConfig); @@ -525,7 +588,9 @@ public Builder resolveConfig( .planParallelism(flinkReadConf.workerPoolSize()) .includeColumnStats(flinkReadConf.includeColumnStats()) .maxPlanningSnapshotCount(flinkReadConf.maxPlanningSnapshotCount()) - .maxAllowedPlanningFailures(maxAllowedPlanningFailures); + .maxAllowedPlanningFailures(maxAllowedPlanningFailures) + .watermarkColumn(flinkReadConf.watermarkColumn()) + .watermarkTimeUnit(flinkReadConf.watermarkTimeUnit()); } public ScanContext build() { @@ -552,6 +617,8 @@ public ScanContext build() { planParallelism, maxPlanningSnapshotCount, maxAllowedPlanningFailures, + watermarkColumn, + watermarkTimeUnit, branch, tag, startTag, diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java index 7d3777935ca8..6a388bb3fcdb 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java @@ -126,6 +126,19 @@ public static List convertRowDataToRow(List rowDataList, RowType r .collect(Collectors.toList()); } + public static void assertRecordsWithOrder( + List results, List expectedRecords, Schema schema) { + List expected = Lists.newArrayList(); + @SuppressWarnings("unchecked") + DataStructureConverter converter = + (DataStructureConverter) + DataStructureConverters.getConverter( + TypeConversions.fromLogicalToDataType(FlinkSchemaUtil.convert(schema))); + expectedRecords.forEach( + r -> expected.add(converter.toExternal(RowDataConverter.convert(schema, r)))); + assertRowsWithOrder(results, expected); + } + public static void assertRecords(List results, List expectedRecords, Schema schema) { List expected = Lists.newArrayList(); @SuppressWarnings("unchecked") @@ -146,6 +159,10 @@ public static void assertRows(List results, List expected) { Assertions.assertThat(results).containsExactlyInAnyOrderElementsOf(expected); } + public static void assertRowsWithOrder(List results, List expected) { + Assertions.assertThat(results).isEqualTo(expected); + } + public static void assertRowData(Schema schema, StructLike expected, RowData actual) { assertRowData(schema.asStruct(), FlinkSchemaUtil.convert(schema), expected, actual); } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java index 3652e0bb56df..c8c10a7bd953 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java @@ -18,7 +18,11 @@ */ package org.apache.iceberg.flink.source; +import static org.apache.iceberg.types.Types.NestedField.required; + import java.io.IOException; +import java.time.Instant; +import java.time.ZoneId; import java.util.List; import java.util.Map; import org.apache.flink.configuration.Configuration; @@ -26,14 +30,33 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.types.Row; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderHelper; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkConfigOptions; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.flink.TestHelpers; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.Assume; import org.junit.Before; +import org.junit.Test; public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded { private volatile TableEnvironment tEnv; + private static final Schema SCHEMA_TS = + new Schema( + required(1, "t1", Types.TimestampType.withoutZone()), + required(2, "t2", Types.LongType.get())); + public TestIcebergSourceBoundedSql(String fileFormat) { super(fileFormat); } @@ -77,4 +100,92 @@ protected List run( String optionStr = SqlHelpers.sqlOptionsToString(options); return SqlHelpers.sql(getTableEnv(), "select %s from t %s %s", select, optionStr, sqlFilter); } + + protected Record generateRecord(Instant t1, long t2) { + Record record = GenericRecord.create(SCHEMA_TS); + record.setField("t1", t1.atZone(ZoneId.systemDefault()).toLocalDateTime()); + record.setField("t2", t2); + return record; + } + + @Test + public void testWatermarkOptions() throws Exception { + // Skip AVRO since we don't collect metrics for it, and hence we cannot use watermark column as + // there are no stats + // re: https://github.com/apache/iceberg/pull/1963 + Assume.assumeTrue("Temporary skip AVRO", FileFormat.AVRO != fileFormat); + + Table table = catalogResource.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, SCHEMA_TS); + long baseTime = 1702382109000L; + + GenericAppenderHelper helper = new GenericAppenderHelper(table, fileFormat, TEMPORARY_FOLDER); + + // File 1 - early timestamps, new longs + Record early1 = + generateRecord(Instant.ofEpochMilli(baseTime), baseTime + (1000 * 60 * 60 * 24 * 30L)); + Record early2 = + generateRecord( + Instant.ofEpochMilli(baseTime - 10 * 1000L), baseTime + (1000 * 60 * 60 * 24 * 35L)); + + List recordsDataFile1 = Lists.newArrayList(); + recordsDataFile1.add(early1); + recordsDataFile1.add(early2); + DataFile dataFile1 = helper.writeFile(recordsDataFile1); + // File 2 - old timestamps, old longs + Record late1 = + generateRecord( + Instant.ofEpochMilli(baseTime + 14 * 1000L), baseTime - (1000 * 60 * 60 * 24 * 30L)); + Record late2 = + generateRecord( + Instant.ofEpochMilli(baseTime + 12 * 1000L), baseTime - (1000 * 60 * 61 * 24 * 35L)); + + List recordsDataFile2 = Lists.newArrayList(); + recordsDataFile2.add(late1); + recordsDataFile2.add(late2); + DataFile dataFile2 = helper.writeFile(recordsDataFile2); + + helper.appendToTable(dataFile1, dataFile2); + + // first assertion: early and then late + List expected = Lists.newArrayList(); + expected.addAll(recordsDataFile1); + expected.addAll(recordsDataFile2); + TestHelpers.assertRecordsWithOrder( + run( + SCHEMA_TS, + ImmutableList.of(), + ImmutableMap.of( + "watermark-column", + "t1", + "split-file-open-cost", + "128000000", + "write-parallelism", + "1"), + "", + "*"), + expected, + SCHEMA_TS); + + // second assertion: late and then early + expected.clear(); + expected.addAll(recordsDataFile2); + expected.addAll(recordsDataFile1); + TestHelpers.assertRecordsWithOrder( + run( + SCHEMA_TS, + ImmutableList.of(), + ImmutableMap.of( + "watermark-column", + "t2", + "watermark-time-unit", + "MILLISECONDS", + "split-file-open-cost", + "128000000", + "write-parallelism", + "1"), + "", + "*"), + expected, + SCHEMA_TS); + } }