Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}

public static class Builder {
public static class Builder implements IcebergSinkBuilder<Builder> {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
Expand Down Expand Up @@ -179,6 +179,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = newTable;
return this;
Expand All @@ -192,6 +193,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -210,21 +212,25 @@ public Builder set(String property, String value) {
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -237,6 +243,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
Expand Down Expand Up @@ -306,6 +313,7 @@ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -321,6 +329,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -332,6 +341,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -376,6 +386,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -436,6 +447,7 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.flink.annotation.Experimental;
Expand Down Expand Up @@ -73,7 +74,10 @@
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -255,7 +259,7 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
return new WriteResultSerializer();
}

public static class Builder {
public static class Builder implements IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
Expand Down Expand Up @@ -311,6 +315,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = (SerializableTable) SerializableTable.copyOf(newTable);
return this;
Expand All @@ -325,6 +330,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -347,21 +353,25 @@ public Builder set(String property, String value) {
* Set the write properties for IcebergSink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -374,6 +384,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
Expand All @@ -390,6 +401,7 @@ public Builder distributionMode(DistributionMode mode) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -405,6 +417,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -416,6 +429,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -458,6 +472,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -527,6 +542,7 @@ IcebergSink build() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
Expand All @@ -545,6 +561,34 @@ public DataStreamSink<RowData> append() {
}
return rowDataDataStreamSink;
}

@VisibleForTesting
List<Integer> checkAndGetEqualityFieldIds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we adding this method here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a copy-paste from FlinkSink. It is used in TestFlinkIcebergSinkV2. I see that this test behaves differently for IcebergSink for WRITE_DISTRIBUTION_MODE_RANGE and turned off partitioning: Equality field columns shouldn't be empty when configuring to use UPSERT data stream is thrown, instead of Invalid write distribution mode .... Do you have an idea whether it is correct or unexpected behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAndGetEqualityFieldIds() was moved to SinkUtil class. This method is already being referenced by both Sinks implementations.
Looks Like I forgot to delete this method from FlinkSink. So we can do the following

  1. Don't add this method to IcebergSink
  2. remove the method from FlinkSink
  3. the test TestFlinkIcebergSinkV2::testCheckAndGetEqualityFieldIds can be rewritten as:
  @TestTemplate
  public void testCheckAndGetEqualityFieldIds() {
    table
        .updateSchema()
        .allowIncompatibleChanges()
        .addRequiredColumn("type", Types.StringType.get())
        .setIdentifierFields("type")
        .commit();

    DataStream<Row> dataStream =
        env.addSource(new BoundedTestSource<>(ImmutableList.of()), ROW_TYPE_INFO);

    // Use schema identifier field IDs as equality field id list by default
    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, null))
        .containsExactlyInAnyOrderElementsOf(table.schema().identifierFieldIds());

    // Use user-provided equality field column as equality field id list
    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("id")))
        .containsExactlyInAnyOrder(table.schema().findField("id").fieldId());

    assertThat(SinkUtil.checkAndGetEqualityFieldIds(table, Lists.newArrayList("type")))
        .containsExactlyInAnyOrder(table.schema().findField("type").fieldId());
  }

This should work but please let me know if it doesn't. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it looks better, fixed

List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
Set<Integer> equalityFieldSet =
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(
field,
"Missing required equality field column '%s' in table schema %s",
column,
table.schema());
equalityFieldSet.add(field.fieldId());
}

if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
LOG.warn(
"The configured equality field column IDs {} are not matched with the schema identifier field IDs"
+ " {}, use job specified equality field columns as the equality fields by default.",
equalityFieldSet,
table.schema().identifierFieldIds());
}
equalityFieldIds = Lists.newArrayList(equalityFieldSet);
}
return equalityFieldIds;
}
}

private static String defaultSuffix(String uidSuffix, String defaultSuffix) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;

@Internal
/*
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink)
and the new one implementation based on Flink v2 sink's API (IcebergSink). After we remove the previous implementation,
all occurrences of this class would be replaced by direct IcebergSink usage.
*/
Copy link
Contributor

@rodmeneses rodmeneses Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely convinced that this is the path we should move forward to. One of the design choices during the implementation of the new IceberSink was exactly to avoid this: instead of refactoring common code, we would just add a brand new IcebergSink and keep the old one untouched (or minimally touched).

In my mind, this PR is about adding additional tests using the new sink, in places where only the old one is used. If we want to keep the design choice from my previous PR, then that would imply that we would be creating new test cases altogether, but only targeting the IcebergSink.
For example, we have
TestFlinkIcebergSinkExtended

then we could add a new one called
TestFlinkIcebergSinkExtendedSinkV2
which is identical as the original one, but replacing the FlinkSink. with IcebergSink.

And even though this will require creating extra files, I think it will be worth it as we can keep the 2 sinks (implementations and unit testing) separate.

Now, for use cases like the Dynamic Tables - which sink to use??:
I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
          ConfigOptions.key("table.exec.iceberg.use-v2-sink")
                  .booleanType()
                  .defaultValue(false)
                  .withDescription("Use the SinkV2 based IcebergSink implementation.");

and then, in dynamic table code:

