Skip to content

Commit

Permalink
RAS support merge agg
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Nov 20, 2024
1 parent 5d7b963 commit 18c5d54
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ object VeloxRuleApi {
injector.injectPreTransform(_ => RemoveTransitions)
injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
injector.injectPreTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("do not replace hash aggregate if child does not have sort order")
.exclude("do not replace hash aggregate if there is no group-by column")
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
.excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
enableSuite[GlutenSQLWindowFunctionSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,6 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("do not replace hash aggregate if child does not have sort order")
.exclude("do not replace hash aggregate if there is no group-by column")
.excludeGlutenTest("replace partial hash aggregate with sort aggregate")
.excludeGlutenTest("replace partial and final hash aggregate together with sort aggregate")
enableSuite[GlutenReuseExchangeAndSubquerySuite]
enableSuite[GlutenSQLAggregateFunctionSuite]
enableSuite[GlutenSQLWindowFunctionSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,27 +81,24 @@ class GlutenReplaceHashWithSortAggSuite
}

testGluten("replace partial and final hash aggregate together with sort aggregate") {
// TODO: Should can merge aggregates even if RAS support.
withSQLConf("spark.gluten.ras.enabled" -> "false") {
withTempView("t1", "t2") {
spark.range(100).selectExpr("id as key").createOrReplaceTempView("t1")
spark.range(50).selectExpr("id as key").createOrReplaceTempView("t2")
Seq(("COUNT", 1, 0, 1, 0), ("COLLECT_LIST", 1, 0, 1, 0)).foreach {
aggExprInfo =>
val query =
s"""
|SELECT key, ${aggExprInfo._1}(key)
|FROM
|(
| SELECT /*+ SHUFFLE_MERGE(t1) */ t1.key AS key
| FROM t1
| JOIN t2
| ON t1.key = t2.key
|)
|GROUP BY key
withTempView("t1", "t2") {
spark.range(100).selectExpr("id as key").createOrReplaceTempView("t1")
spark.range(50).selectExpr("id as key").createOrReplaceTempView("t2")
Seq(("COUNT", 1, 0, 1, 0), ("COLLECT_LIST", 1, 0, 1, 0)).foreach {
aggExprInfo =>
val query =
s"""
|SELECT key, ${aggExprInfo._1}(key)
|FROM
|(
| SELECT /*+ SHUFFLE_MERGE(t1) */ t1.key AS key
| FROM t1
| JOIN t2
| ON t1.key = t2.key
|)
|GROUP BY key
""".stripMargin
checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5)
}
checkAggs(query, aggExprInfo._2, aggExprInfo._3, aggExprInfo._4, aggExprInfo._5)
}
}
}
Expand Down

0 comments on commit 18c5d54

Please sign in to comment.