Skip to content

Commit

Permalink
review fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Arek Burdach committed Oct 7, 2024
1 parent 525b6ee commit ee48c23
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
8 changes: 4 additions & 4 deletions docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,10 @@ At the time when the current default, `FlinkSink` implementation was created, Fl
limitations that were not acceptable for the Iceberg tables purpose. Due to these limitations, `FlinkSink` is based
on a custom chain of `StreamOperator`s terminated by `DiscardingSink`.

In the Flink 1.15 version, [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
was introduced. This interface is used in the new, `IcebergSink` implementation that is also available in the `iceberg-flink` module.
The new implementation will be a base for further work on features such as [table maintenance](maintenance.md).
The SinkV2 based implementation is currently an experimental feature so please use it with caution.
In the 1.15 version of Flink [SinkV2 interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction)
was introduced. This interface is used in the new `IcebergSink` implementation which is available in the `iceberg-flink` module.
The new implementation is a base for further work on features such as [table maintenance](maintenance.md).
The SinkV2 based implementation is currently an experimental feature so use it with caution.

## Writing with SQL

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ public static List<Object[]> parameters() {
for (Object[] catalogParams : CatalogTestBase.parameters()) {
String catalogName = (String) catalogParams[0];
Namespace baseNamespace = (Namespace) catalogParams[1];
boolean doNotUseV2Sink = false;
parameters.add(
new Object[] {catalogName, baseNamespace, format, isStreaming, doNotUseV2Sink});
new Object[] {
catalogName, baseNamespace, format, isStreaming, false /* don't use v2 sink */
});
}
}
}
Expand All @@ -75,8 +76,8 @@ public static List<Object[]> parameters() {
for (Boolean isStreaming : new Boolean[] {true, false}) {
String catalogName = "testhadoop_basenamespace";
Namespace baseNamespace = Namespace.of("l0", "l1");
boolean useV2Sink = true;
parameters.add(new Object[] {catalogName, baseNamespace, format, isStreaming, useV2Sink});
parameters.add(
new Object[] {catalogName, baseNamespace, format, isStreaming, true /* use v2 sink */});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ public class TestFlinkTableSinkExtended extends SqlBase {
protected boolean isStreamingJob;

@Parameter(index = 1)
protected boolean useV2Sink;
protected Boolean useV2Sink;

@Parameters(name = "isStreamingJob={0}, useV2Sink={1}")
protected static List<Object[]> parameters() {
return Arrays.asList(
new Boolean[] {true, false},
new Boolean[] {false, false},
new Boolean[] {true, true},
new Boolean[] {false, true});
new Object[] {true, false},
new Object[] {false, false},
new Object[] {true, true},
new Object[] {false, true},
new Object[] {true, null});
}

protected synchronized TableEnvironment getTableEnv() {
Expand All @@ -126,9 +127,13 @@ protected synchronized TableEnvironment getTableEnv() {
tEnv = TableEnvironment.create(settingsBuilder.build());
}
}
tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);

if (useV2Sink != null) {
tEnv.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_V2_SINK, useV2Sink);
}

return tEnv;
}

Expand Down Expand Up @@ -175,7 +180,7 @@ public void testUsedFlinkSinkInterface() {
planner.translate(Collections.singletonList(operation)).get(0);
assertThat(transformation).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class);
SinkTransformation<?, ?> sinkTransformation = (SinkTransformation<?, ?>) transformation;
if (useV2Sink) {
if (useV2Sink != null && useV2Sink) {
assertThat(sinkTransformation.getSink())
.as("Should use SinkV2 API based implementation")
.isInstanceOf(IcebergSink.class);
Expand Down Expand Up @@ -206,7 +211,7 @@ public void testWriteParallelism() {
TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
if (useV2Sink) {
if (useV2Sink != null && useV2Sink) {
assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
Transformation<?> writerInput = sink.getInputs().get(0);
assertThat(writerInput.getParallelism())
Expand Down

0 comments on commit ee48c23

Please sign in to comment.