Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add table.exec.iceberg.use-v2-sink option #11244

Merged
merged 6 commits into from
Nov 25, 2024

Conversation

arkadius
Copy link
Contributor

@arkadius arkadius commented Oct 1, 2024

This PR adds a table.exec.iceberg.use-v2-sink configuration option allowing to use Flink's Sink v2 API described in the FLIP-143 document.

The configuration option is by default set to false.

This PR is the follow-up of discussion in #10179 and in #11219

@github-actions github-actions bot added the flink label Oct 1, 2024
@arkadius arkadius changed the title Add table.exec.iceberg.use-v2-sink option Fflink: Add table.exec.iceberg.use-v2-sink option Oct 1, 2024
@github-actions github-actions bot added the docs label Oct 1, 2024
@arkadius
Copy link
Contributor Author

arkadius commented Oct 1, 2024

@rodmeneses , @pvary please have a look at this PR. In the change, I added the configuration option that we talked about. I also added the documentation. In the docs I showed differences between the interfaces that I found while working on unifying the unit tests for both sinks.

This is still a draft, as I have no idea how to show in unit tests, that this configuration option does anything 😅. I've only checked in the debugger that the code is entering the correct code blocks. I could add a test on IcebergTableSink.getSinkRuntimeProvider level that would check which implementation is chosen but that doesn't seem like a good idea.

Do you know of any differences in the behaviour of these sinks that I could show in the test? I see that there are plans to add small file compaction in SinkV2 based implementation. Perhaps we should add this test when this feature will be available. WDYT?

@arkadius arkadius force-pushed the sinkv2-configuration-option branch from 70857ec to 213668f Compare October 2, 2024 07:47
@arkadius arkadius marked this pull request as ready for review October 3, 2024 10:46
@arkadius
Copy link
Contributor Author

arkadius commented Oct 3, 2024

This is still a draft, as I have no idea how to show in unit tests, that this configuration option does anything 😅. I've only checked in the debugger that the code is entering the correct code blocks. I could add a test on IcebergTableSink.getSinkRuntimeProvider level that would check which implementation is chosen but that doesn't seem like a good idea.

I found out the way to verify if the correct API used. I added the test. This change is ready for review.

Comment on lines 379 to 382
The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API)
had some limitations - for example it created a lot of small files when writing to it. This problem is called
the `small-file-compaction` problem in
the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The table maintenance is not available yet, so I would not mention it in this PR yet.

The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API) will be deprecated and removed from Flink.

Copy link
Contributor Author

