-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Watermark read options #9346
Conversation
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Outdated
Show resolved
Hide resolved
3afa06b
to
e0cf6ef
Compare
e0cf6ef
to
b8587c0
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
@@ -94,7 +94,7 @@ private FlinkConfigOptions() {} | |||
public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE = | |||
ConfigOptions.key("table.exec.iceberg.split-assigner-type") | |||
.enumType(SplitAssignerType.class) | |||
.defaultValue(SplitAssignerType.SIMPLE) | |||
.defaultValue(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intended? This would be a breaking change if a user did not provide a split assigner type before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch.
I changed that to null, as we had an assertion before:
if (splitAssignerFactory != null) {
Preconditions.checkArgument(
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
}
however, after my work on this PR, it is safe to remove the above assertion, since we will override the factory with a OrderedSplitAssignerFactory
when the watermarkColumn
is specified
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. It would be good to update the docs to reveal how the default is determined
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?
After some thinking, I'm totally not sure about the default change. Couldn't it be possible to use the watermark column, without the ordered split assigner factory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not according to the code.
if (watermarkColumn != null) {
// Column statistics is needed for watermark generation
context = context.copyWithColumnStat(watermarkColumn);
SplitWatermarkExtractor watermarkExtractor =
new ColumnStatsWatermarkExtractor(icebergSchema, watermarkColumn, watermarkTimeUnit);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc: @pvary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the splits are not ordered, then we will have fluctuating watermarks. We do not emit those, which are not in order, but beats the purpose of the whole watermark generation feature.
Imagine a situation where we reading time series data, and read the latest file first. Every other file will contain late data in this case, and might be dropped.
So while technically possible, I rather not allow the users to shoot themselves in the foot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If users have not opt-ed into the new feature, I would expect their SQL queries to still work. Can you add tests to ensure that?
Hi! I didn't get your suggestion about the unit test. Would you please rephrase? Thanks @mas-chen
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
9c73d9e
to
0eba14a
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
0eba14a
to
f8049f5
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestSqlBase.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
...k/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/TestHelpers.java
Outdated
Show resolved
Hide resolved
6583717
to
344325e
Compare
a9f0dcb
to
40185e4
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadOptions.java
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceSql.java
Outdated
Show resolved
Hide resolved
ba4204b
to
5559cda
Compare
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/FlinkReadConf.java
Outdated
Show resolved
Hide resolved
flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java
Outdated
Show resolved
Hide resolved
4b8c558
to
721ece6
Compare
83144b9
to
185171f
Compare
recordsDataFile1.add(file1Record1); | ||
recordsDataFile1.add(file1Record2); | ||
DataFile dataFile1 = helper.writeFile(recordsDataFile1); | ||
// File 2 - old timestamps, old longs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not correct:
// File 2 - late timestamps, old longs
recordsDataFile2.add(file2Record1); | ||
recordsDataFile2.add(file2Record2); | ||
|
||
// early1 - early2 -- late1 late 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not get this comment.
Maybe something like this - feel free to reword if you feel so:
// Expected records if the splits are ordered
// - ascending (watermark from t1) - records from the split with early timestamps then records from the split with late timestamps
// - descending (watermark from t2) - records from the split with old longs then records from the split with new longs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments for comments 😄
Otherwise +1 LGTM
185171f
to
6c60929
Compare
Thanks @rodmeneses for the PR, and @stevenzwu and @mas-chen for the review! |
Now it is possible to pass the above parameters in the SQL statement: