From 5ab0cced0f074f112b2fddecd5857d4c8f62c2a2 Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Tue, 1 Oct 2024 16:51:35 +0200 Subject: [PATCH 1/6] Add table.exec.iceberg.use-v2-sink option --- docs/docs/flink-writes.md | 25 +++++++++++++++++ .../iceberg/flink/FlinkConfigOptions.java | 6 ++++ .../iceberg/flink/IcebergTableSink.java | 28 +++++++++++++------ .../iceberg/flink/TestFlinkTableSink.java | 22 +++++++++++++-- .../flink/TestFlinkTableSinkExtended.java | 17 +++++++++-- 5 files changed, 85 insertions(+), 13 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index f53b5d832efe..2671ec0309e2 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -371,3 +371,28 @@ and [deleting orphan files](maintenance.md#delete-orphan-files) could possibly c the state of the Flink job. To avoid that, make sure to keep the last snapshot created by the Flink job (which can be identified by the `flink.job-id` property in the summary), and only delete orphan files that are old enough. + +# Flink Writes (SinkV2 based implementation) + +The [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) +was introduced in Flink 1.15. +The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API) +had some limitations - for example it created a lot of small files when writing to it. This problem is called +the `small-file-compaction` problem in +the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction). +The default `FlinkSink` implementation available in `iceberg-flink` module builds its own `StreamOperator`s chain ends with `DiscardingSink`. +However, in the same module, there is also `IcebergSink` which is based on the SinkV2 API. +The SinkV2 based `IcebergSink` is currently an experimental feature. + +## Writing with SQL + +To turn on SinkV2 based implementation in SQL, set this configuration option: +```sql +SET table.exec.iceberg.use-v2-sink = true; +``` + +## Writing with DataStream + +To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets. +Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`), +some others are slightly different (e.g. there is a `uidSuffix` method instead of `uidPrefix`). diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 099925101571..97e2c70d348e 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -91,6 +91,12 @@ private FlinkConfigOptions() {} .defaultValue(true) .withDescription("Use the FLIP-27 based Iceberg source implementation."); + public static final ConfigOption TABLE_EXEC_ICEBERG_USE_V2_SINK = + ConfigOptions.key("table.exec.iceberg.use-v2-sink") + .booleanType() + .defaultValue(false) + .withDescription("Use the SinkV2 API based Iceberg sink implementation."); + public static final ConfigOption TABLE_EXEC_SPLIT_ASSIGNER_TYPE = ConfigOptions.key("table.exec.iceberg.split-assigner-type") .enumType(SplitAssignerType.class) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java index 1b9268569d9a..c2c5a6706e92 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java @@ -35,6 +35,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.sink.IcebergSink; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { @@ -77,14 +78,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { @Override public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { - return FlinkSink.forRowData(dataStream) - .tableLoader(tableLoader) - .tableSchema(tableSchema) - .equalityFieldColumns(equalityColumns) - .overwrite(overwrite) - .setAll(writeProps) - .flinkConf(readableConfig) - .append(); + if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) { + return IcebergSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } else { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } } }; } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 2978a92945a2..c5a9dd2d8960 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -51,7 +51,11 @@ public class TestFlinkTableSink extends CatalogTestBase { @Parameter(index = 3) private boolean isStreamingJob; - @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + @Parameter(index = 4) + private boolean useV2Sink; + + @Parameters( + name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, useV2Sink={4}") public static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : @@ -60,10 +64,21 @@ public static List parameters() { for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; - parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + boolean doNotUseV2Sink = false; + parameters.add( + new Object[] {catalogName, baseNamespace, format, isStreaming, doNotUseV2Sink}); } } } + for (FileFormat format : + new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + String catalogName = "testhadoop_basenamespace"; + Namespace baseNamespace = Namespace.of("l0", "l1"); + boolean useV2Sink = true; + parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming, useV2Sink}); + } + } return parameters; } @@ -87,6 +102,9 @@ protected TableEnvironment getTableEnv() { } } } + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); return tEnv; } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index b63547d433a4..fe50cfa0557d 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -91,11 +91,19 @@ public class TestFlinkTableSinkExtended extends SqlBase { private TableEnvironment tEnv; - @Parameter protected boolean isStreamingJob; + @Parameter(index = 0) + protected boolean isStreamingJob; - @Parameters(name = "isStreamingJob={0}") + @Parameter(index = 1) + protected boolean useV2Sink; + + @Parameters(name = "isStreamingJob={0}, useV2Sink={1}") protected static List parameters() { - return Arrays.asList(new Boolean[] {true}, new Boolean[] {false}); + return Arrays.asList( + new Boolean[] {true, false}, + new Boolean[] {false, false}, + new Boolean[] {true, true}, + new Boolean[] {false, true}); } protected synchronized TableEnvironment getTableEnv() { @@ -115,6 +123,9 @@ protected synchronized TableEnvironment getTableEnv() { tEnv = TableEnvironment.create(settingsBuilder.build()); } } + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); return tEnv; } From 18e1996135d23466c5f856028243eae15995a936 Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Thu, 3 Oct 2024 12:45:53 +0200 Subject: [PATCH 2/6] Added test for correct sink interface usage --- docs/docs/flink-writes.md | 2 +- .../iceberg/flink/TestFlinkTableSink.java | 1 - .../flink/TestFlinkTableSinkExtended.java | 54 ++++++++++++++----- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 2671ec0309e2..0da6c08e4763 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -380,7 +380,7 @@ The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLIN had some limitations - for example it created a lot of small files when writing to it. This problem is called the `small-file-compaction` problem in the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction). -The default `FlinkSink` implementation available in `iceberg-flink` module builds its own `StreamOperator`s chain ends with `DiscardingSink`. +The default `FlinkSink` implementation available in `iceberg-flink` module builds its own chain of `StreamOperator`s terminated by `DiscardingSink`. However, in the same module, there is also `IcebergSink` which is based on the SinkV2 API. The SinkV2 based `IcebergSink` is currently an experimental feature. diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index c5a9dd2d8960..24822a3725ae 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -40,7 +40,6 @@ public class TestFlinkTableSink extends CatalogTestBase { - private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index fe50cfa0557d..da596959cf70 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -31,6 +31,8 @@ import java.util.stream.IntStream; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -157,6 +159,27 @@ public void clean() throws Exception { catalog.close(); } + @TestTemplate + public void testUsedFlinkSinkInterface() { + String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList()); + sql( + "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", + SOURCE_TABLE, dataId); + + PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner(); + String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE); + ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); + Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + if (useV2Sink) { + assertThat(sink).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); + } else { + assertThat(sink) + .as("Should use custom chain of StreamOperators terminated by DiscardingSink") + .isInstanceOf(LegacySinkTransformation.class); + } + } + @TestTemplate public void testWriteParallelism() { List dataSet = @@ -176,18 +199,25 @@ public void testWriteParallelism() { "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s", TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); - Transformation dummySink = planner.translate(Collections.singletonList(operation)).get(0); - Transformation committer = dummySink.getInputs().get(0); - Transformation writer = committer.getInputs().get(0); - - assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); - writer - .getInputs() - .forEach( - input -> - assertThat(input.getParallelism()) - .as("Should have the expected parallelism.") - .isEqualTo(isStreamingJob ? 2 : 4)); + Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + if (useV2Sink) { + assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); + Transformation writerInput = sink.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } else { + Transformation committer = sink.getInputs().get(0); + Transformation writer = committer.getInputs().get(0); + + assertThat(writer.getParallelism()) + .as("Should have the expected 1 parallelism.") + .isEqualTo(1); + Transformation writerInput = writer.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } } @TestTemplate From 032b71ab90798ed04682058ec773062cd605804c Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Fri, 4 Oct 2024 11:57:29 +0200 Subject: [PATCH 3/6] Review fixes --- docs/docs/flink-writes.md | 5 +++-- .../java/org/apache/iceberg/flink/TestFlinkTableSink.java | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 0da6c08e4763..c87f49585aec 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -394,5 +394,6 @@ SET table.exec.iceberg.use-v2-sink = true; ## Writing with DataStream To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets. -Warning: some settings are not available in this class (e.g. `rangeDistributionStatisticsType`), -some others are slightly different (e.g. there is a `uidSuffix` method instead of `uidPrefix`). +Warning: There are some slight differences between these implementations: +- The `RANGE` distribution mode is not yet available for the `IcebergSink` +- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix` diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 24822a3725ae..4e3ccd2f0e13 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -69,6 +69,7 @@ public static List parameters() { } } } + for (FileFormat format : new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { for (Boolean isStreaming : new Boolean[] {true, false}) { @@ -78,6 +79,7 @@ public static List parameters() { parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming, useV2Sink}); } } + return parameters; } @@ -101,9 +103,11 @@ protected TableEnvironment getTableEnv() { } } } + tEnv.getConfig() .getConfiguration() .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + return tEnv; } From 8591ba0d130fe64cf275410658c28e406c1d3e40 Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Fri, 4 Oct 2024 12:01:48 +0200 Subject: [PATCH 4/6] FlinkSink using v2 sinks instead of SinkFunction --- .../org/apache/iceberg/flink/sink/FlinkSink.java | 12 +++++------- .../flink/TestFlinkTableSinkExtended.java | 16 +++++++++++----- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index e862e88c968c..2e586b960c22 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; @@ -392,7 +392,7 @@ public Builder toBranch(String branch) { return this; } - private DataStreamSink chainIcebergOperators() { + private DataStreamSink chainIcebergOperators() { Preconditions.checkArgument( inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream."); @@ -484,12 +484,10 @@ List checkAndGetEqualityFieldIds() { return equalityFieldIds; } - @SuppressWarnings("unchecked") - private DataStreamSink appendDummySink( - SingleOutputStreamOperator committerStream) { - DataStreamSink resultStream = + private DataStreamSink appendDummySink(SingleOutputStreamOperator committerStream) { + DataStreamSink resultStream = committerStream - .addSink(new DiscardingSink()) + .sinkTo(new DiscardingSink<>()) .name(operatorName(String.format("IcebergSink %s", this.table.name()))) .setParallelism(1); if (uidPrefix != null) { diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index da596959cf70..3c210f658865 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -31,7 +31,7 @@ import java.util.stream.IntStream; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; @@ -54,6 +54,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.IcebergSink; import org.apache.iceberg.flink.source.BoundedTableFactory; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -170,13 +171,18 @@ public void testUsedFlinkSinkInterface() { PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner(); String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); - Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + Transformation transformation = + planner.translate(Collections.singletonList(operation)).get(0); + assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); + SinkTransformation sinkTransformation = (SinkTransformation) transformation; if (useV2Sink) { - assertThat(sink).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); + assertThat(sinkTransformation.getSink()) + .as("Should use SinkV2 API based implementation") + .isInstanceOf(IcebergSink.class); } else { - assertThat(sink) + assertThat(sinkTransformation.getSink()) .as("Should use custom chain of StreamOperators terminated by DiscardingSink") - .isInstanceOf(LegacySinkTransformation.class); + .isInstanceOf(DiscardingSink.class); } } From 839fe1f8deaf2f5690f735d866e1226cea281064 Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Fri, 4 Oct 2024 16:26:26 +0200 Subject: [PATCH 5/6] introduction rewritten --- docs/docs/flink-writes.md | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index c87f49585aec..1e4f26e2857b 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -374,15 +374,14 @@ orphan files that are old enough. # Flink Writes (SinkV2 based implementation) -The [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) -was introduced in Flink 1.15. -The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A+Unified+Sink+API) -had some limitations - for example it created a lot of small files when writing to it. This problem is called -the `small-file-compaction` problem in -the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction). -The default `FlinkSink` implementation available in `iceberg-flink` module builds its own chain of `StreamOperator`s terminated by `DiscardingSink`. -However, in the same module, there is also `IcebergSink` which is based on the SinkV2 API. -The SinkV2 based `IcebergSink` is currently an experimental feature. +At the time when the current default, `FlinkSink` implementation was created, Flink Sink's interface had some +limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based +on a custom chain of `StreamOperator`s terminated by `DiscardingSink`. + +In the Flink 1.15 version, [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) +was introduced. This interface is used in the new, `IcebergSink` implementation that is also available in the `iceberg-flink` module. +The new implementation will be a base for further work on features such as [table maintenance](maintenance.md). +The SinkV2 based implementation is currently an experimental feature so please use it with caution. ## Writing with SQL From 799de3521c1e8c0e38ff6ef9e81bb0a868e48f8d Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Mon, 7 Oct 2024 23:16:35 +0200 Subject: [PATCH 6/6] review fixes --- docs/docs/flink-writes.md | 8 +++--- .../iceberg/flink/TestFlinkTableSink.java | 9 ++++--- .../flink/TestFlinkTableSinkExtended.java | 25 +++++++++++-------- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 1e4f26e2857b..3edd2720a1f4 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -378,10 +378,10 @@ At the time when the current default, `FlinkSink` implementation was created, Fl limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based on a custom chain of `StreamOperator`s terminated by `DiscardingSink`. -In the Flink 1.15 version, [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) -was introduced. This interface is used in the new, `IcebergSink` implementation that is also available in the `iceberg-flink` module. -The new implementation will be a base for further work on features such as [table maintenance](maintenance.md). -The SinkV2 based implementation is currently an experimental feature so please use it with caution. +In the 1.15 version of Flink [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) +was introduced. This interface is used in the new `IcebergSink` implementation which is available in the `iceberg-flink` module. +The new implementation is a base for further work on features such as [table maintenance](maintenance.md). +The SinkV2 based implementation is currently an experimental feature so use it with caution. ## Writing with SQL diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 4e3ccd2f0e13..fad30f9c1e67 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -63,9 +63,10 @@ public static List parameters() { for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; - boolean doNotUseV2Sink = false; parameters.add( - new Object[] {catalogName, baseNamespace, format, isStreaming, doNotUseV2Sink}); + new Object[] { + catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */ + }); } } } @@ -75,8 +76,8 @@ public static List parameters() { for (Boolean isStreaming : new Boolean[] {true, false}) { String catalogName = "testhadoop_basenamespace"; Namespace baseNamespace = Namespace.of("l0", "l1"); - boolean useV2Sink = true; - parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming, useV2Sink}); + parameters.add( + new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */}); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index 3c210f658865..3afabf6e0795 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -98,15 +98,16 @@ public class TestFlinkTableSinkExtended extends SqlBase { protected boolean isStreamingJob; @Parameter(index = 1) - protected boolean useV2Sink; + protected Boolean useV2Sink; @Parameters(name = "isStreamingJob={0}, useV2Sink={1}") protected static List parameters() { return Arrays.asList( - new Boolean[] {true, false}, - new Boolean[] {false, false}, - new Boolean[] {true, true}, - new Boolean[] {false, true}); + new Object[] {true, false}, + new Object[] {false, false}, + new Object[] {true, true}, + new Object[] {false, true}, + new Object[] {true, null}); } protected synchronized TableEnvironment getTableEnv() { @@ -126,9 +127,13 @@ protected synchronized TableEnvironment getTableEnv() { tEnv = TableEnvironment.create(settingsBuilder.build()); } } - tEnv.getConfig() - .getConfiguration() - .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + + if (useV2Sink != null) { + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + } + return tEnv; } @@ -175,7 +180,7 @@ public void testUsedFlinkSinkInterface() { planner.translate(Collections.singletonList(operation)).get(0); assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); SinkTransformation sinkTransformation = (SinkTransformation) transformation; - if (useV2Sink) { + if (useV2Sink != null && useV2Sink) { assertThat(sinkTransformation.getSink()) .as("Should use SinkV2 API based implementation") .isInstanceOf(IcebergSink.class); @@ -206,7 +211,7 @@ public void testWriteParallelism() { TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); - if (useV2Sink) { + if (useV2Sink != null && useV2Sink) { assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); Transformation writerInput = sink.getInputs().get(0); assertThat(writerInput.getParallelism())