Skip to content

Commit

Permalink
Flink: Add table.exec.iceberg.use-v2-sink option (apache#11244)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadius authored and zachdisc committed Dec 12, 2024
1 parent a7e51b4 commit c1a432d
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 33 deletions.
25 changes: 25 additions & 0 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,28 @@ and [deleting orphan files](maintenance.md#delete-orphan-files) could possibly c
the state of the Flink job. To avoid that, make sure to keep the last snapshot created by the Flink
job (which can be identified by the `flink.job-id` property in the summary), and only delete
orphan files that are old enough.

# Flink Writes (SinkV2 based implementation)

At the time when the current default, `FlinkSink` implementation was created, Flink Sink's interface had some
limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based
on a custom chain of `StreamOperator`s terminated by `DiscardingSink`.

In the 1.15 version of Flink [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
was introduced. This interface is used in the new `IcebergSink` implementation which is available in the `iceberg-flink` module.
The new implementation is a base for further work on features such as [table maintenance](maintenance.md).
The SinkV2 based implementation is currently an experimental feature so use it with caution.

## Writing with SQL

To turn on SinkV2 based implementation in SQL, set this configuration option:
```sql
SET table.exec.iceberg.use-v2-sink = true;
```

## Writing with DataStream

To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets.
Warning: There are some slight differences between these implementations:
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ private FlinkConfigOptions() {}
.defaultValue(true)
.withDescription("Use the FLIP-27 based Iceberg source implementation.");

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
ConfigOptions.key("table.exec.iceberg.use-v2-sink")
.booleanType()
.defaultValue(false)
.withDescription("Use the SinkV2 API based Iceberg sink implementation.");

public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
ConfigOptions.key("table.exec.iceberg.split-assigner-type")
.enumType(SplitAssignerType.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
Expand Down Expand Up @@ -77,14 +78,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
return IcebergSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
} else {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
Expand Down Expand Up @@ -392,7 +392,7 @@ public Builder toBranch(String branch) {
return this;
}

private <T> DataStreamSink<T> chainIcebergOperators() {
private DataStreamSink<Void> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
Expand Down Expand Up @@ -484,12 +484,10 @@ List<Integer> checkAndGetEqualityFieldIds() {
return equalityFieldIds;
}

@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(
SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream =
private DataStreamSink<Void> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<Void> resultStream =
committerStream
.addSink(new DiscardingSink())
.sinkTo(new DiscardingSink<>())
.name(operatorName(String.format("IcebergSink %s", this.table.name())))
.setParallelism(1);
if (uidPrefix != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

public class TestFlinkTableSink extends CatalogTestBase {

private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
Expand All @@ -51,7 +50,11 @@ public class TestFlinkTableSink extends CatalogTestBase {
@Parameter(index = 3)
private boolean isStreamingJob;

@Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
@Parameter(index = 4)
private boolean useV2Sink;

@Parameters(
name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, useV2Sink={4}")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
Expand All @@ -60,10 +63,24 @@ public static List<Object[]> parameters() {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
parameters.add(
new Object[] {
catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */
});
}
}
}

for (FileFormat format :
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
for (Boolean isStreaming : new Boolean[] {true, false}) {
String catalogName = "testhadoop_basenamespace";
Namespace baseNamespace = Namespace.of("l0", "l1");
parameters.add(
new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */});
}
}

return parameters;
}

Expand All @@ -87,6 +104,11 @@ protected TableEnvironment getTableEnv() {
}
}
}

tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);

return tEnv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand All @@ -52,6 +54,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -91,11 +94,20 @@ public class TestFlinkTableSinkExtended extends SqlBase {

private TableEnvironment tEnv;

@Parameter protected boolean isStreamingJob;
@Parameter(index = 0)
protected boolean isStreamingJob;

@Parameters(name = "isStreamingJob={0}")
@Parameter(index = 1)
protected Boolean useV2Sink;

@Parameters(name = "isStreamingJob={0}, useV2Sink={1}")
protected static List<Object[]> parameters() {
return Arrays.asList(new Boolean[] {true}, new Boolean[] {false});
return Arrays.asList(
new Object[] {true, false},
new Object[] {false, false},
new Object[] {true, true},
new Object[] {false, true},
new Object[] {true, null});
}

protected synchronized TableEnvironment getTableEnv() {
Expand All @@ -115,6 +127,13 @@ protected synchronized TableEnvironment getTableEnv() {
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}

if (useV2Sink != null) {
tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);
}

return tEnv;
}

Expand Down Expand Up @@ -146,6 +165,32 @@ public void clean() throws Exception {
catalog.close();
}

@TestTemplate
public void testUsedFlinkSinkInterface() {
String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList());
sql(
"CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ " WITH ('connector'='BoundedSource', 'data-id'='%s')",
SOURCE_TABLE, dataId);

PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> transformation =
planner.translate(Collections.singletonList(operation)).get(0);
assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class);
SinkTransformation<?, ?> sinkTransformation = (SinkTransformation<?, ?>) transformation;
if (useV2Sink != null && useV2Sink) {
assertThat(sinkTransformation.getSink())
.as("Should use SinkV2 API based implementation")
.isInstanceOf(IcebergSink.class);
} else {
assertThat(sinkTransformation.getSink())
.as("Should use custom chain of StreamOperators terminated by DiscardingSink")
.isInstanceOf(DiscardingSink.class);
}
}

@TestTemplate
public void testWriteParallelism() {
List<Row> dataSet =
Expand All @@ -165,18 +210,25 @@ public void testWriteParallelism() {
"INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
Transformation<?> committer = dummySink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
writer
.getInputs()
.forEach(
input ->
assertThat(input.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4));
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
if (useV2Sink != null && useV2Sink) {
assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
Transformation<?> writerInput = sink.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
} else {
Transformation<?> committer = sink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism())
.as("Should have the expected 1 parallelism.")
.isEqualTo(1);
Transformation<?> writerInput = writer.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
}
}

@TestTemplate
Expand Down

0 comments on commit c1a432d

Please sign in to comment.