Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -67,11 +66,8 @@
import org.apache.iceberg.flink.sink.shuffle.StatisticsOrRecord;
import org.apache.iceberg.flink.sink.shuffle.StatisticsType;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.SerializableSupplier;
import org.slf4j.Logger;
Expand Down Expand Up @@ -134,7 +130,7 @@ public static Builder forRowData(DataStream<RowData> input) {
return new Builder().forRowData(input);
}

public static class Builder {
public static class Builder extends IcebergSinkBuilder<Builder> {
private Function<String, DataStream<RowData>> inputCreator = null;
private TableLoader tableLoader;
private Table table;
Expand Down Expand Up @@ -179,6 +175,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = newTable;
return this;
Expand All @@ -192,6 +189,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -210,21 +208,25 @@ public Builder set(String property, String value) {
* Set the write properties for Flink sink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -237,6 +239,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
if (mode != null) {
writeOptions.put(FlinkWriteOptions.DISTRIBUTION_MODE.key(), mode.modeName());
Expand Down Expand Up @@ -306,6 +309,7 @@ public Builder rangeDistributionSortKeyBaseWeight(double weight) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -321,6 +325,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -332,6 +337,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -376,11 +382,22 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
}

@Override
protected SerializableTable getTable() {
return null;
}

@Override
protected List<String> getEqualityFieldColumns() {
return List.of();
}

private <T> DataStreamSink<T> chainIcebergOperators() {
Preconditions.checkArgument(
inputCreator != null,
Expand Down Expand Up @@ -436,6 +453,7 @@ private <T> DataStreamSink<T> chainIcebergOperators() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<Void> append() {
return chainIcebergOperators();
}
Expand All @@ -444,34 +462,6 @@ private String operatorName(String suffix) {
return uidPrefix != null ? uidPrefix + "-" + suffix : suffix;
}

@VisibleForTesting
List<Integer> checkAndGetEqualityFieldIds() {
List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().identifierFieldIds());
if (equalityFieldColumns != null && !equalityFieldColumns.isEmpty()) {
Set<Integer> equalityFieldSet =
Sets.newHashSetWithExpectedSize(equalityFieldColumns.size());
for (String column : equalityFieldColumns) {
org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);
Preconditions.checkNotNull(
field,
"Missing required equality field column '%s' in table schema %s",
column,
table.schema());
equalityFieldSet.add(field.fieldId());
}

if (!equalityFieldSet.equals(table.schema().identifierFieldIds())) {
LOG.warn(
"The configured equality field column IDs {} are not matched with the schema identifier field IDs"
+ " {}, use job specified equality field columns as the equality fields by default.",
equalityFieldSet,
table.schema().identifierFieldIds());
}
equalityFieldIds = Lists.newArrayList(equalityFieldSet);
}
return equalityFieldIds;
}

@SuppressWarnings("unchecked")
private <T> DataStreamSink<T> appendDummySink(
SingleOutputStreamOperator<Void> committerStream) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public SimpleVersionedSerializer<WriteResult> getWriteResultSerializer() {
return new WriteResultSerializer();
}

public static class Builder {
public static class Builder extends IcebergSinkBuilder<Builder> {
private TableLoader tableLoader;
private String uidSuffix = "";
private Function<String, DataStream<RowData>> inputCreator = null;
Expand Down Expand Up @@ -311,6 +311,7 @@ private <T> Builder forMapperOutputType(
* @param newTable the loaded iceberg table instance.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder table(Table newTable) {
this.table = (SerializableTable) SerializableTable.copyOf(newTable);
return this;
Expand All @@ -325,6 +326,7 @@ public Builder table(Table newTable) {
* @param newTableLoader to load iceberg table inside tasks.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder tableLoader(TableLoader newTableLoader) {
this.tableLoader = newTableLoader;
return this;
Expand All @@ -347,21 +349,25 @@ public Builder set(String property, String value) {
* Set the write properties for IcebergSink. View the supported properties in {@link
* FlinkWriteOptions}
*/
@Override
public Builder setAll(Map<String, String> properties) {
writeOptions.putAll(properties);
return this;
}

@Override
public Builder tableSchema(TableSchema newTableSchema) {
this.tableSchema = newTableSchema;
return this;
}

@Override
public Builder overwrite(boolean newOverwrite) {
writeOptions.put(FlinkWriteOptions.OVERWRITE_MODE.key(), Boolean.toString(newOverwrite));
return this;
}

@Override
public Builder flinkConf(ReadableConfig config) {
this.readableConfig = config;
return this;
Expand All @@ -374,6 +380,7 @@ public Builder flinkConf(ReadableConfig config) {
* @param mode to specify the write distribution mode.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder distributionMode(DistributionMode mode) {
Preconditions.checkArgument(
!DistributionMode.RANGE.equals(mode),
Expand All @@ -390,6 +397,7 @@ public Builder distributionMode(DistributionMode mode) {
* @param newWriteParallelism the number of parallel iceberg stream writer.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder writeParallelism(int newWriteParallelism) {
writeOptions.put(
FlinkWriteOptions.WRITE_PARALLELISM.key(), Integer.toString(newWriteParallelism));
Expand All @@ -405,6 +413,7 @@ public Builder writeParallelism(int newWriteParallelism) {
* @param enabled indicate whether it should transform all INSERT/UPDATE_AFTER events to UPSERT.
* @return {@link IcebergSink.Builder} to connect the iceberg table.
*/
@Override
public Builder upsert(boolean enabled) {
writeOptions.put(FlinkWriteOptions.WRITE_UPSERT_ENABLED.key(), Boolean.toString(enabled));
return this;
Expand All @@ -416,6 +425,7 @@ public Builder upsert(boolean enabled) {
* @param columns defines the iceberg table's key.
* @return {@link Builder} to connect the iceberg table.
*/
@Override
public Builder equalityFieldColumns(List<String> columns) {
this.equalityFieldColumns = columns;
return this;
Expand Down Expand Up @@ -458,11 +468,22 @@ public Builder setSnapshotProperty(String property, String value) {
return this;
}

@Override
public Builder toBranch(String branch) {
writeOptions.put(FlinkWriteOptions.BRANCH.key(), branch);
return this;
}

@Override
protected SerializableTable getTable() {
return table;
}

@Override
protected List<String> getEqualityFieldColumns() {
return equalityFieldColumns;
}

IcebergSink build() {

Preconditions.checkArgument(
Expand Down Expand Up @@ -527,6 +548,7 @@ IcebergSink build() {
*
* @return {@link DataStreamSink} for sink.
*/
@Override
public DataStreamSink<RowData> append() {
IcebergSink sink = build();
String suffix = defaultSuffix(uidSuffix, table.name());
Expand All @@ -545,6 +567,7 @@ public DataStreamSink<RowData> append() {
}
return rowDataDataStreamSink;
}

}

private static String defaultSuffix(String uidSuffix, String defaultSuffix) {
Expand Down
Loading