Skip to content

Commit

Permalink
Velox backend support merge two aggregate to one complete mode aggregate
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Oct 31, 2024
1 parent b008a05 commit 3005519
Show file tree
Hide file tree
Showing 31 changed files with 2,345 additions and 2,537 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ private object VeloxRuleApi {
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => BloomFilterMightContainJointRewriteRule.apply(c.session))
injector.injectTransform(c => ArrowScanReplaceRule.apply(c.session))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ abstract class HashAggregateExecTransformer(
VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction)
)
aggregateNodeList.add(aggFunctionNode)
case Final =>
case Final | Complete =>
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
VeloxAggregateFunctionsBuilder.create(args, aggregateFunction, aggregateMode),
childrenNodeList,
Expand Down Expand Up @@ -242,7 +242,7 @@ abstract class HashAggregateExecTransformer(
aggregateFunction.inputAggBufferAttributes.head.nullable)
)
aggregateNodeList.add(partialNode)
case Final =>
case Final | Complete =>
val aggFunctionNode = ExpressionBuilder.makeAggregateFunction(
VeloxAggregateFunctionsBuilder.create(args, aggregateFunction, aggregateMode),
childrenNodeList,
Expand Down Expand Up @@ -275,7 +275,7 @@ abstract class HashAggregateExecTransformer(
expression.mode match {
case Partial | PartialMerge =>
typeNodeList.add(VeloxIntermediateData.getIntermediateTypeNode(aggregateFunction))
case Final =>
case Final | Complete =>
typeNodeList.add(
ConverterUtils
.getTypeNode(aggregateFunction.dataType, aggregateFunction.nullable))
Expand Down Expand Up @@ -356,7 +356,7 @@ abstract class HashAggregateExecTransformer(
// The process of handling the inconsistency in column types and order between
// Spark and Velox is exactly the opposite of applyExtractStruct.
aggregateExpression.mode match {
case PartialMerge | Final =>
case PartialMerge | Final | Complete =>
val newInputAttributes = new ArrayBuffer[Attribute]()
val childNodes = new JArrayList[ExpressionNode]()
val (sparkOrders, sparkTypes) =
Expand Down Expand Up @@ -467,7 +467,7 @@ abstract class HashAggregateExecTransformer(
// by previous projection.
childrenNodes.add(ExpressionBuilder.makeSelection(colIdx))
colIdx += 1
case Partial =>
case Partial | Complete =>
aggFunc.children.foreach {
_ =>
childrenNodes.add(ExpressionBuilder.makeSelection(colIdx))
Expand Down Expand Up @@ -600,7 +600,7 @@ abstract class HashAggregateExecTransformer(
}
val aggregateFunc = aggExpr.aggregateFunction
val childrenNodes = aggExpr.mode match {
case Partial =>
case Partial | Complete =>
aggregateFunc.children.toList.map(
expr => {
ExpressionConverter
Expand Down Expand Up @@ -784,7 +784,7 @@ case class HashAggregateExecPullOutHelper(
expr.mode match {
case Partial | PartialMerge =>
expr.aggregateFunction.aggBufferAttributes
case Final =>
case Final | Complete =>
Seq(aggregateAttributes(index))
case other =>
throw new GlutenNotSupportException(s"Unsupported aggregate mode: $other.")
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3005519

Please sign in to comment.