Skip to content

Commit

Permalink
[GLUTEN-7283][CORE] Support DynamicPruningExpression conversion (#7284)
Browse files Browse the repository at this point in the history
Closes #7283
  • Loading branch information
wForget authored Sep 26, 2024
1 parent 2f75821 commit 0983ec2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,8 @@ object ExpressionConverter extends SQLConfHelper with Logging {
if child.dataType
.isInstanceOf[DecimalType] && !BackendsApiManager.getSettings.transformCheckOverflow =>
replaceWithExpressionTransformer0(child, attributeSeq, expressionsMap)
case _: NormalizeNaNAndZero | _: PromotePrecision | _: TaggingExpression =>
case _: NormalizeNaNAndZero | _: PromotePrecision | _: TaggingExpression |
_: DynamicPruningExpression =>
ChildTransformer(
substraitExprName,
replaceWithExpressionTransformer0(expr.children.head, attributeSeq, expressionsMap),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ object ExpressionMappings {
Sig[In](IN),
Sig[InSet](IN_SET),
Sig[ScalarSubquery](SCALAR_SUBQUERY),
Sig[DynamicPruningExpression](DYNAMIC_PRUNING_EXPRESSION),
Sig[CheckOverflow](CHECK_OVERFLOW),
Sig[MakeDecimal](MAKE_DECIMAL),
Sig[PromotePrecision](PROMOTE_PRECISION),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,49 @@ class GlutenDynamicPartitionPruningV1SuiteAEOn
checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil)
}
}

testGluten("Filter with DynamicPruningExpression") {
withTable("fact_stats_non_partition") {
spark
.table("fact_stats")
.write
.format(tableFormat)
.saveAsTable("fact_stats_non_partition")

withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
val df = sql("""
|SELECT f.date_id, f.product_id, f.units_sold, f.store_id FROM (
| select * from fact_stats
| union all
| select * from fact_stats_non_partition
|) f
|JOIN dim_stats s
|ON f.store_id = s.store_id WHERE s.country = 'DE'
""".stripMargin)
checkAnswer(
df,
Row(1030, 2, 10, 3) ::
Row(1040, 2, 50, 3) ::
Row(1050, 2, 50, 3) ::
Row(1060, 2, 50, 3) ::
Row(1030, 2, 10, 3) ::
Row(1040, 2, 50, 3) ::
Row(1050, 2, 50, 3) ::
Row(1060, 2, 50, 3) :: Nil
)
val filters = collect(df.queryExecution.executedPlan) { case f: FilterExec => f }
assert(filters.isEmpty)
val filterTransformerWithDPPs = collect(df.queryExecution.executedPlan) {
case f: FilterExecTransformerBase
if f.cond.exists(_.isInstanceOf[DynamicPruningExpression]) =>
f
}
assert(filterTransformerWithDPPs.nonEmpty)
}
}
}
}

abstract class GlutenDynamicPartitionPruningV2Suite extends GlutenDynamicPartitionPruningSuiteBase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ object ExpressionNames {
final val IN = "in"
final val IN_SET = "in_set"
final val SCALAR_SUBQUERY = "scalar_subquery"
final val DYNAMIC_PRUNING_EXPRESSION = "dynamic_pruning_expression"
final val AGGREGATE = "aggregate"
final val LAMBDAFUNCTION = "lambdafunction"
final val EXPLODE = "explode"
Expand Down

0 comments on commit 0983ec2

Please sign in to comment.