Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8356][CORE][VL][CH] Make Iceberg code implement component API #8192

Merged
merged 7 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main-iceberg/resource</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.component.CHIcebergComponent
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.component

import org.apache.gluten.backendsapi.clickhouse.CHBackend
import org.apache.gluten.execution.OffloadIcebergScan
import org.apache.gluten.extension.injector.Injector

class CHIcebergComponent extends Component {
override def name(): String = "clickhouse-iceberg"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("ClickHouseIceberg", "N/A", "N/A", "N/A")
Copy link
Contributor

@PHILO-HE PHILO-HE Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems buildInfo is not necessary for component except Backend. Maybe, just use it for Backend in future refactor.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, this part needs some cleanup. BuildInfo is not that general indeed. I'd like to see it totally decoupled with Component API somehow.

override def dependencies(): Seq[Class[_ <: Component]] = classOf[CHBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
OffloadIcebergScan.inject(injector)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu
import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers}
import org.apache.gluten.extension.columnar.rewrite._
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.parser.{GlutenCacheFilesSqlParser, GlutenClickhouseSqlParser}
Expand Down Expand Up @@ -84,20 +83,13 @@ object CHRuleApi {
// Legacy: The legacy transform rule.
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.fallbackByNativeValidation(offloads)
.build()
Validators.newValidator(conf, offloads)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectTransform(
c => intercept(HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads)))
c =>
intercept(
HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads)))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => PruneNestedColumnsInHiveTableScan)
Expand Down
5 changes: 5 additions & 0 deletions backends-velox/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>${project.basedir}/src/main-iceberg/resource</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.gluten.component.VeloxIcebergComponent
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
package org.apache.gluten.component
import org.apache.gluten.backendsapi.velox.VeloxBackend
import org.apache.gluten.execution.OffloadIcebergScan
import org.apache.gluten.extension.injector.Injector

class IcebergTransformerProvider extends DataSourceScanTransformerRegister {

override val scanClassName: String = "org.apache.iceberg.spark.source.SparkBatchQueryScan"

override def createDataSourceV2Transformer(
batchScan: BatchScanExec): BatchScanExecTransformerBase = {
IcebergScanTransformer(batchScan)
class VeloxIcebergComponent extends Component {
override def name(): String = "velox-iceberg"
override def buildInfo(): Component.BuildInfo =
Component.BuildInfo("VeloxIceberg", "N/A", "N/A", "N/A")
override def dependencies(): Seq[Class[_ <: Component]] = classOf[VeloxBackend] :: Nil
override def injectRules(injector: Injector): Unit = {
OffloadIcebergScan.inject(injector)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, Heu
import org.apache.gluten.extension.columnar.offload.{OffloadExchange, OffloadJoin, OffloadOthers}
import org.apache.gluten.extension.columnar.rewrite._
import org.apache.gluten.extension.columnar.transition.{InsertTransitions, RemoveTransitions}
import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.extension.columnar.validator.Validators.ValidatorBuilderImplicits
import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.extension.injector.{Injector, SparkInjector}
import org.apache.gluten.extension.injector.GlutenInjector.{LegacyInjector, RasInjector}
import org.apache.gluten.sql.shims.SparkShimLoader
Expand Down Expand Up @@ -76,20 +75,11 @@ object VeloxRuleApi {
// Legacy: The legacy transform rule.
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.fallbackByNativeValidation(offloads)
.build()
Validators.newValidator(conf, offloads)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectTransform(
c => HeuristicTransform.Single(validatorBuilder(c.glutenConf), rewrites, offloads))
c => HeuristicTransform.WithRewrites(validatorBuilder(c.glutenConf), rewrites, offloads))

// Legacy: Post-transform rules.
injector.injectPostTransform(_ => UnionTransformerRule())
Expand Down Expand Up @@ -132,16 +122,7 @@ object VeloxRuleApi {
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))

// Gluten RAS: The RAS rule.
val validatorBuilder: GlutenConfig => Validator = conf =>
Validator
.builder()
.fallbackByHint()
.fallbackIfScanOnlyWithFilterPushed(conf.enableScanOnly)
.fallbackComplexExpressions()
.fallbackByBackendSettings()
.fallbackByUserOptions()
.fallbackByTestInjects()
.build()
val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf)
val rewrites =
Seq(RewriteIn, RewriteMultiChildrenCount, RewriteJoin, PullOutPreProject, PullOutPostProject)
injector.injectCoster(_ => LegacyCoster)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.gluten.extension.injector.Injector
import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.gluten.logging.LogLevelUtil

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
Expand Down Expand Up @@ -64,7 +65,39 @@ object HeuristicTransform {
new HeuristicTransform(all)
}

case class Single(
/**
* A simple heuristic transform rule with a validator and some offload rules.
*
* Validator will be called before applying the offload rules.
*/
case class Simple(validator: Validator, offloadRules: Seq[OffloadSingleNode])
extends Rule[SparkPlan]
with Logging {
override def apply(plan: SparkPlan): SparkPlan = {
offloadRules.foldLeft(plan) {
case (p, rule) =>
p.transformUp {
node =>
validator.validate(node) match {
case Validator.Passed =>
rule.offload(node)
case Validator.Failed(reason) =>
logDebug(s"Validation failed by reason: $reason on query plan: ${node.nodeName}")
node
}
}
}
}
}

/**
* A heuristic transform rule with given rewrite rules. Fallback tags will be used in the
* procedure to determine which part of the plan is or is not eligible to be offloaded. The tags
* should also be correctly handled in the offload rules.
*
* TODO: Handle tags internally. Remove tag handling code in user offload rules.
*/
case class WithRewrites(
validator: Validator,
rewriteRules: Seq[RewriteSingleNode],
offloadRules: Seq[OffloadSingleNode])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ object Validator {
def builder(): Builder = Builder()

class Builder private {
import Builder._
private val buffer: ListBuffer[Validator] = mutable.ListBuffer()

/** Add a custom validator to pipeline. */
Expand All @@ -69,6 +70,14 @@ object Validator {
p.validators.flatMap(flatten)
case other => Seq(other)
}
}

private object Builder {
def apply(): Builder = new Builder()

private object NoopValidator extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = pass()
}

private class ValidatorPipeline(val validators: Seq[Validator]) extends Validator {
assert(!validators.exists(_.isInstanceOf[ValidatorPipeline]))
Expand All @@ -85,14 +94,6 @@ object Validator {
finalOut
}
}

private object NoopValidator extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = pass()
}
}

private object Builder {
def apply(): Builder = new Builder()
}

implicit class ValidatorImplicits(v: Validator) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ case class IcebergScanTransformer(
commonPartitionValues = commonPartitionValues
) {

protected[this] def supportsBatchScan(scan: Scan): Boolean = {
IcebergScanTransformer.supportsBatchScan(scan)
}

override def filterExprs(): Seq[Expression] = pushdownFilters.getOrElse(Seq.empty)

override lazy val getPartitionSchema: StructType =
Expand Down Expand Up @@ -94,4 +98,8 @@ object IcebergScanTransformer {
commonPartitionValues = SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan)
)
}

def supportsBatchScan(scan: Scan): Boolean = {
scan.getClass.getName == "org.apache.iceberg.spark.source.SparkBatchQueryScan"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just reference this class to get its name for comparison?

Copy link
Member Author

@zhztheplayer zhztheplayer Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkBatchQueryScan is package private.

cc @liujiayi771 if there are some better practices for this.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector

import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

case class OffloadIcebergScan() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) =>
IcebergScanTransformer(scan)
case other => other
}
}

object OffloadIcebergScan {
def inject(injector: Injector): Unit = {
// Inject legacy rule.
injector.gluten.legacy.injectTransform {
c =>
val offload = Seq(OffloadIcebergScan())
HeuristicTransform.Simple(
Validators.newValidator(c.glutenConf, offload),
offload
)
}

// Inject RAS rule.
injector.gluten.ras.injectRasRule {
c =>
RasOffload.Rule(
RasOffload.from[BatchScanExec](OffloadIcebergScan()),
Validators.newValidator(c.glutenConf),
Nil)
}
Comment on lines +39 to +55
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code can be simplified in future PRs.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ case class BatchScanExecTransformer(
applyPartialClustering,
replicatePartitions) {

protected[this] def supportsBatchScan(scan: Scan): Boolean = {
scan.isInstanceOf[FileScan]
}

override def doCanonicalize(): BatchScanExecTransformer = {
this.copy(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
Expand Down Expand Up @@ -134,8 +138,10 @@ abstract class BatchScanExecTransformerBase(
}
}

protected[this] def supportsBatchScan(scan: Scan): Boolean

override def doValidateInternal(): ValidationResult = {
if (!ScanTransformerFactory.supportedBatchScan(scan)) {
if (!supportsBatchScan(scan)) {
return ValidationResult.failed(s"Unsupported scan $scan")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ package org.apache.gluten.execution

import org.apache.gluten.sql.shims.SparkShimLoader

import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec

import java.util.ServiceLoader
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -75,11 +74,6 @@ object ScanTransformerFactory {
}
}

def supportedBatchScan(scan: Scan): Boolean = scan match {
case _: FileScan => true
case _ => lookupDataSourceScanTransformer(scan.getClass.getName).nonEmpty
}

private def lookupDataSourceScanTransformer(scanClassName: String): Option[Class[_]] = {
val clz = scanTransformerMap.computeIfAbsent(
scanClassName,
Expand Down
Loading
Loading