Skip to content

Commit

Permalink
Flink: infer source parallelism for FLIP-27 source in batch execution…
Browse files Browse the repository at this point in the history
… mode
  • Loading branch information
stevenzwu committed Aug 1, 2024
1 parent 506fee4 commit de63e1d
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -37,13 +39,18 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.flink.FlinkConfigOptions;
import org.apache.iceberg.flink.FlinkReadConf;
Expand All @@ -60,6 +67,7 @@
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorState;
import org.apache.iceberg.flink.source.enumerator.IcebergEnumeratorStateSerializer;
import org.apache.iceberg.flink.source.enumerator.StaticIcebergEnumerator;
import org.apache.iceberg.flink.source.reader.AvroGenericRecordReaderFunction;
import org.apache.iceberg.flink.source.reader.ColumnStatsWatermarkExtractor;
import org.apache.iceberg.flink.source.reader.IcebergSourceReader;
import org.apache.iceberg.flink.source.reader.IcebergSourceReaderMetrics;
Expand Down Expand Up @@ -211,6 +219,27 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
}
}

boolean shouldInferParallelism() {
return !scanContext.isStreaming();
}

int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
int parallelism =
SourceUtil.inferParallelism(
flinkConf,
scanContext.limit(),
() -> {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
return splits.size();
});

if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}

return parallelism;
}

public static <T> Builder<T> builder() {
return new Builder<>();
}
Expand All @@ -225,10 +254,13 @@ public static class Builder<T> {
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private ReaderFunction<T> readerFunction;
private TypeInformation<T> outputTypeInfo;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
private TableSchema projectedFlinkSchema;
private Boolean exposeLocality;
private WatermarkStrategy<T> watermarkStrategy = WatermarkStrategy.noWatermarks();
private ScanContext context;

private final Map<String, String> readOptions = Maps.newHashMap();

Expand Down Expand Up @@ -260,6 +292,15 @@ public Builder<T> readerFunction(ReaderFunction<T> newReaderFunction) {
return this;
}

/**
* Optional. Only provide if using custom reader function different from provided {@link
* RowDataReaderFunction} and {@link AvroGenericRecordReaderFunction}
*/
public Builder<T> outputTypeInfo(TypeInformation<T> newOutputTypeInfo) {
this.outputTypeInfo = newOutputTypeInfo;
return this;
}

public Builder<T> flinkConfig(ReadableConfig config) {
this.flinkConfig = config;
return this;
Expand Down Expand Up @@ -464,6 +505,15 @@ public Builder<T> watermarkColumnTimeUnit(TimeUnit timeUnit) {
return this;
}

/**
* Optional. Default is no watermark strategy. Only relevant if using the {@link
* Builder#buildStream(StreamExecutionEnvironment)}.
*/
public Builder<T> watermarkStrategy(WatermarkStrategy<T> newStrategy) {
this.watermarkStrategy = newStrategy;
return this;
}

/** @deprecated Use {@link #setAll} instead. */
@Deprecated
public Builder<T> properties(Map<String, String> properties) {
Expand All @@ -481,6 +531,8 @@ public IcebergSource<T> build() {
}
}

contextBuilder.exposeLocality(
SourceUtil.isLocalityEnabled(table, flinkConfig, exposeLocality));
contextBuilder.resolveConfig(table, readOptions, flinkConfig);
Schema icebergSchema = table.schema();
if (projectedFlinkSchema != null) {
Expand All @@ -503,28 +555,10 @@ public IcebergSource<T> build() {
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
this.context = contextBuilder.build();
context.validate();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
new MetaDataReaderFunction(
flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
} else {
RowDataReaderFunction rowDataReaderFunction =
new RowDataReaderFunction(
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters(),
context.limit());
this.readerFunction = (ReaderFunction<T>) rowDataReaderFunction;
}
this.readerFunction = defaultReaderFunction(table, context, flinkConfig);
}

if (splitAssignerFactory == null) {
Expand All @@ -545,5 +579,66 @@ public IcebergSource<T> build() {
table,
emitter);
}

/**
* Build the {@link IcebergSource} and create a {@link DataStream} from the source.
*
* @return data stream from the Iceberg source
*/
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
IcebergSource<T> source = build();
// inferOutputTypeInfo depends on the ScanContext constructed by build() call above
if (outputTypeInfo == null) {
this.outputTypeInfo = inferOutputTypeInfo(table, context, readerFunction);
}

DataStreamSource<T> stream =
env.fromSource(source, watermarkStrategy, source.name(), outputTypeInfo);

if (source.shouldInferParallelism()) {
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
}

return stream;
}

@SuppressWarnings("unchecked")
private static <T> ReaderFunction<T> defaultReaderFunction(
Table table, ScanContext context, ReadableConfig flinkConfig) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
new MetaDataReaderFunction(
flinkConfig, table.schema(), context.project(), table.io(), table.encryption());
return (ReaderFunction<T>) rowDataReaderFunction;
} else {
RowDataReaderFunction rowDataReaderFunction =
new RowDataReaderFunction(
flinkConfig,
table.schema(),
context.project(),
context.nameMapping(),
context.caseSensitive(),
table.io(),
table.encryption(),
context.filters(),
context.limit());
return (ReaderFunction<T>) rowDataReaderFunction;
}
}

@SuppressWarnings("unchecked")
private static <T> TypeInformation<T> inferOutputTypeInfo(
Table table, ScanContext context, ReaderFunction<T> readerFunction) {
if (readerFunction instanceof RowDataReaderFunction) {
return (TypeInformation<T>) TypeInformation.of(RowData.class);
} else if (readerFunction instanceof AvroGenericRecordReaderFunction) {
Schema readSchema = context.project() != null ? context.project() : table.schema();
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(readSchema, table.name());
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(avroSchema);
} else {
throw new IllegalStateException(
"Output type info must be provided for custom reader function");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand Down Expand Up @@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
.build();
}

private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
DataStreamSource stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
source.name(),
TypeInformation.of(RowData.class));
return stream;
return IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.buildStream(env);
}

private TableSchema getProjectedSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -130,11 +128,8 @@ protected List<Row> run(
sourceBuilder.properties(options);

DataStream<Row> stream =
env.fromSource(
sourceBuilder.build(),
WatermarkStrategy.noWatermarks(),
"testBasicRead",
TypeInformation.of(RowData.class))
sourceBuilder
.buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
import java.util.List;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -178,11 +176,8 @@ private List<Row> run(
AvroSchemaUtil.convert(readSchema, TestFixtures.TABLE_IDENTIFIER.name());

DataStream<Row> stream =
env.fromSource(
sourceBuilder.build(),
WatermarkStrategy.noWatermarks(),
"testBasicRead",
new GenericRecordAvroTypeInfo(avroSchema))
sourceBuilder
.buildStream(env)
// There are two reasons for converting GenericRecord back to Row.
// 1. Avro GenericRecord/Schema is not serializable.
// 2. leverage the TestHelpers.assertRecords for validation.
Expand Down
Loading

0 comments on commit de63e1d

Please sign in to comment.