From 18c5d54fbcd1784270829e690b38c4fe1487e686 Mon Sep 17 00:00:00 2001 From: yikaifei Date: Wed, 20 Nov 2024 10:15:58 +0800 Subject: [PATCH] RAS support merge agg --- .../backendsapi/velox/VeloxRuleApi.scala | 1 + .../clickhouse/ClickHouseTestSettings.scala | 1 - .../clickhouse/ClickHouseTestSettings.scala | 1 - .../GlutenReplaceHashWithSortAggSuite.scala | 37 +++++++++---------- 4 files changed, 18 insertions(+), 22 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 67f9d21a5e7f..c5f46dae67dd 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -125,6 +125,7 @@ object VeloxRuleApi { injector.injectPreTransform(_ => RemoveTransitions) injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload) injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session)) + injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate(c.session)) injector.injectPreTransform(_ => RewriteSubqueryBroadcast()) injector.injectPreTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session)) injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session)) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index e91f1495fbe9..cddf59462b50 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -843,7 +843,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("do not replace hash aggregate if child does not have sort order") .exclude("do not replace hash aggregate if there is no group-by column") .excludeGlutenTest("replace partial hash aggregate with sort aggregate") - .excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate") enableSuite[GlutenReuseExchangeAndSubquerySuite] enableSuite[GlutenSQLAggregateFunctionSuite] enableSuite[GlutenSQLWindowFunctionSuite] diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index f0637839a762..f30813e44f04 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -843,7 +843,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("do not replace hash aggregate if child does not have sort order") .exclude("do not replace hash aggregate if there is no group-by column") .excludeGlutenTest("replace partial hash aggregate with sort aggregate") - .excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate") enableSuite[GlutenReuseExchangeAndSubquerySuite] enableSuite[GlutenSQLAggregateFunctionSuite] enableSuite[GlutenSQLWindowFunctionSuite] diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala index 29ccd9b71dcd..33bf1a1ec97e 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenReplaceHashWithSortAggSuite.scala @@ -81,27 +81,24 @@ class GlutenReplaceHashWithSortAggSuite } testGluten("replace partial and final hash aggregate together with sort aggregate") { - // TODO: Should can merge aggregates even if RAS support. - withSQLConf("spark.gluten.ras.enabled" -> "false") { - withTempView("t1", "t2") { - spark.range(100).selectExpr("id as key").createOrReplaceTempView("t1") - spark.range(50).selectExpr("id as key").createOrReplaceTempView("t2") - Seq(("COUNT", 1, 0, 1, 0), ("COLLECT_LIST", 1, 0, 1, 0)).foreach { - aggExprInfo => - val query = - s""" - |SELECT key, ${aggExprInfo._1}(key) - |FROM - |( - | SELECT /*+ SHUFFLE_MERGE(t1) */ t1.key AS key - | FROM t1 - | JOIN t2 - | ON t1.key = t2.key - |) - |GROUP BY key + withTempView("t1", "t2") { + spark.range(100).selectExpr("id as key").createOrReplaceTempView("t1") + spark.range(50).selectExpr("id as key").createOrReplaceTempView("t2") + Seq(("COUNT", 1, 0, 1, 0), ("COLLECT_LIST", 1, 0, 1, 0)).foreach { + aggExprInfo => + val query = + s""" + |SELECT key, ${aggExprInfo._1}(key) + |FROM + |( + | SELECT /*+ SHUFFLE_MERGE(t1) */ t1.key AS key + | FROM t1 + | JOIN t2 + | ON t1.key = t2.key + |) + |GROUP BY key """.stripMargin - checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5) - } + checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5) } } }