-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219
Conversation
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.flink.TableLoader; | ||
|
||
public interface BaseIcebergSinkBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand where you are coming from, but we are planning to add features (table maintenance) to the new IcebergSink
which will not be available to the FlinkSink
.
We also need to think about the deprecation and the eventual removal of the FlinkSink
.
Maybe something like I propose for the MaintenanceTaskBuilder
could help: https://github.com/apache/iceberg/pull/11144/files#diff-1b7c33ba8eb300e4a1c92844418384756a07ba32882de39855c74c9a40778f48
Or we could just move this class to the tests, and don't add this to the public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my previous solution, it was also possible, because each Builder implementation overrode the return type of each method to the specific one, but your suggestion looks better - changed.
With regards to the visibility, in the referenced PR, we discussed that there is a plan to allow to use IcebergSink
for dynamic tables (IcebergTableSink:80
). I thought about adding the property allowing to use this implementation there. Thanks to the common parent for builders, we could check the property value at the beginning of method and do rest of builder's chain on it. This would ensure that we keep these builder synchronized. I though about doing this in the next PR. WDYT?
{FileFormat.PARQUET, 1, false}, | ||
{FileFormat.PARQUET, 2, true}, | ||
{FileFormat.PARQUET, 2, false} | ||
{FileFormat.AVRO, 1, true, false}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need the whole testing matrix here.
I usually:
- Test everything with the new feature
- Run some choice tests that makes sure the old feature is intact
Some possible cuts:
- Run 1 Avro, 1 ORC test and keep all the PARQUET
- Run 1 with single parallelism and use parallelism 2 for the others
- Run 1 with unparitioned, and use partitioned for the others
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, reduced
private TableLoader tableLoader; | ||
|
||
@Parameters(name = "formatVersion = {0}, branch = {1}") | ||
@Parameters(name = "formatVersion = {0}, branch = {1}, useV2Sink = {2}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cut the number of tests, like above
@Parameter(index = 2) | ||
private boolean useV2Sink; | ||
|
||
@Parameters(name = "parallelism = {0}, partitioned = {1}, useV2Sink = {3}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cut the number of tests, like above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I eventually hardcoded it to DO_NOT_USE_V2_SINK
because in some tests there were used rangeDistributionStatisticsType
which is not available in IcebergSink
.
@rodmeneses: Please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary I added tests for IcebergSink
in most places where FlinkSink
was tested. In three classes I'm not sure if I should extract parameter tests - take a look.
Also, in two places, it wasn't possible to add tests for IcebergSink
because its builder doesn't have rangeDistributionStatisticsType
and has uidSuffix
instead of uidPrefix
. WDYAT? Should we align these builders or these tests shouldn't be run for IcebergSink
?
@@ -57,6 +57,9 @@ public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase { | |||
private final boolean partitioned = true; | |||
private final int parallelism = 2; | |||
private final FileFormat format = FileFormat.PARQUET; | |||
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary WDYT? Should I extract parameter tests or keep this hardcoded as other things in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's agree on the general approach first, before going into these details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good plan.
@@ -108,6 +108,9 @@ public class TestFlinkIcebergSinkRangeDistributionBucketing { | |||
private static final PartitionSpec SPEC = | |||
PartitionSpec.builderFor(SCHEMA).hour("ts").bucket("uuid", NUM_BUCKETS).build(); | |||
private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA); | |||
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary WDYT? Should I extract parameter tests or keep this hardcoded to not produce too much tests?
@@ -76,6 +76,9 @@ public class TestIcebergSourceFailover { | |||
// The goal is to allow some splits to remain in the enumerator when restoring the state | |||
private static final int PARALLELISM = 2; | |||
private static final int DO_NOT_FAIL = Integer.MAX_VALUE; | |||
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary WDYT? Should I extract parameter tests or keep this hardcoded to not produce too much tests?
Hi @arkadius . I would update the PR description to something like: My reasoning here is that the current description kind of implies that |
/* | ||
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink) | ||
and the new one implementation based on Flink v2 sink's API (IcebergSink) | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced that this is the path we should move forward to. One of the design choices during the implementation of the new IceberSink was exactly to avoid this: instead of refactoring common code, we would just add a brand new IcebergSink
and keep the old one untouched (or minimally touched).
In my mind, this PR is about adding additional tests using the new sink, in places where only the old one is used. If we want to keep the design choice from my previous PR, then that would imply that we would be creating new test cases altogether, but only targeting the IcebergSink.
For example, we have
TestFlinkIcebergSinkExtended
then we could add a new one called
TestFlinkIcebergSinkExtendedSinkV2
which is identical as the original one, but replacing the FlinkSink.
with IcebergSink.
And even though this will require creating extra files, I think it will be worth it as we can keep the 2 sinks (implementations and unit testing) separate.
Now, for use cases like the Dynamic Tables - which sink to use??:
I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions
public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
ConfigOptions.key("table.exec.iceberg.use-v2-sink")
.booleanType()
.defaultValue(false)
.withDescription("Use the SinkV2 based IcebergSink implementation.");
and then, in dynamic table code:
public DataStreamSink<?> consumeDataStream( ...
.....
.....
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
return IcebergSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
} else {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
}
Wdyt? @pvary @stevenzwu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the priorities in my head:
- When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.
- It should be as easy as possible to remove the FlinkSink and related classes. There should not be any dangling classes left (unused interfaces and such)
- We should reuse as much of the testing code as possible
From my perspective, it's ok to add an interface to separate the common methods, but I prefer not to have any implementation in the interface, as this class should be removed when the deprecation happens. If this approach is not enough, then we need test only util methods to create the sinks for the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review folks. The listed priorities sounds reasonable. Would you agree @pvary that we should add two more bullets to this lists:
- We should ensure that
IcebergSink
is covered by unit tests to a similar level asFlinkSink
- We should ensure as transitive migration from
FlinkSink
toIcebergSink
as possible. For example, try to keep features and allowed properties synchronized (of course, I'm not talking about features that are not possible to synchronize because of Flink's API differences)
?
With regards to this:
When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.
I think that the suggested by you approach with the interface without any implementation shouldn't break the user's code. I can do a manual test simulating the removal of the interface and verify if no exception is thrown.
@rodmeneses with regards to this comment:
I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions
I wanted to do this, but I thought that it would be better to move this to the separated PR as I mentioned in this comment: #11219 (comment) WDYAT?
I started with a lower level unit tests because I saw that they cover a lot of use cases and as a potential user of this class, I would sleep better if I knew that all tests passes also for IcebergSink
. For the purpose of this feature flag, I think that we should add a higher level tests that would use Flink's table API/Flink SQL. Do you agree with that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed abstract class to the interface.
BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of IcebergSink
that are copy-pastes of FlinkSink
tests (e.g. TestIcebergSink
vs TestFlinkIcebergSink
). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @arkadius
I'm OK with adding a new interface IcerbergSinkBuilder
and having the FlinkSink
and IcebergSink
implement it. So on my mind, the only things that are changing in this PR are exactly those ones. I can see that in the PR already, with only one thing I noticed which I will comment directly in the PR itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed abstract class to the interface. BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of
IcebergSink
that are copy-pastes ofFlinkSink
tests (e.g.TestIcebergSink
vsTestFlinkIcebergSink
). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?
During the implementation of the IcerbergSink
I wrote unit test to cover all the new functionality specific to the new sink, and I also added unit test to cover similar functionality from the FlinkSink
. I didn't want to modify existing code (which was a design choice in this PR) so I went ahead and created new test classes that will only feature the new IcebergSink
. This is the reason I suggest yesterday about adding new test classes that only target the new IcbergSink
, so that it is similar to the approach that I followed.
Having said that, I really liked the idea of your new marker interface as it minimally change behavior or APIs in the current code, and at the same time it makes very simple and clear adding a parameter "useSinkV2" for the existing unit tests, so that you can use either FlinkSink
or IcebergSink
.
You are right, The summary is confusing. I'll propose the change of it when we close the discussion about the scope of this change.
I'm a little bit confused about that. By dynamic tables, I thought about this concept https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/ . From this perspective, everywhere where we use Table API or Flink SQL, and the test code touches |
@@ -545,6 +561,34 @@ public DataStreamSink<RowData> append() { | |||
} | |||
return rowDataDataStreamSink; | |||
} | |||
|
|||
@VisibleForTesting | |||
List<Integer> checkAndGetEqualityFieldIds() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding this method here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a copy-paste from FlinkSink
. It is used in TestFlinkIcebergSinkV2
. I see that this test behaves differently for IcebergSink
for WRITE_DISTRIBUTION_MODE_RANGE
and turned off partitioning: Equality field columns shouldn't be empty when configuring to use UPSERT data stream
is thrown, instead of Invalid write distribution mode ...
. Do you have an idea whether it is correct or unexpected behaviour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checkAndGetEqualityFieldIds()
was moved to SinkUtil
class. This method is already being referenced by both Sinks implementations.
Looks Like I forgot to delete this method from FlinkSink
. So we can do the following
- Don't add this method to
IcebergSink
- remove the method from
FlinkSink
- the test
TestFlinkIcebergSinkV2::testCheckAndGetEqualityFieldIds
can be rewritten as:
@TestTemplate
public void testCheckAndGetEqualityFieldIds() {
table
.updateSchema()
.allowIncompatibleChanges()
.addRequiredColumn("type", Types.StringType.get())
.setIdentifierFields("type")
.commit();
DataStream<Row> dataStream =
env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);
// Use schema identifier field IDs as equality field id list by default
assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, null))
.containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds());
// Use user-provided equality field column as equality field id list
assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("id")))
.containsExactlyInAnyOrder(table.schema().findField("id").fieldId());
assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("type")))
.containsExactlyInAnyOrder(table.schema().findField("type").fieldId());
}
This should work but please let me know if it doesn't. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, it looks better, fixed
@rodmeneses , @pvary I've opened the new PR #11249 with non-controversial changes that I've discovered during this work. It contains mostly goodies that were added in IcebergSink tests and haven't been migrated to FlinkSink tests. Can you take a look at them? |
Hi @arkadius @pvary does this PR look good to you, or you have anything pending that would like to be done? The PR is looking good to me. Also, please @pvary and @stevenzwu please help starting the CI pipelines on this PR. |
@rodmeneses I think that it would be better if you copied the parts of the code from this PR into your PR. I can rebase into your changes after you merge them. This PR isn't finished yet. I have a plan how the final shape of unit test could look like to make them easier to maintain and to fill some gaps in |
It could make sense, to open a PR with only Builder Marker Interface. I can raise that PR, if you're ok with it @arkadius. But before jumping into that, I'd like to hear about this from @pvary and/or @stevenzwu. What do you think? |
It's fine for me. I raised it here #11305. We can move this discussion there. |
Merged #11305 |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
Hi @rodmeneses. No, this change is not relevant. I'll close it. |
The scope of this PR is to provide the same level of test use case coverage for the new implementation of Flink's sink (
IcebergSink
) based on Flink's v2 sinks API as it was done for the previous implementation (FlinkSink
). The next step after this change will be to add a configuration option allowing to choose which implementation of the sink to use - this will be done in a separate PR.This PR is the follow-up of discussion in #10179