Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: infer source parallelism for FLIP-27 source in batch execution mode #10832

Merged
merged 1 commit into from
Aug 26, 2024

Conversation

stevenzwu
Copy link
Contributor

No description provided.

@stevenzwu stevenzwu requested a review from pvary July 31, 2024 23:53
@github-actions github-actions bot added the flink label Jul 31, 2024
@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch 2 times, most recently from e6c3a59 to de63e1d Compare August 1, 2024 00:13
"testBasicRead",
TypeInformation.of(RowData.class))
sourceBuilder
.buildStream(env)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched this class to test the new buildStream API

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have tests remaining to check the fromSource API for bounded sources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will only switch this class

Field privateField =
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
privateField.setAccessible(true);
InternalMiniClusterExtension internalExtension =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use reflect to retrieve InternalMiniClusterExtension to get MiniCluster in order to get execution graph to verify source parallelism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you call

env.getTransformations().get(0).getParallelism()

before env.executeAsync() then you could get the parallelism. Would this help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried it with debugger. the value is the default parallelism of 4 while the expected inferred source parallelism is 1 after the executeAsync()

    DataStream<Row> dataStream =
        IcebergSource.forRowData()
            .tableLoader(CATALOG_EXTENSION.tableLoader())
            .table(table)
            .flinkConfig(config)
            // force one file per split
            .splitSize(1L)
            .buildStream(env)
            .map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));

    int sourceParallelism = env.getTransformations().get(0).getParallelism();

*
* @return data stream from the Iceberg source
*/
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new public API. I also thought about the method name as createStream. but decided this name for now. open to other suggestion.

also think it is better to require StreamExecutionEnvironment here instead of having it a builder method so that it is clear it is not required for the build() method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence about env is also true for outputTypeInfo, and watermarkStrategy. Maybe adding them as a parameter would be reasonable too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently, outputTypeInfo can be inferred from the ReaderFunction if using provided RowData or Avro reader.

      if (outputTypeInfo == null) {
        this.outputTypeInfo = inferOutputTypeInfo(table, context, readerFunction);
      }

watermarkStrategy is defaulted WatermarkStrategy.noWatermarks. so it is not mandatory either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

outputTypeInfo is not needed anymore with the Converter interface. Removed watermark strategy for now. We can always add it back in the future if it is needed.

@stevenzwu
Copy link
Contributor Author

stevenzwu commented Aug 1, 2024

the new TestIcebergSpeculativeExecutionSupport hangs after this change. Stuck in the wait. need to investigate.

    tEnv.fromDataStream(slowStream)
        .executeInsert(String.format("%s.%s", DATABASE_NAME, OUTPUT_TABLE_NAME))
        .await();