@arkadius arkadius Oct 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I see your point, but I think we need to change it even more, I'm currently thinking about how to do this. The problem with the proposed changes in the introducing section are:

  1. Flink had 3 approaches to defining sink: SinkFunction, SinkV1 (FLIP-143) and SinkV2 (FLIP-191). In FlinkSink we used the first one approach, not the SinkV1 but ...
  2. ... It is not so important because the main logic of sinking logic is done in operators before the sink. For the sink is used DiscardingSink which BTW can be replaced by SinkV2 version to avoid problems with the removal of the previous interface (I've just did it)

IMO we should give some introduction explaining why we have 2 implementations and it is problematic without exposing future plans. Do you see some other idea?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've rewritten the introduction part. Please correct me if I've twisted something or if you see it differently.

## Writing with DataStream

To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets.
Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The `RANGE` distribution mode is not yet available for the `IcebergSink`.


To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets.
Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`),
some others are slightly different (e.g. there is a `uidSuffix` method instead of `uidPrefix`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When using IcebergSink use uidPrefix instead of the uidSuffix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 with one little change, should be opposite: IcebergSink has the uidSuffix method and FlinkSink has the uidPrefix method

@@ -115,6 +125,9 @@ protected synchronized TableEnvironment getTableEnv() {
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}
tEnv.getConfig()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we add an empty line between blocks

@pvary
Copy link
Contributor

pvary commented Oct 4, 2024

@rodmeneses, @stevenzwu: Please review the PR. Especially the documentation part.

@arkadius arkadius force-pushed the sinkv2-configuration-option branch 2 times, most recently from 23fa5e8 to 525b6ee Compare October 4, 2024 14:28
@arkadius arkadius requested a review from pvary October 7, 2024 07:23
@@ -381,7 +381,7 @@ public Builder toBranch(String branch) {
return this;
}

private <T> DataStreamSink<T> chainIcebergOperators() {
private DataStreamSink<Void> chainIcebergOperators() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change being introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to this discussion: #11244 (comment). I realize that we don't need to use the legacy, deprecated interface in the FlinkSink implementation as the sink part of the chain is not important in this chain of transformations - discarding sink behaves the same when it is used either as legacy SinkFunction or the new, SinkV2 interface.

committerStream
.addSink(new DiscardingSink())
.sinkTo(new DiscardingSink<>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change being introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above

@@ -60,10 +63,23 @@ public static List<Object[]> parameters() {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
boolean doNotUseV2Sink = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have a kind of double negation doNotUseV2Sink = false , but below in the other block we have boolean useV2Sink = true;
could we just do

parameters.add(
              new Object[] {catalogName, baseNamespace, format, isStreaming, false});

ie: removing this local variable?

Copy link
Contributor Author

@arkadius arkadius Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Contribute guide is written that boolean arguments should be named. This rule shouldn't be applied in this place as well? Or do you prefer the comment instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced it with comment, take a look now

@@ -115,6 +126,9 @@ protected synchronized TableEnvironment getTableEnv() {
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}
tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
I see we added this block on this PR, and we provide testTemplate with useV2Sink=true and useV2Sink=false.
I wonder if it would be a good idea to also test the scenarios when we dont explicitly set this ConfigOption? In my mind, it should behave identically as when user does .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, false);

But I may be overcomplicating this. what do you think? @pvary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll add a test case for that.

@arkadius arkadius requested a review from rodmeneses October 7, 2024 21:17
@arkadius arkadius changed the title Fflink: Add table.exec.iceberg.use-v2-sink option Flink: Add table.exec.iceberg.use-v2-sink option Oct 8, 2024
assertThat(input.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4));
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to leverage the marker interface for the sink builder in #11219
for these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it is possible as we don't have builders here. Instead, we have already built a chain of operators. This test works on the table connectors level - builders are hidden behind the table connector API as an implementation detail.

@arkadius arkadius requested a review from rodmeneses October 21, 2024 08:37
@arkadius
Copy link
Contributor Author

@rodmeneses @pvary I've done all fixes that were discussed in comments. Can we move forward with this change? Do we need more eyes to take a look at the docs part? @stevenzwu can you take a look?

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Nov 21, 2024
@rodmeneses
Copy link
Contributor

Hi @arkadius is this still needed/relevant ? please advise, as it will be closed due to inactivity soon
cc @pvary

@arkadius
Copy link
Contributor Author

I think that this option would benefit this project in a good transitional migration to the new interface in Table API / Flink SQL. What is the alternative plan for the migration between the old implementation and the new one?

@pvary
Copy link
Contributor

pvary commented Nov 21, 2024

We definitely need this.
@arkadius: Could you please make sure the tests are run? Just change the commit message of the last commit and force push the branch, or something?

@arkadius arkadius force-pushed the sinkv2-configuration-option branch from ee48c23 to 799de35 Compare November 21, 2024 14:48
@arkadius
Copy link
Contributor Author

@arkadius: Could you please make sure the tests are run? Just change the commit message of the last commit and force push the branch, or something?

Sure, I rebased the branch to main to make sure that all tests will be run on the state that will be present after merge

@github-actions github-actions bot removed the stale label Nov 22, 2024
@pvary pvary merged commit fa47f31 into apache:main Nov 25, 2024
20 checks passed
@pvary
Copy link
Contributor

pvary commented Nov 25, 2024

Thanks for the PR @arkadius and @rodmeneses for the review!

@pvary
Copy link
Contributor

pvary commented Nov 25, 2024

@arkadius: Could you please backport the change to the relevant older Flink branches?

arkadius added a commit to arkadius/iceberg that referenced this pull request Nov 27, 2024
@arkadius
Copy link
Contributor Author

arkadius commented Nov 27, 2024

@arkadius: Could you please backport the change to the relevant older Flink branches?

Sure, here is backport for Flink 1.19: #11665
The SinkV2 API was added in Flink 1.19 so it is only relevant for this version.

pvary pushed a commit that referenced this pull request Nov 27, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants