Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219

Closed
wants to merge 11 commits into from

Conversation

arkadius
Copy link
Contributor

@arkadius arkadius commented Sep 27, 2024

The scope of this PR is to provide the same level of test use case coverage for the new implementation of Flink's sink (IcebergSink) based on Flink's v2 sinks API as it was done for the previous implementation (FlinkSink). The next step after this change will be to add a configuration option allowing to choose which implementation of the sink to use - this will be done in a separate PR.

This PR is the follow-up of discussion in #10179

@github-actions github-actions bot added the flink label Sep 27, 2024
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;

public interface BaseIcebergSinkBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand where you are coming from, but we are planning to add features (table maintenance) to the new IcebergSink which will not be available to the FlinkSink.
We also need to think about the deprecation and the eventual removal of the FlinkSink.

Maybe something like I propose for the MaintenanceTaskBuilder could help: https://github.com/apache/iceberg/pull/11144/files#diff-1b7c33ba8eb300e4a1c92844418384756a07ba32882de39855c74c9a40778f48

Or we could just move this class to the tests, and don't add this to the public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my previous solution, it was also possible, because each Builder implementation overrode the return type of each method to the specific one, but your suggestion looks better - changed.

With regards to the visibility, in the referenced PR, we discussed that there is a plan to allow to use IcebergSink for dynamic tables (IcebergTableSink:80). I thought about adding the property allowing to use this implementation there. Thanks to the common parent for builders, we could check the property value at the beginning of method and do rest of builder's chain on it. This would ensure that we keep these builder synchronized. I though about doing this in the next PR. WDYT?

{FileFormat.PARQUET, 1, false},
{FileFormat.PARQUET, 2, true},
{FileFormat.PARQUET, 2, false}
{FileFormat.AVRO, 1, true, false},
Copy link
Contributor

@pvary pvary Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the whole testing matrix here.
I usually:

  • Test everything with the new feature
  • Run some choice tests that makes sure the old feature is intact

Some possible cuts:

  • Run 1 Avro, 1 ORC test and keep all the PARQUET
  • Run 1 with single parallelism and use parallelism 2 for the others
  • Run 1 with unparitioned, and use partitioned for the others

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, reduced

private TableLoader tableLoader;

