-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Add table.exec.iceberg.use-v2-sink option #11244
Conversation
@rodmeneses , @pvary please have a look at this PR. In the change, I added the configuration option that we talked about. I also added the documentation. In the docs I showed differences between the interfaces that I found while working on unifying the unit tests for both sinks. This is still a draft, as I have no idea how to show in unit tests, that this configuration option does anything 😅. I've only checked in the debugger that the code is entering the correct code blocks. I could add a test on Do you know of any differences in the behaviour of these sinks that I could show in the test? I see that there are plans to add small file compaction in SinkV2 based implementation. Perhaps we should add this test when this feature will be available. WDYT? |
70857ec
to
213668f
Compare
I found out the way to verify if the correct API used. I added the test. This change is ready for review. |
docs/docs/flink-writes.md
Outdated
The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API) | ||
had some limitations - for example it created a lot of small files when writing to it. This problem is called | ||
the `small-file-compaction` problem in | ||
the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The table maintenance is not available yet, so I would not mention it in this PR yet.
The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API) will be deprecated and removed from Flink.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I see your point, but I think we need to change it even more, I'm currently thinking about how to do this. The problem with the proposed changes in the introducing section are:
- Flink had 3 approaches to defining sink:
SinkFunction
,SinkV1
(FLIP-143) andSinkV2
(FLIP-191). InFlinkSink
we used the first one approach, not theSinkV1
but ... - ... It is not so important because the main logic of sinking logic is done in operators before the sink. For the sink is used DiscardingSink which BTW can be replaced by SinkV2 version to avoid problems with the removal of the previous interface (I've just did it)
IMO we should give some introduction explaining why we have 2 implementations and it is problematic without exposing future plans. Do you see some other idea?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've rewritten the introduction part. Please correct me if I've twisted something or if you see it differently.
docs/docs/flink-writes.md
Outdated
## Writing with DataStream | ||
|
||
To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets. | ||
Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The `RANGE` distribution mode is not yet available for the `IcebergSink`.
docs/docs/flink-writes.md
Outdated
|
||
To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets. | ||
Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`), | ||
some others are slightly different (e.g. there is a `uidSuffix` method instead of `uidPrefix`). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When using IcebergSink
use uidPrefix
instead of the uidSuffix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 with one little change, should be opposite: IcebergSink
has the uidSuffix
method and FlinkSink
has the uidPrefix
method
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Show resolved
Hide resolved
flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java
Show resolved
Hide resolved
@@ -115,6 +125,9 @@ protected synchronized TableEnvironment getTableEnv() { | |||
tEnv = TableEnvironment.create(settingsBuilder.build()); | |||
} | |||
} | |||
tEnv.getConfig() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we add an empty line between blocks
@rodmeneses, @stevenzwu: Please review the PR. Especially the documentation part. |
23fa5e8
to
525b6ee
Compare
@@ -381,7 +381,7 @@ public Builder toBranch(String branch) { | |||
return this; | |||
} | |||
|
|||
private <T> DataStreamSink<T> chainIcebergOperators() { | |||
private DataStreamSink<Void> chainIcebergOperators() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this change being introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to this discussion: #11244 (comment). I realize that we don't need to use the legacy, deprecated interface in the FlinkSink
implementation as the sink part of the chain is not important in this chain of transformations - discarding sink behaves the same when it is used either as legacy SinkFunction or the new, SinkV2 interface.
committerStream | ||
.addSink(new DiscardingSink()) | ||
.sinkTo(new DiscardingSink<>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this change being introduced?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same as above
@@ -60,10 +63,23 @@ public static List<Object[]> parameters() { | |||
for (Object[] catalogParams : CatalogTestBase.parameters()) { | |||
String catalogName = (String) catalogParams[0]; | |||
Namespace baseNamespace = (Namespace) catalogParams[1]; | |||
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); | |||
boolean doNotUseV2Sink = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we have a kind of double negation doNotUseV2Sink = false
, but below in the other block we have boolean useV2Sink = true;
could we just do
parameters.add(
new Object[] {catalogName, baseNamespace, format, isStreaming, false});
ie: removing this local variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Contribute guide is written that boolean arguments should be named. This rule shouldn't be applied in this place as well? Or do you prefer the comment instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced it with comment, take a look now
@@ -115,6 +126,9 @@ protected synchronized TableEnvironment getTableEnv() { | |||
tEnv = TableEnvironment.create(settingsBuilder.build()); | |||
} | |||
} | |||
tEnv.getConfig() | |||
.getConfiguration() | |||
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
I see we added this block on this PR, and we provide testTemplate with useV2Sink=true
and useV2Sink=false
.
I wonder if it would be a good idea to also test the scenarios when we dont explicitly set this ConfigOption? In my mind, it should behave identically as when user does .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, false);
But I may be overcomplicating this. what do you think? @pvary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I'll add a test case for that.
assertThat(input.getParallelism()) | ||
.as("Should have the expected parallelism.") | ||
.isEqualTo(isStreamingJob ? 2 : 4)); | ||
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you plan to leverage the marker interface for the sink builder in #11219
for these tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that it is possible as we don't have builders here. Instead, we have already built a chain of operators. This test works on the table connectors level - builders are hidden behind the table connector API as an implementation detail.
@rodmeneses @pvary I've done all fixes that were discussed in comments. Can we move forward with this change? Do we need more eyes to take a look at the docs part? @stevenzwu can you take a look? |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
I think that this option would benefit this project in a good transitional migration to the new interface in Table API / Flink SQL. What is the alternative plan for the migration between the old implementation and the new one? |
We definitely need this. |
ee48c23
to
799de35
Compare
Sure, I rebased the branch to main to make sure that all tests will be run on the state that will be present after merge |
Thanks for the PR @arkadius and @rodmeneses for the review! |
@arkadius: Could you please backport the change to the relevant older Flink branches? |
…e-v2-sink option)
…e-v2-sink option) (apache#11665)
This PR adds a
table.exec.iceberg.use-v2-sink
configuration option allowing to use Flink's Sink v2 API described in the FLIP-143 document.The configuration option is by default set to
false
.This PR is the follow-up of discussion in #10179 and in #11219