-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219
Changes from 8 commits
cd80ac1
eec1419
4bd39bc
e0aa16d
5a75006
f7c1654
a6fa562
2cae011
710710e
514a636
158b5b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.sink; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.configuration.ReadableConfig; | ||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
import org.apache.flink.streaming.api.datastream.DataStreamSink; | ||
import org.apache.flink.table.api.TableSchema; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.types.Row; | ||
import org.apache.iceberg.DistributionMode; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.flink.TableLoader; | ||
|
||
@Internal | ||
/* | ||
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink) | ||
and the new one implementation based on Flink v2 sink's API (IcebergSink). After we remove the previous implementation, | ||
all occurrences of this class would be replaced by direct IcebergSink usage. | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not entirely convinced that this is the path we should move forward to. One of the design choices during the implementation of the new IceberSink was exactly to avoid this: instead of refactoring common code, we would just add a brand new In my mind, this PR is about adding additional tests using the new sink, in places where only the old one is used. If we want to keep the design choice from my previous PR, then that would imply that we would be creating new test cases altogether, but only targeting the IcebergSink. then we could add a new one called And even though this will require creating extra files, I think it will be worth it as we can keep the 2 sinks (implementations and unit testing) separate. Now, for use cases like the Dynamic Tables - which sink to use??: public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
ConfigOptions.key("table.exec.iceberg.use-v2-sink")
.booleanType()
.defaultValue(false)
.withDescription("Use the SinkV2 based IcebergSink implementation."); and then, in dynamic table code: public DataStreamSink<?> consumeDataStream( ...
.....
.....
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
return IcebergSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
} else {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
} Wdyt? @pvary @stevenzwu There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here are the priorities in my head:
From my perspective, it's ok to add an interface to separate the common methods, but I prefer not to have any implementation in the interface, as this class should be removed when the deprecation happens. If this approach is not enough, then we need test only util methods to create the sinks for the tests There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for the review folks. The listed priorities sounds reasonable. Would you agree @pvary that we should add two more bullets to this lists:
With regards to this:
I think that the suggested by you approach with the interface without any implementation shouldn't break the user's code. I can do a manual test simulating the removal of the interface and verify if no exception is thrown. @rodmeneses with regards to this comment:
I wanted to do this, but I thought that it would be better to move this to the separated PR as I mentioned in this comment: #11219 (comment) WDYAT? I started with a lower level unit tests because I saw that they cover a lot of use cases and as a potential user of this class, I would sleep better if I knew that all tests passes also for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I changed abstract class to the interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @arkadius There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
During the implementation of the Having said that, I really liked the idea of your new marker interface as it minimally change behavior or APIs in the current code, and at the same time it makes very simple and clear adding a parameter "useSinkV2" for the existing unit tests, so that you can use either |
||
public interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> { | ||
|
||
T tableSchema(TableSchema newTableSchema); | ||
|
||
T tableLoader(TableLoader newTableLoader); | ||
|
||
T equalityFieldColumns(List<String> columns); | ||
|
||
T overwrite(boolean newOverwrite); | ||
|
||
T setAll(Map<String, String> properties); | ||
|
||
T flinkConf(ReadableConfig config); | ||
|
||
T table(Table newTable); | ||
|
||
T writeParallelism(int newWriteParallelism); | ||
|
||
T distributionMode(DistributionMode mode); | ||
|
||
T toBranch(String branch); | ||
|
||
T upsert(boolean enabled); | ||
|
||
DataStreamSink<?> append(); | ||
|
||
static IcebergSinkBuilder<?> forRow( | ||
DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRow(input, tableSchema); | ||
} else { | ||
return FlinkSink.forRow(input, tableSchema); | ||
} | ||
} | ||
|
||
static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRowData(input); | ||
} else { | ||
return FlinkSink.forRowData(input); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding this method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a copy-paste from
FlinkSink
. It is used inTestFlinkIcebergSinkV2
. I see that this test behaves differently forIcebergSink
forWRITE_DISTRIBUTION_MODE_RANGE
and turned off partitioning:Equality field columns shouldn't be empty when configuring to use UPSERT data stream
is thrown, instead ofInvalid write distribution mode ...
. Do you have an idea whether it is correct or unexpected behaviour?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAndGetEqualityFieldIds()
was moved toSinkUtil
class. This method is already being referenced by both Sinks implementations.Looks Like I forgot to delete this method from
FlinkSink
. So we can do the followingIcebergSink
FlinkSink
TestFlinkIcebergSinkV2::testCheckAndGetEqualityFieldIds
can be rewritten as:This should work but please let me know if it doesn't. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it looks better, fixed