From 27f30ad826be0e085d321c302b54eb789ad9476e Mon Sep 17 00:00:00 2001 From: Qian Sun Date: Tue, 15 Oct 2024 09:16:11 +0800 Subject: [PATCH] [GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten (#7111) * [GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten * init DeltaFilterExecTransformer * init DeltaProjectExecTransformer * update metric * add FilterTransformerRegistor * fix scala 213 compile error * add ProjectTransformerRegister * remove redundant function * update supportedDelta * update veloxSparkPlanExecApi * use createUnsafe for ProjectExec * init ReplaceDeltaTransformer * use delta write rule * fix error * use copy-resources to reduce redundant resources --- .../backendsapi/clickhouse/CHMetricsApi.scala | 8 +- .../backendsapi/velox/VeloxMetricsApi.scala | 10 +- .../execution/FilterExecTransformer.scala | 21 +--- .../gluten/metrics/FilterMetricsUpdater.scala | 12 +- .../metrics/ProjectMetricsUpdater.scala | 12 +- gluten-delta/pom.xml | 31 +++++ .../DeltaFilterExecTransformer.scala | 81 ++++++++++++ .../DeltaProjectExecTransformer.scala | 116 ++++++++++++++++++ .../DeltaFilterExecTransformer.scala | 27 ++++ .../DeltaProjectExecTransformer.scala | 27 ++++ .../DeltaRewriteTransformerRules.scala | 28 ++++- .../gluten/backendsapi/MetricsApi.scala | 8 +- .../BasicPhysicalOperatorTransformer.scala | 47 +++---- .../execution/ProjectExecTransformer.scala | 41 +++++++ 14 files changed, 415 insertions(+), 54 deletions(-) create mode 100644 gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala create mode 100644 gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala create mode 100644 gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala create mode 100644 gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala create mode 100644 gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index d84181fec1da..6a4f0c9a6f77 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -165,7 +165,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time") ) - override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = + override def genFilterTransformerMetricsUpdater( + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater = new FilterMetricsUpdater(metrics) override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = @@ -182,7 +184,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { ) override def genProjectTransformerMetricsUpdater( - metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics) + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater = + new ProjectMetricsUpdater(metrics) override def genHashAggregateTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 00cba4372891..1f87ffdba5ab 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -175,8 +175,10 @@ class VeloxMetricsApi extends MetricsApi with Logging { "number of memory allocations") ) - override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = - new FilterMetricsUpdater(metrics) + override def genFilterTransformerMetricsUpdater( + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater = + new FilterMetricsUpdater(metrics, extraMetrics) override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( @@ -192,7 +194,9 @@ class VeloxMetricsApi extends MetricsApi with Logging { ) override def genProjectTransformerMetricsUpdater( - metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics) + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater = + new ProjectMetricsUpdater(metrics, extraMetrics) override def genHashAggregateTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala index daa08498bee3..fdfe1d7ea917 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala @@ -16,30 +16,11 @@ */ package org.apache.gluten.execution -import org.apache.spark.sql.catalyst.expressions.{And, Expression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.SparkPlan case class FilterExecTransformer(condition: Expression, child: SparkPlan) extends FilterExecTransformerBase(condition, child) { - // FIXME: Should use field "condition" to store the actual executed filter expressions. - // To make optimization easier (like to remove filter when it actually does nothing) - override protected def getRemainingCondition: Expression = { - val scanFilters = child match { - // Get the filters including the manually pushed down ones. - case basicScanExecTransformer: BasicScanExecTransformer => - basicScanExecTransformer.filterExprs() - // For fallback scan, we need to keep original filter. - case _ => - Seq.empty[Expression] - } - if (scanFilters.isEmpty) { - condition - } else { - val remainingFilters = - FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition)) - remainingFilters.reduceLeftOption(And).orNull - } - } override protected def withNewChildInternal(newChild: SparkPlan): FilterExecTransformer = copy(child = newChild) diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala index c877fdbb0785..e8c27f6a43c4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala @@ -18,7 +18,10 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric -class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { +class FilterMetricsUpdater( + val metrics: Map[String, SQLMetric], + val extraMetrics: Seq[(String, SQLMetric)]) + extends MetricsUpdater { override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { @@ -30,6 +33,13 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + extraMetrics.foreach { + case (name, metric) => + name match { + case "increment_metric" => metric += operatorMetrics.outputRows + case _ => // do nothing + } + } } } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala index ff8335c861d5..cbd195bb804f 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/metrics/ProjectMetricsUpdater.scala @@ -18,7 +18,10 @@ package org.apache.gluten.metrics import org.apache.spark.sql.execution.metric.SQLMetric -class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { +class ProjectMetricsUpdater( + val metrics: Map[String, SQLMetric], + val extraMetrics: Seq[(String, SQLMetric)]) + extends MetricsUpdater { override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { @@ -30,6 +33,13 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + extraMetrics.foreach { + case (name, metric) => + name match { + case "increment_metric" => metric += operatorMetrics.outputRows + case _ => // do nothing + } + } } } } diff --git a/gluten-delta/pom.xml b/gluten-delta/pom.xml index 48d47d906ba0..6a6b7291d756 100755 --- a/gluten-delta/pom.xml +++ b/gluten-delta/pom.xml @@ -130,6 +130,19 @@ com.diffplug.spotless spotless-maven-plugin + + + + ${project.basedir}/../.scalafmt.conf + + + src/main/scala/**/*.scala + src/test/scala/**/*.scala + src/main/delta-${delta.binary.version}/**/*.scala + src/test/delta-${delta.binary.version}/**/*.scala + + + org.scalatest @@ -154,6 +167,24 @@ org.apache.maven.plugins maven-resources-plugin + + + copy-resources + generate-sources + + copy-resources + + + src/main/scala/org/apache/gluten/execution + + + src/main/delta-${delta.binary.version}/org/apache/gluten/execution + + + true + + + diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala new file mode 100644 index 000000000000..0c8cd54902c2 --- /dev/null +++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter} +import org.apache.gluten.metrics.MetricsUpdater +import org.apache.gluten.substrait.`type`.TypeBuilder +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.extensions.ExtensionBuilder +import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} + +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.delta.metric.IncrementMetric +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.SQLMetric + +import scala.collection.JavaConverters._ + +case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) + extends FilterExecTransformerBase(condition, child) { + + private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty + + override def metricsUpdater(): MetricsUpdater = + BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater( + metrics, + extraMetrics.toSeq) + + override def getRelNode( + context: SubstraitContext, + condExpr: Expression, + originalInputAttributes: Seq[Attribute], + operatorId: Long, + input: RelNode, + validation: Boolean): RelNode = { + assert(condExpr != null) + val args = context.registeredFunction + val condExprNode = condExpr match { + case IncrementMetric(child, metric) => + extraMetrics :+= (condExpr.prettyName, metric) + ExpressionConverter + .replaceWithExpressionTransformer(child, attributeSeq = originalInputAttributes) + .doTransform(args) + case _ => + ExpressionConverter + .replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes) + .doTransform(args) + } + + if (!validation) { + RelBuilder.makeFilterRel(input, condExprNode, context, operatorId) + } else { + // Use a extension node to send the input types through Substrait plan for validation. + val inputTypeNodeList = originalInputAttributes + .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) + .asJava + val extensionNode = ExtensionBuilder.makeAdvancedExtension( + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + RelBuilder.makeFilterRel(input, condExprNode, extensionNode, context, operatorId) + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = + copy(child = newChild) +} diff --git a/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala new file mode 100644 index 000000000000..a2be01a1f024 --- /dev/null +++ b/gluten-delta/src/main/delta-32/org/apache/gluten/execution/DeltaProjectExecTransformer.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} +import org.apache.gluten.metrics.MetricsUpdater +import org.apache.gluten.substrait.`type`.TypeBuilder +import org.apache.gluten.substrait.SubstraitContext +import org.apache.gluten.substrait.extensions.ExtensionBuilder +import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, NamedExpression} +import org.apache.spark.sql.delta.metric.IncrementMetric +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.metric.SQLMetric + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan) + extends ProjectExecTransformerBase(projectList, child) { + + private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)] + + override def metricsUpdater(): MetricsUpdater = + BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater( + metrics, + extraMetrics.toSeq) + + override def getRelNode( + context: SubstraitContext, + projectList: Seq[NamedExpression], + originalInputAttributes: Seq[Attribute], + operatorId: Long, + input: RelNode, + validation: Boolean): RelNode = { + val args = context.registeredFunction + val newProjectList = genNewProjectList(projectList) + val columnarProjExprs: Seq[ExpressionTransformer] = ExpressionConverter + .replaceWithExpressionTransformer(newProjectList, attributeSeq = originalInputAttributes) + val projExprNodeList = columnarProjExprs.map(_.doTransform(args)).asJava + val emitStartIndex = originalInputAttributes.size + if (!validation) { + RelBuilder.makeProjectRel(input, projExprNodeList, context, operatorId, emitStartIndex) + } else { + // Use a extension node to send the input types through Substrait plan for validation. + val inputTypeNodeList = originalInputAttributes + .map(attr => ConverterUtils.getTypeNode(attr.dataType, attr.nullable)) + .asJava + val extensionNode = ExtensionBuilder.makeAdvancedExtension( + BackendsApiManager.getTransformerApiInstance.packPBMessage( + TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf)) + RelBuilder.makeProjectRel( + input, + projExprNodeList, + extensionNode, + context, + operatorId, + emitStartIndex) + } + } + + override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer = + copy(child = newChild) + + def genNewProjectList(projectList: Seq[NamedExpression]): Seq[NamedExpression] = { + projectList.map { + case alias: Alias => + alias.child match { + case IncrementMetric(child, metric) => + extraMetrics :+= (alias.child.prettyName, metric) + Alias(child = child, name = alias.name)() + + case CaseWhen(branches, elseValue) => + val newBranches = branches.map { + case (expr1, expr2: IncrementMetric) => + extraMetrics :+= (expr2.prettyName, expr2.metric) + (expr1, expr2.child) + case other => other + } + + val newElseValue = elseValue match { + case Some(IncrementMetric(child: IncrementMetric, metric)) => + extraMetrics :+= (child.prettyName, metric) + extraMetrics :+= (child.prettyName, child.metric) + Some(child.child) + case _ => elseValue + } + + Alias( + child = CaseWhen(newBranches, newElseValue), + name = alias.name + )(alias.exprId) + + case _ => + alias + } + case other => other + } + } +} diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala new file mode 100644 index 000000000000..ca4665c0d0cb --- /dev/null +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaFilterExecTransformer.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.execution.SparkPlan + +case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan) + extends FilterExecTransformerBase(condition, child) { + + override protected def withNewChildInternal(newChild: SparkPlan): DeltaFilterExecTransformer = + copy(child = newChild) +} diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala new file mode 100644 index 000000000000..9b720b19c5ba --- /dev/null +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaProjectExecTransformer.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.execution.SparkPlan + +case class DeltaProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan) + extends ProjectExecTransformerBase(projectList, child) { + + override protected def withNewChildInternal(newChild: SparkPlan): DeltaProjectExecTransformer = + copy(child = newChild) +} diff --git a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala index 3dc126032144..011184724e68 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaRewriteTransformerRules.scala @@ -16,8 +16,8 @@ */ package org.apache.gluten.extension -import org.apache.gluten.execution.{DeltaScanTransformer, ProjectExecTransformer} -import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, pushDownInputFileExprRule} +import org.apache.gluten.execution.{DeltaFilterExecTransformer, DeltaProjectExecTransformer, DeltaScanTransformer, ProjectExecTransformer} +import org.apache.gluten.extension.DeltaRewriteTransformerRules.{columnMappingRule, filterRule, projectRule, pushDownInputFileExprRule} import org.apache.gluten.extension.columnar.RewriteTransformerRules import org.apache.spark.sql.SparkSession @@ -25,13 +25,14 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaParquetFileFormat, NoMapping} -import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.FileFormat import scala.collection.mutable.ListBuffer class DeltaRewriteTransformerRules extends RewriteTransformerRules { - override def rules: Seq[Rule[SparkPlan]] = columnMappingRule :: pushDownInputFileExprRule :: Nil + override def rules: Seq[Rule[SparkPlan]] = + columnMappingRule :: filterRule :: projectRule :: pushDownInputFileExprRule :: Nil } object DeltaRewriteTransformerRules { @@ -58,6 +59,18 @@ object DeltaRewriteTransformerRules { transformColumnMappingPlan(p) } + val filterRule: Rule[SparkPlan] = (plan: SparkPlan) => + plan.transformUp { + case FilterExec(condition, child) if containsIncrementMetricExpr(condition) => + DeltaFilterExecTransformer(condition, child) + } + + val projectRule: Rule[SparkPlan] = (plan: SparkPlan) => + plan.transformUp { + case ProjectExec(projectList, child) if projectList.exists(containsIncrementMetricExpr) => + DeltaProjectExecTransformer(projectList, child) + } + val pushDownInputFileExprRule: Rule[SparkPlan] = (plan: SparkPlan) => plan.transformUp { case p @ ProjectExec(projectList, child: DeltaScanTransformer) @@ -79,6 +92,13 @@ object DeltaRewriteTransformerRules { } } + private def containsIncrementMetricExpr(expr: Expression): Boolean = { + expr match { + case e if e.prettyName == "increment_metric" => true + case _ => expr.children.exists(containsIncrementMetricExpr) + } + } + /** * This method is only used for Delta ColumnMapping FileFormat(e.g. nameMapping and idMapping) * transform the metadata of Delta into Parquet's, each plan should only be transformed once. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala index a96f27f5a8a3..62008767f51b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala @@ -57,11 +57,15 @@ trait MetricsApi extends Serializable { def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] - def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater + def genFilterTransformerMetricsUpdater( + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] - def genProjectTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater + def genProjectTransformerMetricsUpdater( + metrics: Map[String, SQLMetric], + extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater def genHashAggregateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index 0801ffb27ae3..cc5c2325dce4 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -106,7 +106,25 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP override protected def outputExpressions: Seq[NamedExpression] = child.output - protected def getRemainingCondition: Expression + // FIXME: Should use field "condition" to store the actual executed filter expressions. + // To make optimization easier (like to remove filter when it actually does nothing) + protected def getRemainingCondition: Expression = { + val scanFilters = child match { + // Get the filters including the manually pushed down ones. + case basicScanExecTransformer: BasicScanExecTransformer => + basicScanExecTransformer.filterExprs() + // For fallback scan, we need to keep original filter. + case _ => + Seq.empty[Expression] + } + if (scanFilters.isEmpty) { + cond + } else { + val remainingFilters = + FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(cond)) + remainingFilters.reduceLeftOption(And).orNull + } + } override protected def doValidateInternal(): ValidationResult = { val remainingCondition = getRemainingCondition @@ -160,7 +178,7 @@ object FilterExecTransformerBase { } } -case class ProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan) +abstract class ProjectExecTransformerBase(val list: Seq[NamedExpression], val input: SparkPlan) extends UnaryTransformSupport with OrderPreservingNodeShim with PartitioningPreservingNodeShim @@ -176,7 +194,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch // Firstly, need to check if the Substrait plan for this operator can be successfully generated. val operatorId = substraitContext.nextOperatorId(this.nodeName) val relNode = - getRelNode(substraitContext, projectList, child.output, operatorId, null, validation = true) + getRelNode(substraitContext, list, child.output, operatorId, null, validation = true) // Then, validate the generated plan in native engine. doNativeValidation(substraitContext, relNode) } @@ -192,7 +210,7 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch override def doTransform(context: SubstraitContext): TransformContext = { val childCtx = child.asInstanceOf[TransformSupport].transform(context) val operatorId = context.nextOperatorId(this.nodeName) - if ((projectList == null || projectList.isEmpty) && childCtx != null) { + if ((list == null || list.isEmpty) && childCtx != null) { // The computing for this project is not needed. // the child may be an input adapter and childCtx is null. In this case we want to // make a read node with non-empty base_schema. @@ -201,16 +219,16 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch } val currRel = - getRelNode(context, projectList, child.output, operatorId, childCtx.root, validation = false) + getRelNode(context, list, child.output, operatorId, childCtx.root, validation = false) assert(currRel != null, "Project Rel should be valid") TransformContext(childCtx.outputAttributes, output, currRel) } - override def output: Seq[Attribute] = projectList.map(_.toAttribute) + override def output: Seq[Attribute] = list.map(_.toAttribute) override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering - override protected def outputExpressions: Seq[NamedExpression] = projectList + override protected def outputExpressions: Seq[NamedExpression] = list def getRelNode( context: SubstraitContext, @@ -247,23 +265,10 @@ case class ProjectExecTransformer private (projectList: Seq[NamedExpression], ch override def verboseStringWithOperatorId(): String = { s""" |$formattedNodeName - |${ExplainUtils.generateFieldString("Output", projectList)} + |${ExplainUtils.generateFieldString("Output", list)} |${ExplainUtils.generateFieldString("Input", child.output)} |""".stripMargin } - - override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer = - copy(child = newChild) -} -object ProjectExecTransformer { - def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = { - BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList, child) - } - - // Directly creating a project transformer may not be considered safe since some backends, E.g., - // Clickhouse may require to intercept the instantiation procedure. - def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = - new ProjectExecTransformer(projectList, child) } // An alternatives for UnionExec. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala new file mode 100644 index 000000000000..bb8361d993db --- /dev/null +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ProjectExecTransformer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.backendsapi.BackendsApiManager + +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.execution.SparkPlan + +case class ProjectExecTransformer(projectList: Seq[NamedExpression], child: SparkPlan) + extends ProjectExecTransformerBase(projectList, child) { + + override protected def withNewChildInternal(newChild: SparkPlan): ProjectExecTransformer = + copy(child = newChild) +} + +object ProjectExecTransformer { + + def apply(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = { + BackendsApiManager.getSparkPlanExecApiInstance.genProjectExecTransformer(projectList, child) + } + + // Directly creating a project transformer may not be considered safe since some backends, E.g., + // Clickhouse may require to intercept the instantiation procedure. + def createUnsafe(projectList: Seq[NamedExpression], child: SparkPlan): ProjectExecTransformer = + new ProjectExecTransformer(projectList, child) +}