diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 1e4f26e2857b..3edd2720a1f4 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -378,10 +378,10 @@ At the time when the current default, `FlinkSink` implementation was created, Fl limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based on a custom chain of `StreamOperator`s terminated by `DiscardingSink`. -In the Flink 1.15 version, [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) -was introduced. This interface is used in the new, `IcebergSink` implementation that is also available in the `iceberg-flink` module. -The new implementation will be a base for further work on features such as [table maintenance](maintenance.md). -The SinkV2 based implementation is currently an experimental feature so please use it with caution. +In the 1.15 version of Flink [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction) +was introduced. This interface is used in the new `IcebergSink` implementation which is available in the `iceberg-flink` module. +The new implementation is a base for further work on features such as [table maintenance](maintenance.md). +The SinkV2 based implementation is currently an experimental feature so use it with caution. ## Writing with SQL diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 4e3ccd2f0e13..fad30f9c1e67 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -63,9 +63,10 @@ public static List parameters() { for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; - boolean doNotUseV2Sink = false; parameters.add( - new Object[] {catalogName, baseNamespace, format, isStreaming, doNotUseV2Sink}); + new Object[] { + catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */ + }); } } } @@ -75,8 +76,8 @@ public static List parameters() { for (Boolean isStreaming : new Boolean[] {true, false}) { String catalogName = "testhadoop_basenamespace"; Namespace baseNamespace = Namespace.of("l0", "l1"); - boolean useV2Sink = true; - parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming, useV2Sink}); + parameters.add( + new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */}); } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index 3c210f658865..3afabf6e0795 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -98,15 +98,16 @@ public class TestFlinkTableSinkExtended extends SqlBase { protected boolean isStreamingJob; @Parameter(index = 1) - protected boolean useV2Sink; + protected Boolean useV2Sink; @Parameters(name = "isStreamingJob={0}, useV2Sink={1}") protected static List parameters() { return Arrays.asList( - new Boolean[] {true, false}, - new Boolean[] {false, false}, - new Boolean[] {true, true}, - new Boolean[] {false, true}); + new Object[] {true, false}, + new Object[] {false, false}, + new Object[] {true, true}, + new Object[] {false, true}, + new Object[] {true, null}); } protected synchronized TableEnvironment getTableEnv() { @@ -126,9 +127,13 @@ protected synchronized TableEnvironment getTableEnv() { tEnv = TableEnvironment.create(settingsBuilder.build()); } } - tEnv.getConfig() - .getConfiguration() - .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + + if (useV2Sink != null) { + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + } + return tEnv; } @@ -175,7 +180,7 @@ public void testUsedFlinkSinkInterface() { planner.translate(Collections.singletonList(operation)).get(0); assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); SinkTransformation sinkTransformation = (SinkTransformation) transformation; - if (useV2Sink) { + if (useV2Sink != null && useV2Sink) { assertThat(sinkTransformation.getSink()) .as("Should use SinkV2 API based implementation") .isInstanceOf(IcebergSink.class); @@ -206,7 +211,7 @@ public void testWriteParallelism() { TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); - if (useV2Sink) { + if (useV2Sink != null && useV2Sink) { assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); Transformation writerInput = sink.getInputs().get(0); assertThat(writerInput.getParallelism())