From c1ab70d080a3f4d8a4cc2c7b3a307c04e484357a Mon Sep 17 00:00:00 2001 From: Mingliang Zhu Date: Mon, 9 Dec 2024 13:51:08 +0800 Subject: [PATCH] [CORE] Add nativeFilters info for simpleString of scan (#8169) --- .../execution/BatchScanExecTransformer.scala | 10 ++++++++++ .../execution/FileSourceScanExecTransformer.scala | 15 +++++++++++++++ .../datasources/v2/AbstractBatchScanExec.scala | 9 +-------- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index 4f603e102443..d229117aa4e1 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -27,6 +27,7 @@ import org.apache.gluten.utils.FileIndexUtil import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan} @@ -169,4 +170,13 @@ abstract class BatchScanExecTransformerBase( case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat case _ => ReadFileFormat.UnknownFormat } + + override def simpleString(maxFields: Int): String = { + val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) + val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}" + val nativeFiltersString = s"NativeFilters: ${filterExprs().mkString("[", ",", "]")}" + val result = s"$nodeName$truncatedOutputString ${scan.description()}" + + s" $runtimeFiltersString $nativeFiltersString" + redact(result) + } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala index d2f8237b6969..7f3c6d4f9f47 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala @@ -26,6 +26,7 @@ import org.apache.gluten.utils.FileIndexUtil import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.FileSourceScanExecShim import org.apache.spark.sql.execution.datasources.HadoopFsRelation @@ -33,6 +34,8 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.util.collection.BitSet +import org.apache.commons.lang3.StringUtils + case class FileSourceScanExecTransformer( @transient override val relation: HadoopFsRelation, override val output: Seq[Attribute], @@ -190,6 +193,18 @@ abstract class FileSourceScanExecTransformerBase( case "CSVFileFormat" => ReadFileFormat.TextReadFormat case _ => ReadFileFormat.UnknownFormat } + + override def simpleString(maxFields: Int): String = { + val metadataEntries = metadata.toSeq.sorted.map { + case (key, value) => + key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength) + } + val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) + val nativeFiltersString = s"NativeFilters: ${filterExprs().mkString("[", ",", "]")}" + redact( + s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]", maxFields)}$metadataStr" + + s" $nativeFiltersString") + } } object FileSourceScanExecTransformerBase { diff --git a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala index e3e9659ed33a..3313c3c76842 100644 --- a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala +++ b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/datasources/v2/AbstractBatchScanExec.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, SinglePartition} -import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper} +import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read._ import org.apache.spark.sql.internal.SQLConf @@ -252,13 +252,6 @@ abstract class AbstractBatchScanExec( rdd } - override def simpleString(maxFields: Int): String = { - val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields) - val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}" - val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString" - redact(result) - } - override def nodeName: String = { s"BatchScanTransformer ${table.name()}".trim }