Skip to content

Commit

Permalink
Spark 3.4: Take shuffle partitions into account for parallelism (#8327)
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi authored Aug 15, 2023
1 parent 86737e6 commit bfa0529
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -275,4 +275,10 @@ public boolean adaptiveSplitSizeEnabled() {
.defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT)
.parse();
}

public int parallelism() {
int defaultParallelism = spark.sparkContext().defaultParallelism();
int numShufflePartitions = spark.sessionState().conf().numShufflePartitions();
return Math.max(defaultParallelism, numShufflePartitions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ public CustomMetric[] supportedCustomMetrics() {
protected long adjustSplitSize(List<? extends ScanTask> tasks, long splitSize) {
if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) {
long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum();
int parallelism = sparkContext.defaultParallelism();
int parallelism = readConf.parallelism();
return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize);
} else {
return splitSize;
Expand Down

0 comments on commit bfa0529

Please sign in to comment.