Skip to content

Commit

Permalink
[GLUTEN-6853][CORE] Move more general query planner APIs from gluten-…
Browse files Browse the repository at this point in the history
…substrait to gluten-core (apache#7972)
  • Loading branch information
zhztheplayer authored Nov 19, 2024
1 parent 5dda71a commit 32bf3a2
Show file tree
Hide file tree
Showing 57 changed files with 1,280 additions and 927 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, HeuristicTransform}
import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers}
import org.apache.gluten.extension.columnar.rewrite._
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
Expand All @@ -45,8 +50,8 @@ class CHRuleApi extends RuleApi {
}
}

private object CHRuleApi {
def injectSpark(injector: SparkInjector): Unit = {
object CHRuleApi {
private def injectSpark(injector: SparkInjector): Unit = {
// Inject the regular Spark rules directly.
injector.injectQueryStagePrepRule(FallbackBroadcastHashJoinPrepQueryStage.apply)
injector.injectQueryStagePrepRule(spark => CHAQEPropagateEmptyRelation(spark))
Expand All @@ -64,36 +69,51 @@ private object CHRuleApi {
injector.injectPreCBORule(spark => new CHOptimizeMetadataOnlyDeltaQuery(spark))
}

def injectLegacy(injector: LegacyInjector): Unit = {
private def injectLegacy(injector: LegacyInjector): Unit = {
// Legacy: Pre-transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))

// Legacy: The Legacy transform rule.
injector.injectTransform(_ => intercept(HeuristicTransform()))
injector.injectPreTransform(_ => RemoveTransitions)
injector.injectPreTransform(_ => PushDownInputFileExpression.PreOffload)
injector.injectPreTransform(c => FallbackOnANSIMode.apply(c.session))
injector.injectPreTransform(c => FallbackMultiCodegens.apply(c.session))
injector.injectPreTransform(_ => RewriteSubqueryBroadcast())
injector.injectPreTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectPreTransform(c => MergeTwoPhasesHashBaseAggregate.apply(c.session))

// Legacy: The legacy transform rule.
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.fallbackByNativeValidation()
.build()
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
injector.injectTransform(
c => intercept(HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads)))

// Legacy: Post-transform rules.
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectTransform(_ => PushDownFilterToScan)
injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectTransform(_ => EnsureLocalSortRequirements)
injector.injectTransform(_ => EliminateLocalSort)
injector.injectTransform(_ => CollapseProjectExecTransformer)
injector.injectTransform(c => RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectTransform(c => PushdownAggregatePreProjectionAheadExpand.apply(c.session))
injector.injectTransform(c => LazyAggregateExpandRule.apply(c.session))
injector.injectTransform(
injector.injectPostTransform(_ => RemoveNativeWriteFilesSortAndProject())
injector.injectPostTransform(c => intercept(RewriteTransformer.apply(c.session)))
injector.injectPostTransform(_ => PushDownFilterToScan)
injector.injectPostTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectPostTransform(_ => EnsureLocalSortRequirements)
injector.injectPostTransform(_ => EliminateLocalSort)
injector.injectPostTransform(_ => CollapseProjectExecTransformer)
injector.injectPostTransform(c => RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectPostTransform(c => PushdownAggregatePreProjectionAheadExpand.apply(c.session))
injector.injectPostTransform(c => LazyAggregateExpandRule.apply(c.session))
injector.injectPostTransform(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))
injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))

// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(
Expand All @@ -105,7 +125,7 @@ private object CHRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => intercept(each(c.session))))
injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf))
injector.injectTransform(
injector.injectPost(
c =>
intercept(
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session)))
Expand All @@ -116,10 +136,10 @@ private object CHRuleApi {
injector.injectFinal(_ => RemoveFallbackTagRule())
}

def injectRas(injector: RasInjector): Unit = {
private def injectRas(injector: RasInjector): Unit = {
// CH backend doesn't work with RAS at the moment. Inject a rule that aborts any
// execution calls.
injector.inject(
injector.injectPreTransform(
_ =>
new SparkPlanRules.AbortRule(
"Clickhouse backend doesn't yet have RAS support, please try disabling RAS and" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
// Caller may not know it adds project on top of the shuffle.
// FIXME: HeuristicTransform is costly. Re-applying it may cause performance issues.
val project =
HeuristicTransform()(ProjectExec(plan.child.output ++ projectExpressions, plan.child))
HeuristicTransform.static()(
ProjectExec(plan.child.output ++ projectExpressions, plan.child))
var newExprs = Seq[Expression]()
for (i <- exprs.indices) {
val pos = newExpressionsPosition(i)
Expand All @@ -250,7 +251,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
// Caller may not know it adds project on top of the shuffle.
// FIXME: HeuristicTransform is costly. Re-applying it may cause performance issues.
val project =
HeuristicTransform()(ProjectExec(plan.child.output ++ projectExpressions, plan.child))
HeuristicTransform.static()(
ProjectExec(plan.child.output ++ projectExpressions, plan.child))
var newOrderings = Seq[SortOrder]()
for (i <- orderings.indices) {
val oldOrdering = orderings(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.FallbackTags

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ class VeloxListenerApi extends ListenerApi with Logging {

// Initial native backend with configurations.
var parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)

// Workaround for https://github.com/apache/incubator-gluten/issues/7837
if (isDriver && !inLocalMode(conf)) {
parsed += (GlutenConfig.COLUMNAR_VELOX_CACHE_ENABLED.key -> "false")
Expand Down
Loading

0 comments on commit 32bf3a2

Please sign in to comment.