From bfa052924ecb9ca14b571b55ca98489bae4f524b Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 15 Aug 2023 10:12:47 -0700 Subject: [PATCH] Spark 3.4: Take shuffle partitions into account for parallelism (#8327) --- .../main/java/org/apache/iceberg/spark/SparkReadConf.java | 6 ++++++ .../java/org/apache/iceberg/spark/source/SparkScan.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 85e368d8cf69..75e060e55692 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -275,4 +275,10 @@ public boolean adaptiveSplitSizeEnabled() { .defaultValue(TableProperties.ADAPTIVE_SPLIT_SIZE_ENABLED_DEFAULT) .parse(); } + + public int parallelism() { + int defaultParallelism = spark.sparkContext().defaultParallelism(); + int numShufflePartitions = spark.sessionState().conf().numShufflePartitions(); + return Math.max(defaultParallelism, numShufflePartitions); + } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 535d43853f3b..6c3b2db14367 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -227,7 +227,7 @@ public CustomMetric[] supportedCustomMetrics() { protected long adjustSplitSize(List tasks, long splitSize) { if (readConf.splitSizeOption() == null && readConf.adaptiveSplitSizeEnabled()) { long scanSize = tasks.stream().mapToLong(ScanTask::sizeBytes).sum(); - int parallelism = sparkContext.defaultParallelism(); + int parallelism = readConf.parallelism(); return TableScanUtil.adjustSplitSize(scanSize, parallelism, splitSize); } else { return splitSize;