-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219
Changes from 2 commits
cd80ac1
eec1419
4bd39bc
e0aa16d
5a75006
f7c1654
a6fa562
2cae011
710710e
514a636
158b5b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* | ||
* * Licensed to the Apache Software Foundation (ASF) under one | ||
* * or more contributor license agreements. See the NOTICE file | ||
* * distributed with this work for additional information | ||
* * regarding copyright ownership. The ASF licenses this file | ||
* * to you under the Apache License, Version 2.0 (the | ||
* * "License"); you may not use this file except in compliance | ||
* * with the License. You may obtain a copy of the License at | ||
* * | ||
* * http://www.apache.org/licenses/LICENSE-2.0 | ||
* * | ||
* * Unless required by applicable law or agreed to in writing, | ||
* * software distributed under the License is distributed on an | ||
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* * KIND, either express or implied. See the License for the | ||
* * specific language governing permissions and limitations | ||
* * under the License. | ||
* | ||
*/ | ||
|
||
package org.apache.iceberg.flink.sink; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.flink.configuration.ReadableConfig; | ||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
import org.apache.flink.streaming.api.datastream.DataStreamSink; | ||
import org.apache.flink.table.api.TableSchema; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.types.Row; | ||
import org.apache.iceberg.DistributionMode; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.flink.TableLoader; | ||
|
||
public interface BaseIcebergSinkBuilder { | ||
|
||
BaseIcebergSinkBuilder tableSchema(TableSchema newTableSchema); | ||
|
||
BaseIcebergSinkBuilder tableLoader(TableLoader newTableLoader); | ||
|
||
BaseIcebergSinkBuilder equalityFieldColumns(List<String> columns); | ||
|
||
BaseIcebergSinkBuilder overwrite(boolean newOverwrite); | ||
|
||
BaseIcebergSinkBuilder setAll(Map<String, String> properties); | ||
|
||
BaseIcebergSinkBuilder flinkConf(ReadableConfig config); | ||
|
||
BaseIcebergSinkBuilder table(Table newTable); | ||
|
||
BaseIcebergSinkBuilder writeParallelism(int newWriteParallelism); | ||
|
||
BaseIcebergSinkBuilder distributionMode(DistributionMode mode); | ||
|
||
BaseIcebergSinkBuilder toBranch(String branch); | ||
|
||
DataStreamSink<?> append(); | ||
|
||
static BaseIcebergSinkBuilder forRow(DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRow(input, tableSchema); | ||
} else { | ||
return FlinkSink.forRow(input, tableSchema); | ||
} | ||
} | ||
|
||
static BaseIcebergSinkBuilder forRowData(DataStream<RowData> input, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRowData(input); | ||
} else { | ||
return FlinkSink.forRowData(input); | ||
} | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -52,21 +52,36 @@ public class TestFlinkIcebergSink extends TestFlinkIcebergSinkBase { | |
@Parameter(index = 2) | ||
private boolean partitioned; | ||
|
||
@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}") | ||
@Parameter(index = 3) | ||
private boolean useV2Sink; | ||
|
||
@Parameters(name = "format={0}, parallelism = {1}, partitioned = {2}, useV2Sink = {3}") | ||
public static Object[][] parameters() { | ||
return new Object[][] { | ||
{FileFormat.AVRO, 1, true}, | ||
{FileFormat.AVRO, 1, false}, | ||
{FileFormat.AVRO, 2, true}, | ||
{FileFormat.AVRO, 2, false}, | ||
{FileFormat.ORC, 1, true}, | ||
{FileFormat.ORC, 1, false}, | ||
{FileFormat.ORC, 2, true}, | ||
{FileFormat.ORC, 2, false}, | ||
{FileFormat.PARQUET, 1, true}, | ||
{FileFormat.PARQUET, 1, false}, | ||
{FileFormat.PARQUET, 2, true}, | ||
{FileFormat.PARQUET, 2, false} | ||
{FileFormat.AVRO, 1, true, false}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need the whole testing matrix here.
Some possible cuts:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, reduced |
||
{FileFormat.AVRO, 1, false, false}, | ||
{FileFormat.AVRO, 2, true, false}, | ||
{FileFormat.AVRO, 2, false, false}, | ||
{FileFormat.ORC, 1, true, false}, | ||
{FileFormat.ORC, 1, false, false}, | ||
{FileFormat.ORC, 2, true, false}, | ||
{FileFormat.ORC, 2, false, false}, | ||
{FileFormat.PARQUET, 1, true, false}, | ||
{FileFormat.PARQUET, 1, false, false}, | ||
{FileFormat.PARQUET, 2, true, false}, | ||
{FileFormat.PARQUET, 2, false, false}, | ||
{FileFormat.AVRO, 1, true, true}, | ||
{FileFormat.AVRO, 1, false, true}, | ||
{FileFormat.AVRO, 2, true, true}, | ||
{FileFormat.AVRO, 2, false, true}, | ||
{FileFormat.ORC, 1, true, true}, | ||
{FileFormat.ORC, 1, false, true}, | ||
{FileFormat.ORC, 2, true, true}, | ||
{FileFormat.ORC, 2, false, true}, | ||
{FileFormat.PARQUET, 1, true, true}, | ||
{FileFormat.PARQUET, 1, false, true}, | ||
{FileFormat.PARQUET, 2, true, true}, | ||
{FileFormat.PARQUET, 2, false, true} | ||
}; | ||
} | ||
|
||
|
@@ -100,7 +115,7 @@ public void testWriteRowData() throws Exception { | |
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO) | ||
.map(CONVERTER::toInternal, FlinkCompatibilityUtil.toTypeInfo(SimpleDataUtil.ROW_TYPE)); | ||
|
||
FlinkSink.forRowData(dataStream) | ||
BaseIcebergSinkBuilder.forRowData(dataStream, useV2Sink) | ||
.table(table) | ||
.tableLoader(tableLoader) | ||
.writeParallelism(parallelism) | ||
|
@@ -115,11 +130,11 @@ public void testWriteRowData() throws Exception { | |
|
||
@TestTemplate | ||
public void testWriteRow() throws Exception { | ||
testWriteRow(parallelism, null, DistributionMode.NONE); | ||
testWriteRow(parallelism, null, DistributionMode.NONE, useV2Sink); | ||
} | ||
|
||
@TestTemplate | ||
public void testWriteRowWithTableSchema() throws Exception { | ||
testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE); | ||
testWriteRow(parallelism, SimpleDataUtil.FLINK_SCHEMA, DistributionMode.NONE, useV2Sink); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -59,15 +59,22 @@ public class TestFlinkIcebergSinkBranch extends TestFlinkIcebergSinkBase { | |
@Parameter(index = 1) | ||
private String branch; | ||
|
||
@Parameter(index = 2) | ||
private boolean useV2Sink; | ||
|
||
private TableLoader tableLoader; | ||
|
||
@Parameters(name = "formatVersion = {0}, branch = {1}") | ||
@Parameters(name = "formatVersion = {0}, branch = {1}, useV2Sink = {2}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cut the number of tests, like above |
||
public static Object[][] parameters() { | ||
return new Object[][] { | ||
{"1", "main"}, | ||
{"1", "testBranch"}, | ||
{"2", "main"}, | ||
{"2", "testBranch"} | ||
{"1", "main", false}, | ||
{"1", "testBranch", false}, | ||
{"2", "main", false}, | ||
{"2", "testBranch", false}, | ||
{"1", "main", true}, | ||
{"1", "testBranch", true}, | ||
{"2", "main", true}, | ||
{"2", "testBranch", true} | ||
}; | ||
} | ||
|
||
|
@@ -105,7 +112,7 @@ private void testWriteRow(TableSchema tableSchema, DistributionMode distribution | |
List<Row> rows = createRows(""); | ||
DataStream<Row> dataStream = env.addSource(createBoundedSource(rows), ROW_TYPE_INFO); | ||
|
||
FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) | ||
BaseIcebergSinkBuilder.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA, useV2Sink) | ||
.table(table) | ||
.tableLoader(tableLoader) | ||
.tableSchema(tableSchema) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand where you are coming from, but we are planning to add features (table maintenance) to the new
IcebergSink
which will not be available to theFlinkSink
.We also need to think about the deprecation and the eventual removal of the
FlinkSink
.Maybe something like I propose for the
MaintenanceTaskBuilder
could help: https://github.com/apache/iceberg/pull/11144/files#diff-1b7c33ba8eb300e4a1c92844418384756a07ba32882de39855c74c9a40778f48Or we could just move this class to the tests, and don't add this to the public API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my previous solution, it was also possible, because each Builder implementation overrode the return type of each method to the specific one, but your suggestion looks better - changed.
With regards to the visibility, in the referenced PR, we discussed that there is a plan to allow to use
IcebergSink
for dynamic tables (IcebergTableSink:80
). I thought about adding the property allowing to use this implementation there. Thanks to the common parent for builders, we could check the property value at the beginning of method and do rest of builder's chain on it. This would ensure that we keep these builder synchronized. I though about doing this in the next PR. WDYT?