Skip to content

Commit

Permalink
chore: Enable Comet shuffle with AQE coalesce partitions (apache#834)
Browse files Browse the repository at this point in the history
* chore: Remove COMET_SHUFFLE_ENFORCE_MODE_ENABLED

* Update plan stability

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Fix

* Remove test

* Update

* Use same allocator

* test

* Add synchronized

* test

* Revert "test"

This reverts commit 5574bf5.

* Revert "Add synchronized"

This reverts commit aac200a.

* Fix

* fix

* Update diffs

* Update diffs

* Add CometColumnarBatch

* Change to ubuntu-20.04.

* Change to macos-latest

* Change to ubuntu-24.04

* update 3.4.3..diff

* Update to ubuntu-24.04 for Spark 4.0.0 pipeline

* Revert some changes

* Disable Comet shuffle for Spark SQL core-1 test on Spark 3.5 and 4.0.0

* Disable Comet shuffle for Spark SQL core-1 on Spark 3.4.3 too.
  • Loading branch information
viirya authored Aug 17, 2024
1 parent c9af3f4 commit 3cff826
Show file tree
Hide file tree
Showing 819 changed files with 86,478 additions and 88,173 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/spark_sql_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [11]
spark-version: [{short: '3.4', full: '3.4.3'}, {short: '3.5', full: '3.5.1'}]
module:
Expand Down Expand Up @@ -76,7 +76,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
ENABLE_COMET=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
ENABLE_COMET=true ENABLE_COMET_SHUFFLE=${{ matrix.module.name == 'sql/core-1' && 'false' || 'true' }} build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

4 changes: 2 additions & 2 deletions .github/workflows/spark_sql_test_ansi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
spark-sql-catalyst:
strategy:
matrix:
os: [ubuntu-latest]
os: [ubuntu-24.04]
java-version: [17]
spark-version: [{short: '4.0', full: '4.0.0-preview1'}]
module:
Expand Down Expand Up @@ -74,7 +74,7 @@ jobs:
run: |
cd apache-spark
rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
RUST_BACKTRACE=1 ENABLE_COMET=true ENABLE_COMET_ANSI_MODE=true ENABLE_COMET_SHUFFLE=${{ matrix.module.name == 'sql/core-1' && 'false' || 'true' }} build/sbt ${{ matrix.module.args1 }} "${{ matrix.module.args2 }}"
env:
LC_ALL: "C.UTF-8"

11 changes: 0 additions & 11 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -222,17 +222,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("jvm")

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
.doc(
"Comet shuffle doesn't support Spark AQE coalesce partitions. If AQE coalesce " +
"partitions is enabled, Comet shuffle won't be triggered even enabled. This config " +
"is used to enforce Comet to trigger shuffle even if AQE coalesce partitions is " +
"enabled. This is for testing purpose only.")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
Expand Down
426 changes: 338 additions & 88 deletions dev/diffs/3.4.3.diff

Large diffs are not rendered by default.

413 changes: 339 additions & 74 deletions dev/diffs/3.5.1.diff

Large diffs are not rendered by default.

774 changes: 691 additions & 83 deletions dev/diffs/4.0.0-preview1.diff

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion docs/source/contributor-guide/benchmarking.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ $SPARK_HOME/bin/spark-submit \
--conf spark.comet.cast.allowIncompatible=true \
--conf spark.comet.exec.shuffle.enabled=true \
--conf spark.comet.exec.shuffle.mode=auto \
--conf spark.comet.shuffle.enforceMode.enabled=true \
--conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \
tpcbench.py \
--benchmark tpch \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,21 +1071,13 @@ object CometSparkSessionExtensions extends Logging {
}

private[comet] def isCometShuffleEnabled(conf: SQLConf): Boolean =
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf) &&
// TODO: AQE coalesce partitions feature causes Comet shuffle memory leak.
// We should disable Comet shuffle when AQE coalesce partitions is enabled.
(!conf.coalesceShufflePartitionsEnabled || COMET_SHUFFLE_ENFORCE_MODE_ENABLED.get())
COMET_EXEC_SHUFFLE_ENABLED.get(conf) && isCometShuffleManagerEnabled(conf)

private[comet] def getCometShuffleNotEnabledReason(conf: SQLConf): Option[String] = {
if (!COMET_EXEC_SHUFFLE_ENABLED.get(conf)) {
Some(s"${COMET_EXEC_SHUFFLE_ENABLED.key} is not enabled")
} else if (!isCometShuffleManagerEnabled(conf)) {
Some(s"spark.shuffle.manager is not set to ${CometShuffleManager.getClass.getName}")
} else if (conf.coalesceShufflePartitionsEnabled && !COMET_SHUFFLE_ENFORCE_MODE_ENABLED
.get()) {
Some(
s"${SQLConf.COALESCE_PARTITIONS_ENABLED.key} is enabled and " +
s"${COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key} is not enabled")
} else {
None
}
Expand Down
Loading

0 comments on commit 3cff826

Please sign in to comment.