diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java index 7c7afd24ed8e..6362bc447634 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/FlinkConfigOptions.java @@ -91,6 +91,12 @@ private FlinkConfigOptions() {} .defaultValue(false) .withDescription("Use the FLIP-27 based Iceberg source implementation."); + public static final ConfigOption TABLE_EXEC_ICEBERG_USE_V2_SINK = + ConfigOptions.key("table.exec.iceberg.use-v2-sink") + .booleanType() + .defaultValue(false) + .withDescription("Use the SinkV2 API based Iceberg sink implementation."); + public static final ConfigOption TABLE_EXEC_SPLIT_ASSIGNER_TYPE = ConfigOptions.key("table.exec.iceberg.split-assigner-type") .enumType(SplitAssignerType.class) diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java index 1b9268569d9a..c2c5a6706e92 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/IcebergTableSink.java @@ -35,6 +35,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.flink.sink.IcebergSink; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite { @@ -77,14 +78,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { @Override public DataStreamSink consumeDataStream( ProviderContext providerContext, DataStream dataStream) { - return FlinkSink.forRowData(dataStream) - .tableLoader(tableLoader) - .tableSchema(tableSchema) - .equalityFieldColumns(equalityColumns) - .overwrite(overwrite) - .setAll(writeProps) - .flinkConf(readableConfig) - .append(); + if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) { + return IcebergSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } else { + return FlinkSink.forRowData(dataStream) + .tableLoader(tableLoader) + .tableSchema(tableSchema) + .equalityFieldColumns(equalityColumns) + .overwrite(overwrite) + .setAll(writeProps) + .flinkConf(readableConfig) + .append(); + } } }; } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java index c53431490984..3f1fef6580ef 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java @@ -41,7 +41,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.flink.streaming.api.functions.sink.DiscardingSink; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.util.DataFormatConverters; @@ -381,7 +381,7 @@ public Builder toBranch(String branch) { return this; } - private DataStreamSink chainIcebergOperators() { + private DataStreamSink chainIcebergOperators() { Preconditions.checkArgument( inputCreator != null, "Please use forRowData() or forMapperOutputType() to initialize the input DataStream."); @@ -472,12 +472,10 @@ List checkAndGetEqualityFieldIds() { return equalityFieldIds; } - @SuppressWarnings("unchecked") - private DataStreamSink appendDummySink( - SingleOutputStreamOperator committerStream) { - DataStreamSink resultStream = + private DataStreamSink appendDummySink(SingleOutputStreamOperator committerStream) { + DataStreamSink resultStream = committerStream - .addSink(new DiscardingSink()) + .sinkTo(new DiscardingSink<>()) .name(operatorName(String.format("IcebergSink %s", this.table.name()))) .setParallelism(1); if (uidPrefix != null) { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index 2978a92945a2..fad30f9c1e67 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -40,7 +40,6 @@ public class TestFlinkTableSink extends CatalogTestBase { - private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; @@ -51,7 +50,11 @@ public class TestFlinkTableSink extends CatalogTestBase { @Parameter(index = 3) private boolean isStreamingJob; - @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}") + @Parameter(index = 4) + private boolean useV2Sink; + + @Parameters( + name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, useV2Sink={4}") public static List parameters() { List parameters = Lists.newArrayList(); for (FileFormat format : @@ -60,10 +63,24 @@ public static List parameters() { for (Object[] catalogParams : CatalogTestBase.parameters()) { String catalogName = (String) catalogParams[0]; Namespace baseNamespace = (Namespace) catalogParams[1]; - parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming}); + parameters.add( + new Object[] { + catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */ + }); } } } + + for (FileFormat format : + new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) { + for (Boolean isStreaming : new Boolean[] {true, false}) { + String catalogName = "testhadoop_basenamespace"; + Namespace baseNamespace = Namespace.of("l0", "l1"); + parameters.add( + new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */}); + } + } + return parameters; } @@ -87,6 +104,11 @@ protected TableEnvironment getTableEnv() { } } } + + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + return tEnv; } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index b63547d433a4..3afabf6e0795 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -31,6 +31,8 @@ import java.util.stream.IntStream; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -52,6 +54,7 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.sink.IcebergSink; import org.apache.iceberg.flink.source.BoundedTableFactory; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -91,11 +94,20 @@ public class TestFlinkTableSinkExtended extends SqlBase { private TableEnvironment tEnv; - @Parameter protected boolean isStreamingJob; + @Parameter(index = 0) + protected boolean isStreamingJob; - @Parameters(name = "isStreamingJob={0}") + @Parameter(index = 1) + protected Boolean useV2Sink; + + @Parameters(name = "isStreamingJob={0}, useV2Sink={1}") protected static List parameters() { - return Arrays.asList(new Boolean[] {true}, new Boolean[] {false}); + return Arrays.asList( + new Object[] {true, false}, + new Object[] {false, false}, + new Object[] {true, true}, + new Object[] {false, true}, + new Object[] {true, null}); } protected synchronized TableEnvironment getTableEnv() { @@ -115,6 +127,13 @@ protected synchronized TableEnvironment getTableEnv() { tEnv = TableEnvironment.create(settingsBuilder.build()); } } + + if (useV2Sink != null) { + tEnv.getConfig() + .getConfiguration() + .set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink); + } + return tEnv; } @@ -146,6 +165,32 @@ public void clean() throws Exception { catalog.close(); } + @TestTemplate + public void testUsedFlinkSinkInterface() { + String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList()); + sql( + "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", + SOURCE_TABLE, dataId); + + PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner(); + String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE); + ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); + Transformation transformation = + planner.translate(Collections.singletonList(operation)).get(0); + assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); + SinkTransformation sinkTransformation = (SinkTransformation) transformation; + if (useV2Sink != null && useV2Sink) { + assertThat(sinkTransformation.getSink()) + .as("Should use SinkV2 API based implementation") + .isInstanceOf(IcebergSink.class); + } else { + assertThat(sinkTransformation.getSink()) + .as("Should use custom chain of StreamOperators terminated by DiscardingSink") + .isInstanceOf(DiscardingSink.class); + } + } + @TestTemplate public void testWriteParallelism() { List dataSet = @@ -165,18 +210,25 @@ public void testWriteParallelism() { "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s", TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); - Transformation dummySink = planner.translate(Collections.singletonList(operation)).get(0); - Transformation committer = dummySink.getInputs().get(0); - Transformation writer = committer.getInputs().get(0); - - assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); - writer - .getInputs() - .forEach( - input -> - assertThat(input.getParallelism()) - .as("Should have the expected parallelism.") - .isEqualTo(isStreamingJob ? 2 : 4)); + Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + if (useV2Sink != null && useV2Sink) { + assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); + Transformation writerInput = sink.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } else { + Transformation committer = sink.getInputs().get(0); + Transformation writer = committer.getInputs().get(0); + + assertThat(writer.getParallelism()) + .as("Should have the expected 1 parallelism.") + .isEqualTo(1); + Transformation writerInput = writer.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } } @TestTemplate