Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing,
* * software distributed under the License is distributed on an
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* * KIND, either express or implied. See the License for the
* * specific language governing permissions and limitations
* * under the License.
*
*/

package org.apache.iceberg.flink.sink;

import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;

public interface BaseIcebergSinkBuilder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand where you are coming from, but we are planning to add features (table maintenance) to the new IcebergSink which will not be available to the FlinkSink.
We also need to think about the deprecation and the eventual removal of the FlinkSink.

Maybe something like I propose for the MaintenanceTaskBuilder could help: https://github.com/apache/iceberg/pull/11144/files#diff-1b7c33ba8eb300e4a1c92844418384756a07ba32882de39855c74c9a40778f48

Or we could just move this class to the tests, and don't add this to the public API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my previous solution, it was also possible, because each Builder implementation overrode the return type of each method to the specific one, but your suggestion looks better - changed.

With regards to the visibility, in the referenced PR, we discussed that there is a plan to allow to use IcebergSink for dynamic tables (IcebergTableSink:80). I thought about adding the property allowing to use this implementation there. Thanks to the common parent for builders, we could check the property value at the beginning of method and do rest of builder's chain on it. This would ensure that we keep these builder synchronized. I though about doing this in the next PR. WDYT?


BaseIcebergSinkBuilder tableSchema(TableSchema newTableSchema);

BaseIcebergSinkBuilder tableLoader(TableLoader newTableLoader);

BaseIcebergSinkBuilder equalityFieldColumns(List<String> columns);

BaseIcebergSinkBuilder overwrite(boolean newOverwrite);

BaseIcebergSinkBuilder setAll(Map<String, String> properties);

BaseIcebergSinkBuilder flinkConf(ReadableConfig config);

BaseIcebergSinkBuilder table(Table newTable);

BaseIcebergSinkBuilder writeParallelism(int newWriteParallelism);

BaseIcebergSinkBuilder distributionMode(DistributionMode mode);

BaseIcebergSinkBuilder toBranch(String branch);

DataStreamSink<?> append();

static BaseIcebergSinkBuilder forRow(DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRow(input, tableSchema);
} else {
return FlinkSink.forRow(input, tableSchema);
}
}

static BaseIcebergSinkBuilder forRowData(DataStream<RowData> input, boolean useV2Sink) {
if (useV2Sink) {
return IcebergSink.forRowData(input);
} else {
return FlinkSink.forRowData(input);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}

public static class Builder {
public static class Builder implements BaseIcebergSinkBuilder {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
Expand Down Expand Up @@ -179,6 +179,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = newTable;
return this;
Expand All @@ -192,6 +193,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -210,21 +212,25 @@ public Builder set(String property, String value) {
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -237,6 +243,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
Expand Down Expand Up @@ -306,6 +313,7 @@ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -332,6 +340,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -376,6 +385,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -436,6 +446,7 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
return new WriteResultSerializer();
}

public static class Builder {
public static class Builder implements BaseIcebergSinkBuilder {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
Expand Down Expand Up @@ -311,6 +311,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = (SerializableTable) SerializableTable.copyOf(newTable);
return this;
Expand All @@ -325,6 +326,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -347,21 +349,25 @@ public Builder set(String property, String value) {
* Set the write properties for IcebergSink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -374,6 +380,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
Expand All @@ -390,6 +397,7 @@ public Builder distributionMode(DistributionMode mode) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -416,6 +424,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -458,6 +467,7 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
Expand Down Expand Up @@ -527,6 +537,7 @@ IcebergSink build() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,36 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase {
@Parameter(index = 2)
private boolean partitioned;

@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}")
@Parameter(index = 3)
private boolean useV2Sink;

@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, useV2Sink = {3}")
public static Object[][] parameters() {
return new Object[][] {
{FileFormat.AVRO, 1, true},
{FileFormat.AVRO, 1, false},
{FileFormat.AVRO, 2, true},
{FileFormat.AVRO, 2, false},
{FileFormat.ORC, 1, true},
{FileFormat.ORC, 1, false},
{FileFormat.ORC, 2, true},
{FileFormat.ORC, 2, false},
{FileFormat.PARQUET, 1, true},
{FileFormat.PARQUET, 1, false},
{FileFormat.PARQUET, 2, true},
{FileFormat.PARQUET, 2, false}
{FileFormat.AVRO, 1, true, false},
Copy link
Contributor

@pvary pvary Sep 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the whole testing matrix here.
I usually:

  • Test everything with the new feature
  • Run some choice tests that makes sure the old feature is intact

Some possible cuts:

  • Run 1 Avro, 1 ORC test and keep all the PARQUET
  • Run 1 with single parallelism and use parallelism 2 for the others
  • Run 1 with unparitioned, and use partitioned for the others

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, reduced

{FileFormat.AVRO, 1, false, false},
{FileFormat.AVRO, 2, true, false},
{FileFormat.AVRO, 2, false, false},
{FileFormat.ORC, 1, true, false},
{FileFormat.ORC, 1, false, false},
{FileFormat.ORC, 2, true, false},
{FileFormat.ORC, 2, false, false},
{FileFormat.PARQUET, 1, true, false},
{FileFormat.PARQUET, 1, false, false},
{FileFormat.PARQUET, 2, true, false},
{FileFormat.PARQUET, 2, false, false},
{FileFormat.AVRO, 1, true, true},
{FileFormat.AVRO, 1, false, true},
{FileFormat.AVRO, 2, true, true},
{FileFormat.AVRO, 2, false, true},
{FileFormat.ORC, 1, true, true},
{FileFormat.ORC, 1, false, true},
{FileFormat.ORC, 2, true, true},
{FileFormat.ORC, 2, false, true},
{FileFormat.PARQUET, 1, true, true},
{FileFormat.PARQUET, 1, false, true},
{FileFormat.PARQUET, 2, true, true},
{FileFormat.PARQUET, 2, false, true}
};
}

Expand Down Expand Up @@ -100,7 +115,7 @@ public void testWriteRowData() throws Exception {
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO)
.map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE));

FlinkSink.forRowData(dataStream)
BaseIcebergSinkBuilder.forRowData(dataStream, useV2Sink)
.table(table)
.tableLoader(tableLoader)
.writeParallelism(parallelism)
Expand All @@ -115,11 +130,11 @@ public void testWriteRowData() throws Exception {

@TestTemplate
public void testWriteRow() throws Exception {
testWriteRow(parallelism, null, DistributionMode.NONE);
testWriteRow(parallelism, null, DistributionMode.NONE, useV2Sink);
}

@TestTemplate
public void testWriteRowWithTableSchema() throws Exception {
testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE);
testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE, useV2Sink);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected List<RowData> convertToRowData(List<Row> rows) {
}

protected void testWriteRow(
int writerParallelism, TableSchema tableSchema, DistributionMode distributionMode)
int writerParallelism, TableSchema tableSchema, DistributionMode distributionMode, boolean useV2Sink)
throws Exception {
List<Row> rows = createRows("");
DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);

FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
BaseIcebergSinkBuilder.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA, useV2Sink)
.table(table)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,22 @@ public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase {
@Parameter(index = 1)
private String branch;

@Parameter(index = 2)
private boolean useV2Sink;

private TableLoader tableLoader;

@Parameters(name = "formatVersion = {0}, branch = {1}")
@Parameters(name = "formatVersion = {0}, branch = {1}, useV2Sink = {2}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cut the number of tests, like above

public static Object[][] parameters() {
return new Object[][] {
{"1", "main"},
{"1", "testBranch"},
{"2", "main"},
{"2", "testBranch"}
{"1", "main", false},
{"1", "testBranch", false},
{"2", "main", false},
{"2", "testBranch", false},
{"1", "main", true},
{"1", "testBranch", true},
{"2", "main", true},
{"2", "testBranch", true}
};
}

Expand Down Expand Up @@ -105,7 +112,7 @@ private void testWriteRow(TableSchema tableSchema, DistributionMode distribution
List<Row> rows = createRows("");
DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO);

FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
BaseIcebergSinkBuilder.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA, useV2Sink)
.table(table)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
Expand Down
Loading
Loading