@Parameters(name = "formatVersion = {0}, branch = {1}")
@Parameters(name = "formatVersion = {0}, branch = {1}, useV2Sink = {2}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cut the number of tests, like above

@Parameter(index = 2)
private boolean useV2Sink;

@Parameters(name = "parallelism = {0}, partitioned = {1}, useV2Sink = {3}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cut the number of tests, like above

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I eventually hardcoded it to DO_NOT_USE_V2_SINK because in some tests there were used rangeDistributionStatisticsType which is not available in IcebergSink.

@pvary
Copy link
Contributor

pvary commented Sep 30, 2024

@rodmeneses: Please review

Copy link
Contributor Author

@arkadius arkadius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary I added tests for IcebergSink in most places where FlinkSink was tested. In three classes I'm not sure if I should extract parameter tests - take a look.

Also, in two places, it wasn't possible to add tests for IcebergSink because its builder doesn't have rangeDistributionStatisticsType and has uidSuffix instead of uidPrefix. WDYAT? Should we align these builders or these tests shouldn't be run for IcebergSink?

@@ -57,6 +57,9 @@ public class TestFlinkIcebergSinkExtended extends TestFlinkIcebergSinkBase {
private final boolean partitioned = true;
private final int parallelism = 2;
private final FileFormat format = FileFormat.PARQUET;
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary WDYT? Should I extract parameter tests or keep this hardcoded as other things in this test?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's agree on the general approach first, before going into these details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good plan.

@@ -108,6 +108,9 @@ public class TestFlinkIcebergSinkRangeDistributionBucketing {
private static final PartitionSpec SPEC =
PartitionSpec.builderFor(SCHEMA).hour("ts").bucket("uuid", NUM_BUCKETS).build();
private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(SCHEMA);
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary WDYT? Should I extract parameter tests or keep this hardcoded to not produce too much tests?

@@ -76,6 +76,9 @@ public class TestIcebergSourceFailover {
// The goal is to allow some splits to remain in the enumerator when restoring the state
private static final int PARALLELISM = 2;
private static final int DO_NOT_FAIL = Integer.MAX_VALUE;
// FIXME (before merge): should it be parameterized test or hardcoded value with some comment that
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary WDYT? Should I extract parameter tests or keep this hardcoded to not produce too much tests?

@rodmeneses
Copy link
Contributor

The scope of this PR is to provide unit tests for the new implementation of Flink's sink (IcebergSink) based on Flink's v2 sinks API. This PR is the follow-up of discussion in #10179

Hi @arkadius . I would update the PR description to something like:

The scope of this PR is to add unit tests for the IcebergSink implementation in all the modules where FlinkSink is
referenced, like: <....$$$ please list what components you are referring to. ie: DynamicTables [however, keep in mind that I think there are no existing unit tests for dynamicTables, even with the FlinkSink implementation.$$$

My reasoning here is that the current description kind of implies that IcebergSink doesn't have unit tests, which it does.

/*
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink)
and the new one implementation based on Flink v2 sink's API (IcebergSink)
*/
Copy link
Contributor

@rodmeneses rodmeneses Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced that this is the path we should move forward to. One of the design choices during the implementation of the new IceberSink was exactly to avoid this: instead of refactoring common code, we would just add a brand new IcebergSink and keep the old one untouched (or minimally touched).

In my mind, this PR is about adding additional tests using the new sink, in places where only the old one is used. If we want to keep the design choice from my previous PR, then that would imply that we would be creating new test cases altogether, but only targeting the IcebergSink.
For example, we have
TestFlinkIcebergSinkExtended

then we could add a new one called
TestFlinkIcebergSinkExtendedSinkV2
which is identical as the original one, but replacing the FlinkSink. with IcebergSink.

And even though this will require creating extra files, I think it will be worth it as we can keep the 2 sinks (implementations and unit testing) separate.

Now, for use cases like the Dynamic Tables - which sink to use??:
I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
          ConfigOptions.key("table.exec.iceberg.use-v2-sink")
                  .booleanType()
                  .defaultValue(false)
                  .withDescription("Use the SinkV2 based IcebergSink implementation.");

and then, in dynamic table code:

public DataStreamSink<?> consumeDataStream( ...
.....
.....

 if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
          return IcebergSink.forRowData(dataStream)
                  .tableLoader(tableLoader)
                  .tableSchema(tableSchema)
                  .equalityFieldColumns(equalityColumns)
                  .overwrite(overwrite)
                  .setAll(writeProps)
                  .flinkConf(readableConfig)
                  .append();
        } else {
          return FlinkSink.forRowData(dataStream)
                  .tableLoader(tableLoader)
                  .tableSchema(tableSchema)
                  .equalityFieldColumns(equalityColumns)
                  .overwrite(overwrite)
                  .setAll(writeProps)
                  .flinkConf(readableConfig)
                  .append();
        }

Wdyt? @pvary @stevenzwu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the priorities in my head:

  • When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.
  • It should be as easy as possible to remove the FlinkSink and related classes. There should not be any dangling classes left (unused interfaces and such)
  • We should reuse as much of the testing code as possible

From my perspective, it's ok to add an interface to separate the common methods, but I prefer not to have any implementation in the interface, as this class should be removed when the deprecation happens. If this approach is not enough, then we need test only util methods to create the sinks for the tests

Copy link
Contributor Author

@arkadius arkadius Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review folks. The listed priorities sounds reasonable. Would you agree @pvary that we should add two more bullets to this lists:

  • We should ensure that IcebergSink is covered by unit tests to a similar level as FlinkSink
  • We should ensure as transitive migration from FlinkSink to IcebergSink as possible. For example, try to keep features and allowed properties synchronized (of course, I'm not talking about features that are not possible to synchronize because of Flink's API differences)
    ?

With regards to this:

When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.

I think that the suggested by you approach with the interface without any implementation shouldn't break the user's code. I can do a manual test simulating the removal of the interface and verify if no exception is thrown.

@rodmeneses with regards to this comment:

I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions

I wanted to do this, but I thought that it would be better to move this to the separated PR as I mentioned in this comment: #11219 (comment) WDYAT?

I started with a lower level unit tests because I saw that they cover a lot of use cases and as a potential user of this class, I would sleep better if I knew that all tests passes also for IcebergSink. For the purpose of this feature flag, I think that we should add a higher level tests that would use Flink's table API/Flink SQL. Do you agree with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed abstract class to the interface.
BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of IcebergSink that are copy-pastes of FlinkSink tests (e.g. TestIcebergSink vs TestFlinkIcebergSink). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @arkadius
I'm OK with adding a new interface IcerbergSinkBuilder and having the FlinkSink and IcebergSink implement it. So on my mind, the only things that are changing in this PR are exactly those ones. I can see that in the PR already, with only one thing I noticed which I will comment directly in the PR itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed abstract class to the interface. BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of IcebergSink that are copy-pastes of FlinkSink tests (e.g. TestIcebergSink vs TestFlinkIcebergSink). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?

During the implementation of the IcerbergSink I wrote unit test to cover all the new functionality specific to the new sink, and I also added unit test to cover similar functionality from the FlinkSink. I didn't want to modify existing code (which was a design choice in this PR) so I went ahead and created new test classes that will only feature the new IcebergSink. This is the reason I suggest yesterday about adding new test classes that only target the new IcbergSink, so that it is similar to the approach that I followed.

Having said that, I really liked the idea of your new marker interface as it minimally change behavior or APIs in the current code, and at the same time it makes very simple and clear adding a parameter "useSinkV2" for the existing unit tests, so that you can use either FlinkSink or IcebergSink.

@arkadius
Copy link
Contributor Author

arkadius commented Oct 1, 2024

The scope of this PR is to provide unit tests for the new implementation of Flink's sink (IcebergSink) based on Flink's v2 sinks API. This PR is the follow-up of discussion in #10179

Hi @arkadius . I would update the PR description to something like: The scope of this PR is to add unit tests for the IcebergSink implementation in all the modules where FlinkSink is referenced, like: <....$$$ please list what components you are referring to. ie: DynamicTables [however, keep in mind that I think there are no existing unit tests for dynamicTables, even with the FlinkSink implementation.$$$

My reasoning here is that the current description kind of implies that IcebergSink doesn't have unit tests, which it does.

You are right, The summary is confusing. I'll propose the change of it when we close the discussion about the scope of this change.

however, keep in mind that I think there are no existing unit tests for dynamicTables, even with the FlinkSink implementation.$$$

I'm a little bit confused about that. By dynamic tables, I thought about this concept https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/dynamic_tables/ . From this perspective, everywhere where we use Table API or Flink SQL, and the test code touches FlinkDynamicTableFactory (via SPI or directly), we test Iceberg's dynamic tables implementation. Did you think about something else?

@arkadius arkadius changed the title Flink: Tests for the Flink Sink v2-based implemenation (IcebergSink) Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) Oct 1, 2024
@@ -545,6 +561,34 @@ public DataStreamSink<RowData> append() {
}
return rowDataDataStreamSink;
}

@VisibleForTesting
List<Integer> checkAndGetEqualityFieldIds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding this method here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy-paste from FlinkSink. It is used in TestFlinkIcebergSinkV2. I see that this test behaves differently for IcebergSink for WRITE_DISTRIBUTION_MODE_RANGE and turned off partitioning: Equality field columns shouldn't be empty when configuring to use UPSERT data stream is thrown, instead of Invalid write distribution mode .... Do you have an idea whether it is correct or unexpected behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAndGetEqualityFieldIds() was moved to SinkUtil class. This method is already being referenced by both Sinks implementations.
Looks Like I forgot to delete this method from FlinkSink. So we can do the following

  1. Don't add this method to IcebergSink
  2. remove the method from FlinkSink
  3. the test TestFlinkIcebergSinkV2::testCheckAndGetEqualityFieldIds can be rewritten as:
  @TestTemplate
  public void testCheckAndGetEqualityFieldIds() {
    table
        .updateSchema()
        .allowIncompatibleChanges()
        .addRequiredColumn("type", Types.StringType.get())
        .setIdentifierFields("type")
        .commit();

    DataStream<Row> dataStream =
        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);

    // Use schema identifier field IDs as equality field id list by default
    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, null))
        .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds());

    // Use user-provided equality field column as equality field id list
    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("id")))
        .containsExactlyInAnyOrder(table.schema().findField("id").fieldId());

    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("type")))
        .containsExactlyInAnyOrder(table.schema().findField("type").fieldId());
  }

