Skip to content

Commit

Permalink
[GLUTEN-7526][VL] Scala code style for VeloxCollect (#7527)
Browse files Browse the repository at this point in the history
Closes #7526
  • Loading branch information
beliefer authored Oct 15, 2024
1 parent 27f30ad commit 9120101
Showing 1 changed file with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,55 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.trees.UnaryLike
import org.apache.spark.sql.types.{ArrayType, DataType}

abstract class VeloxCollect extends DeclarativeAggregate with UnaryLike[Expression] {
abstract class VeloxCollect(child: Expression)
extends DeclarativeAggregate
with UnaryLike[Expression] {

protected lazy val buffer: AttributeReference = AttributeReference("buffer", dataType)()

override def dataType: DataType = ArrayType(child.dataType, false)

override def aggBufferAttributes: Seq[AttributeReference] = List(buffer)
override def aggBufferAttributes: Seq[AttributeReference] = Seq(buffer)

override lazy val initialValues: Seq[Expression] = List(Literal.create(Seq.empty, dataType))
override lazy val initialValues: Seq[Expression] = Seq(Literal.create(Array(), dataType))

override lazy val updateExpressions: Seq[Expression] = List(
override lazy val updateExpressions: Seq[Expression] = Seq(
If(
IsNull(child),
buffer,
Concat(List(buffer, CreateArray(List(child), useStringTypeWhenEmpty = false))))
Concat(Seq(buffer, CreateArray(Seq(child), useStringTypeWhenEmpty = false))))
)

override lazy val mergeExpressions: Seq[Expression] = List(
Concat(List(buffer.left, buffer.right))
override lazy val mergeExpressions: Seq[Expression] = Seq(
Concat(Seq(buffer.left, buffer.right))
)

override def defaultResult: Option[Literal] = Option(Literal.create(Array(), dataType))
}

case class VeloxCollectSet(override val child: Expression) extends VeloxCollect {
override def prettyName: String = "velox_collect_set"
case class VeloxCollectSet(child: Expression) extends VeloxCollect(child) {

// Velox's collect_set implementation allows null output. Thus we usually wrap
// the function to enforce non-null output. See CollectRewriteRule#ensureNonNull.
override def nullable: Boolean = true

override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)

override lazy val evaluateExpression: Expression =
ArrayDistinct(buffer)

override def prettyName: String = "velox_collect_set"

override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)
}

case class VeloxCollectList(override val child: Expression) extends VeloxCollect {
override def prettyName: String = "velox_collect_list"
case class VeloxCollectList(child: Expression) extends VeloxCollect(child) {

override def nullable: Boolean = false

override val evaluateExpression: Expression = buffer

override def prettyName: String = "velox_collect_list"

override protected def withNewChildInternal(newChild: Expression): Expression =
copy(child = newChild)

override val evaluateExpression: Expression = buffer
}

0 comments on commit 9120101

Please sign in to comment.