Skip to content

Commit

Permalink
Rename GlutenColumnarWriteFilesExec to ColumnarWriteFilesExec and Glu…
Browse files Browse the repository at this point in the history
…tenColumnarWriteFilesRDD to ColumnarWriteFilesRDD
  • Loading branch information
baibaichen committed Jul 19, 2024
1 parent 197ddcd commit cd7cdd0
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
if (!nativeUsed) {
nativeUsed = if (isSparkVersionGE("3.4")) {
qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined
qe.executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
} else {
qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, GlutenColumnarWriteFilesExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.{BackendWrite, ColumnarWriteFilesExec, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down Expand Up @@ -389,7 +389,7 @@ trait SparkPlanExecApi {
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan = {
GlutenColumnarWriteFilesExec(
ColumnarWriteFilesExec(
child,
fileFormat,
partitionColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.hadoop.fs.FileAlreadyExistsException
import java.util.Date

/**
* This trait is used in [[GlutenColumnarWriteFilesRDD]] to inject the staging write path before
* This trait is used in [[ColumnarWriteFilesRDD]] to inject the staging write path before
* initializing the native plan and collect native write files metrics for each backend.
*/
trait BackendWrite {
Expand All @@ -51,7 +51,7 @@ trait BackendWrite {
* This RDD is used to make sure we have injected staging write path before initializing the native
* plan, and support Spark file commit protocol.
*/
class GlutenColumnarWriteFilesRDD(
class ColumnarWriteFilesRDD(
var prev: RDD[ColumnarBatch],
description: WriteJobDescription,
committer: FileCommitProtocol,
Expand Down Expand Up @@ -156,7 +156,7 @@ class GlutenColumnarWriteFilesRDD(
// we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark
// choose the new write code path (version >= 3.4). The actual plan to write is the left child
// of this operator.
case class GlutenColumnarWriteFilesExec private (
case class ColumnarWriteFilesExec private (
override val left: SparkPlan,
override val right: SparkPlan,
fileFormat: FileFormat,
Expand All @@ -166,7 +166,7 @@ case class GlutenColumnarWriteFilesExec private (
staticPartitions: TablePartitionSpec)
extends BinaryExecNode
with GlutenPlan
with GlutenColumnarWriteFilesExec.ExecuteWriteCompatible {
with ColumnarWriteFilesExec.ExecuteWriteCompatible {

val child: SparkPlan = left

Expand Down Expand Up @@ -217,7 +217,7 @@ case class GlutenColumnarWriteFilesExec private (
// partition rdd to make sure we at least set up one write task to write the metadata.
writeFilesForEmptyRDD(description, committer, jobTrackerID)
} else {
new GlutenColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
new ColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID)
}
}
override protected def withNewChildrenInternal(
Expand All @@ -226,15 +226,15 @@ case class GlutenColumnarWriteFilesExec private (
copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions)
}

object GlutenColumnarWriteFilesExec {
object ColumnarWriteFilesExec {

def apply(
child: SparkPlan,
fileFormat: FileFormat,
partitionColumns: Seq[Attribute],
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): GlutenColumnarWriteFilesExec = {
staticPartitions: TablePartitionSpec): ColumnarWriteFilesExec = {
// This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for
// a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala
// case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute`
Expand All @@ -253,7 +253,7 @@ object GlutenColumnarWriteFilesExec {
options,
staticPartitions)

GlutenColumnarWriteFilesExec(
ColumnarWriteFilesExec(
child,
right,
fileFormat,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer
import org.apache.spark.sql.GlutenSQLTestsBaseTrait
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.{GlutenColumnarWriteFilesExec, QueryExecution, SortExec}
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, QueryExecution, SortExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StringType}
Expand Down Expand Up @@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec])
executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child
assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down Expand Up @@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite
val executedPlan = FileFormatWriter.executedPlan.get

val plan = if (enabled) {
assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec])
executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child
assert(executedPlan.isInstanceOf[ColumnarWriteFilesExec])
executedPlan.asInstanceOf[ColumnarWriteFilesExec].child
} else {
executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.{CommandResultExec, GlutenColumnarWriteFilesExec, QueryExecution}
import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, CommandResultExec, QueryExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -60,13 +60,13 @@ class GlutenInsertSuite
super.afterAll()
}

private def checkAndGetWriteFiles(df: DataFrame): GlutenColumnarWriteFilesExec = {
private def checkAndGetWriteFiles(df: DataFrame): ColumnarWriteFilesExec = {
val writeFiles = stripAQEPlan(
df.queryExecution.executedPlan
.asInstanceOf[CommandResultExec]
.commandPhysicalPlan).children.head
assert(writeFiles.isInstanceOf[GlutenColumnarWriteFilesExec])
writeFiles.asInstanceOf[GlutenColumnarWriteFilesExec]
assert(writeFiles.isInstanceOf[ColumnarWriteFilesExec])
writeFiles.asInstanceOf[ColumnarWriteFilesExec]
}

testGluten("insert partition table") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package org.apache.gluten

import org.apache.spark.sql.execution.{SparkPlan, GlutenColumnarWriteFilesExec}
import org.apache.spark.sql.execution.{SparkPlan, ColumnarWriteFilesExec}

trait GlutenColumnarWriteTestSupport {

def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = {
assert(sparkPlan.isInstanceOf[GlutenColumnarWriteFilesExec])
sparkPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child
assert(sparkPlan.isInstanceOf[ColumnarWriteFilesExec])
sparkPlan.asInstanceOf[ColumnarWriteFilesExec].child
}
}

0 comments on commit cd7cdd0

Please sign in to comment.