Skip to content

Commit

Permalink
[CORE] Use pushedFilters to offload scan
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Dec 4, 2024
1 parent ca66d7b commit eaeb8f2
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.PushDownUtil

/** Columnar Based BatchScanExec. */
case class BatchScanExecTransformer(
Expand Down Expand Up @@ -105,7 +106,7 @@ abstract class BatchScanExecTransformerBase(

override def filterExprs(): Seq[Expression] = scan match {
case fileScan: FileScan =>
pushdownFilters.getOrElse(fileScan.dataFilters)
pushdownFilters.getOrElse(PushDownUtil.pushFilters(fileScan.dataFilters))
case _ =>
throw new GlutenNotSupportException(s"${scan.getClass.toString} is not supported")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,18 @@ object ScanTransformerFactory {
.asInstanceOf[DataSourceScanTransformerRegister]
.createDataSourceTransformer(scanExec)
case _ =>
val transform = FileSourceScanExecTransformer(
val pushedFilters = PushDownUtil.pushFilters(scanExec.dataFilters)
FileSourceScanExecTransformer(
scanExec.relation,
scanExec.output,
scanExec.requiredSchema,
scanExec.partitionFilters,
scanExec.optionalBucketSet,
scanExec.optionalNumCoalescedBuckets,
scanExec.dataFilters,
pushedFilters,
scanExec.tableIdentifier,
scanExec.disableBucketedScan
)
if (!transform.doValidate().ok()) {
transform.copy(dataFilters = PushDownUtil.pushFilters(scanExec.dataFilters))
} else {
transform
}
}
}

Expand All @@ -77,16 +73,12 @@ object ScanTransformerFactory {
case _ =>
scan match {
case _: FileScan =>
val transform = BatchScanExecTransformer(
BatchScanExecTransformer(
batchScanExec.output,
batchScanExec.scan,
batchScanExec.runtimeFilters,
table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScanExec)
)
if (!transform.doValidate().ok()) {
transform.setPushDownFilters(PushDownUtil.pushFilters(transform.filterExprs))
}
transform
case _ =>
throw new GlutenNotSupportException(s"Unsupported scan $scan")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object PushDownFilterToScan extends Rule[SparkPlan] with PredicateHelper {
// If BatchScanExecTransformerBase's parent is filter, pushdownFilters can't be None.
batchScan.setPushDownFilters(Seq.empty)
val newScan = batchScan
if (pushDownFilters.size > 0) {
if (pushDownFilters.nonEmpty) {
newScan.setPushDownFilters(pushDownFilters)
if (newScan.doValidate().ok()) {
filter.withNewChildren(Seq(newScan))
Expand Down

0 comments on commit eaeb8f2

Please sign in to comment.