diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
index d84181fec1da..6a4f0c9a6f77 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala
@@ -165,7 +165,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
)
- override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
+ override def genFilterTransformerMetricsUpdater(
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics)
override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
@@ -182,7 +184,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
)
override def genProjectTransformerMetricsUpdater(
- metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+ new ProjectMetricsUpdater(metrics)
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
index 00cba4372891..1f87ffdba5ab 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala
@@ -175,8 +175,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"number of memory allocations")
)
- override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
- new FilterMetricsUpdater(metrics)
+ override def genFilterTransformerMetricsUpdater(
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+ new FilterMetricsUpdater(metrics, extraMetrics)
override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
@@ -192,7 +194,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
)
override def genProjectTransformerMetricsUpdater(
- metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
+ new ProjectMetricsUpdater(metrics, extraMetrics)
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
index daa08498bee3..fdfe1d7ea917 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala
@@ -16,30 +16,11 @@
*/
package org.apache.gluten.execution
-import org.apache.spark.sql.catalyst.expressions.{And, Expression}
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.SparkPlan
case class FilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {
- // FIXME: Should use field "condition" to store the actual executed filter expressions.
- // To make optimization easier (like to remove filter when it actually does nothing)
- override protected def getRemainingCondition: Expression = {
- val scanFilters = child match {
- // Get the filters including the manually pushed down ones.
- case basicScanExecTransformer: BasicScanExecTransformer =>
- basicScanExecTransformer.filterExprs()
- // For fallback scan, we need to keep original filter.
- case _ =>
- Seq.empty[Expression]
- }
- if (scanFilters.isEmpty) {
- condition
- } else {
- val remainingFilters =
- FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
- remainingFilters.reduceLeftOption(And).orNull
- }
- }
override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer =
copy(child = newChild)
diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
index c877fdbb0785..e8c27f6a43c4 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala
@@ -18,7 +18,10 @@ package org.apache.gluten.metrics
import org.apache.spark.sql.execution.metric.SQLMetric
-class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
+class FilterMetricsUpdater(
+ val metrics: Map[String, SQLMetric],
+ val extraMetrics: Seq[(String, SQLMetric)])
+ extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
@@ -30,6 +33,13 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+ extraMetrics.foreach {
+ case (name, metric) =>
+ name match {
+ case "increment_metric" => metric += operatorMetrics.outputRows
+ case _ => // do nothing
+ }
+ }
}
}
}
diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
index ff8335c861d5..cbd195bb804f 100644
--- a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
+++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala
@@ -18,7 +18,10 @@ package org.apache.gluten.metrics
import org.apache.spark.sql.execution.metric.SQLMetric
-class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
+class ProjectMetricsUpdater(
+ val metrics: Map[String, SQLMetric],
+ val extraMetrics: Seq[(String, SQLMetric)])
+ extends MetricsUpdater {
override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
@@ -30,6 +33,13 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
+ extraMetrics.foreach {
+ case (name, metric) =>
+ name match {
+ case "increment_metric" => metric += operatorMetrics.outputRows
+ case _ => // do nothing
+ }
+ }
}
}
}
diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml
index 48d47d906ba0..6a6b7291d756 100755
--- a/gluten-delta/pom.xml
+++ b/gluten-delta/pom.xml
@@ -130,6 +130,19 @@
com.diffplug.spotless
spotless-maven-plugin
+
+
+
+ ${project.basedir}/../.scalafmt.conf
+
+
+ src/main/scala/**/*.scala
+ src/test/scala/**/*.scala
+ src/main/delta-${delta.binary.version}/**/*.scala
+ src/test/delta-${delta.binary.version}/**/*.scala
+
+
+
org.scalatest
@@ -154,6 +167,24 @@
org.apache.maven.plugins
maven-resources-plugin
+
+
+ copy-resources
+ generate-sources
+
+ copy-resources
+
+
+ src/main/scala/org/apache/gluten/execution
+
+
+ src/main/delta-${delta.binary.version}/org/apache/gluten/execution
+
+
+ true
+
+
+
diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
new file mode 100644
index 000000000000..0c8cd54902c2
--- /dev/null
+++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
+ extends FilterExecTransformerBase(condition, child) {
+
+ private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty
+
+ override def metricsUpdater(): MetricsUpdater =
+ BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
+ metrics,
+ extraMetrics.toSeq)
+
+ override def getRelNode(
+ context: SubstraitContext,
+ condExpr: Expression,
+ originalInputAttributes: Seq[Attribute],
+ operatorId: Long,
+ input: RelNode,
+ validation: Boolean): RelNode = {
+ assert(condExpr != null)
+ val args = context.registeredFunction
+ val condExprNode = condExpr match {
+ case IncrementMetric(child, metric) =>
+ extraMetrics :+= (condExpr.prettyName, metric)
+ ExpressionConverter
+ .replaceWithExpressionTransformer(child, attributeSeq = originalInputAttributes)
+ .doTransform(args)
+ case _ =>
+ ExpressionConverter
+ .replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes)
+ .doTransform(args)
+ }
+
+ if (!validation) {
+ RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)
+ } else {
+ // Use a extension node to send the input types through Substrait plan for validation.
+ val inputTypeNodeList = originalInputAttributes
+ .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+ .asJava
+ val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+ BackendsApiManager.getTransformerApiInstance.packPBMessage(
+ TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+ RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context, operatorId)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
+ copy(child = newChild)
+}
diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
new file mode 100644
index 000000000000..a2be01a1f024
--- /dev/null
+++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
+import org.apache.gluten.metrics.MetricsUpdater
+import org.apache.gluten.substrait.`type`.TypeBuilder
+import org.apache.gluten.substrait.SubstraitContext
+import org.apache.gluten.substrait.extensions.ExtensionBuilder
+import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, NamedExpression}
+import org.apache.spark.sql.delta.metric.IncrementMetric
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.metric.SQLMetric
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
+ extends ProjectExecTransformerBase(projectList, child) {
+
+ private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]
+
+ override def metricsUpdater(): MetricsUpdater =
+ BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
+ metrics,
+ extraMetrics.toSeq)
+
+ override def getRelNode(
+ context: SubstraitContext,
+ projectList: Seq[NamedExpression],
+ originalInputAttributes: Seq[Attribute],
+ operatorId: Long,
+ input: RelNode,
+ validation: Boolean): RelNode = {
+ val args = context.registeredFunction
+ val newProjectList = genNewProjectList(projectList)
+ val columnarProjExprs: Seq[ExpressionTransformer] = ExpressionConverter
+ .replaceWithExpressionTransformer(newProjectList, attributeSeq = originalInputAttributes)
+ val projExprNodeList = columnarProjExprs.map(_.doTransform(args)).asJava
+ val emitStartIndex = originalInputAttributes.size
+ if (!validation) {
+ RelBuilder.makeProjectRel(input, projExprNodeList, context, operatorId, emitStartIndex)
+ } else {
+ // Use a extension node to send the input types through Substrait plan for validation.
+ val inputTypeNodeList = originalInputAttributes
+ .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
+ .asJava
+ val extensionNode = ExtensionBuilder.makeAdvancedExtension(
+ BackendsApiManager.getTransformerApiInstance.packPBMessage(
+ TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
+ RelBuilder.makeProjectRel(
+ input,
+ projExprNodeList,
+ extensionNode,
+ context,
+ operatorId,
+ emitStartIndex)
+ }
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
+ copy(child = newChild)
+
+ def genNewProjectList(projectList: Seq[NamedExpression]): Seq[NamedExpression] = {
+ projectList.map {
+ case alias: Alias =>
+ alias.child match {
+ case IncrementMetric(child, metric) =>
+ extraMetrics :+= (alias.child.prettyName, metric)
+ Alias(child = child, name = alias.name)()
+
+ case CaseWhen(branches, elseValue) =>
+ val newBranches = branches.map {
+ case (expr1, expr2: IncrementMetric) =>
+ extraMetrics :+= (expr2.prettyName, expr2.metric)
+ (expr1, expr2.child)
+ case other => other
+ }
+
+ val newElseValue = elseValue match {
+ case Some(IncrementMetric(child: IncrementMetric, metric)) =>
+ extraMetrics :+= (child.prettyName, metric)
+ extraMetrics :+= (child.prettyName, child.metric)
+ Some(child.child)
+ case _ => elseValue
+ }
+
+ Alias(
+ child = CaseWhen(newBranches, newElseValue),
+ name = alias.name
+ )(alias.exprId)
+
+ case _ =>
+ alias
+ }
+ case other => other
+ }
+ }
+}
diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
new file mode 100644
index 000000000000..ca4665c0d0cb
--- /dev/null
+++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.SparkPlan
+
+case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
+ extends FilterExecTransformerBase(condition, child) {
+
+ override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer =
+ copy(child = newChild)
+}
diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
new file mode 100644
index 000000000000..9b720b19c5ba
--- /dev/null
+++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.execution.SparkPlan
+
+case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
+ extends ProjectExecTransformerBase(projectList, child) {
+
+ override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer =
+ copy(child = newChild)
+}
diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
index 3dc126032144..011184724e68 100644
--- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
+++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala
@@ -16,8 +16,8 @@
*/
package org.apache.gluten.extension
-import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer}
-import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, pushDownInputFileExprRule}
+import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer}
+import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule, pushDownInputFileExprRule}
import org.apache.gluten.extension.columnar.RewriteTransformerRules
import org.apache.spark.sql.SparkSession
@@ -25,13 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping}
-import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import scala.collection.mutable.ListBuffer
class DeltaRewriteTransformerRules extends RewriteTransformerRules {
- override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: pushDownInputFileExprRule :: Nil
+ override def rules: Seq[Rule[SparkPlan]] =
+ columnMappingRule :: filterRule :: projectRule :: pushDownInputFileExprRule :: Nil
}
object DeltaRewriteTransformerRules {
@@ -58,6 +59,18 @@ object DeltaRewriteTransformerRules {
transformColumnMappingPlan(p)
}
+ val filterRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+ plan.transformUp {
+ case FilterExec(condition, child) if containsIncrementMetricExpr(condition) =>
+ DeltaFilterExecTransformer(condition, child)
+ }
+
+ val projectRule: Rule[SparkPlan] = (plan: SparkPlan) =>
+ plan.transformUp {
+ case ProjectExec(projectList, child) if projectList.exists(containsIncrementMetricExpr) =>
+ DeltaProjectExecTransformer(projectList, child)
+ }
+
val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) =>
plan.transformUp {
case p @ ProjectExec(projectList, child: DeltaScanTransformer)
@@ -79,6 +92,13 @@ object DeltaRewriteTransformerRules {
}
}
+ private def containsIncrementMetricExpr(expr: Expression): Boolean = {
+ expr match {
+ case e if e.prettyName == "increment_metric" => true
+ case _ => expr.children.exists(containsIncrementMetricExpr)
+ }
+ }
+
/**
* This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping)
* transform the metadata of Delta into Parquet's, each plan should only be transformed once.
diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
index a96f27f5a8a3..62008767f51b 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala
@@ -57,11 +57,15 @@ trait MetricsApi extends Serializable {
def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
- def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater
+ def genFilterTransformerMetricsUpdater(
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater
def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
- def genProjectTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater
+ def genProjectTransformerMetricsUpdater(
+ metrics: Map[String, SQLMetric],
+ extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater
def genHashAggregateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 0801ffb27ae3..cc5c2325dce4 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -106,7 +106,25 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
override protected def outputExpressions: Seq[NamedExpression] = child.output
- protected def getRemainingCondition: Expression
+ // FIXME: Should use field "condition" to store the actual executed filter expressions.
+ // To make optimization easier (like to remove filter when it actually does nothing)
+ protected def getRemainingCondition: Expression = {
+ val scanFilters = child match {
+ // Get the filters including the manually pushed down ones.
+ case basicScanExecTransformer: BasicScanExecTransformer =>
+ basicScanExecTransformer.filterExprs()
+ // For fallback scan, we need to keep original filter.
+ case _ =>
+ Seq.empty[Expression]
+ }
+ if (scanFilters.isEmpty) {
+ cond
+ } else {
+ val remainingFilters =
+ FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(cond))
+ remainingFilters.reduceLeftOption(And).orNull
+ }
+ }
override protected def doValidateInternal(): ValidationResult = {
val remainingCondition = getRemainingCondition
@@ -160,7 +178,7 @@ object FilterExecTransformerBase {
}
}
-case class ProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
+abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val input: SparkPlan)
extends UnaryTransformSupport
with OrderPreservingNodeShim
with PartitioningPreservingNodeShim
@@ -176,7 +194,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
// Firstly, need to check if the Substrait plan for this operator can be successfully generated.
val operatorId = substraitContext.nextOperatorId(this.nodeName)
val relNode =
- getRelNode(substraitContext, projectList, child.output, operatorId, null, validation = true)
+ getRelNode(substraitContext, list, child.output, operatorId, null, validation = true)
// Then, validate the generated plan in native engine.
doNativeValidation(substraitContext, relNode)
}
@@ -192,7 +210,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
override def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
val operatorId = context.nextOperatorId(this.nodeName)
- if ((projectList == null || projectList.isEmpty) && childCtx != null) {
+ if ((list == null || list.isEmpty) && childCtx != null) {
// The computing for this project is not needed.
// the child may be an input adapter and childCtx is null. In this case we want to
// make a read node with non-empty base_schema.
@@ -201,16 +219,16 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
}
val currRel =
- getRelNode(context, projectList, child.output, operatorId, childCtx.root, validation = false)
+ getRelNode(context, list, child.output, operatorId, childCtx.root, validation = false)
assert(currRel != null, "Project Rel should be valid")
TransformContext(childCtx.outputAttributes, output, currRel)
}
- override def output: Seq[Attribute] = projectList.map(_.toAttribute)
+ override def output: Seq[Attribute] = list.map(_.toAttribute)
override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering
- override protected def outputExpressions: Seq[NamedExpression] = projectList
+ override protected def outputExpressions: Seq[NamedExpression] = list
def getRelNode(
context: SubstraitContext,
@@ -247,23 +265,10 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch
override def verboseStringWithOperatorId(): String = {
s"""
|$formattedNodeName
- |${ExplainUtils.generateFieldString("Output", projectList)}
+ |${ExplainUtils.generateFieldString("Output", list)}
|${ExplainUtils.generateFieldString("Input", child.output)}
|""".stripMargin
}
-
- override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
- copy(child = newChild)
-}
-object ProjectExecTransformer {
- def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = {
- BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList, child)
- }
-
- // Directly creating a project transformer may not be considered safe since some backends, E.g.,
- // Clickhouse may require to intercept the instantiation procedure.
- def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer =
- new ProjectExecTransformer(projectList, child)
}
// An alternatives for UnionExec.
diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala
new file mode 100644
index 000000000000..bb8361d993db
--- /dev/null
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.execution.SparkPlan
+
+case class ProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan)
+ extends ProjectExecTransformerBase(projectList, child) {
+
+ override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer =
+ copy(child = newChild)
+}
+
+object ProjectExecTransformer {
+
+ def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = {
+ BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList, child)
+ }
+
+ // Directly creating a project transformer may not be considered safe since some backends, E.g.,
+ // Clickhouse may require to intercept the instantiation procedure.
+ def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer =
+ new ProjectExecTransformer(projectList, child)
+}