Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduces the new IcebergSink based on the new V2 Flink Sink Abstraction #10179

Merged
merged 25 commits into from
Aug 26, 2024

Conversation

rodmeneses
Copy link
Contributor

@rodmeneses rodmeneses commented Apr 18, 2024

Co-authored-by: Liwei Li [email protected]
Co-authored-by: Kyle Bendickson [email protected]
Co-authored-by: Peter Vary [email protected]

Summary

The Flink community created a new Sink specification in FLIP-143 with the explicit goal to guarantee the unified handling of the bounded and unbounded data streams. Later it was enhanced in FLIP-191 so there is a well defined place to execute small files compaction. The deprecation of the old SinkFunction is postponed to somewhere around Flink 2.0 based on the discussion on the dev mailing list , so the migration is not extremely urgent, but having the possibility to use the PostCommitTopology to execute the compaction of the small files could provide immediate benefits for the users of the Iceberg-Flink integration.

Previous work

  1. There is an existing Iceberg PR Flink: new sink base on the unified sink API #4904 for the Sink migration by Liwei Li (https://github.com/hililiwei) and Kyle Bendickson (https://github.com/kbendick) with the related documentation which is authored by the same team. The discussion there is stuck, and the PR has been out of date for almost a year now. The current proposal builds heavily on their work and wants to keep them as the co-authors for the proposed change.

  2. @pvary opened Flink: new sink base on the unified sink API - WIP #8653 which this code is based on. On his PR, lots of code is refactored so that the V1 Sink (FlinkSink) and the new V2 (IcebergSink) can reuse and share many code paths and components.

New Implementation

This PR introduces a brand new IcebergSink, but it doesn't change the existing FlinkSink

With Flink 1.19 released, we now have access to the new Hooks like SupportsPreWriteTopology, SupportsCommitter, SupportsPreCommitTopology and SupportsPostCommitTopology. The code in this PR takes advantage of those interfaces to fully implement a brand new IcebergSink using the new V2 Sink Model.

The new IcebergSink implemented here, is needed for the Flink Table maintenance effort led by @pvary and it's described in: https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?pli=1#heading=h.jl0dmuup12gz

Particularly, the new IcebergSink implements