This should work but please let me know if it doesn't. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it looks better, fixed

@arkadius
Copy link
Contributor Author

arkadius commented Oct 2, 2024

@rodmeneses , @pvary I've opened the new PR #11249 with non-controversial changes that I've discovered during this work. It contains mostly goodies that were added in IcebergSink tests and haven't been migrated to FlinkSink tests. Can you take a look at them?

@rodmeneses
Copy link
Contributor

Hi @arkadius
I have started working in backporting the RANGE distribution to the IcebergSink. The unit tests in my code will benefit from the new marker interface you are introducing in this PR, so I'd like to merge this one so that I can rebase properly.
I see this PR is still on "draft". Is anything pending?

@pvary does this PR look good to you, or you have anything pending that would like to be done? The PR is looking good to me.

Also, please @pvary and @stevenzwu please help starting the CI pipelines on this PR.

@arkadius
Copy link
Contributor Author

Hi @arkadius I have started working in backporting the RANGE distribution to the IcebergSink. The unit tests in my code will benefit from the new marker interface you are introducing in this PR, so I'd like to merge this one so that I can rebase properly. I see this PR is still on "draft". Is anything pending?

@pvary does this PR look good to you, or you have anything pending that would like to be done? The PR is looking good to me.

Also, please @pvary and @stevenzwu please help starting the CI pipelines on this PR.

