Skip to content

Commit

Permalink
[CORE] Add nativeFilters info for simpleString of scan
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Dec 6, 2024
1 parent 5002106 commit 7f89544
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.gluten.utils.FileIndexUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
Expand Down Expand Up @@ -169,4 +170,13 @@ abstract class BatchScanExecTransformerBase(
case "ClickHouseScan" => ReadFileFormat.MergeTreeReadFormat
case _ => ReadFileFormat.UnknownFormat
}

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
val nativeFiltersString = s"nativeFilters: ${filterExprs().mkString("[", ",", "]")}"
val result = s"$nodeName$truncatedOutputString ${scan.description()}" +
s" $runtimeFiltersString $nativeFiltersString"
redact(result)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ import org.apache.gluten.utils.FileIndexUtil
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.BitSet

import org.apache.commons.lang3.StringUtils

case class FileSourceScanExecTransformer(
@transient override val relation: HadoopFsRelation,
override val output: Seq[Attribute],
Expand Down Expand Up @@ -190,6 +193,18 @@ abstract class FileSourceScanExecTransformerBase(
case "CSVFileFormat" => ReadFileFormat.TextReadFormat
case _ => ReadFileFormat.UnknownFormat
}

override def simpleString(maxFields: Int): String = {
val metadataEntries = metadata.toSeq.sorted.map {
case (key, value) =>
key + ": " + StringUtils.abbreviate(redact(value), maxMetadataValueLength)
}
val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields)
val nativeFiltersString = s"nativeFilters: ${filterExprs().mkString("[", ",", "]")}"
redact(
s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]", maxFields)}$metadataStr" +
s" $nativeFiltersString")
}
}

object FileSourceScanExecTransformerBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{KeyGroupedPartitioning, Partitioning, SinglePartition}
import org.apache.spark.sql.catalyst.util.{truncatedString, InternalRowComparableWrapper}
import org.apache.spark.sql.catalyst.util.InternalRowComparableWrapper
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -252,13 +252,6 @@ abstract class AbstractBatchScanExec(
rdd
}

override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]", maxFields)
val runtimeFiltersString = s"RuntimeFilters: ${runtimeFilters.mkString("[", ",", "]")}"
val result = s"$nodeName$truncatedOutputString ${scan.description()} $runtimeFiltersString"
redact(result)
}

override def nodeName: String = {
s"BatchScanTransformer ${table.name()}".trim
}
Expand Down

0 comments on commit 7f89544

Please sign in to comment.