@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch from 72ea6fb to 3ead397 Compare August 1, 2024 21:37
if (getRuntimeContext().getTaskInfo().getAttemptNumber() <= 0) {
// Simulate slow subtask 0 with attempt 0
TaskInfo taskInfo = getRuntimeContext().getTaskInfo();
if (taskInfo.getIndexOfThisSubtask() == 0 && taskInfo.getAttemptNumber() <= 0) {
Copy link
Contributor Author

@stevenzwu stevenzwu Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after this change of inferring source parallelism, this test would hang (even with the inferring parallelism flag turned off). It seems that speculative execution won't kick in somehow. This line of change seems to fix the problem however (tried local run 50 times without a failure). Not sure exactly why. Regardless of the reason, this seems like a good change anyway.

@pvary @venkata91 @becketqin let me know if you have any idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. Thanks for tagging me. Do you mean adding taskInfo.getIndexOfThisSubtask() == 0 solves the hang issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@venkata91 that is correct.

new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false)
// disable inferring source parallelism
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferred parallelism might mess up the watermark and record ordering comparison. disable it to avoid the flakiness

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this so?
Do you have any idea? Would it be an issue in prod?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good question. let me dig more.

Copy link
Contributor Author

@stevenzwu stevenzwu Aug 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestIcebergSourceSql assume the parallelism is 1 for testWatermarkOptionsAscending and testWatermarkOptionsDescending. Table has 2 files. This test just check split assignment is ordered with single reader.

    tableEnvironment.getConfig().set("table.exec.resource.default-parallelism", "1");

I think TestIcebergSourceWithWatermarkExtractor similarly assumes parallelism of 4. inferring parallelism would change source parallelism to the number of splits and potentially inferring with the assertion on ordering of the read records..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe explicitly setting the parallelism in those tests would be better.
WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, we don't need the disabling for this particular test as it doesn't go through the buildStream(env) path where infer parallelism happens. will revert the change

@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch 2 times, most recently from 1035dad to aaf3765 Compare August 6, 2024 16:28
* Optional. Default is no watermark strategy. Only relevant if using the {@link
* Builder#buildStream(StreamExecutionEnvironment)}.
*/
public Builder<T> watermarkStrategy(WatermarkStrategy<T> newStrategy) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to provide a useful WatermarkStrategy?
I think it is possible to provide a useful TimestampAssigner, but I don't see how can someone provide a useful WatermarkGenerator. The only possible way to generate watermarks are with the watermarkColumn is provided, but even then the WatermarkGenerator should not be used.

Do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only for the buildStream method. this just moved the watermark strategy from env.fromSource to the builder.

for regular build method, users would also need to set the watermark strategy, which most likely would be none.

    DataStream<RowData> stream =
        env.fromSource(
            sourceBuilder().build(),
            WatermarkStrategy.noWatermarks(),
            "IcebergSource",
            TypeInformation.of(RowData.class));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will remove WatermarkStrategy from here. we can always add another buildStream with a new arg in the future if there is really a need.

@@ -503,28 +569,10 @@ public IcebergSource<T> build() {
new OrderedSplitAssignerFactory(SplitComparators.watermark(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
this.context = contextBuilder.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ugly as hell.
Side-effect of the build method....
Maybe creating an init() with all the side-effects?
And then build() which uses the initialized attributes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree side-effect is undesirable. let me think of a way to refactor the code. maybe extract the ScanContext building into a separate method from build()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the Converter interface will obsolete this problem.

@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch 2 times, most recently from be4ce67 to 09cb0eb Compare August 14, 2024 05:05
@stevenzwu stevenzwu marked this pull request as draft August 17, 2024 22:24
@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch 2 times, most recently from 32310d4 to 3aabbad Compare August 18, 2024 21:51
@stevenzwu stevenzwu force-pushed the flip27-source-infer-parallelism branch from 3aabbad to f86449a Compare August 21, 2024 22:11
@stevenzwu stevenzwu marked this pull request as ready for review August 21, 2024 22:16
@stevenzwu stevenzwu merged commit 2ed61a1 into apache:main Aug 26, 2024
20 checks passed
@stevenzwu
Copy link
Contributor Author

thanks @pvary for the review

@stevenzwu stevenzwu deleted the flip27-source-infer-parallelism branch August 26, 2024 15:01
stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Aug 26, 2024
jenbaldwin pushed a commit to Teradata/iceberg that referenced this pull request Sep 17, 2024
* main: (208 commits)
  Docs: Fix Flink 1.20 support versions (apache#11065)
  Flink: Fix compile warning (apache#11072)
  Docs: Initial committer guidelines and requirements for merging (apache#10780)
  Core: Refactor ZOrderByteUtils (apache#10624)
  API: implement types timestamp_ns and timestamptz_ns (apache#9008)
  Build: Bump com.google.errorprone:error_prone_annotations (apache#11055)
  Build: Bump mkdocs-material from 9.5.33 to 9.5.34 (apache#11062)
  Flink: Backport PR apache#10526 to v1.18 and v1.20 (apache#11018)
  Kafka Connect: Disable publish tasks in runtime project (apache#11032)
  Flink: add unit tests for range distribution on bucket partition column (apache#11033)
  Spark 3.5: Use FileGenerationUtil in PlanningBenchmark (apache#11027)
  Core: Add benchmark for appending files (apache#11029)
  Build: Ignore benchmark output folders across all modules (apache#11030)
  Spec: Add RemovePartitionSpecsUpdate REST update type (apache#10846)
  Docs: bump latest version to 1.6.1 (apache#11036)
  OpenAPI, Build: Apply spotless to testFixtures source code (apache#11024)
  Core: Generate realistic bounds in benchmarks (apache#11022)
  Add REST Compatibility Kit (apache#10908)
  Flink: backport PR apache#10832 of inferring parallelism in FLIP-27 source (apache#11009)
  Docs: Add Druid docs url to sidebar (apache#10997)
  ...
flinkConf,
scanContext.limit(),
() -> {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure whether it is intentional to modify the split list instance field in planSplitsForBatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intentional. See the comment for the method. We cache it as it will be reused later.

zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants