Skip to content

Commit

Permalink
Support Native Write
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Jul 25, 2024
1 parent 5e9b3ec commit f78ffb3
Show file tree
Hide file tree
Showing 28 changed files with 1,942 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class OperatorMetrics implements IOperatorMetrics {
public JoinParams joinParams;
public AggregationParams aggParams;

public long physicalWrittenBytes;
public long numWrittenFiles;

/** Create an instance for operator metrics. */
public OperatorMetrics(
List<MetricsData> metricsList, JoinParams joinParams, AggregationParams aggParams) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.sql.internal.SQLConf;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -79,8 +80,10 @@ private static Map<String, String> getNativeBackendConf() {
}

public static void injectWriteFilesTempPath(String path, String fileName) {
throw new UnsupportedOperationException(
"injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator");
ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath(
CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(),
path.getBytes(StandardCharsets.UTF_8),
fileName.getBytes(StandardCharsets.UTF_8));
}

// Used by WholeStageTransform to create the native computing pipeline and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,13 @@ public static native long nativeCreateKernelWithIterator(
GeneralInIterator[] batchItr,
byte[] confArray,
boolean materializeInput);

/**
* Set the temp path for writing files.
*
* @param allocatorId allocator id for current task attempt(or thread)
* @param path the temp path for writing files
*/
public static native void injectWriteFilesTempPath(
long allocatorId, byte[] path, byte[] filename);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,25 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig}
import org.apache.gluten.backendsapi._
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, StructField, StructType}

import java.util.Locale

Expand Down Expand Up @@ -187,6 +192,73 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}

override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult = {

def validateCompressionCodec(): Option[String] = {
// FIXME: verify Support compression codec
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
None
}

def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None
case _: OrcFileFormat => None
case f: FileFormat => Some(s"Not support FileFormat: ${f.getClass.getSimpleName}")
}
}

// Validate if all types are supported.
def validateDateTypes(): Option[String] = {
None
}

def validateFieldMetadata(): Option[String] = {
// copy CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY
val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING"
fields
.find(_.metadata != Metadata.empty)
.filterNot(_.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY))
.map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}
def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
.map(_.toLong)
.getOrElse(SQLConf.get.maxRecordsPerFile)
if (maxRecordsPerFile > 0) {
Some("Unsupported native write: maxRecordsPerFile not supported.")
} else {
None
}
}

def validateBucketSpec(): Option[String] = {
if (bucketSpec.nonEmpty) {
Some("Unsupported native write: bucket write is not supported.")
} else {
None
}
}

validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDateTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}

override def supportShuffleWithProject(
outputPartitioning: Partitioning,
child: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
s"SampleTransformer metrics update is not supported in CH backend")
}

def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
}
def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
"numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
)

def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = {
throw new UnsupportedOperationException(
s"WriteFilesTransformer metrics update is not supported in CH backend")
new WriteFilesMetricsUpdater(metrics)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHRegExpReplaceTransformer(substraitExprName, children, expr)
}

def createBackendWrite(description: WriteJobDescription): BackendWrite =
throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.")
def createBackendWrite(description: WriteJobDescription): BackendWrite = ClickhouseBackendWrite(
description)

override def createColumnarArrowEvalPythonExec(
udfs: Seq[PythonUDF],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
metrics("physicalWrittenBytes") += operatorMetrics.physicalWrittenBytes
metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.internal.Logging
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.vectorized.ColumnarBatch

import scala.collection.mutable

case class ClickhouseBackendWrite(description: WriteJobDescription)
extends BackendWrite
with Logging {

override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = {
val numFiles = cb.numRows()
// Write an empty iterator
if (numFiles == 0) {
None
} else {
val file_col = cb.column(0)
val partition_col = cb.column(1)
val count_col = cb.column(2)

val outputPath = description.path
var updatedPartitions = Set.empty[String]
val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]()

val write_stats = Range(0, cb.numRows()).map {
i =>
val targetFileName = file_col.getUTF8String(i).toString
val partition = partition_col.getUTF8String(i).toString
if (partition != "__NO_PARTITION_ID__") {
updatedPartitions += partition
val tmpOutputPath = outputPath + "/" + partition + "/" + targetFileName
val customOutputPath =
description.customPartitionLocations.get(
PartitioningUtils.parsePathFragment(partition))
if (customOutputPath.isDefined) {
addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName
}
}
count_col.getLong(i)
}

val partitionsInternalRows = updatedPartitions.map {
part =>
val parts = new Array[Any](1)
parts(0) = part
new GenericInternalRow(parts)
}.toSeq

val numWrittenRows = write_stats.sum
val stats = BasicWriteTaskStats(
partitions = partitionsInternalRows,
numFiles = numFiles,
numBytes = 101,
numRows = numWrittenRows)
val summary =
ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats))

Some(
WriteTaskResult(
new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions),
summary))
}
}
}
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Common/CHUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ namespace local_engine
static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage";
static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert";
static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss";
static const std::string SPARK_TASK_WRITE_TMEP_DIR = "gluten.write.temp.dir";
static const std::string SPARK_TASK_WRITE_FILENAME = "gluten.write.file.name";

static const std::unordered_set<String> BOOL_VALUE_SETTINGS{
MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS};
Expand Down
21 changes: 15 additions & 6 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include <Parser/RelParser.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parser/TypeParser.h>
#include <Parser/WriteRelParser.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ExpressionListParsers.h>
#include <Processors/Formats/Impl/ArrowBlockOutputFormat.h>
Expand Down Expand Up @@ -423,12 +424,13 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan)
if (!root_rel.has_root())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!");

if (root_rel.root().input().has_write())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "write pipeline is not supported yet!");
const bool writePipeline = root_rel.root().input().has_write();
const substrait::Rel & first_read_rel = writePipeline ? root_rel.root().input().write().input() : root_rel.root().input();

std::list<const substrait::Rel *> rel_stack;
auto query_plan = parseOp(root_rel.root().input(), rel_stack);
adjustOutput(query_plan, root_rel);
auto query_plan = parseOp(first_read_rel, rel_stack);
if (!writePipeline)
adjustOutput(query_plan, root_rel);

#ifndef NDEBUG
PlanUtil::checkOuputType(*query_plan);
Expand Down Expand Up @@ -1339,9 +1341,16 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
Stopwatch stopwatch;

const Settings & settings = context->getSettingsRef();
auto pipeline_builder = buildQueryPipeline(*query_plan);
auto builder = buildQueryPipeline(*query_plan);

QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder));
///
assert(s_plan.relations_size() == 1);
const substrait::PlanRel & root_rel = s_plan.relations().at(0);
assert(root_rel.has_root());
if (root_rel.root().input().has_write())
addSinkTransfrom(context, root_rel.root().input().write(), builder);
///
QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));

auto * logger = &Poco::Logger::get("SerializedPlanParser");
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
Expand Down
Loading

0 comments on commit f78ffb3

Please sign in to comment.