@rodmeneses I think that it would be better if you copied the parts of the code from this PR into your PR. I can rebase into your changes after you merge them.

This PR isn't finished yet. I have a plan how the final shape of unit test could look like to make them easier to maintain and to fill some gaps in IcebergSink's tests but before I show it, I would like to merge this PR: #11249 (and maybe this: #11244). Thanks to that, it will be more clear what is the scope of this change.

@rodmeneses
Copy link
Contributor

Hi @arkadius I have started working in backporting the RANGE distribution to the IcebergSink. The unit tests in my code will benefit from the new marker interface you are introducing in this PR, so I'd like to merge this one so that I can rebase properly. I see this PR is still on "draft". Is anything pending?
@pvary does this PR look good to you, or you have anything pending that would like to be done? The PR is looking good to me.
Also, please @pvary and @stevenzwu please help starting the CI pipelines on this PR.

@rodmeneses I think that it would be better if you copied the parts of the code from this PR into your PR. I can rebase into your changes after you merge them.

This PR isn't finished yet. I have a plan how the final shape of unit test could look like to make them easier to maintain and to fill some gaps in IcebergSink's tests but before I show it, I would like to merge this PR: #11249 (and maybe this: #11244). Thanks to that, it will be more clear what is the scope of this change.

It could make sense, to open a PR with only Builder Marker Interface. I can raise that PR, if you're ok with it @arkadius. But before jumping into that, I'd like to hear about this from @pvary and/or @stevenzwu. What do you think?

@arkadius
Copy link
Contributor Author

Hi @arkadius I have started working in backporting the RANGE distribution to the IcebergSink. The unit tests in my code will benefit from the new marker interface you are introducing in this PR, so I'd like to merge this one so that I can rebase properly. I see this PR is still on "draft". Is anything pending?
@pvary does this PR look good to you, or you have anything pending that would like to be done? The PR is looking good to me.
Also, please @pvary and @stevenzwu please help starting the CI pipelines on this PR.

@rodmeneses I think that it would be better if you copied the parts of the code from this PR into your PR. I can rebase into your changes after you merge them.
This PR isn't finished yet. I have a plan how the final shape of unit test could look like to make them easier to maintain and to fill some gaps in IcebergSink's tests but before I show it, I would like to merge this PR: #11249 (and maybe this: #11244). Thanks to that, it will be more clear what is the scope of this change.

It could make sense, to open a PR with only Builder Marker Interface. I can raise that PR, if you're ok with it @arkadius. But before jumping into that, I'd like to hear about this from @pvary and/or @stevenzwu. What do you think?

It's fine for me. I raised it here #11305. We can move this discussion there.

@pvary
Copy link
Contributor

pvary commented Oct 21, 2024

Merged #11305

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Nov 21, 2024
@rodmeneses
Copy link
Contributor

Hi @arkadius is this still needed/relevant ? please advise, as it will be closed due to inactivity soon
cc @pvary

@arkadius
Copy link
Contributor Author

Hi @rodmeneses. No, this change is not relevant. I'll close it.

@arkadius arkadius closed this Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants