diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java index 5b2b64909eafb..01f38cb3b90be 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java @@ -28,6 +28,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -78,6 +79,13 @@ private static Map getNativeBackendConf() { BackendsApiManager.getSettings().getBackendConfigPrefix(), SQLConf.get().getAllConfs()); } + public static void injectWriteFilesTempPath(String path, String fileName) { + ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath( + CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(), + path.getBytes(StandardCharsets.UTF_8), + fileName.getBytes(StandardCharsets.UTF_8)); + } + // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public static BatchIterator createKernelWithBatchIterator( diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java index 7c67f1008e383..a5a474d2a2521 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java @@ -42,4 +42,13 @@ public static native long nativeCreateKernelWithIterator( GeneralInIterator[] batchItr, byte[] confArray, boolean materializeInput); + + /** + * Set the temp path for writing files. + * + * @param allocatorId allocator id for current task attempt(or thread) + * @param path the temp path for writing files + */ + public static native void injectWriteFilesTempPath( + long allocatorId, byte[] path, byte[] filename); } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 341a3e0f0a52c..07129e69a9875 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -25,13 +25,11 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} @@ -286,13 +284,6 @@ object CHBackendSettings extends BackendSettingsApi with Logging { .getLong(GLUTEN_MAX_SHUFFLE_READ_BYTES, GLUTEN_MAX_SHUFFLE_READ_BYTES_DEFAULT) } - override def supportWriteFilesExec( - format: FileFormat, - fields: Array[StructField], - bucketSpec: Option[BucketSpec], - options: Map[String, String]): ValidationResult = - ValidationResult.failed("CH backend is unsupported.") - override def enableNativeWriteFiles(): Boolean = { GlutenConfig.getConf.enableNativeWriter.getOrElse(false) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 0c8495c6f67e7..7f775b0f5d4ce 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -290,6 +290,13 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { None, createNativeIterator(splitInfoByteArray, wsPlan, materializeInput, inputIterators)) } + + override def injectWriteFilesTempPath(path: String, fileName: Option[String]): Unit = { + if (fileName.isEmpty) { + throw new IllegalArgumentException("fileName should not be empty.") + } + CHNativeExpressionEvaluator.injectWriteFilesTempPath(path, fileName.get) + } } class CollectMetricIterator( diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 547458d8c2545..4d7d731b90265 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -49,7 +49,7 @@ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, WriteJobDescription} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -679,6 +679,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { throw new GlutenNotSupportException("ColumnarWriteFilesExec is not support in ch backend.") } + def createBackendWrite(description: WriteJobDescription): BackendWrite = + throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.") + override def createColumnarArrowEvalPythonExec( udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index e4efa22055fc8..14dfd3d9779de 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -494,8 +494,6 @@ object VeloxBackendSettings extends BackendSettingsApi { override def staticPartitionWriteOnly(): Boolean = true - override def supportTransformWriteFiles: Boolean = true - override def allowDecimalArithmetic: Boolean = true override def enableNativeWriteFiles(): Boolean = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 613e539456ecf..020d457ee6458 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -161,7 +161,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { (paths, starts, lengths, fileSizes, modificationTimes, partitionColumns, metadataColumns) } - override def injectWriteFilesTempPath(path: String): Unit = { + override def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit = { val transKernel = NativePlanEvaluator.create() transKernel.injectWriteFilesTempPath(path) } @@ -171,7 +171,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { inputPartition: BaseGlutenPartition, context: TaskContext, pipelineTime: SQLMetric, - updateInputMetrics: (InputMetricsWrapper) => Unit, + updateInputMetrics: InputMetricsWrapper => Unit, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq()): Iterator[ColumnarBatch] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 37b46df3e23d9..e5658978d20cb 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -50,7 +50,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BuildSideRelation, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.SQLMetric @@ -557,7 +557,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { bucketSpec: Option[BucketSpec], options: Map[String, String], staticPartitions: TablePartitionSpec): SparkPlan = { - VeloxColumnarWriteFilesExec( + GlutenColumnarWriteFilesExec( child, fileFormat, partitionColumns, @@ -566,6 +566,10 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi { staticPartitions) } + override def createBackendWrite(description: WriteJobDescription): BackendWrite = { + VeloxBackendWrite(description) + } + override def createColumnarArrowEvalPythonExec( udfs: Seq[PythonUDF], resultAttrs: Seq[Attribute], diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala new file mode 100644 index 0000000000000..4354b16d729b7 --- /dev/null +++ b/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxBackendWrite.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.gluten.backendsapi.BackendsApiManager +import org.apache.gluten.columnarbatch.ColumnarBatches +import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import scala.collection.mutable + +// Velox write files metrics start +// +// Follows the code in velox `HiveDataSink::close()` +// The json can be as following: +// { +// "inMemoryDataSizeInBytes":0, +// "containsNumberedFileNames":true, +// "onDiskDataSizeInBytes":307, +// "fileWriteInfos":[ +// { +// "fileSize":307, +// "writeFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", +// "targetFileName": +// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" +// } +// ], +// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "rowCount":1, +// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", +// "updateMode":"NEW", +// "name":"part1=1/part2=1" +// } +case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long) + +case class VeloxWriteFilesMetrics( + name: String, + updateMode: String, + writePath: String, + targetPath: String, + fileWriteInfos: Seq[VeloxWriteFilesInfo], + rowCount: Long, + inMemoryDataSizeInBytes: Long, + onDiskDataSizeInBytes: Long, + containsNumberedFileNames: Boolean) + +// Velox write files metrics end + +case class VeloxBackendWrite(description: WriteJobDescription) extends BackendWrite with Logging { + + override def injectTempPath(commitProtocol: SparkWriteFilesCommitProtocol): String = { + val writePath = commitProtocol.newTaskAttemptTempPath() + logDebug(s"Velox staging write path: $writePath") + BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath) + writePath + } + + override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = { + // Currently, the cb contains three columns: row, fragments, and context. + // The first row in the row column contains the number of written numRows. + // The fragments column contains detailed information about the file writes. + val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) + assert(loadedCb.numCols() == 3) + val numWrittenRows = loadedCb.column(0).getLong(0) + + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + var numBytes = 0L + val objectMapper = new ObjectMapper() + objectMapper.registerModule(DefaultScalaModule) + for (i <- 0 until loadedCb.numRows() - 1) { + val fragments = loadedCb.column(1).getUTF8String(i + 1) + val metrics = objectMapper + .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) + logDebug(s"Velox write files metrics: $metrics") + + val fileWriteInfos = metrics.fileWriteInfos + assert(fileWriteInfos.length == 1) + val fileWriteInfo = fileWriteInfos.head + numBytes += fileWriteInfo.fileSize + val targetFileName = fileWriteInfo.targetFileName + val outputPath = description.path + + // part1=1/part2=1 + val partitionFragment = metrics.name + // Write a partitioned table + if (partitionFragment != "") { + updatedPartitions += partitionFragment + val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName + val customOutputPath = description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partitionFragment)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName + } + } + } + + val numFiles = loadedCb.numRows() - 1 + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + val stats = BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = numFiles, + numBytes = numBytes, + numRows = numWrittenRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + // Write an empty iterator + if (numFiles == 0) { + None + } else { + Some( + WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary)) + } + } +} diff --git a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala index 731f5ef4845c1..576714c27a245 100644 --- a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala +++ b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteForHiveSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.util.QueryExecutionListener class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { - private var _spark: SparkSession = null + private var _spark: SparkSession = _ override protected def beforeAll(): Unit = { super.beforeAll() @@ -86,7 +86,7 @@ class VeloxParquetWriteForHiveSuite extends GlutenQueryTest with SQLTestUtils { override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = { if (!nativeUsed) { nativeUsed = if (isSparkVersionGE("3.4")) { - qe.executedPlan.find(_.isInstanceOf[VeloxColumnarWriteFilesExec]).isDefined + qe.executedPlan.find(_.isInstanceOf[GlutenColumnarWriteFilesExec]).isDefined } else { qe.executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined } diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index 8b4c18b01970d..358043cc5f6be 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -129,8 +129,6 @@ trait BackendSettingsApi { def staticPartitionWriteOnly(): Boolean = false - def supportTransformWriteFiles: Boolean = false - def requiredInputFilePaths(): Boolean = false // TODO: Move this to test settings as used in UT only. diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 53dc8f47861fb..45dce0c5c5fe2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -47,8 +47,13 @@ trait IteratorApi { /** * Inject the task attempt temporary path for native write files, this method should be called * before `genFirstStageIterator` or `genFinalStageIterator` + * @param path + * is the temporary directory for native write pipeline + * @param fileName + * is the file name for native write pipeline, if None, backend will generate it. */ - def injectWriteFilesTempPath(path: String): Unit = throw new UnsupportedOperationException() + def injectWriteFilesTempPath(path: String, fileName: Option[String] = None): Unit = + throw new UnsupportedOperationException() /** * Generate Iterator[ColumnarBatch] for first stage. ("first" means it does not depend on other @@ -58,7 +63,7 @@ trait IteratorApi { inputPartition: BaseGlutenPartition, context: TaskContext, pipelineTime: SQLMetric, - updateInputMetrics: (InputMetricsWrapper) => Unit, + updateInputMetrics: InputMetricsWrapper => Unit, updateNativeMetrics: IMetrics => Unit, partitionIndex: Int, inputIterators: Seq[Iterator[ColumnarBatch]] = Seq() diff --git a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 58a08192dec91..511d187fa5b83 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -39,8 +39,8 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.{FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan} -import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.{BackendWrite, FileSourceScanExec, GenerateExec, LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.datasources.{FileFormat, WriteJobDescription} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.BuildSideRelation @@ -390,6 +390,9 @@ trait SparkPlanExecApi { options: Map[String, String], staticPartitions: TablePartitionSpec): SparkPlan + /** Create BackendWrite */ + def createBackendWrite(description: WriteJobDescription): BackendWrite + /** Create ColumnarArrowEvalPythonExec, for velox backend */ def createColumnarArrowEvalPythonExec( udfs: Seq[PythonUDF], diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 903723ccb56b5..a85cb163ceaa7 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -141,8 +141,7 @@ object Validators { override def validate(plan: SparkPlan): Validator.OutCome = plan match { case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => fail(p) case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => fail(p) - case p: WriteFilesExec - if !(settings.enableNativeWriteFiles() && settings.supportTransformWriteFiles) => + case p: WriteFilesExec if !settings.enableNativeWriteFiles() => fail(p) case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg => fail(p) diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala similarity index 66% rename from backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala rename to gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala index c87b8d4f688d7..a86b8c5cba8e5 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/VeloxColumnarWriteFilesExec.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenColumnarWriteFilesExec.scala @@ -17,150 +17,48 @@ package org.apache.spark.sql.execution import org.apache.gluten.backendsapi.BackendsApiManager -import org.apache.gluten.columnarbatch.ColumnarBatches import org.apache.gluten.exception.GlutenException import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.spark.{Partition, SparkException, TaskContext, TaskOutputFileAlreadyExistException} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} -import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage import org.apache.spark.rdd.RDD import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericInternalRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet} import org.apache.spark.sql.connector.write.WriterCommitMessage import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils -import com.fasterxml.jackson.databind.ObjectMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.hadoop.fs.FileAlreadyExistsException import java.util.Date -import scala.collection.mutable - -// Velox write files metrics start -// -// Follows the code in velox `HiveDataSink::close()` -// The json can be as following: -// { -// "inMemoryDataSizeInBytes":0, -// "containsNumberedFileNames":true, -// "onDiskDataSizeInBytes":307, -// "fileWriteInfos":[ -// { -// "fileSize":307, -// "writeFileName": -// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet", -// "targetFileName": -// "Gluten_Stage_1_TID_2_0_2_d1db3b31-4f99-41cb-a4e7-3b8607506168.parquet" -// } -// ], -// "writePath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", -// "rowCount":1, -// "targetPath":"file:/home/gluten/spark-warehouse/inserttable/part1=1/part2=1", -// "updateMode":"NEW", -// "name":"part1=1/part2=1" -// } -case class VeloxWriteFilesInfo(writeFileName: String, targetFileName: String, fileSize: Long) - -case class VeloxWriteFilesMetrics( - name: String, - updateMode: String, - writePath: String, - targetPath: String, - fileWriteInfos: Seq[VeloxWriteFilesInfo], - rowCount: Long, - inMemoryDataSizeInBytes: Long, - onDiskDataSizeInBytes: Long, - containsNumberedFileNames: Boolean) - -// Velox write files metrics end +/** + * This trait is used in [[GlutenColumnarWriteFilesRDD]] to inject the staging write path before + * initializing the native plan and collect native write files metrics for each backend. + */ +trait BackendWrite { + def injectTempPath(committer: SparkWriteFilesCommitProtocol): String + def collectNativeWriteFilesMetrics(batch: ColumnarBatch): Option[WriteTaskResult] +} /** * This RDD is used to make sure we have injected staging write path before initializing the native * plan, and support Spark file commit protocol. */ -class VeloxColumnarWriteFilesRDD( +class GlutenColumnarWriteFilesRDD( var prev: RDD[ColumnarBatch], description: WriteJobDescription, committer: FileCommitProtocol, jobTrackerID: String) extends RDD[WriterCommitMessage](prev) { - private def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = { - // Currently, the cb contains three columns: row, fragments, and context. - // The first row in the row column contains the number of written numRows. - // The fragments column contains detailed information about the file writes. - val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb) - assert(loadedCb.numCols() == 3) - val numWrittenRows = loadedCb.column(0).getLong(0) - - var updatedPartitions = Set.empty[String] - val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() - var numBytes = 0L - val objectMapper = new ObjectMapper() - objectMapper.registerModule(DefaultScalaModule) - for (i <- 0 until loadedCb.numRows() - 1) { - val fragments = loadedCb.column(1).getUTF8String(i + 1) - val metrics = objectMapper - .readValue(fragments.toString.getBytes("UTF-8"), classOf[VeloxWriteFilesMetrics]) - logDebug(s"Velox write files metrics: $metrics") - - val fileWriteInfos = metrics.fileWriteInfos - assert(fileWriteInfos.length == 1) - val fileWriteInfo = fileWriteInfos.head - numBytes += fileWriteInfo.fileSize - val targetFileName = fileWriteInfo.targetFileName - val outputPath = description.path - - // part1=1/part2=1 - val partitionFragment = metrics.name - // Write a partitioned table - if (partitionFragment != "") { - updatedPartitions += partitionFragment - val tmpOutputPath = outputPath + "/" + partitionFragment + "/" + targetFileName - val customOutputPath = description.customPartitionLocations.get( - PartitioningUtils.parsePathFragment(partitionFragment)) - if (customOutputPath.isDefined) { - addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName - } - } - } - - val numFiles = loadedCb.numRows() - 1 - val partitionsInternalRows = updatedPartitions.map { - part => - val parts = new Array[Any](1) - parts(0) = part - new GenericInternalRow(parts) - }.toSeq - val stats = BasicWriteTaskStats( - partitions = partitionsInternalRows, - numFiles = numFiles, - numBytes = numBytes, - numRows = numWrittenRows) - val summary = - ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) - - // Write an empty iterator - if (numFiles == 0) { - None - } else { - Some( - WriteTaskResult( - new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), - summary)) - } - } - private def reportTaskMetrics(writeTaskResult: WriteTaskResult): Unit = { val stats = writeTaskResult.summary.stats.head.asInstanceOf[BasicWriteTaskStats] val (numBytes, numWrittenRows) = (stats.numBytes, stats.numRows) @@ -194,21 +92,21 @@ class VeloxColumnarWriteFilesRDD( override def compute(split: Partition, context: TaskContext): Iterator[WriterCommitMessage] = { val commitProtocol = new SparkWriteFilesCommitProtocol(jobTrackerID, description, committer) + val backendWrite = + BackendsApiManager.getSparkPlanExecApiInstance.createBackendWrite(description) commitProtocol.setupTask() - val writePath = commitProtocol.newTaskAttemptTempPath() - logDebug(s"Velox staging write path: $writePath") + val writePath = backendWrite.injectTempPath(commitProtocol) + var writeTaskResult: WriteTaskResult = null try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - BackendsApiManager.getIteratorApiInstance.injectWriteFilesTempPath(writePath) - // Initialize the native plan val iter = firstParent[ColumnarBatch].iterator(split, context) assert(iter.hasNext) val resultColumnarBatch = iter.next() assert(resultColumnarBatch != null) - val nativeWriteTaskResult = collectNativeWriteFilesMetrics(resultColumnarBatch) + val nativeWriteTaskResult = backendWrite.collectNativeWriteFilesMetrics(resultColumnarBatch) if (nativeWriteTaskResult.isEmpty) { // If we are writing an empty iterator, then velox would do nothing. // Here we fallback to use vanilla Spark write files to generate an empty file for @@ -255,7 +153,7 @@ class VeloxColumnarWriteFilesRDD( // we need to expose a dummy child (as right child) with type "WriteFilesExec" to let Spark // choose the new write code path (version >= 3.4). The actual plan to write is the left child // of this operator. -case class VeloxColumnarWriteFilesExec private ( +case class GlutenColumnarWriteFilesExec private ( override val left: SparkPlan, override val right: SparkPlan, fileFormat: FileFormat, @@ -265,7 +163,7 @@ case class VeloxColumnarWriteFilesExec private ( staticPartitions: TablePartitionSpec) extends BinaryExecNode with GlutenPlan - with VeloxColumnarWriteFilesExec.ExecuteWriteCompatible { + with GlutenColumnarWriteFilesExec.ExecuteWriteCompatible { val child: SparkPlan = left @@ -316,7 +214,7 @@ case class VeloxColumnarWriteFilesExec private ( // partition rdd to make sure we at least set up one write task to write the metadata. writeFilesForEmptyRDD(description, committer, jobTrackerID) } else { - new VeloxColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) + new GlutenColumnarWriteFilesRDD(rdd, description, committer, jobTrackerID) } } override protected def withNewChildrenInternal( @@ -325,7 +223,7 @@ case class VeloxColumnarWriteFilesExec private ( copy(newLeft, newRight, fileFormat, partitionColumns, bucketSpec, options, staticPartitions) } -object VeloxColumnarWriteFilesExec { +object GlutenColumnarWriteFilesExec { def apply( child: SparkPlan, @@ -333,7 +231,7 @@ object VeloxColumnarWriteFilesExec { partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], options: Map[String, String], - staticPartitions: TablePartitionSpec): VeloxColumnarWriteFilesExec = { + staticPartitions: TablePartitionSpec): GlutenColumnarWriteFilesExec = { // This is a workaround for FileFormatWriter#write. Vanilla Spark (version >= 3.4) requires for // a plan that has at least one node exactly of type `WriteFilesExec` that is a Scala // case-class, to decide to choose new `#executeWrite` code path over the legacy `#execute` @@ -352,7 +250,7 @@ object VeloxColumnarWriteFilesExec { options, staticPartitions) - VeloxColumnarWriteFilesExec( + GlutenColumnarWriteFilesExec( child, right, fileFormat, diff --git a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala similarity index 95% rename from backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala rename to gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala index 845f2f98fb8c8..6095c202684fe 100644 --- a/backends-velox/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala +++ b/gluten-core/src/main/scala/org/apache/spark/sql/execution/SparkWriteFilesCommitProtocol.scala @@ -41,9 +41,9 @@ class SparkWriteFilesCommitProtocol( extends Logging { assert(committer.isInstanceOf[HadoopMapReduceCommitProtocol]) - val sparkStageId = TaskContext.get().stageId() - val sparkPartitionId = TaskContext.get().partitionId() - val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue + val sparkStageId: Int = TaskContext.get().stageId() + val sparkPartitionId: Int = TaskContext.get().partitionId() + private val sparkAttemptNumber = TaskContext.get().taskAttemptId().toInt & Int.MaxValue private val jobId = createJobID(jobTrackerID, sparkStageId) private val taskId = new TaskID(jobId, TaskType.MAP, sparkPartitionId) diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala index 3d277b94cc3e7..14605c403f4c1 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/GlutenV1WriteCommandSuite.scala @@ -21,7 +21,7 @@ import org.apache.gluten.execution.SortExecTransformer import org.apache.spark.sql.GlutenSQLTestsBaseTrait import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, NullsFirst, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort} -import org.apache.spark.sql.execution.{QueryExecution, SortExec, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{GlutenColumnarWriteFilesExec, QueryExecution, SortExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{IntegerType, StringType} @@ -122,8 +122,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } @@ -204,8 +204,8 @@ class GlutenV1WriteCommandSuite val executedPlan = FileFormatWriter.executedPlan.get val plan = if (enabled) { - assert(executedPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - executedPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(executedPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + executedPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } else { executedPlan.transformDown { case a: AdaptiveSparkPlanExec => a.executedPlan } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala index ca4b3740a7bc8..6db1f66845acb 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.executor.OutputMetrics import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.{CommandResultExec, QueryExecution, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{CommandResultExec, GlutenColumnarWriteFilesExec, QueryExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.metric.SQLMetric @@ -60,13 +60,13 @@ class GlutenInsertSuite super.afterAll() } - private def checkAndGetWriteFiles(df: DataFrame): VeloxColumnarWriteFilesExec = { + private def checkAndGetWriteFiles(df: DataFrame): GlutenColumnarWriteFilesExec = { val writeFiles = stripAQEPlan( df.queryExecution.executedPlan .asInstanceOf[CommandResultExec] .commandPhysicalPlan).children.head - assert(writeFiles.isInstanceOf[VeloxColumnarWriteFilesExec]) - writeFiles.asInstanceOf[VeloxColumnarWriteFilesExec] + assert(writeFiles.isInstanceOf[GlutenColumnarWriteFilesExec]) + writeFiles.asInstanceOf[GlutenColumnarWriteFilesExec] } testGluten("insert partition table") { @@ -405,7 +405,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -425,7 +425,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -452,7 +452,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -474,7 +474,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -501,7 +501,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } @@ -571,7 +571,7 @@ class GlutenInsertSuite withTable("t") { sql(s"create table t(i boolean) using ${config.dataSource}") if (config.useDataFrames) { - Seq((false)).toDF.write.insertInto("t") + Seq(false).toDF.write.insertInto("t") } else { sql("insert into t select false") } diff --git a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala index c7ad606bcf8d0..28f315da49e75 100644 --- a/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala +++ b/gluten-ut/spark35/src/test/backends-velox/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -16,12 +16,12 @@ */ package org.apache.gluten -import org.apache.spark.sql.execution.{SparkPlan, VeloxColumnarWriteFilesExec} +import org.apache.spark.sql.execution.{SparkPlan, GlutenColumnarWriteFilesExec} trait GlutenColumnarWriteTestSupport { def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { - assert(sparkPlan.isInstanceOf[VeloxColumnarWriteFilesExec]) - sparkPlan.asInstanceOf[VeloxColumnarWriteFilesExec].child + assert(sparkPlan.isInstanceOf[GlutenColumnarWriteFilesExec]) + sparkPlan.asInstanceOf[GlutenColumnarWriteFilesExec].child } }