Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The "Emitting watermarks" feature can't be used in flink sql? #10219

Open
yeezychao opened this issue Apr 25, 2024 · 13 comments
Open

The "Emitting watermarks" feature can't be used in flink sql? #10219

yeezychao opened this issue Apr 25, 2024 · 13 comments
Labels
question Further information is requested

Comments

@yeezychao
Copy link

Query engine

flink 1.18.0

Question

Hi @stevenzwu In the latest version, use flink sql still cannot define watermarks. This is still not possible when our company wants to use flink sql to implement window aggregation to process ODS data. Are there plans to support this?

@yeezychao yeezychao added the question Further information is requested label Apr 25, 2024
@pvary
Copy link
Contributor

pvary commented Apr 25, 2024

@yeezychao: The feature should be available in Iceberg 1.5.0. Here is the PR, and there is an example there: #9346.

Please let us know, if it is not working.

@pvary
Copy link
Contributor

pvary commented Apr 25, 2024

Also, here is the documentation which describes what this feature can and can't do: https://iceberg.apache.org/docs/nightly/flink-queries/#emitting-watermarks

@yeezychao
Copy link
Author

@pvary Thank you for your reply. I understand what you mean and I have also read the doc. However, we want to implement a function similar to this demo and need to use flink sql for TUMBLE window aggregation. If you run window calculations by specifying the watermark field through options, I will get an exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.Because it is not defined by WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
Therefore, I think this implementation can only be used under Flink Data StreamAPI, Table&SQL API are not supported yet.

@pvary
Copy link
Contributor

pvary commented Apr 25, 2024

@yeezychao: Do you happen to know, what is needed from the connector side to make this work?

@yeezychao
Copy link
Author

@pvary I'm confused why computed columns and watermark specs are not supported in the FlinkCatalog.java code.

@pvary
Copy link
Contributor

pvary commented Apr 26, 2024

What would be needed to support them?
I am guessing that this would be a Flink specific conversion between the Iceberg table and the Flink table. Am I right here?

@yeezychao
Copy link
Author

@pvary You're right!

@yeezychao
Copy link
Author

IcebergTableSource should implement the SupportsWatermarkPushDown interface like Kafka to implement Watermark pushdown. Can you give some suggestions? @hililiwei @stevenzwu

@pvary
Copy link
Contributor

pvary commented May 2, 2024

Maybe we could just implement the interface with the IcebergTableSource. We either prevent setting a watermark strategy which is not noWatermarks (cleaner approach, but might not work, if the watermark strategy is reused somewhere else to change some behavior), or create a new watermark strategy in createFLIP27Stream, where the watermark generation part is noWatermark, and only the timestamp generation part is reused. If we pass this watermark strategy to the fromSource, then we might be able to make things work.

Sadly I don't have the bandwidth to test this out, but if you want to take a stab at it, I can review your PR.

Thanks, Peter

@yeezychao yeezychao changed the title Is the "Emitting watermarks" new feature can't be used in flink sql? The "Emitting watermarks" feature can't be used in flink sql? May 8, 2024
@Deanozk
Copy link

Deanozk commented Jul 24, 2024

Is this not on the roadmap yet?

@Deanozk
Copy link

Deanozk commented Jul 24, 2024

Any updates?

@yeezychao
Copy link
Author

yeezychao commented Aug 2, 2024

Maybe we could just implement the interface with the IcebergTableSource. We either prevent setting a watermark strategy which is not noWatermarks (cleaner approach, but might not work, if the watermark strategy is reused somewhere else to change some behavior), or create a new watermark strategy in createFLIP27Stream, where the watermark generation part is noWatermark, and only the timestamp generation part is reused. If we pass this watermark strategy to the fromSource, then we might be able to make things work.

Sadly I don't have the bandwidth to test this out, but if you want to take a stab at it, I can review your PR.

Thanks, Peter

Hi @pvary ,I refer to 4625 and implement the computed column and declare the watermark strategy. I plan to align the logic of your implemented watermark-column option in IcebergSource code according to the watermark field stored in the table attributes. But if the watermark-column is a computed column (not a physical column), how to create SplitWatermarkExtractor next? Can you give me some advice? Thank you

@pvary
Copy link
Contributor

pvary commented Aug 2, 2024

how to create SplitWatermarkExtractor next? Can you give me some advice?

I'm not sure how Flink calculates these watermarks normally. The IcebergSourceSplit could be used to get the min value for the timestamp column, but I am not sure how Flink calculates the watermark in these cases

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants