Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Add table.exec.iceberg.use-v2-sink option #11244

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,28 @@ and [deleting orphan files](maintenance.md#delete-orphan-files) could possibly c
the state of the Flink job. To avoid that, make sure to keep the last snapshot created by the Flink
job (which can be identified by the `flink.job-id` property in the summary), and only delete
orphan files that are old enough.

# Flink Writes (SinkV2 based implementation)

At the time when the current default, `FlinkSink` implementation was created, Flink Sink's interface had some
pvary marked this conversation as resolved.
Show resolved Hide resolved
limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based
on a custom chain of `StreamOperator`s terminated by `DiscardingSink`.

In the 1.15 version of Flink [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
was introduced. This interface is used in the new `IcebergSink` implementation which is available in the `iceberg-flink` module.
The new implementation is a base for further work on features such as [table maintenance](maintenance.md).
The SinkV2 based implementation is currently an experimental feature so use it with caution.

## Writing with SQL

To turn on SinkV2 based implementation in SQL, set this configuration option:
```sql
SET table.exec.iceberg.use-v2-sink = true;
```

## Writing with DataStream

To use SinkV2 based implementation, replace `FlinkSink` with `IcebergSink` in the provided snippets.
Warning: There are some slight differences between these implementations:
- The `RANGE` distribution mode is not yet available for the `IcebergSink`
- When using `IcebergSink` use `uidSuffix` instead of the `uidPrefix`
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ private FlinkConfigOptions() {}
.defaultValue(true)
.withDescription("Use the FLIP-27 based Iceberg source implementation.");

public static final ConfigOption<Boolean> TABLE_EXEC_ICEBERG_USE_V2_SINK =
ConfigOptions.key("table.exec.iceberg.use-v2-sink")
.booleanType()
.defaultValue(false)
.withDescription("Use the SinkV2 API based Iceberg sink implementation.");

public static final ConfigOption<SplitAssignerType> TABLE_EXEC_SPLIT_ASSIGNER_TYPE =
ConfigOptions.key("table.exec.iceberg.split-assigner-type")
.enumType(SplitAssignerType.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public class IcebergTableSink implements DynamicTableSink, SupportsPartitioning, SupportsOverwrite {
Expand Down Expand Up @@ -77,14 +78,25 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
if (readableConfig.get(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK)) {
return IcebergSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
} else {
return FlinkSink.forRowData(dataStream)
.tableLoader(tableLoader)
.tableSchema(tableSchema)
.equalityFieldColumns(equalityColumns)
.overwrite(overwrite)
.setAll(writeProps)
.flinkConf(readableConfig)
.append();
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
Expand Down Expand Up @@ -392,7 +392,7 @@ public Builder toBranch(String branch) {
return this;
}

private <T> DataStreamSink<T> chainIcebergOperators() {
private DataStreamSink<Void> chainIcebergOperators() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change being introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to this discussion: #11244 (comment). I realize that we don't need to use the legacy, deprecated interface in the FlinkSink implementation as the sink part of the chain is not important in this chain of transformations - discarding sink behaves the same when it is used either as legacy SinkFunction or the new, SinkV2 interface.

Preconditions.checkArgument(
inputCreator != null,
"Please use forRowData() or forMapperOutputType() to initialize the input DataStream.");
Expand Down Expand Up @@ -484,12 +484,10 @@ List<Integer> checkAndGetEqualityFieldIds() {
return equalityFieldIds;
}

@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(
SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<T> resultStream =
private DataStreamSink<Void> appendDummySink(SingleOutputStreamOperator<Void> committerStream) {
DataStreamSink<Void> resultStream =
committerStream
.addSink(new DiscardingSink())
.sinkTo(new DiscardingSink<>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this change being introduced?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as above

.name(operatorName(String.format("IcebergSink %s", this.table.name())))
.setParallelism(1);
if (uidPrefix != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

public class TestFlinkTableSink extends CatalogTestBase {

private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
Expand All @@ -51,7 +50,11 @@ public class TestFlinkTableSink extends CatalogTestBase {
@Parameter(index = 3)
private boolean isStreamingJob;

@Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}")
@Parameter(index = 4)
private boolean useV2Sink;

@Parameters(
name = "catalogName={0}, baseNamespace={1}, format={2}, isStreaming={3}, useV2Sink={4}")
public static List<Object[]> parameters() {
List<Object[]> parameters = Lists.newArrayList();
for (FileFormat format :
Expand All @@ -60,10 +63,24 @@ public static List<Object[]> parameters() {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming});
parameters.add(
new Object[] {
catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */
});
}
}
}

for (FileFormat format :
pvary marked this conversation as resolved.
Show resolved Hide resolved
new FileFormat[] {FileFormat.ORC, FileFormat.AVRO, FileFormat.PARQUET}) {
for (Boolean isStreaming : new Boolean[] {true, false}) {
String catalogName = "testhadoop_basenamespace";
Namespace baseNamespace = Namespace.of("l0", "l1");
parameters.add(
new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */});
}
}
pvary marked this conversation as resolved.
Show resolved Hide resolved

return parameters;
}

Expand All @@ -87,6 +104,11 @@ protected TableEnvironment getTableEnv() {
}
}
}

tEnv.getConfig()
pvary marked this conversation as resolved.
Show resolved Hide resolved
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);

return tEnv;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand All @@ -52,6 +54,7 @@
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.sink.IcebergSink;
import org.apache.iceberg.flink.source.BoundedTableFactory;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -91,11 +94,20 @@ public class TestFlinkTableSinkExtended extends SqlBase {

private TableEnvironment tEnv;

@Parameter protected boolean isStreamingJob;
@Parameter(index = 0)
protected boolean isStreamingJob;

@Parameters(name = "isStreamingJob={0}")
@Parameter(index = 1)
protected Boolean useV2Sink;

@Parameters(name = "isStreamingJob={0}, useV2Sink={1}")
protected static List<Object[]> parameters() {
return Arrays.asList(new Boolean[] {true}, new Boolean[] {false});
return Arrays.asList(
new Object[] {true, false},
new Object[] {false, false},
new Object[] {true, true},
new Object[] {false, true},
new Object[] {true, null});
}

protected synchronized TableEnvironment getTableEnv() {
Expand All @@ -115,6 +127,13 @@ protected synchronized TableEnvironment getTableEnv() {
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}

if (useV2Sink != null) {
tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);
}

return tEnv;
}

Expand Down Expand Up @@ -146,6 +165,32 @@ public void clean() throws Exception {
catalog.close();
}

@TestTemplate
public void testUsedFlinkSinkInterface() {
String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList());
sql(
"CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ " WITH ('connector'='BoundedSource', 'data-id'='%s')",
SOURCE_TABLE, dataId);

PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> transformation =
planner.translate(Collections.singletonList(operation)).get(0);
assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class);
SinkTransformation<?, ?> sinkTransformation = (SinkTransformation<?, ?>) transformation;
if (useV2Sink != null && useV2Sink) {
assertThat(sinkTransformation.getSink())
.as("Should use SinkV2 API based implementation")
.isInstanceOf(IcebergSink.class);
} else {
assertThat(sinkTransformation.getSink())
.as("Should use custom chain of StreamOperators terminated by DiscardingSink")
.isInstanceOf(DiscardingSink.class);
}
}

@TestTemplate
public void testWriteParallelism() {
List<Row> dataSet =
Expand All @@ -165,18 +210,25 @@ public void testWriteParallelism() {
"INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
Transformation<?> committer = dummySink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
writer
.getInputs()
.forEach(
input ->
assertThat(input.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4));
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you plan to leverage the marker interface for the sink builder in #11219
for these tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it is possible as we don't have builders here. Instead, we have already built a chain of operators. This test works on the table connectors level - builders are hidden behind the table connector API as an implementation detail.

if (useV2Sink != null && useV2Sink) {
assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
Transformation<?> writerInput = sink.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
} else {
Transformation<?> committer = sink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism())
.as("Should have the expected 1 parallelism.")
.isEqualTo(1);
Transformation<?> writerInput = writer.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
}
}

@TestTemplate
Expand Down