Skip to content

Commit

Permalink
GlutenColumnarWriteFilesExec
Browse files Browse the repository at this point in the history
injectWriteFilesTempPath with optional fileName
remove supportTransformWriteFiles
  • Loading branch information
baibaichen committed Jul 17, 2024
1 parent 4e1fede commit 04ca20e
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -78,6 +79,13 @@ private static Map<String, String> getNativeBackendConf() {
BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs());
}

public static void injectWriteFilesTempPath(String path, String fileName) {
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
// return a columnar result iterator.
public static BatchIterator createKernelWithBatchIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public static native long nativeCreateKernelWithIterator(
GeneralInIterator[] batchItr,
byte[] confArray,
boolean materializeInput);

/**
* Set the temp path for writing files.
*
* @param allocatorId allocator id for current task attempt(or thread)
* @param path the temp path for writing files
*/
public static native void injectWriteFilesTempPath(
long allocatorId, byte[] path, byte[] filename);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}

Expand Down Expand Up @@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT)
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult =
ValidationResult.failed("CH backend is unsupported.")

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
None,
createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators))
}

override def injectWriteFilesTempPath(path: String, fileName: Option[String]): Unit = {
if (fileName.isEmpty) {
throw new IllegalArgumentException("fileName should not be empty.")
}
CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName.get)
}
}

class CollectMetricIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
Expand Down Expand Up @@ -679,6 +679,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.")
}

def createBackendWrite(description: WriteJobDescription): BackendWrite =
throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.")

override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,6 @@ object VeloxBackendSettings extends BackendSettingsApi {

override def staticPartitionWriteOnly(): Boolean = true

override def supportTransformWriteFiles: Boolean = true

override def allowDecimalArithmetic: Boolean = true

override def enableNativeWriteFiles(): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
(paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns)
}

override def injectWriteFilesTempPath(path: String): Unit = {
override def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit = {
val transKernel = NativePlanEvaluator.create()
transKernel.injectWriteFilesTempPath(path)
}
Expand All @@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetric
Expand Down Expand Up @@ -557,7 +557,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
bucketSpec: Option[BucketSpec],
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan = {
VeloxColumnarWriteFilesExec(
GlutenColumnarWriteFilesExec(
child,
fileFormat,
partitionColumns,
Expand All @@ -566,6 +566,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
staticPartitions)
}

override def createBackendWrite(description: WriteJobDescription): BackendWrite = {
VeloxBackendWrite(description)
}

override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
resultAttrs: Seq[Attribute],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.vectorized.ColumnarBatch

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import scala.collection.mutable

// Velox write files metrics start
//
// Follows the code in velox `HiveDataSink::close()`
// The json can be as following:
// {
// "inMemoryDataSizeInBytes":0,
// "containsNumberedFileNames":true,
// "onDiskDataSizeInBytes":307,
// "fileWriteInfos":[
// {
// "fileSize":307,
// "writeFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet",
// "targetFileName":
// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet"
// }
// ],
// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "rowCount":1,
// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1",
// "updateMode":"NEW",
// "name":"part1=1/part2=1"
// }
case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long)

case class VeloxWriteFilesMetrics(
name: String,
updateMode: String,
writePath: String,
targetPath: String,
fileWriteInfos: Seq[VeloxWriteFilesInfo],
rowCount: Long,
inMemoryDataSizeInBytes: Long,
onDiskDataSizeInBytes: Long,
containsNumberedFileNames: Boolean)

// Velox write files metrics end

case class VeloxBackendWrite(description: WriteJobDescription) extends BackendWrite with Logging {

override def injectTempPath(commitProtocol: SparkWriteFilesCommitProtocol): String = {
val writePath = commitProtocol.newTaskAttemptTempPath()
logDebug(s"Velox staging write path: $writePath")
BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath)
writePath
}

override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = {
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()
var numBytes = 0L
val objectMapper = new ObjectMapper()
objectMapper.registerModule(DefaultScalaModule)
for (i <- 0 until loadedCb.numRows() - 1) {
val fragments = loadedCb.column(1).getUTF8String(i + 1)
val metrics = objectMapper
.readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics])
logDebug(s"Velox write files metrics: $metrics")

val fileWriteInfos = metrics.fileWriteInfos
assert(fileWriteInfos.length == 1)
val fileWriteInfo = fileWriteInfos.head
numBytes += fileWriteInfo.fileSize
val targetFileName = fileWriteInfo.targetFileName
val outputPath = description.path

// part1=1/part2=1
val partitionFragment = metrics.name
// Write a partitioned table
if (partitionFragment != "") {
updatedPartitions += partitionFragment
val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName
val customOutputPath = description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partitionFragment))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
}

val numFiles = loadedCb.numRows() - 1
val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = numFiles,
numBytes = numBytes,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

// Write an empty iterator
if (numFiles == 0) {
None
} else {
Some(
WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.util.QueryExecutionListener

class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
private var _spark: SparkSession = null
private var _spark: SparkSession = _

override protected def beforeAll(): Unit = {
super.beforeAll()
Expand Down Expand Up @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils {
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
if (!nativeUsed) {
nativeUsed = if (isSparkVersionGE("3.4")) {
qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined
qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined
} else {
qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,6 @@ trait BackendSettingsApi {

def staticPartitionWriteOnly(): Boolean = false

def supportTransformWriteFiles: Boolean = false

def requiredInputFilePaths(): Boolean = false

// TODO: Move this to test settings as used in UT only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ trait IteratorApi {
/**
* Inject the task attempt temporary path for native write files, this method should be called
* before `genFirstStageIterator` or `genFinalStageIterator`
* @param path
* is the temporary directory for native write pipeline
* @param fileName
* is the file name for native write pipeline, if None, backend will generate it.
*/
def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException()
def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit =
throw new UnsupportedOperationException()

/**
* Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other
Expand All @@ -58,7 +63,7 @@ trait IteratorApi {
inputPartition: BaseGlutenPartition,
context: TaskContext,
pipelineTime: SQLMetric,
updateInputMetrics: (InputMetricsWrapper) => Unit,
updateInputMetrics: InputMetricsWrapper => Unit,
updateNativeMetrics: IMetrics => Unit,
partitionIndex: Int,
inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
Expand Down Expand Up @@ -390,6 +390,9 @@ trait SparkPlanExecApi {
options: Map[String, String],
staticPartitions: TablePartitionSpec): SparkPlan

/** Create BackendWrite */
def createBackendWrite(description: WriteJobDescription): BackendWrite

/** Create ColumnarArrowEvalPythonExec, for velox backend */
def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ object Validators {
override def validate(plan: SparkPlan): Validator.OutCome = plan match {
case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => fail(p)
case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p)
case p: WriteFilesExec
if !(settings.enableNativeWriteFiles() && settings.supportTransformWriteFiles) =>
case p: WriteFilesExec if !settings.enableNativeWriteFiles() =>
fail(p)
case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
fail(p)
Expand Down
Loading

0 comments on commit 04ca20e

Please sign in to comment.