public void addPostCommitTopology(DataStream<CommittableMessage<SinkCommittable>> committables) {

which can be used to perform cleaning/compacting as described in the above doc.

@github-actions github-actions bot added the flink label Apr 18, 2024
@rodmeneses rodmeneses changed the title Iceberg sink a A new implementation of an Iceberg Sink [WIP] thta will be used with upcoming Flink Compaction jobs Apr 18, 2024
@rodmeneses rodmeneses marked this pull request as draft April 18, 2024 20:30
@rodmeneses rodmeneses changed the title A new implementation of an Iceberg Sink [WIP] thta will be used with upcoming Flink Compaction jobs A new implementation of an Iceberg Sink [WIP] that will be used with upcoming Flink Compaction jobs Apr 18, 2024
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.junit.Assert;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update these to Junit5 tests that use AssertJ assertions. You can use TestRewriteDataFilesAction as a reference

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update this (and other tests in this PR) to JUnit5 + AssertJ, because we're in the process of migrating JUnit4 to JUnit5 and new tests should be written using JUnit5 + AssertJ

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do that in a different PR? Either before, or after this?
One of the important thing with this PR is, that the original behaviour is not changing. It is hard to reason about it, if we change the test files in the same PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI, @nastra:
I will have all new unit tests using JUnit 5.
All the others we can update in another PR, once this is merged.

What do you think?

@pvary
Copy link
Contributor

pvary commented Apr 19, 2024

Please update the description of the PR. Also link the previous versions, docs, relevant stuff. So in the future it is easier to find them

@stevenzwu stevenzwu merged commit bea364c into apache:main Aug 26, 2024
20 checks passed
@stevenzwu
Copy link
Contributor

thanks @rodmeneses for the contribution and @pvary for the review

@rodmeneses
Copy link
Contributor Author

rodmeneses commented Aug 26, 2024

Thanks a lot. This was a great team collaboration effort. Thanks for all the reviews and the time spent on this PR: @stevenzwu @pvary @nastra
Special thanks to the original authors:

Liwei Li
Kyle Bendickson
Peter Vary

@mamineturki
Copy link

Hi, when will this change be released please?
Thanks,

@pvary
Copy link
Contributor

pvary commented Aug 30, 2024

It will be released in Iceberg 1.7.0

@arkadius
Copy link
Contributor

Hi, Is there a plan to replace the previous implementation (FlinkSink) with the new one (IcebergSink) also for dynamic tables (in FlinkDynamicTableFactory)? When it will happen?

@rodmeneses
Copy link
Contributor Author

Hi, Is there a plan to replace the previous implementation (FlinkSink) with the new one (IcebergSink) also for dynamic tables (in FlinkDynamicTableFactory)? When it will happen?

Hi @arkadius
new IcebergSink will be released with 1.7.0
Both sink implementation will coexist for a while.

@arkadius
Copy link
Contributor

Thank you for the quick response. By coexistence do you mean that it will be possible to pick the new implementation for dynamic tables for example by some property in the catalog configuration? Currently, the old one is hardcoded. For a test, I changed the implementation to the new one here: https://github.com/arkadius/iceberg/pull/1/files#diff-af1fa2eee823938a1ceaf6f212ebeb12226c7d853a2f47109c284a59c44f39a3R80 and so far I see that everything works. Are there some known limitations of the new implementation?

@rodmeneses
Copy link
Contributor Author

no code has been implemented for this. I see at least 2 options:

  1. Implement a brand new IcebergTableSink that uses the new IcebergSink.
  2. Control what underlying sink to use with some config.
    thoughts? @pvary

@pvary
Copy link
Contributor

pvary commented Sep 25, 2024

@rodmeneses: I would follow the steps taken by the IcebergTableSource, but instead of createFLIP27Stream and table.exec.iceberg.use-flip27-source we should find a better name. I remember that we discussed that the choice of the parameters were not the best one.
Maybe even V2 could be better in this case.
WDYT @stevenzwu ?

@rodmeneses
Copy link
Contributor Author

@rodmeneses: I would follow the steps taken by the IcebergTableSource, but instead of createFLIP27Stream and table.exec.iceberg.use-flip27-source we should find a better name. I remember that we discussed that the choice of the parameters were not the best one. Maybe even V2 could be better in this case. WDYT @stevenzwu ?

Thanks @pvary . That sounds reasonable.
Hi @arkadius. When you said that "everything works", what did you mean? Did you mean that all tests passed? I might be missing something, but I cannot find unit tests covering IcebergTableSink. So maybe now it's a good opportunity to also write unit test to cover all these code...

@rodmeneses
Copy link
Contributor Author

rodmeneses commented Sep 25, 2024

Hi @arkadius . Are you interested to take this work? We would also requires to add unit test to have coverage in the Dynamic Tables flink codebase.
Let me know if you cannot take it. Thanks!!!

@arkadius
Copy link
Contributor

Hi @rodmeneses, by everything works I meant that I did some manual tests and the results were the same as with the old one. Probably "everything" was an overkill here ;-) Yes, I can take this. I'll let you know on the slack if I need any help with this task.

@rodmeneses
Copy link
Contributor Author

Hi @rodmeneses, by everything works I meant that I did some manual tests and the results were the same as with the old one. Probably "everything" was an overkill here ;-) Yes, I can take this. I'll let you know on the slack if I need any help with this task.

Amazing! Thanks!! Definitely tag me for reviews!

@stevenzwu
Copy link
Contributor

yes, we should have a config to determine which sink implementation used for Table API/SQL. Default should be using the old FlinkSink. When the new v2 sink implementation becomes stable and battle tested, we can switch the default to the new v2 sink.

public void close() throws Exception {
if (writer != null) {
writer.close();
}
Copy link

@tedyu tedyu Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the writer should be set to null after the close call.

try {
return InstantiationUtil.deserializeObject(
resultBuf, IcebergCommittableSerializer.class.getClassLoader());
} catch (ClassNotFoundException cnc) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems cnf or cnfe is a better variable name.

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
…tion (apache#10179)

Co-authored-by: Liwei Li <[email protected]>
Co-authored-by: Kyle Bendickson <[email protected]>
Co-authored-by: Peter Vary <[email protected]>
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.