public DataStreamSink<?> consumeDataStream( ...
.....
.....

 if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
          return IcebergSink.forRowData(dataStream)
                  .tableLoader(tableLoader)
                  .tableSchema(tableSchema)
                  .equalityFieldColumns(equalityColumns)
                  .overwrite(overwrite)
                  .setAll(writeProps)
                  .flinkConf(readableConfig)
                  .append();
        } else {
          return FlinkSink.forRowData(dataStream)
                  .tableLoader(tableLoader)
                  .tableSchema(tableSchema)
                  .equalityFieldColumns(equalityColumns)
                  .overwrite(overwrite)
                  .setAll(writeProps)
                  .flinkConf(readableConfig)
                  .append();
        }

Wdyt? @pvary @stevenzwu

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the priorities in my head:

  • When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.
  • It should be as easy as possible to remove the FlinkSink and related classes. There should not be any dangling classes left (unused interfaces and such)
  • We should reuse as much of the testing code as possible

From my perspective, it's ok to add an interface to separate the common methods, but I prefer not to have any implementation in the interface, as this class should be removed when the deprecation happens. If this approach is not enough, then we need test only util methods to create the sinks for the tests

Copy link
Contributor Author

@arkadius arkadius Oct 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review folks. The listed priorities sounds reasonable. Would you agree @pvary that we should add two more bullets to this lists:

  • We should ensure that IcebergSink is covered by unit tests to a similar level as FlinkSink
  • We should ensure as transitive migration from FlinkSink to IcebergSink as possible. For example, try to keep features and allowed properties synchronized (of course, I'm not talking about features that are not possible to synchronize because of Flink's API differences)
    ?

With regards to this:

When removing the FlinkSink from the Iceberg connector codebase, the users of the IcebergSink should not need to change any code.

I think that the suggested by you approach with the interface without any implementation shouldn't break the user's code. I can do a manual test simulating the removal of the interface and verify if no exception is thrown.

@rodmeneses with regards to this comment:

I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to FlinkConfigOptions

I wanted to do this, but I thought that it would be better to move this to the separated PR as I mentioned in this comment: #11219 (comment) WDYAT?

I started with a lower level unit tests because I saw that they cover a lot of use cases and as a potential user of this class, I would sleep better if I knew that all tests passes also for IcebergSink. For the purpose of this feature flag, I think that we should add a higher level tests that would use Flink's table API/Flink SQL. Do you agree with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed abstract class to the interface.
BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of IcebergSink that are copy-pastes of FlinkSink tests (e.g. TestIcebergSink vs TestFlinkIcebergSink). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @arkadius
I'm OK with adding a new interface IcerbergSinkBuilder and having the FlinkSink and IcebergSink implement it. So on my mind, the only things that are changing in this PR are exactly those ones. I can see that in the PR already, with only one thing I noticed which I will comment directly in the PR itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed abstract class to the interface. BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of IcebergSink that are copy-pastes of FlinkSink tests (e.g. TestIcebergSink vs TestFlinkIcebergSink). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?

During the implementation of the IcerbergSink I wrote unit test to cover all the new functionality specific to the new sink, and I also added unit test to cover similar functionality from the FlinkSink. I didn't want to modify existing code (which was a design choice in this PR) so I went ahead and created new test classes that will only feature the new IcebergSink. This is the reason I suggest yesterday about adding new test classes that only target the new IcbergSink, so that it is similar to the approach that I followed.

Having said that, I really liked the idea of your new marker interface as it minimally change behavior or APIs in the current code, and at the same time it makes very simple and clear adding a parameter "useSinkV2" for the existing unit tests, so that you can use either FlinkSink or IcebergSink.

public interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> {

T tableSchema(TableSchema newTableSchema);

T tableLoader(TableLoader newTableLoader);

T equalityFieldColumns(List<String> columns);

T overwrite(boolean newOverwrite);

T setAll(Map<String, String> properties);

T flinkConf(ReadableConfig config);

T table(Table newTable);

T writeParallelism(int newWriteParallelism);

T distributionMode(DistributionMode mode);

T toBranch(String branch);

T upsert(boolean enabled);

DataStreamSink<?> append();

static IcebergSinkBuilder<?> forRow(
DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRow(input, tableSchema);
} else {
return FlinkSink.forRow(input, tableSchema);
}
}

static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRowData(input);
} else {
return FlinkSink.forRowData(input);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
Expand Down Expand Up @@ -59,4 +60,18 @@ static CommittableWithLineage<IcebergCommittable> extractAndAssertCommittableWit
assertThat(value).isInstanceOf(CommittableWithLineage.class);
return (CommittableWithLineage<IcebergCommittable>) value;
}

static <R> R invokeIcebergSinkBuilderMethod(
IcebergSinkBuilder<?> sinkBuilder,
Function<FlinkSink.Builder, R> processSinkV1Builder,
Function<IcebergSink.Builder, R> processSinkV2Builder) {
if (sinkBuilder instanceof FlinkSink.Builder) {
return processSinkV1Builder.apply((FlinkSink.Builder) sinkBuilder);
} else if (sinkBuilder instanceof IcebergSink.Builder) {
return processSinkV2Builder.apply((IcebergSink.Builder) sinkBuilder);
} else {
throw new IllegalArgumentException(
"Not expected sinkBuilder class: " + sinkBuilder.getClass());
}
}
}
Loading