-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduces the new IcebergSink based on the new V2 Flink Sink Abstraction #10179
Conversation
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
import org.apache.iceberg.util.Pair; | ||
import org.junit.Assert; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update these to Junit5 tests that use AssertJ assertions. You can use TestRewriteDataFilesAction
as a reference
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
@RunWith(Parameterized.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please update this (and other tests in this PR) to JUnit5 + AssertJ, because we're in the process of migrating JUnit4 to JUnit5 and new tests should be written using JUnit5 + AssertJ
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do that in a different PR? Either before, or after this?
One of the important thing with this PR is, that the original behaviour is not changing. It is hard to reason about it, if we change the test files in the same PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HI, @nastra:
I will have all new unit tests using JUnit 5.
All the others we can update in another PR, once this is merged.
What do you think?
...k/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestSinkV2Committer.java
Outdated
Show resolved
Hide resolved
...k/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/committer/TestSinkV2Committer.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/DeltaManifests.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkV2Aggregator.java
Outdated
Show resolved
Hide resolved
Please update the description of the PR. Also link the previous versions, docs, relevant stuff. So in the future it is easier to find them |
...v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/RowDataTaskWriterFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CachingTableSupplier.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/SimpleTableSupplier.java
Outdated
Show resolved
Hide resolved
...19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/IcebergFlinkManifestUtil.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/iceberg/flink/sink/committer/IcebergManifestOutputFileFactory.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkAggregator.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/committer/SinkCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/BaseDeltaTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/writer/IcebergStreamWriter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSink.java
Outdated
Show resolved
Hide resolved
…ction Co-authored-by: Liwei Li <[email protected]> Co-authored-by: Kyle Bendickson <[email protected]> Co-authored-by: Peter Vary <[email protected]>
…ction Co-authored-by: Liwei Li <[email protected]> Co-authored-by: Kyle Bendickson <[email protected]> Co-authored-by: Peter Vary <[email protected]>
thanks @rodmeneses for the contribution and @pvary for the review |
Thanks a lot. This was a great team collaboration effort. Thanks for all the reviews and the time spent on this PR: @stevenzwu @pvary @nastra Liwei Li |
Hi, when will this change be released please? |
It will be released in Iceberg 1.7.0 |
Hi, Is there a plan to replace the previous implementation ( |
Hi @arkadius |
Thank you for the quick response. By coexistence do you mean that it will be possible to pick the new implementation for dynamic tables for example by some property in the catalog configuration? Currently, the old one is hardcoded. For a test, I changed the implementation to the new one here: https://github.com/arkadius/iceberg/pull/1/files#diff-af1fa2eee823938a1ceaf6f212ebeb12226c7d853a2f47109c284a59c44f39a3R80 and so far I see that everything works. Are there some known limitations of the new implementation? |
no code has been implemented for this. I see at least 2 options:
|
@rodmeneses: I would follow the steps taken by the IcebergTableSource, but instead of |
Thanks @pvary . That sounds reasonable. |
Hi @arkadius . Are you interested to take this work? We would also requires to add unit test to have coverage in the Dynamic Tables flink codebase. |
Hi @rodmeneses, by everything works I meant that I did some manual tests and the results were the same as with the old one. Probably "everything" was an overkill here ;-) Yes, I can take this. I'll let you know on the slack if I need any help with this task. |
|
yes, we should have a config to determine which sink implementation used for Table API/SQL. Default should be using the old |
public void close() throws Exception { | ||
if (writer != null) { | ||
writer.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the writer
should be set to null after the close
call.
try { | ||
return InstantiationUtil.deserializeObject( | ||
resultBuf, IcebergCommittableSerializer.class.getClassLoader()); | ||
} catch (ClassNotFoundException cnc) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems cnf or cnfe is a better variable name.
…tion (apache#10179) Co-authored-by: Liwei Li <[email protected]> Co-authored-by: Kyle Bendickson <[email protected]> Co-authored-by: Peter Vary <[email protected]>
Co-authored-by: Liwei Li [email protected]
Co-authored-by: Kyle Bendickson [email protected]
Co-authored-by: Peter Vary [email protected]
Summary
The Flink community created a new Sink specification in FLIP-143 with the explicit goal to guarantee the unified handling of the bounded and unbounded data streams. Later it was enhanced in FLIP-191 so there is a well defined place to execute small files compaction. The deprecation of the old SinkFunction is postponed to somewhere around Flink 2.0 based on the discussion on the dev mailing list , so the migration is not extremely urgent, but having the possibility to use the PostCommitTopology to execute the compaction of the small files could provide immediate benefits for the users of the Iceberg-Flink integration.
Previous work
There is an existing Iceberg PR Flink: new sink base on the unified sink API #4904 for the Sink migration by Liwei Li (https://github.com/hililiwei) and Kyle Bendickson (https://github.com/kbendick) with the related documentation which is authored by the same team. The discussion there is stuck, and the PR has been out of date for almost a year now. The current proposal builds heavily on their work and wants to keep them as the co-authors for the proposed change.
@pvary opened Flink: new sink base on the unified sink API - WIP #8653 which this code is based on. On his PR, lots of code is refactored so that the V1 Sink (FlinkSink) and the new V2 (IcebergSink) can reuse and share many code paths and components.
New Implementation
This PR introduces a brand new
IcebergSink
, but it doesn't change the existingFlinkSink
With Flink 1.19 released, we now have access to the new Hooks like
SupportsPreWriteTopology
,SupportsCommitter
,SupportsPreCommitTopology
andSupportsPostCommitTopology
. The code in this PR takes advantage of those interfaces to fully implement a brand newIcebergSink
using the new V2 Sink Model.The new
IcebergSink
implemented here, is needed for theFlink Table maintenance
effort led by @pvary and it's described in: https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?pli=1#heading=h.jl0dmuup12gzParticularly, the new
IcebergSink
implementswhich can be used to perform cleaning/compacting as described in the above doc.