From 8bd82eeb91f5207605ef27f7ea92e842c15770d5 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Mon, 9 Dec 2024 09:23:18 +0800 Subject: [PATCH 1/7] fixup --- .../backendsapi/clickhouse/CHRuleApi.scala | 18 ++--- backends-velox/pom.xml | 3 + .../org.apache.gluten.component.Component | 1 + .../component/VeloxIcebergComponent.scala | 66 +++++++++++++++++++ .../backendsapi/velox/VeloxRuleApi.scala | 27 ++------ .../heuristic/HeuristicTransform.scala | 35 +++++++++- .../columnar/validator/Validator.scala | 17 ++--- ...xecution.DataSourceScanTransformerRegister | 1 - .../execution/IcebergScanTransformer.scala | 8 +++ .../IcebergTransformerProvider.scala | 29 -------- .../execution/BatchScanExecTransformer.scala | 8 ++- .../execution/ScanTransformerFactory.scala | 8 +-- .../columnar/validator/Validators.scala | 36 ++++++++++ 13 files changed, 174 insertions(+), 83 deletions(-) create mode 100644 backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component create mode 100644 backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala delete mode 100644 gluten-iceberg/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister delete mode 100644 gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 9e129f224dcf..fc5d1df91840 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -26,8 +26,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers} import org.apache.gluten.extension.columnar.rewrite._ import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} -import org.apache.gluten.extension.columnar.validator.Validator -import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits +import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.extension.injector.{Injector, SparkInjector} import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector} import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser} @@ -84,20 +83,13 @@ object CHRuleApi { // Legacy: The legacy transform rule. val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) val validatorBuilder: GlutenConfig => Validator = conf => - Validator - .builder() - .fallbackByHint() - .fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly) - .fallbackComplexExpressions() - .fallbackByBackendSettings() - .fallbackByUserOptions() - .fallbackByTestInjects() - .fallbackByNativeValidation(offloads) - .build() + Validators.newValidator(conf, offloads) val rewrites = Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject) injector.injectTransform( - c => intercept(HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads))) + c => + intercept( + HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads))) // Legacy: Post-transform rules. injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan) diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index a1fcb52ce763..90b5b33334a5 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -260,6 +260,9 @@ ${resource.dir} + + ${project.basedir}/src/main-iceberg/resource + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component b/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component new file mode 100644 index 000000000000..0f674cf783d8 --- /dev/null +++ b/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component @@ -0,0 +1 @@ +org.apache.gluten.component.VeloxIcebergComponent \ No newline at end of file diff --git a/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala b/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala new file mode 100644 index 000000000000..45a05e7db0f7 --- /dev/null +++ b/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.component +import org.apache.gluten.backendsapi.velox.VeloxBackend +import org.apache.gluten.execution.IcebergScanTransformer +import org.apache.gluten.extension.columnar.enumerated.RasOffload +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.offload.OffloadSingleNode +import org.apache.gluten.extension.columnar.validator.Validators +import org.apache.gluten.extension.injector.Injector + +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +class VeloxIcebergComponent extends Component { + import VeloxIcebergComponent._ + override def name(): String = "velox-iceberg" + override def buildInfo(): Component.BuildInfo = + Component.BuildInfo("VeloxIceberg", "N/A", "N/A", "N/A") + override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil + override def injectRules(injector: Injector): Unit = { + // Inject legacy rule. + injector.gluten.legacy.injectTransform { + c => + val offload = Seq(OffloadIcebergScan()) + HeuristicTransform.Simple( + Validators.newValidator(c.glutenConf, offload), + offload + ) + } + + // Inject RAS rule. + injector.gluten.ras.injectRasRule { + c => + RasOffload.Rule( + RasOffload.from[BatchScanExec](OffloadIcebergScan()), + Validators.newValidator(c.glutenConf), + Nil) + } + } +} + +object VeloxIcebergComponent { + private case class OffloadIcebergScan() extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) => + IcebergScanTransformer(scan) + case other => other + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index d6887f0463ac..22919538ff4d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -28,8 +28,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers} import org.apache.gluten.extension.columnar.rewrite._ import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions} -import org.apache.gluten.extension.columnar.validator.Validator -import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits +import org.apache.gluten.extension.columnar.validator.{Validator, Validators} import org.apache.gluten.extension.injector.{Injector, SparkInjector} import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector} import org.apache.gluten.sql.shims.SparkShimLoader @@ -76,20 +75,11 @@ object VeloxRuleApi { // Legacy: The legacy transform rule. val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) val validatorBuilder: GlutenConfig => Validator = conf => - Validator - .builder() - .fallbackByHint() - .fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly) - .fallbackComplexExpressions() - .fallbackByBackendSettings() - .fallbackByUserOptions() - .fallbackByTestInjects() - .fallbackByNativeValidation(offloads) - .build() + Validators.newValidator(conf, offloads) val rewrites = Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject) injector.injectTransform( - c => HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads)) + c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads)) // Legacy: Post-transform rules. injector.injectPostTransform(_ => UnionTransformerRule()) @@ -132,16 +122,7 @@ object VeloxRuleApi { injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session)) // Gluten RAS: The RAS rule. - val validatorBuilder: GlutenConfig => Validator = conf => - Validator - .builder() - .fallbackByHint() - .fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly) - .fallbackComplexExpressions() - .fallbackByBackendSettings() - .fallbackByUserOptions() - .fallbackByTestInjects() - .build() + val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf) val rewrites = Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject) injector.injectCoster(_ => LegacyCoster) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala index f9def8c94ba2..5a0fdfeefeae 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicTransform.scala @@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.Injector import org.apache.gluten.extension.util.AdaptiveContext import org.apache.gluten.logging.LogLevelUtil +import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.SparkPlan @@ -64,7 +65,39 @@ object HeuristicTransform { new HeuristicTransform(all) } - case class Single( + /** + * A simple heuristic transform rule with a validator and some offload rules. + * + * Validator will be called before applying the offload rules. + */ + case class Simple(validator: Validator, offloadRules: Seq[OffloadSingleNode]) + extends Rule[SparkPlan] + with Logging { + override def apply(plan: SparkPlan): SparkPlan = { + offloadRules.foldLeft(plan) { + case (p, rule) => + p.transformUp { + node => + validator.validate(node) match { + case Validator.Passed => + rule.offload(node) + case Validator.Failed(reason) => + logDebug(s"Validation failed by reason: $reason on query plan: ${node.nodeName}") + node + } + } + } + } + } + + /** + * A heuristic transform rule with given rewrite rules. Fallback tags will be used in the + * procedure to determine which part of the plan is or is not eligible to be offloaded. The tags + * should also be correctly handled in the offload rules. + * + * TODO: Handle tags internally. Remove tag handling code in user offload rules. + */ + case class WithRewrites( validator: Validator, rewriteRules: Seq[RewriteSingleNode], offloadRules: Seq[OffloadSingleNode]) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala index 63a3a0af07b2..22a39a42804f 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala @@ -46,6 +46,7 @@ object Validator { def builder(): Builder = Builder() class Builder private { + import Builder._ private val buffer: ListBuffer[Validator] = mutable.ListBuffer() /** Add a custom validator to pipeline. */ @@ -70,6 +71,14 @@ object Validator { case other => Seq(other) } + private object NoopValidator extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = pass() + } + } + + private object Builder { + def apply(): Builder = new Builder() + private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator { assert(!validators.exists(_.isInstanceOf[ValidatorPipeline])) @@ -85,14 +94,6 @@ object Validator { finalOut } } - - private object NoopValidator extends Validator { - override def validate(plan: SparkPlan): Validator.OutCome = pass() - } - } - - private object Builder { - def apply(): Builder = new Builder() } implicit class ValidatorImplicits(v: Validator) { diff --git a/gluten-iceberg/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister b/gluten-iceberg/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister deleted file mode 100644 index 904608211fc3..000000000000 --- a/gluten-iceberg/src/main/resources/META-INF/services/org.apache.gluten.execution.DataSourceScanTransformerRegister +++ /dev/null @@ -1 +0,0 @@ -org.apache.gluten.execution.IcebergTransformerProvider \ No newline at end of file diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 1cbeb52a9213..56041a6a99c1 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -46,6 +46,10 @@ case class IcebergScanTransformer( commonPartitionValues = commonPartitionValues ) { + protected[this] def supportsBatchScan(scan: Scan): Boolean = { + IcebergScanTransformer.supportsBatchScan(scan) + } + override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty) override lazy val getPartitionSchema: StructType = @@ -94,4 +98,8 @@ object IcebergScanTransformer { commonPartitionValues = SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan) ) } + + def supportsBatchScan(scan: Scan): Boolean = { + scan.getClass.getName == "org.apache.iceberg.spark.source.SparkBatchQueryScan" + } } diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala deleted file mode 100644 index dc521f39c1b9..000000000000 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergTransformerProvider.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.execution - -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec - -class IcebergTransformerProvider extends DataSourceScanTransformerRegister { - - override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan" - - override def createDataSourceV2Transformer( - batchScan: BatchScanExec): BatchScanExecTransformerBase = { - IcebergScanTransformer(batchScan) - } -} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala index d229117aa4e1..55777c11c1bb 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala @@ -56,6 +56,10 @@ case class BatchScanExecTransformer( applyPartialClustering, replicatePartitions) { + protected[this] def supportsBatchScan(scan: Scan): Boolean = { + scan.isInstanceOf[FileScan] + } + override def doCanonicalize(): BatchScanExecTransformer = { this.copy( output = output.map(QueryPlan.normalizeExpressions(_, output)), @@ -134,8 +138,10 @@ abstract class BatchScanExecTransformerBase( } } + protected[this] def supportsBatchScan(scan: Scan): Boolean + override def doValidateInternal(): ValidationResult = { - if (!ScanTransformerFactory.supportedBatchScan(scan)) { + if (!supportsBatchScan(scan)) { return ValidationResult.failed(s"Unsupported scan $scan") } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala index dfdf2d2f34e1..745c895688c9 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala @@ -18,9 +18,8 @@ package org.apache.gluten.execution import org.apache.gluten.sql.shims.SparkShimLoader -import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import java.util.ServiceLoader import java.util.concurrent.ConcurrentHashMap @@ -75,11 +74,6 @@ object ScanTransformerFactory { } } - def supportedBatchScan(scan: Scan): Boolean = scan match { - case _: FileScan => true - case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty - } - private def lookupDataSourceScanTransformer(scanClassName: String): Option[Class[_]] = { val clz = scanTransformerMap.computeIfAbsent( scanClassName, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index b0eeccef7a10..93c444fbd01b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -237,4 +237,40 @@ object Validators { out } } + + /** + * A standard validator for legacy planner that does native validation. + * + * The native validation is ordered in the latest validator, namely the one created by + * #fallbackByNativeValidation. The validator accepts offload rules for doing offload attempts, + * then call native validation code on the offloaded plan. + * + * Once the native validation fails, the validator then gives negative outcome. + */ + def newValidator(conf: GlutenConfig, offloads: Seq[OffloadSingleNode]): Validator = { + val nativeValidator = Validator.builder().fallbackByNativeValidation(offloads).build() + newValidator(conf).andThen(nativeValidator) + } + + /** + * A validator that doesn't involve native validation. + * + * This is typically RAS planner that does native validation inline without relying on tags. Thus, + * validator `#fallbackByNativeValidation` is not required. See + * [[org.apache.gluten.extension.columnar.enumerated.RasOffload]]. + * + * This could also be used in legacy planner for doing trivial offload without the help of rewrite + * rules. + */ + def newValidator(conf: GlutenConfig): Validator = { + Validator + .builder() + .fallbackByHint() + .fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly) + .fallbackComplexExpressions() + .fallbackByBackendSettings() + .fallbackByUserOptions() + .fallbackByTestInjects() + .build() + } } From efa80da8a609ba5c31126b1927a0ffc554a8b9c4 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 10:59:44 +0800 Subject: [PATCH 2/7] fixup --- backends-clickhouse/pom.xml | 5 ++ .../org.apache.gluten.component.Component | 1 + .../gluten/component/CHIcebergComponent.scala | 32 +++++++++++ backends-velox/pom.xml | 8 ++- .../component/VeloxIcebergComponent.scala | 39 +------------ .../gluten/execution/OffloadIcebergScan.scala | 57 +++++++++++++++++++ 6 files changed, 102 insertions(+), 40 deletions(-) create mode 100644 backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component create mode 100644 backends-clickhouse/src/main-iceberg/scala/org/apache/gluten/component/CHIcebergComponent.scala create mode 100644 gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 3a4a9422479b..2e7edf50d228 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -39,6 +39,11 @@ + + + ${project.basedir}/src/main-iceberg/resource + + org.codehaus.mojo diff --git a/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component b/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component new file mode 100644 index 000000000000..ad88c75a2cb0 --- /dev/null +++ b/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component @@ -0,0 +1 @@ +org.apache.gluten.component.CHIcebergComponent \ No newline at end of file diff --git a/backends-clickhouse/src/main-iceberg/scala/org/apache/gluten/component/CHIcebergComponent.scala b/backends-clickhouse/src/main-iceberg/scala/org/apache/gluten/component/CHIcebergComponent.scala new file mode 100644 index 000000000000..8ee694cefd4b --- /dev/null +++ b/backends-clickhouse/src/main-iceberg/scala/org/apache/gluten/component/CHIcebergComponent.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.component + +import org.apache.gluten.backendsapi.clickhouse.CHBackend +import org.apache.gluten.execution.OffloadIcebergScan +import org.apache.gluten.extension.injector.Injector + +class CHIcebergComponent extends Component { + override def name(): String = "clickhouse-iceberg" + override def buildInfo(): Component.BuildInfo = + Component.BuildInfo("ClickHouseIceberg", "N/A", "N/A", "N/A") + override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil + override def injectRules(injector: Injector): Unit = { + OffloadIcebergScan.inject(injector) + } +} diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 90b5b33334a5..f7607652df1a 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -61,6 +61,11 @@ + + + ${project.basedir}/src/main-iceberg/resource + + org.codehaus.mojo @@ -260,9 +265,6 @@ ${resource.dir} - - ${project.basedir}/src/main-iceberg/resource - target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala b/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala index 45a05e7db0f7..e29aaa656bdf 100644 --- a/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala +++ b/backends-velox/src/main-iceberg/scala/org/apache/gluten/component/VeloxIcebergComponent.scala @@ -17,50 +17,15 @@ package org.apache.gluten.component import org.apache.gluten.backendsapi.velox.VeloxBackend -import org.apache.gluten.execution.IcebergScanTransformer -import org.apache.gluten.extension.columnar.enumerated.RasOffload -import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform -import org.apache.gluten.extension.columnar.offload.OffloadSingleNode -import org.apache.gluten.extension.columnar.validator.Validators +import org.apache.gluten.execution.OffloadIcebergScan import org.apache.gluten.extension.injector.Injector -import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec - class VeloxIcebergComponent extends Component { - import VeloxIcebergComponent._ override def name(): String = "velox-iceberg" override def buildInfo(): Component.BuildInfo = Component.BuildInfo("VeloxIceberg", "N/A", "N/A", "N/A") override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil override def injectRules(injector: Injector): Unit = { - // Inject legacy rule. - injector.gluten.legacy.injectTransform { - c => - val offload = Seq(OffloadIcebergScan()) - HeuristicTransform.Simple( - Validators.newValidator(c.glutenConf, offload), - offload - ) - } - - // Inject RAS rule. - injector.gluten.ras.injectRasRule { - c => - RasOffload.Rule( - RasOffload.from[BatchScanExec](OffloadIcebergScan()), - Validators.newValidator(c.glutenConf), - Nil) - } - } -} - -object VeloxIcebergComponent { - private case class OffloadIcebergScan() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { - case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) => - IcebergScanTransformer(scan) - case other => other - } + OffloadIcebergScan.inject(injector) } } diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala new file mode 100644 index 000000000000..6747b79ffc2a --- /dev/null +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.extension.columnar.enumerated.RasOffload +import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform +import org.apache.gluten.extension.columnar.offload.OffloadSingleNode +import org.apache.gluten.extension.columnar.validator.Validators +import org.apache.gluten.extension.injector.Injector + +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec + +case class OffloadIcebergScan() extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = plan match { + case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) => + IcebergScanTransformer(scan) + case other => other + } +} + +object OffloadIcebergScan { + def inject(injector: Injector): Unit = { + // Inject legacy rule. + injector.gluten.legacy.injectTransform { + c => + val offload = Seq(OffloadIcebergScan()) + HeuristicTransform.Simple( + Validators.newValidator(c.glutenConf, offload), + offload + ) + } + + // Inject RAS rule. + injector.gluten.ras.injectRasRule { + c => + RasOffload.Rule( + RasOffload.from[BatchScanExec](OffloadIcebergScan()), + Validators.newValidator(c.glutenConf), + Nil) + } + } +} From 860b4518bce0ba04b4afc562d71951a4c55faa90 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 11:13:51 +0800 Subject: [PATCH 3/7] fixup --- .../META-INF/services/org.apache.gluten.component.Component | 2 +- .../META-INF/services/org.apache.gluten.component.Component | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component b/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component index ad88c75a2cb0..a13f6fa739e8 100644 --- a/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component +++ b/backends-clickhouse/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component @@ -1 +1 @@ -org.apache.gluten.component.CHIcebergComponent \ No newline at end of file +org.apache.gluten.component.CHIcebergComponent diff --git a/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component b/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component index 0f674cf783d8..e9e844c6bb47 100644 --- a/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component +++ b/backends-velox/src/main-iceberg/resource/META-INF/services/org.apache.gluten.component.Component @@ -1 +1 @@ -org.apache.gluten.component.VeloxIcebergComponent \ No newline at end of file +org.apache.gluten.component.VeloxIcebergComponent From 88522a1318fad36617f57e914ea12a91f08a13ee Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 11:14:54 +0800 Subject: [PATCH 4/7] fixup --- .../gluten/extension/columnar/validator/Validator.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala index 22a39a42804f..b6f1313ff502 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala @@ -70,15 +70,15 @@ object Validator { p.validators.flatMap(flatten) case other => Seq(other) } - - private object NoopValidator extends Validator { - override def validate(plan: SparkPlan): Validator.OutCome = pass() - } } private object Builder { def apply(): Builder = new Builder() + private object NoopValidator extends Validator { + override def validate(plan: SparkPlan): Validator.OutCome = pass() + } + private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator { assert(!validators.exists(_.isInstanceOf[ValidatorPipeline])) From 22f463e9faeaea1617560e709c8240b7e7ea22d4 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 11:54:21 +0800 Subject: [PATCH 5/7] fixup --- .../src/main/scala/org/apache/gluten/component/Component.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala index 6a3b74699b71..cba67853e792 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala @@ -180,6 +180,9 @@ object Component { dependencies.foreach { case (uid, dependencyCompClass) => + require( + registry.isClassRegistered(dependencyCompClass), + s"Dependency class not registered yet: ${dependencyCompClass.getName}") val dependencyUid = registry.findByClass(dependencyCompClass).uid require(uid != dependencyUid) require(lookup.contains(uid)) From 834084cd177d7230352d5927d845f564a0d5e339 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 13:40:50 +0800 Subject: [PATCH 6/7] fixup --- backends-clickhouse/pom.xml | 5 +++++ backends-velox/pom.xml | 3 +-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 2e7edf50d228..a6042fd1cce5 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -323,6 +323,11 @@ + + + ${project.basedir}/src/main/resources + + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index f7607652df1a..36755d7faa6f 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -14,7 +14,6 @@ Gluten Backends Velox - ${project.basedir}/src/main/resources ../cpp/build/ ${cpp.build.dir}/releases/ @@ -263,7 +262,7 @@ ${platform}/${arch} - ${resource.dir} + ${project.basedir}/src/main/resources target/scala-${scala.binary.version}/classes From 59c119fe3d3a762325820c32f1150712f891f2a1 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Tue, 10 Dec 2024 13:43:03 +0800 Subject: [PATCH 7/7] fixup --- .../src/main/scala/org/apache/gluten/component/Component.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala index cba67853e792..4a066e1484c8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/component/Component.scala @@ -32,7 +32,8 @@ import scala.collection.mutable * should be placed to Gluten's classpath with a Java service file. Gluten will discover all the * component implementations then register them at the booting time. * - * Experimental: This is not expected to be used in production yet. Use [[Backend]] instead. + * Experimental: This is not expected to be used in production yet. Use + * [[org.apache.gluten.backend.Backend]] instead. */ @Experimental trait Component {