Skip to content

Commit

Permalink
Flink: infer source parallelism for FLIP-27 source in batch execution…
Browse files Browse the repository at this point in the history
… mode (#10832)
  • Loading branch information
stevenzwu authored Aug 26, 2024
1 parent 99e4ab7 commit 2ed61a1
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
Expand All @@ -37,6 +39,9 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -74,6 +79,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
import org.apache.iceberg.flink.source.split.SplitComparators;
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.ThreadPools;
Expand All @@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
private final SerializableRecordEmitter<T> emitter;
private final String tableName;

// cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
// from two different threads: (1) source/stream construction by main thread (2) enumerator
// creation. Hence need volatile here.
private volatile List<IcebergSourceSplit> batchSplits;

IcebergSource(
TableLoader tableLoader,
ScanContext scanContext,
Expand Down Expand Up @@ -132,16 +143,26 @@ private String planningThreadName() {
return tableName + "-" + UUID.randomUUID();
}

/**
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
*/
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
if (batchSplits != null) {
return batchSplits;
}

ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
this.batchSplits =
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
"Discovered {} splits from table {} during job initialization",
batchSplits.size(),
tableName);
return batchSplits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
Expand Down Expand Up @@ -207,12 +228,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
// Only do scan planning if nothing is restored from checkpoint state
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
assigner.onDiscoveredSplits(splits);
// clear the cached splits after enumerator creation as they won't be needed anymore
this.batchSplits = null;
}

return new StaticIcebergEnumerator(enumContext, assigner);
}
}

private boolean shouldInferParallelism() {
return !scanContext.isStreaming();
}

private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
int parallelism =
SourceUtil.inferParallelism(
flinkConf,
scanContext.limit(),
() -> {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
return splits.size();
});

if (env.getMaxParallelism() > 0) {
parallelism = Math.min(parallelism, env.getMaxParallelism());
}

return parallelism;
}

/**
* Create a source builder.
*
Expand Down Expand Up @@ -571,6 +615,41 @@ public IcebergSource<T> build() {
emitter);
}

/**
* Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
* strategy is set to {@link WatermarkStrategy#noWatermarks()}.
*
* @return data stream from the Iceberg source
*/
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
// buildStream should only be called with RowData or Converter paths.
Preconditions.checkState(
readerFunction == null,
"Cannot set reader function when building a data stream from the source");
IcebergSource<T> source = build();
TypeInformation<T> outputTypeInfo =
outputTypeInfo(converter, table.schema(), source.scanContext.project());
DataStreamSource<T> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
if (source.shouldInferParallelism()) {
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
}

return stream;
}

private static <T> TypeInformation<T> outputTypeInfo(
RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
if (converter != null) {
return converter.getProducedType();
} else {
// output type is RowData
Schema readSchema = projected != null ? projected : tableSchema;
return (TypeInformation<T>)
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
}
}

private ReaderFunction<T> readerFunction(ScanContext context) {
if (table instanceof BaseMetadataTable) {
MetaDataReaderFunction rowDataReaderFunction =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@
import java.util.Map;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
Expand Down Expand Up @@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
.build();
}

private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
SplitAssignerType assignerType =
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
IcebergSource<RowData> source =
IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.build();
DataStreamSource stream =
env.fromSource(
source,
WatermarkStrategy.noWatermarks(),
source.name(),
TypeInformation.of(RowData.class));
return stream;
return IcebergSource.forRowData()
.tableLoader(loader)
.assignerFactory(assignerType.factory())
.properties(properties)
.project(getProjectedSchema())
.limit(limit)
.filters(filters)
.flinkConfig(readableConfig)
.buildStream(env);
}

private TableSchema getProjectedSchema() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -130,11 +128,8 @@ protected List<Row> run(
sourceBuilder.properties(options);

DataStream<Row> stream =
env.fromSource(
sourceBuilder.build(),
WatermarkStrategy.noWatermarks(),
"testBasicRead",
TypeInformation.of(RowData.class))
sourceBuilder
.buildStream(env)
.map(
new RowDataToRowMapper(
FlinkSchemaUtil.convert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
@BeforeEach
public void before() throws IOException {
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
SqlHelpers.sql(
getTableEnv(),
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
Expand Down
Loading

0 comments on commit 2ed61a1

Please sign in to comment.