From f78ffb3de2035e10ae82326b28623ca350c400ae Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Thu, 25 Jul 2024 20:31:04 +0800 Subject: [PATCH] Support Native Write --- .../gluten/metrics/OperatorMetrics.java | 3 + .../CHNativeExpressionEvaluator.java | 7 +- .../ExpressionEvaluatorJniWrapper.java | 9 + .../backendsapi/clickhouse/CHBackend.scala | 74 ++++- .../backendsapi/clickhouse/CHMetricsApi.scala | 12 +- .../clickhouse/CHSparkPlanExecApi.scala | 4 +- .../metrics/WriteFilesMetricsUpdater.scala | 30 ++ .../execution/ClickhouseBackendWrite.scala | 84 ++++++ cpp-ch/local-engine/Common/CHUtil.h | 2 + .../Parser/SerializedPlanParser.cpp | 21 +- cpp-ch/local-engine/Parser/WriteRelParser.cpp | 221 ++++++++++++++ cpp-ch/local-engine/Parser/WriteRelParser.h | 46 +++ .../Storages/Output/FileWriterWrappers.cpp | 3 + .../Storages/Output/FileWriterWrappers.h | 218 ++++++++++++++ cpp-ch/local-engine/local_engine_jni.cpp | 17 ++ .../tests/benchmark_local_engine.cpp | 5 +- .../local-engine/tests/gluten_test_util.cpp | 1 + cpp-ch/local-engine/tests/gluten_test_util.h | 78 +---- ...881.cpp => gtest_clickhouse_pr_verify.cpp} | 20 +- .../local-engine/tests/gtest_local_engine.cpp | 3 +- .../tests/gtest_parquet_columnindex.cpp | 7 +- .../local-engine/tests/gtest_parquet_read.cpp | 7 +- cpp-ch/local-engine/tests/gtest_parser.cpp | 183 ++++------- .../tests/gtest_write_pipeline.cpp | 253 ++++++++++++++++ .../json/native_write_one_partition.json | 283 ++++++++++++++++++ .../tests/json/native_write_plan.json | 203 +++++++++++++ .../json/native_write_plan_1_spark33.json | 116 +++++++ .../json/native_write_plan_1_spark35.json | 246 +++++++++++++++ 28 files changed, 1942 insertions(+), 214 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala create mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala create mode 100644 cpp-ch/local-engine/Parser/WriteRelParser.cpp create mode 100644 cpp-ch/local-engine/Parser/WriteRelParser.h rename cpp-ch/local-engine/tests/{gtest_clickhouse_54881.cpp => gtest_clickhouse_pr_verify.cpp} (74%) create mode 100644 cpp-ch/local-engine/tests/gtest_write_pipeline.cpp create mode 100644 cpp-ch/local-engine/tests/json/native_write_one_partition.json create mode 100644 cpp-ch/local-engine/tests/json/native_write_plan.json create mode 100644 cpp-ch/local-engine/tests/json/native_write_plan_1_spark33.json create mode 100644 cpp-ch/local-engine/tests/json/native_write_plan_1_spark35.json diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index 8dcb0ef74b13..53c822b416ce 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -27,6 +27,9 @@ public class OperatorMetrics implements IOperatorMetrics { public JoinParams joinParams; public AggregationParams aggParams; + public long physicalWrittenBytes; + public long numWrittenFiles; + /** Create an instance for operator metrics. */ public OperatorMetrics( List metricsList, JoinParams joinParams, AggregationParams aggParams) { diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java index c92e33ca8605..01f38cb3b90b 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHNativeExpressionEvaluator.java @@ -28,6 +28,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -79,8 +80,10 @@ private static Map getNativeBackendConf() { } public static void injectWriteFilesTempPath(String path, String fileName) { - throw new UnsupportedOperationException( - "injectWriteFilesTempPath Not supported in CHNativeExpressionEvaluator"); + ExpressionEvaluatorJniWrapper.injectWriteFilesTempPath( + CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(), + path.getBytes(StandardCharsets.UTF_8), + fileName.getBytes(StandardCharsets.UTF_8)); } // Used by WholeStageTransform to create the native computing pipeline and diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java index 7c67f1008e38..a5a474d2a252 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/ExpressionEvaluatorJniWrapper.java @@ -42,4 +42,13 @@ public static native long nativeCreateKernelWithIterator( GeneralInIterator[] batchItr, byte[] confArray, boolean materializeInput); + + /** + * Set the temp path for writing files. + * + * @param allocatorId allocator id for current task attempt(or thread) + * @param path the temp path for writing files + */ + public static native void injectWriteFilesTempPath( + long allocatorId, byte[] path, byte[] filename); } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 320483beb3b6..06fe8c34ca4a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.{CH_BRANCH, CH_COMMIT, GlutenConfig} import org.apache.gluten.backendsapi._ +import org.apache.gluten.execution.WriteFilesExecTransformer import org.apache.gluten.expression.WindowFunctionsBuilder import org.apache.gluten.extension.ValidationResult import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat @@ -25,13 +26,17 @@ import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._ import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.aggregate.HashAggregateExec +import org.apache.spark.sql.execution.datasources.FileFormat +import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} +import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, StructField, StructType} import java.util.Locale @@ -187,6 +192,73 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } + override def supportWriteFilesExec( + format: FileFormat, + fields: Array[StructField], + bucketSpec: Option[BucketSpec], + options: Map[String, String]): ValidationResult = { + + def validateCompressionCodec(): Option[String] = { + // FIXME: verify Support compression codec + val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) + None + } + + def validateFileFormat(): Option[String] = { + format match { + case _: ParquetFileFormat => None + case _: OrcFileFormat => None + case f: FileFormat => Some(s"Not support FileFormat: ${f.getClass.getSimpleName}") + } + } + + // Validate if all types are supported. + def validateDateTypes(): Option[String] = { + None + } + + def validateFieldMetadata(): Option[String] = { + // copy CharVarcharUtils.CHAR_VARCHAR_TYPE_STRING_METADATA_KEY + val CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING" + fields + .find(_.metadata != Metadata.empty) + .filterNot(_.metadata.contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) + .map { + filed => + s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}" + } + } + def validateWriteFilesOptions(): Option[String] = { + val maxRecordsPerFile = options + .get("maxRecordsPerFile") + .map(_.toLong) + .getOrElse(SQLConf.get.maxRecordsPerFile) + if (maxRecordsPerFile > 0) { + Some("Unsupported native write: maxRecordsPerFile not supported.") + } else { + None + } + } + + def validateBucketSpec(): Option[String] = { + if (bucketSpec.nonEmpty) { + Some("Unsupported native write: bucket write is not supported.") + } else { + None + } + } + + validateCompressionCodec() + .orElse(validateFileFormat()) + .orElse(validateFieldMetadata()) + .orElse(validateDateTypes()) + .orElse(validateWriteFilesOptions()) + .orElse(validateBucketSpec()) match { + case Some(reason) => ValidationResult.failed(reason) + case _ => ValidationResult.succeeded + } + } + override def supportShuffleWithProject( outputPartitioning: Partitioning, child: SparkPlan): Boolean = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index 5465e9b60b67..85b298fa4835 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -383,13 +383,13 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { s"SampleTransformer metrics update is not supported in CH backend") } - def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = { - throw new UnsupportedOperationException( - s"WriteFilesTransformer metrics update is not supported in CH backend") - } + def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + Map( + "physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"), + "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files") + ) def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = { - throw new UnsupportedOperationException( - s"WriteFilesTransformer metrics update is not supported in CH backend") + new WriteFilesMetricsUpdater(metrics) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 32f372956f0a..b8a76b4210c3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -674,8 +674,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHRegExpReplaceTransformer(substraitExprName, children, expr) } - def createBackendWrite(description: WriteJobDescription): BackendWrite = - throw new UnsupportedOperationException("createBackendWrite is not supported in ch backend.") + def createBackendWrite(description: WriteJobDescription): BackendWrite = ClickhouseBackendWrite( + description) override def createColumnarArrowEvalPythonExec( udfs: Seq[PythonUDF], diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala new file mode 100644 index 000000000000..5a04b404334f --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.metrics + +import org.apache.spark.sql.execution.metric.SQLMetric + +class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { + + override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { + if (opMetrics != null) { + val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] + metrics("physicalWrittenBytes") += operatorMetrics.physicalWrittenBytes + metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles + } + } +} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala new file mode 100644 index 000000000000..225d9688c7df --- /dev/null +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/ClickhouseBackendWrite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.internal.Logging +import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.mutable + +case class ClickhouseBackendWrite(description: WriteJobDescription) + extends BackendWrite + with Logging { + + override def collectNativeWriteFilesMetrics(cb: ColumnarBatch): Option[WriteTaskResult] = { + val numFiles = cb.numRows() + // Write an empty iterator + if (numFiles == 0) { + None + } else { + val file_col = cb.column(0) + val partition_col = cb.column(1) + val count_col = cb.column(2) + + val outputPath = description.path + var updatedPartitions = Set.empty[String] + val addedAbsPathFiles: mutable.Map[String, String] = mutable.Map[String, String]() + + val write_stats = Range(0, cb.numRows()).map { + i => + val targetFileName = file_col.getUTF8String(i).toString + val partition = partition_col.getUTF8String(i).toString + if (partition != "__NO_PARTITION_ID__") { + updatedPartitions += partition + val tmpOutputPath = outputPath + "/" + partition + "/" + targetFileName + val customOutputPath = + description.customPartitionLocations.get( + PartitioningUtils.parsePathFragment(partition)) + if (customOutputPath.isDefined) { + addedAbsPathFiles(tmpOutputPath) = customOutputPath.get + "/" + targetFileName + } + } + count_col.getLong(i) + } + + val partitionsInternalRows = updatedPartitions.map { + part => + val parts = new Array[Any](1) + parts(0) = part + new GenericInternalRow(parts) + }.toSeq + + val numWrittenRows = write_stats.sum + val stats = BasicWriteTaskStats( + partitions = partitionsInternalRows, + numFiles = numFiles, + numBytes = 101, + numRows = numWrittenRows) + val summary = + ExecutedWriteSummary(updatedPartitions = updatedPartitions, stats = Seq(stats)) + + Some( + WriteTaskResult( + new TaskCommitMessage(addedAbsPathFiles.toMap -> updatedPartitions), + summary)) + } + } +} diff --git a/cpp-ch/local-engine/Common/CHUtil.h b/cpp-ch/local-engine/Common/CHUtil.h index 8b5de0a86700..98139fb49a5b 100644 --- a/cpp-ch/local-engine/Common/CHUtil.h +++ b/cpp-ch/local-engine/Common/CHUtil.h @@ -42,6 +42,8 @@ namespace local_engine static const String MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE = "mergetree.insert_without_local_storage"; static const String MERGETREE_MERGE_AFTER_INSERT = "mergetree.merge_after_insert"; static const std::string DECIMAL_OPERATIONS_ALLOW_PREC_LOSS = "spark.sql.decimalOperations.allowPrecisionLoss"; +static const std::string SPARK_TASK_WRITE_TMEP_DIR = "gluten.write.temp.dir"; +static const std::string SPARK_TASK_WRITE_FILENAME = "gluten.write.file.name"; static const std::unordered_set BOOL_VALUE_SETTINGS{ MERGETREE_MERGE_AFTER_INSERT, MERGETREE_INSERT_WITHOUT_LOCAL_STORAGE, DECIMAL_OPERATIONS_ALLOW_PREC_LOSS}; diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 17612b9f9d10..151e24c6da45 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -57,6 +57,7 @@ #include #include #include +#include #include #include #include @@ -423,12 +424,13 @@ QueryPlanPtr SerializedPlanParser::parse(const substrait::Plan & plan) if (!root_rel.has_root()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "must have root rel!"); - if (root_rel.root().input().has_write()) - throw Exception(ErrorCodes::BAD_ARGUMENTS, "write pipeline is not supported yet!"); + const bool writePipeline = root_rel.root().input().has_write(); + const substrait::Rel & first_read_rel = writePipeline ? root_rel.root().input().write().input() : root_rel.root().input(); std::list rel_stack; - auto query_plan = parseOp(root_rel.root().input(), rel_stack); - adjustOutput(query_plan, root_rel); + auto query_plan = parseOp(first_read_rel, rel_stack); + if (!writePipeline) + adjustOutput(query_plan, root_rel); #ifndef NDEBUG PlanUtil::checkOuputType(*query_plan); @@ -1339,9 +1341,16 @@ std::unique_ptr SerializedPlanParser::createExecutor(DB::QueryPla Stopwatch stopwatch; const Settings & settings = context->getSettingsRef(); - auto pipeline_builder = buildQueryPipeline(*query_plan); + auto builder = buildQueryPipeline(*query_plan); - QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder)); + /// + assert(s_plan.relations_size() == 1); + const substrait::PlanRel & root_rel = s_plan.relations().at(0); + assert(root_rel.has_root()); + if (root_rel.root().input().has_write()) + addSinkTransfrom(context, root_rel.root().input().write(), builder); + /// + QueryPipeline pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder)); auto * logger = &Poco::Logger::get("SerializedPlanParser"); LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0); diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/WriteRelParser.cpp new file mode 100644 index 000000000000..b32b7bc6337b --- /dev/null +++ b/cpp-ch/local-engine/Parser/WriteRelParser.cpp @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "WriteRelParser.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace local_engine; + +DB::ProcessorPtr makeSink( + const DB::ContextPtr & context, + const DB::Names & partition_by, + const DB::Block & input_header, + const DB::Block & output_header, + const std::string & base_path, + const std::string & filename, + const std::string & format_hint, + const std::shared_ptr & stats) +{ + if (partition_by.empty()) + { + auto file_sink = std::make_shared(context, base_path, "", filename, format_hint, input_header); + file_sink->setStats(stats); + return file_sink; + } + + auto file_sink = std::make_shared( + context, partition_by, input_header, output_header, base_path, filename, format_hint); + file_sink->setStats(stats); + return file_sink; +} + +bool need_fix_tuple(const DB::DataTypePtr& input, const DB::DataTypePtr& output) +{ + const auto orgial = typeid_cast(input.get()); + const auto output_type = typeid_cast(output.get()); + return orgial != nullptr && output_type != nullptr && !orgial->equals(*output_type); +} + +DB::ExpressionActionsPtr create_rename_action(const DB::Block & input, const DB::Block & output) +{ + DB::NamesWithAliases aliases; + for (auto ouput_name = output.begin(), input_iter = input.begin(); ouput_name != output.end(); ++ouput_name, ++input_iter) + aliases.emplace_back(DB::NameWithAlias(input_iter->name, ouput_name->name)); + + const auto actions_dag = std::make_shared(blockToNameAndTypeList(input)); + actions_dag->project(aliases); + return std::make_shared(actions_dag); +} + +DB::ExpressionActionsPtr create_project_action(const DB::Block & input, const DB::Block & output) +{ + DB::ColumnsWithTypeAndName final_cols; + std::ranges::transform( + output, std::back_inserter(final_cols), [](const DB::ColumnWithTypeAndName& out_ocl) { + const auto out_type = out_ocl.type; + return DB::ColumnWithTypeAndName(out_type->createColumn(), out_type, out_ocl.name); + }); + assert(final_cols.size() == output.columns()); + + const auto & original_cols = input.getColumnsWithTypeAndName(); + ActionsDAGPtr final_project = ActionsDAG::makeConvertingActions(original_cols, final_cols, ActionsDAG::MatchColumnsMode::Position); + return std::make_shared(final_project); +} + +void adjust_output(const DB::QueryPipelineBuilderPtr & builder, const DB::Block& output) +{ + const auto input = builder->getHeader(); + if (input.columns() != output.columns()) + { + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, + "Missmatch result columns size, input size is {}, but output size is {}", input.columns(), output.columns()); + } + + auto mismatch_pair = + std::mismatch(input.begin(), input.end(), output.begin(), + [](const DB::ColumnWithTypeAndName& lhs, const DB::ColumnWithTypeAndName& rhs) {return lhs.name == rhs.name;}); + bool name_is_diffient = mismatch_pair.first != input.end(); + + mismatch_pair = std::mismatch(input.begin(), input.end(), output.begin(), + [](const DB::ColumnWithTypeAndName& lhs, const DB::ColumnWithTypeAndName& rhs) + { + return lhs.type->equals(*rhs.type); + }); + bool type_is_diffient = mismatch_pair.first != input.end(); + + DB::ExpressionActionsPtr convert_action; + + if (type_is_diffient) + convert_action = create_project_action(input, output); + + if(name_is_diffient && !convert_action) + convert_action = create_rename_action(input, output); + + if (!convert_action) + return; + + builder->addSimpleTransform( + [&](const DB::Block & cur_header, const DB::QueryPipelineBuilder::StreamType stream_type) -> DB::ProcessorPtr + { + if (stream_type != DB::QueryPipelineBuilder::StreamType::Main) + return nullptr; + return std::make_shared(cur_header, convert_action); + }); +} + +namespace local_engine +{ + +void addSinkTransfrom(const DB::ContextPtr & context, const substrait::WriteRel & write_rel, const DB::QueryPipelineBuilderPtr & builder) +{ + const DB::Settings & settings = context->getSettingsRef(); + + DB::Field field_tmp_dir; + if (!settings.tryGet(SPARK_TASK_WRITE_TMEP_DIR, field_tmp_dir)) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Write Pipeline need inject temp directory."); + const auto & tmp_dir = field_tmp_dir.get(); + + DB::Field field_filename; + if (!settings.tryGet(SPARK_TASK_WRITE_FILENAME, field_filename)) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Write Pipeline need inject file name."); + const auto & filename = field_filename.get(); + + assert(write_rel.has_named_table()); + const substrait::NamedObjectWrite & named_table = write_rel.named_table(); + google::protobuf::StringValue optimization; + named_table.advanced_extension().optimization().UnpackTo(&optimization); + auto config = local_engine::parse_write_parameter(optimization.value()); + + //TODO : set compression codec according to format + assert(config["isSnappy"] == "1"); + assert(config.contains("format")); + + assert(write_rel.has_table_schema()); + const substrait::NamedStruct & table_schema = write_rel.table_schema(); + auto blockHeader = TypeParser::buildBlockFromNamedStruct(table_schema); + const auto partitionCols = collect_partition_cols(blockHeader, table_schema); + + auto stats = std::make_shared(blockHeader); + + adjust_output(builder, blockHeader); + + builder->addSimpleTransform( + [&](const Block & cur_header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr + { + if (stream_type != QueryPipelineBuilder::StreamType::Main) + return nullptr; + return makeSink(context, partitionCols, cur_header, blockHeader, tmp_dir, filename, config["format"], stats); + }); + builder->addSimpleTransform( + [&](const Block &, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr + { + if (stream_type != QueryPipelineBuilder::StreamType::Main) + return nullptr; + return stats; + }); +} + +std::map parse_write_parameter(const std::string & input) +{ + std::map reuslt; + const std::string prefix = "WriteParameters:"; + const size_t prefix_pos = input.find(prefix); + if (prefix_pos == std::string::npos) + return reuslt; + + const size_t start_pos = prefix_pos + prefix.length(); + const size_t end_pos = input.find('\n', start_pos); + + if (end_pos == std::string::npos) + return reuslt; + + for (const Poco::StringTokenizer tok(input.substr(start_pos, end_pos - start_pos), ";", Poco::StringTokenizer::TOK_TRIM); + const auto & parameter : tok) + { + const size_t pos = parameter.find('='); + if (pos == std::string::npos) + continue; + reuslt[parameter.substr(0, pos)] = parameter.substr(pos + 1); + } + return reuslt; +} + +DB::Names collect_partition_cols(const DB::Block & header, const substrait::NamedStruct & struct_) +{ + DB::Names result; + assert(struct_.column_types_size() == header.columns()); + assert(struct_.column_types_size() == struct_.struct_().types_size()); + + auto name_iter = header.begin(); + auto type_iter = struct_.column_types().begin(); + for (; name_iter != header.end(); ++name_iter, ++type_iter) + if (*type_iter == ::substrait::NamedStruct::PARTITION_COL) + result.push_back(name_iter->name); + return result; +} + +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Parser/WriteRelParser.h b/cpp-ch/local-engine/Parser/WriteRelParser.h new file mode 100644 index 000000000000..9d896e7ca53a --- /dev/null +++ b/cpp-ch/local-engine/Parser/WriteRelParser.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace substrait +{ +class WriteRel; +class NamedStruct; +} + +namespace DB +{ +class QueryPipelineBuilder; +using QueryPipelineBuilderPtr = std::unique_ptr; +} + +namespace local_engine +{ + +void addSinkTransfrom(const DB::ContextPtr & context, const substrait::WriteRel & write_rel, const DB::QueryPipelineBuilderPtr & builder); + +/// Visible for UTs +std::map parse_write_parameter(const std::string & input); +DB::Names collect_partition_cols(const DB::Block & header, const substrait::NamedStruct & struct_); + +} diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp index 619207bb6269..fc4b3a72f75b 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp +++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp @@ -20,6 +20,9 @@ namespace local_engine { +const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; +const std::string SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"}; + NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : FileWriterWrapper(file_), context(context_) { diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h index 727f70e863d9..57cb47e41a55 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h +++ b/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h @@ -20,8 +20,17 @@ #include #include #include +#include +#include +#include #include +#include +#include #include +#include +#include +#include +#include #include namespace local_engine @@ -70,4 +79,213 @@ OutputFormatFilePtr create_output_format_file( const std::string & file_uri, const DB::Names & preferred_column_names, const std::string & format_hint); + +class WriteStats : public DB::ISimpleTransform +{ + bool all_chunks_processed_ = false; /// flag to determine if we have already processed all chunks + Arena partition_keys_arena_; + std::string filename_; + + absl::flat_hash_map fiel_to_count_; + + static Block statsHeader() + { + return makeBlockHeader({{STRING(), "filename"}, {STRING(), "partition_id"}, {BIGINT(), "record_count"}}); + } + + Chunk final_result() const + { + ///TODO: improve performance + auto file_col = STRING()->createColumn(); + auto partition_col = STRING()->createColumn(); + auto countCol = BIGINT()->createColumn(); + UInt64 num_rows = 0; + for (const auto & [relative_path, rows] : fiel_to_count_) + { + if (rows == 0) + continue; + file_col->insertData(filename_.c_str(), filename_.size()); + partition_col->insertData(relative_path.data, relative_path.size); + countCol->insert(rows); + num_rows++; + } + + const DB::Columns res_columns{std::move(file_col), std::move(partition_col), std::move(countCol)}; + return DB::Chunk(res_columns, num_rows); + } + +public: + explicit WriteStats(const Block & input_header_) : ISimpleTransform(input_header_, statsHeader(), true) { } + + Status prepare() override + { + if (input.isFinished() && !output.isFinished() && !has_input && !all_chunks_processed_) + { + all_chunks_processed_ = true; + /// return Ready to call transform() for generating filling rows after latest chunk was processed + return Status::Ready; + } + + return ISimpleTransform::prepare(); + } + + String getName() const override { return "WriteStats"; } + void transform(Chunk & chunk) override + { + if (all_chunks_processed_) + chunk = final_result(); + else + chunk = {}; + } + + void addFilePath(const String & patition_id, const String & filename) + { + assert(!filename.empty()); + + if (filename_.empty()) + filename_ = filename; + + assert(filename_ == filename); + + if (patition_id.empty()) + return; + fiel_to_count_.emplace(copyStringInArena(partition_keys_arena_, patition_id), 0); + } + + void collectStats(const String & file_path, size_t rows) + { + if (const auto it = fiel_to_count_.find(file_path); it != fiel_to_count_.end()) + { + it->second += rows; + return; + } + throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "File path {} not found in the stats map", file_path); + } +}; + +class SubstraitFileSink final : public SinkToStorage +{ + const std::string partition_id_; + const std::string relative_path_; + OutputFormatFile::OutputFormatPtr output_format_; + std::shared_ptr stats_{nullptr}; + + static std::string makeFilename(const std::string & base_path, const std::string & partition_id, const std::string & relative) + { + if (partition_id.empty()) + return fmt::format("{}/{}", base_path, relative); + return fmt::format("{}/{}/{}", base_path, partition_id, relative); + } + +public: + /// visible for UTs + static const std::string NO_PARTITION_ID; + + explicit SubstraitFileSink( + const DB::ContextPtr & context, + const std::string & base_path, + const std::string & partition_id, + const std::string & relative, + const std::string & format_hint, + const Block & header) + : SinkToStorage(header) + , partition_id_(partition_id.empty() ? NO_PARTITION_ID : partition_id) + , relative_path_(relative) + , output_format_(create_output_format_file(context, makeFilename(base_path, partition_id, relative), header.getNames(), format_hint) + ->createOutputFormat(header)) + { + } + String getName() const override { return "SubstraitFileSink"; } + + ///TODO: remove this function + void setStats(const std::shared_ptr & stats) + { + stats_ = stats; + stats_->addFilePath(partition_id_, relative_path_); + } + +protected: + void consume(Chunk & chunk) override + { + const size_t row_count = chunk.getNumRows(); + output_format_->output->write(materializeBlock(getHeader().cloneWithColumns(chunk.detachColumns()))); + + if (stats_) + stats_->collectStats(partition_id_, row_count); + } + void onFinish() override + { + output_format_->output->finalize(); + output_format_->output->flush(); + output_format_->write_buffer->finalize(); + } +}; + +class SubstraitPartitionedFileSink final : public DB::PartitionedSink +{ + static const std::string DEFAULT_PARTITION_NAME; + +public: + /// visible for UTs + static ASTPtr make_partition_expression(const DB::Names & partition_columns) + { + /// Parse the following expression into ASTs + /// cancat('/col_name=', 'toString(col_name)') + bool add_slash = false; + ASTs arguments; + for (const auto & column : partition_columns) + { + // partition_column= + std::string key = add_slash ? fmt::format("/{}=", column) : fmt::format("{}=", column); + add_slash = true; + arguments.emplace_back(std::make_shared(key)); + + // ifNull(toString(partition_column), DEFAULT_PARTITION_NAME) + // FIXME if toString(partition_column) is empty + auto column_ast = std::make_shared(column); + ASTs if_null_args{makeASTFunction("toString", ASTs{column_ast}), std::make_shared(DEFAULT_PARTITION_NAME)}; + arguments.emplace_back(makeASTFunction("ifNull", std::move(if_null_args))); + } + return DB::makeASTFunction("concat", std::move(arguments)); + } + +private: + const std::string base_path_; + const std::string filenmame_; + ContextPtr context_; + const Block sample_block_; + const std::string format_hint_; + std::shared_ptr stats_{nullptr}; + +public: + SubstraitPartitionedFileSink( + const ContextPtr & context, + const Names & partition_by, + const Block & input_header, + const Block & sample_block, + const std::string & base_path, + const std::string & filename, + const std::string & format_hint) + : PartitionedSink(make_partition_expression(partition_by), context, input_header) + , base_path_(base_path) + , filenmame_(filename) + , context_(context) + , sample_block_(sample_block) + , format_hint_(format_hint) + { + } + SinkPtr createSinkForPartition(const String & partition_id) override + { + assert(stats_); + const auto partition_path = fmt::format("{}/{}", partition_id, filenmame_); + PartitionedSink::validatePartitionKey(partition_path, true); + auto file_sink = std::make_shared(context_, base_path_, partition_id, filenmame_, format_hint_, sample_block_); + file_sink->setStats(stats_); + return file_sink; + } + String getName() const override { return "SubstraitPartitionedFileSink"; } + + ///TODO: remove this function + void setStats(const std::shared_ptr & stats) { stats_ = stats; } +}; } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 3c3e839a3b29..17d087bb82ff 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -217,6 +217,23 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_n LOCAL_ENGINE_JNI_METHOD_END(env, ) } +JNIEXPORT void Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_injectWriteFilesTempPath( + JNIEnv * env, jclass, jlong allocator_id, jbyteArray temp_path, jbyteArray filename) +{ + LOCAL_ENGINE_JNI_METHOD_START + const auto query_context = local_engine::getAllocator(allocator_id)->query_context; + + const auto path_array = local_engine::getByteArrayElementsSafe(env, temp_path); + const std::string c_path{reinterpret_cast(path_array.elems()), static_cast(path_array.length())}; + query_context->setSetting(local_engine::SPARK_TASK_WRITE_TMEP_DIR, c_path); + + const auto filename_array = local_engine::getByteArrayElementsSafe(env, filename); + const std::string c_filename{reinterpret_cast(filename_array.elems()), static_cast(filename_array.length())}; + query_context->setSetting(local_engine::SPARK_TASK_WRITE_FILENAME, c_filename); + + LOCAL_ENGINE_JNI_METHOD_END(env, ) +} + JNIEXPORT jlong Java_org_apache_gluten_vectorized_ExpressionEvaluatorJniWrapper_nativeCreateKernelWithIterator( JNIEnv * env, jclass , diff --git a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp index 43cdab8a41fa..cf9ecf37dd30 100644 --- a/cpp-ch/local-engine/tests/benchmark_local_engine.cpp +++ b/cpp-ch/local-engine/tests/benchmark_local_engine.cpp @@ -234,7 +234,7 @@ DB::ContextMutablePtr global_context; std::ifstream t(path); std::string str((std::istreambuf_iterator(t)), std::istreambuf_iterator()); std::cout << "the plan from: " << path << std::endl; - auto local_executor = parser.createExecutor(str); + auto local_executor = parser.createExecutor(str); state.ResumeTiming(); while (local_executor->hasNext()) [[maybe_unused]] auto * x = local_executor->nextColumnar(); @@ -585,8 +585,7 @@ DB::ContextMutablePtr global_context; readIntBinary(x, buf); readIntBinary(y, buf); readIntBinary(z, buf); - std::cout << std::to_string(x) + " " << std::to_string(y) + " " << std::to_string(z) + " " - << "\n"; + std::cout << std::to_string(x) + " " << std::to_string(y) + " " << std::to_string(z) + " " << "\n"; data_buf.seek(x, SEEK_SET); assert(!data_buf.eof()); std::string data; diff --git a/cpp-ch/local-engine/tests/gluten_test_util.cpp b/cpp-ch/local-engine/tests/gluten_test_util.cpp index 0448092b960d..1f1bd9983696 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.cpp +++ b/cpp-ch/local-engine/tests/gluten_test_util.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include namespace fs = std::filesystem; diff --git a/cpp-ch/local-engine/tests/gluten_test_util.h b/cpp-ch/local-engine/tests/gluten_test_util.h index 338d53be788a..34e05b8b188b 100644 --- a/cpp-ch/local-engine/tests/gluten_test_util.h +++ b/cpp-ch/local-engine/tests/gluten_test_util.h @@ -18,6 +18,7 @@ #pragma once #include +#include #include #include #include @@ -25,7 +26,6 @@ #include #include #include -#include #include using BlockRowType = DB::ColumnsWithTypeAndName; @@ -65,23 +65,6 @@ AnotherRowType readParquetSchema(const std::string & file); DB::ActionsDAGPtr parseFilter(const std::string & filter, const AnotherRowType & name_and_types); -namespace pb_util -{ -template -std::string JsonStringToBinary(const std::string_view & json) -{ - Message message; - std::string binary; - auto s = google::protobuf::util::JsonStringToMessage(json, &message); - if (!s.ok()) - { - const std::string err_msg{s.message()}; - throw std::runtime_error(err_msg); - } - message.SerializeToString(&binary); - return binary; -} -} } inline std::string replaceLocalFilesWildcards(const String & haystack, const String & replaced) @@ -90,54 +73,6 @@ inline std::string replaceLocalFilesWildcards(const String & haystack, const Str return boost::replace_all_copy(haystack, _WILDCARD_, replaced); } -inline DB::DataTypePtr BIGINT() -{ - return std::make_shared(); -} -inline DB::DataTypePtr INT() -{ - return std::make_shared(); -} -inline DB::DataTypePtr INT16() -{ - return std::make_shared(); -} -inline DB::DataTypePtr INT8() -{ - return std::make_shared(); -} -inline DB::DataTypePtr UBIGINT() -{ - return std::make_shared(); -} -inline DB::DataTypePtr UINT() -{ - return std::make_shared(); -} -inline DB::DataTypePtr UINT16() -{ - return std::make_shared(); -} -inline DB::DataTypePtr UINT8() -{ - return std::make_shared(); -} - -inline DB::DataTypePtr DOUBLE() -{ - return std::make_shared(); -} - -inline DB::DataTypePtr STRING() -{ - return std::make_shared(); -} - -inline DB::DataTypePtr DATE() -{ - return std::make_shared(); -} - inline BlockFieldType toBlockFieldType(const AnotherFieldType & type) { return BlockFieldType(type.type, type.name); @@ -148,6 +83,17 @@ inline AnotherFieldType toAnotherFieldType(const parquet::ColumnDescriptor & typ return {type.name(), local_engine::test::toDataType(type)}; } +inline AnotherRowType toAnotherRowType(const DB::Block & header) +{ + AnotherRowType types; + for (const auto & name : header.getNames()) + { + const auto * column = header.findByName(name); + types.push_back(DB::NameAndTypePair(column->name, column->type)); + } + return types; +} + inline BlockRowType toBlockRowType(const AnotherRowType & type, const bool reverse = false) { BlockRowType result; diff --git a/cpp-ch/local-engine/tests/gtest_clickhouse_54881.cpp b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp similarity index 74% rename from cpp-ch/local-engine/tests/gtest_clickhouse_54881.cpp rename to cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp index 69966ef93c0b..6352a819927c 100644 --- a/cpp-ch/local-engine/tests/gtest_clickhouse_54881.cpp +++ b/cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -42,11 +43,13 @@ TEST(Clickhouse, PR54881) = replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/54881.snappy.parquet")); SerializedPlanParser parser(context1); - parser.addSplitInfo(test::pb_util::JsonStringToBinary(split)); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); - const auto local_executor = parser.createExecutor( + const auto plan = local_engine::JsonStringToMessage( {reinterpret_cast(gresource_embedded_pr_54881_jsonData), gresource_embedded_pr_54881_jsonSize}); + auto local_executor = parser.createExecutor(plan); + EXPECT_TRUE(local_executor->hasNext()); const Block & block = *local_executor->nextColumnar(); @@ -81,3 +84,16 @@ TEST(Clickhouse, PR54881) EXPECT_FALSE(local_executor->hasNext()); } + +// Plan for https://github.com/ClickHouse/ClickHouse/pull/65234 +INCBIN(resource_embedded_pr_65234_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json"); + +TEST(Clickhouse, PR65234) +{ + const std::string split = R"({"items":[{"uriFile":"file:///foo","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; + SerializedPlanParser parser(SerializedPlanParser::global_context); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + const auto plan = local_engine::JsonStringToMessage( + {reinterpret_cast(gresource_embedded_pr_65234_jsonData), gresource_embedded_pr_65234_jsonSize}); + auto query_plan = parser.parse(plan); +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_local_engine.cpp b/cpp-ch/local-engine/tests/gtest_local_engine.cpp index 962bf9def52e..50e527afc8d5 100644 --- a/cpp-ch/local-engine/tests/gtest_local_engine.cpp +++ b/cpp-ch/local-engine/tests/gtest_local_engine.cpp @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -95,7 +96,7 @@ void registerOutputFormatParquet(DB::FormatFactory & factory); int main(int argc, char ** argv) { - BackendInitializerUtil::init(test::pb_util::JsonStringToBinary( + BackendInitializerUtil::init(local_engine::JsonStringToBinary( {reinterpret_cast(gresource_embedded_config_jsonData), gresource_embedded_config_jsonSize})); auto & factory = FormatFactory::instance(); diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp index 76c0d028a58e..fbd7fbc63c27 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp @@ -15,13 +15,12 @@ * limitations under the License. */ -#include - #include "config.h" #if USE_PARQUET #include #include +#include #include #include #include @@ -29,6 +28,7 @@ #include #include +#include #include #include #include @@ -470,7 +470,9 @@ TEST(ColumnIndex, FilteringWithAllNullPages) } TEST(ColumnIndex, FilteringWithNotFoundColumnName) { + using namespace test_utils; + using namespace local_engine; const local_engine::ColumnIndexStore column_index_store = buildTestColumnIndexStore(); { @@ -1040,6 +1042,7 @@ TEST_P(TestBuildPageReadStates, BuildPageReadStates) TEST(ColumnIndex, VectorizedParquetRecordReader) { + using namespace local_engine; //TODO: move test parquet to s3 and download to CI machine. const std::string filename = "/home/chang/test/tpch/parquet/Index/60001/part-00000-76ef9b89-f292-495f-9d0d-98325f3d8956-c000.snappy.parquet"; diff --git a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp index 9623ffa98d28..e11343834459 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_read.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_read.cpp @@ -15,15 +15,13 @@ * limitations under the License. */ -#include - - #include "config.h" #if USE_PARQUET #include #include +#include #include #include #include @@ -41,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -426,7 +425,7 @@ TEST(ParquetRead, LowLevelRead) TEST(ParquetRead, VectorizedColumnReader) { const std::string sample(local_engine::test::data_file("sample.parquet")); - Block blockHeader({{DOUBLE(), "b"}, {BIGINT(), "a"}}); + Block blockHeader({{local_engine::DOUBLE(), "b"}, {local_engine::BIGINT(), "a"}}); ReadBufferFromFile in(sample); const FormatSettings format_settings{}; auto arrow_file = local_engine::test::asArrowFileForParquet(in, format_settings); diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp index aaaa3679ab94..135f81a9149e 100644 --- a/cpp-ch/local-engine/tests/gtest_parser.cpp +++ b/cpp-ch/local-engine/tests/gtest_parser.cpp @@ -16,141 +16,86 @@ */ #include #include +#include #include #include +#include +#include +#include +#include #include +#include +#include using namespace local_engine; using namespace DB; -// Plan for https://github.com/ClickHouse/ClickHouse/pull/65234 -INCBIN(resource_embedded_pr_65234_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_65234.json"); -TEST(SerializedPlanParser, PR65234) +INCBIN(resource_embedded_readcsv_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json"); +TEST(LocalExecutor, ReadCSV) { - const std::string split - = R"({"items":[{"uriFile":"file:///home/chang/SourceCode/rebase_gluten/backends-clickhouse/target/scala-2.12/test-classes/tests-working-home/tpch-data/supplier/part-00000-16caa751-9774-470c-bd37-5c84c53373c8-c000.snappy.parquet","length":"84633","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; + const std::string split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; + const std::string split = replaceLocalFilesWildcards( + split_template, GLUTEN_SOURCE_DIR("/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv")); SerializedPlanParser parser(SerializedPlanParser::global_context); - parser.addSplitInfo(test::pb_util::JsonStringToBinary(split)); - auto query_plan - = parser.parseJson({reinterpret_cast(gresource_embedded_pr_65234_jsonData), gresource_embedded_pr_65234_jsonSize}); -} - -#include -#include -#include -#include -#include - -Chunk testChunk() -{ - auto nameCol = STRING()->createColumn(); - nameCol->insert("one"); - nameCol->insert("two"); - nameCol->insert("three"); - - auto valueCol = UINT()->createColumn(); - valueCol->insert(1); - valueCol->insert(2); - valueCol->insert(3); - MutableColumns x; - x.push_back(std::move(nameCol)); - x.push_back(std::move(valueCol)); - return {std::move(x), 3}; -} - -TEST(LocalExecutor, StorageObjectStorageSink) -{ - /// 0. Create ObjectStorage for HDFS - auto settings = SerializedPlanParser::global_context->getSettingsRef(); - const std::string query - = R"(CREATE TABLE hdfs_engine_xxxx (name String, value UInt32) ENGINE=HDFS('hdfs://localhost:8020/clickhouse/test2', 'Parquet'))"; - DB::ParserCreateQuery parser; - std::string error_message; - const char * pos = query.data(); - auto ast = DB::tryParseQuery( - parser, - pos, - pos + query.size(), - error_message, - /* hilite = */ false, - "QUERY TEST", - /* allow_multi_statements = */ false, - 0, - settings.max_parser_depth, - settings.max_parser_backtracks, - true); - auto & create = ast->as(); - auto arg = create.storage->children[0]; - const auto * func = arg->as(); - EXPECT_TRUE(func && func->name == "HDFS"); - - DB::StorageHDFSConfiguration config; - StorageObjectStorage::Configuration::initialize(config, arg->children[0]->children, SerializedPlanParser::global_context, false); - - const std::shared_ptr object_storage - = std::dynamic_pointer_cast(config.createObjectStorage(SerializedPlanParser::global_context, false)); - EXPECT_TRUE(object_storage != nullptr); - - RelativePathsWithMetadata files_with_metadata; - object_storage->listObjects("/clickhouse", files_with_metadata, 0); - - /// 1. Create ObjectStorageSink - DB::StorageObjectStorageSink sink{ - object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(), "value"}}, SerializedPlanParser::global_context, ""}; - - /// 2. Create Chunk - /// 3. comsume - sink.consume(testChunk()); - sink.onFinish(); -} + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + auto plan = local_engine::JsonStringToMessage( + {reinterpret_cast(gresource_embedded_readcsv_jsonData), gresource_embedded_readcsv_jsonSize}); -namespace DB -{ -SinkToStoragePtr createFilelinkSink( - const StorageMetadataPtr & metadata_snapshot, - const String & table_name_for_log, - const String & path, - CompressionMethod compression_method, - const std::optional & format_settings, - const String & format_name, - const ContextPtr & context, - int flags); + auto query_plan = parser.parse(plan); + const auto pipeline = parser.buildQueryPipeline(*query_plan); + LocalExecutor local_executor{std::move(query_plan), QueryPipelineBuilder::getPipeline(std::move(*pipeline))}; + EXPECT_TRUE(local_executor.hasNext()); + const Block & x = *local_executor.nextColumnar(); + EXPECT_EQ(4, x.rows()); } -INCBIN(resource_embedded_readcsv_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/read_student_option_schema.csv.json"); -TEST(LocalExecutor, StorageFileSink) +size_t count(const substrait::Type_Struct & type) { - const std::string split - = R"({"items":[{"uriFile":"file:///home/chang/SourceCode/rebase_gluten/backends-velox/src/test/resources/datasource/csv/student_option_schema.csv","length":"56","text":{"fieldDelimiter":",","maxBlockSize":"8192","header":"1"},"schema":{"names":["id","name","language"],"struct":{"types":[{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; - SerializedPlanParser parser(SerializedPlanParser::global_context); - parser.addSplitInfo(test::pb_util::JsonStringToBinary(split)); - auto local_executor = parser.createExecutor( - {reinterpret_cast(gresource_embedded_readcsv_jsonData), gresource_embedded_readcsv_jsonSize}); - - while (local_executor->hasNext()) + size_t ret = 0; + for (const auto & t : type.types()) { - const Block & x = *local_executor->nextColumnar(); - EXPECT_EQ(4, x.rows()); + if (t.has_struct_()) + ret += 1 + count(t.struct_()); + else + ret++; } + return ret; +} - StorageInMemoryMetadata metadata; - metadata.setColumns(ColumnsDescription::fromNamesAndTypes({{"name", STRING()}, {"value", UINT()}})); - StorageMetadataPtr metadata_ptr = std::make_shared(metadata); - - /* - auto sink = createFilelinkSink( - metadata_ptr, - "test_table", - "/tmp/test_table.parquet", - CompressionMethod::None, - {}, - "Parquet", - SerializedPlanParser::global_context, - 0); - - sink->consume(testChunk()); - sink->onFinish(); - */ +TEST(TypeParser, SchemaTest) +{ + const std::string scheam_str = R"({ + "names": [ + "count#16#Partial#count", + "anonymousfield0" + ], + "struct": { + "types": [ + { + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_REQUIRED" + } + } + ], + "nullability": "NULLABILITY_REQUIRED", + "names": [ + "anonymousField0" + ] + } + } + ] + } +})"; + + const auto schema = local_engine::JsonStringToMessage(scheam_str); + EXPECT_EQ(schema.names_size(), count(schema.struct_())); + const auto block = TypeParser::buildBlockFromNamedStruct(schema); + EXPECT_EQ(1, block.columns()); + debug::headBlock(block); } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp new file mode 100644 index 000000000000..e56184a1417c --- /dev/null +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace local_engine; +using namespace DB; + +Chunk testChunk() +{ + auto nameCol = STRING()->createColumn(); + nameCol->insert("one"); + nameCol->insert("two"); + nameCol->insert("three"); + + auto valueCol = UINT()->createColumn(); + valueCol->insert(1); + valueCol->insert(2); + valueCol->insert(3); + MutableColumns x; + x.push_back(std::move(nameCol)); + x.push_back(std::move(valueCol)); + return {std::move(x), 3}; +} + +TEST(LocalExecutor, StorageObjectStorageSink) +{ + /// 0. Create ObjectStorage for HDFS + auto settings = SerializedPlanParser::global_context->getSettingsRef(); + const std::string query + = R"(CREATE TABLE hdfs_engine_xxxx (name String, value UInt32) ENGINE=HDFS('hdfs://localhost:8020/clickhouse/test2', 'Parquet'))"; + DB::ParserCreateQuery parser; + std::string error_message; + const char * pos = query.data(); + auto ast = DB::tryParseQuery( + parser, + pos, + pos + query.size(), + error_message, + /* hilite = */ false, + "QUERY TEST", + /* allow_multi_statements = */ false, + 0, + settings.max_parser_depth, + settings.max_parser_backtracks, + true); + auto & create = ast->as(); + auto arg = create.storage->children[0]; + const auto * func = arg->as(); + EXPECT_TRUE(func && func->name == "HDFS"); + + DB::StorageHDFSConfiguration config; + StorageObjectStorage::Configuration::initialize(config, arg->children[0]->children, SerializedPlanParser::global_context, false); + + const std::shared_ptr object_storage + = std::dynamic_pointer_cast(config.createObjectStorage(SerializedPlanParser::global_context, false)); + EXPECT_TRUE(object_storage != nullptr); + + RelativePathsWithMetadata files_with_metadata; + object_storage->listObjects("/clickhouse", files_with_metadata, 0); + + /// 1. Create ObjectStorageSink + DB::StorageObjectStorageSink sink{ + object_storage, config.clone(), {}, {{STRING(), "name"}, {UINT(), "value"}}, SerializedPlanParser::global_context, ""}; + + /// 2. Create Chunk + auto chunk = testChunk(); + /// 3. comsume + sink.consume(chunk); + sink.onFinish(); +} + + +INCBIN(resource_embedded_write_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_plan.json"); +TEST(WritePipeline, SubstraitFileSink) +{ + const auto tmpdir = std::string{"file:///tmp/test_table/test"}; + const auto filename = std::string{"data.parquet"}; + const std::string split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; + const std::string split + = replaceLocalFilesWildcards(split_template, GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")); + + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + context->setSetting(local_engine::SPARK_TASK_WRITE_TMEP_DIR, tmpdir); + context->setSetting(local_engine::SPARK_TASK_WRITE_FILENAME, filename); + SerializedPlanParser parser(context); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + + const auto plan = local_engine::JsonStringToMessage( + {reinterpret_cast(gresource_embedded_write_jsonData), gresource_embedded_write_jsonSize}); + + EXPECT_EQ(1, plan.relations_size()); + const substrait::PlanRel & root_rel = plan.relations().at(0); + EXPECT_TRUE(root_rel.has_root()); + EXPECT_TRUE(root_rel.root().input().has_write()); + + const substrait::WriteRel & write_rel = root_rel.root().input().write(); + EXPECT_TRUE(write_rel.has_named_table()); + + const substrait::NamedObjectWrite & named_table = write_rel.named_table(); + + google::protobuf::StringValue optimization; + named_table.advanced_extension().optimization().UnpackTo(&optimization); + auto config = local_engine::parse_write_parameter(optimization.value()); + EXPECT_EQ(2, config.size()); + EXPECT_EQ("parquet", config["format"]); + EXPECT_EQ("1", config["isSnappy"]); + + + EXPECT_TRUE(write_rel.has_table_schema()); + const substrait::NamedStruct & table_schema = write_rel.table_schema(); + auto block = TypeParser::buildBlockFromNamedStruct(table_schema); + auto names = block.getNames(); + DB::Names expected{"s_suppkey", "s_name", "s_address", "s_nationkey", "s_phone", "s_acctbal", "s_comment111"}; + EXPECT_EQ(expected, names); + + auto partitionCols = collect_partition_cols(block, table_schema); + DB::Names expected_partition_cols; + EXPECT_EQ(expected_partition_cols, partitionCols); + + + auto local_executor = parser.createExecutor(plan); + EXPECT_TRUE(local_executor->hasNext()); + const Block & x = *local_executor->nextColumnar(); + debug::headBlock(x); + EXPECT_EQ(1, x.rows()); + const auto & col_a = *(x.getColumns()[0]); + EXPECT_EQ(filename, col_a.getDataAt(0)); + const auto & col_b = *(x.getColumns()[1]); + EXPECT_EQ(SubstraitFileSink::NO_PARTITION_ID, col_b.getDataAt(0)); + const auto & col_c = *(x.getColumns()[2]); + EXPECT_EQ(10000, col_c.getInt(0)); +} + +INCBIN(resource_embedded_write_one_partition_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/native_write_one_partition.json"); + +TEST(WritePipeline, SubstraitPartitionedFileSink) +{ + const std::string split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"1399183","text":{"fieldDelimiter":"|","maxBlockSize":"8192"},"schema":{"names":["s_suppkey","s_name","s_address","s_nationkey","s_phone","s_acctbal","s_comment"],"struct":{"types":[{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"i64":{"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}},{"decimal":{"scale":2,"precision":15,"nullability":"NULLABILITY_NULLABLE"}},{"string":{"nullability":"NULLABILITY_NULLABLE"}}]}},"metadataColumns":[{}]}]})"; + const std::string split + = replaceLocalFilesWildcards(split_template, GLUTEN_SOURCE_DIR("/backends-clickhouse/src/test/resources/csv-data/supplier.csv")); + + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + context->setSetting(local_engine::SPARK_TASK_WRITE_TMEP_DIR, std::string{"file:///tmp/test_table/test_partition"}); + context->setSetting(local_engine::SPARK_TASK_WRITE_FILENAME, std::string{"data.parquet"}); + SerializedPlanParser parser(context); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + + const auto plan = local_engine::JsonStringToMessage( + {reinterpret_cast(gresource_embedded_write_one_partition_jsonData), gresource_embedded_write_one_partition_jsonSize}); + + EXPECT_EQ(1, plan.relations_size()); + const substrait::PlanRel & root_rel = plan.relations().at(0); + EXPECT_TRUE(root_rel.has_root()); + EXPECT_TRUE(root_rel.root().input().has_write()); + + const substrait::WriteRel & write_rel = root_rel.root().input().write(); + EXPECT_TRUE(write_rel.has_named_table()); + + const substrait::NamedObjectWrite & named_table = write_rel.named_table(); + + google::protobuf::StringValue optimization; + named_table.advanced_extension().optimization().UnpackTo(&optimization); + auto config = local_engine::parse_write_parameter(optimization.value()); + EXPECT_EQ(2, config.size()); + EXPECT_EQ("parquet", config["format"]); + EXPECT_EQ("1", config["isSnappy"]); + + + EXPECT_TRUE(write_rel.has_table_schema()); + const substrait::NamedStruct & table_schema = write_rel.table_schema(); + auto block = TypeParser::buildBlockFromNamedStruct(table_schema); + auto names = block.getNames(); + DB::Names expected{"s_suppkey", "s_name", "s_address", "s_phone", "s_acctbal", "s_comment", "s_nationkey"}; + EXPECT_EQ(expected, names); + + auto partitionCols = local_engine::collect_partition_cols(block, table_schema); + DB::Names expected_partition_cols{"s_nationkey"}; + EXPECT_EQ(expected_partition_cols, partitionCols); + + auto local_executor = parser.createExecutor(plan); + EXPECT_TRUE(local_executor->hasNext()); + const Block & x = *local_executor->nextColumnar(); + debug::headBlock(x, 25); + EXPECT_EQ(25, x.rows()); + // const auto & col_b = *(x.getColumns()[1]); + // EXPECT_EQ(16, col_b.getInt(0)); +} + +TEST(WritePipeline, ComputePartitionedExpression) +{ + const auto context = DB::Context::createCopy(SerializedPlanParser::global_context); + + auto partition_by = SubstraitPartitionedFileSink::make_partition_expression({"s_nationkey", "name"}); + + ASTs arguments(1, partition_by); + ASTPtr partition_by_string = makeASTFunction("toString", std::move(arguments)); + + Block sample_block{{STRING(), "name"}, {UINT(), "s_nationkey"}}; + auto syntax_result = TreeRewriter(context).analyze(partition_by_string, sample_block.getNamesAndTypesList()); + auto partition_by_expr = ExpressionAnalyzer(partition_by_string, syntax_result, context).getActions(false); + + + auto partition_by_column_name = partition_by_string->getColumnName(); + + Chunk chunk = testChunk(); + const auto & columns = chunk.getColumns(); + Block block_with_partition_by_expr = sample_block.cloneWithoutColumns(); + block_with_partition_by_expr.setColumns(columns); + partition_by_expr->execute(block_with_partition_by_expr); + + size_t chunk_rows = chunk.getNumRows(); + EXPECT_EQ(3, chunk_rows); + + const auto * partition_by_result_column = block_with_partition_by_expr.getByName(partition_by_column_name).column.get(); + EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0)); + EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1)); + EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2)); +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/native_write_one_partition.json b/cpp-ch/local-engine/tests/json/native_write_one_partition.json new file mode 100644 index 000000000000..45b3f60e41fa --- /dev/null +++ b/cpp-ch/local-engine/tests/json/native_write_one_partition.json @@ -0,0 +1,283 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "WriteParameters:isSnappy=1;format=parquet\n" + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "s_suppkey", + "s_name", + "s_address", + "s_phone", + "s_acctbal", + "s_comment", + "s_nationkey" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "PARTITION_COL" + ] + }, + "input": { + "project": { + "common": { + "emit": { + "outputMapping": [ + 7, + 8, + 9, + 10, + 11, + 12, + 13 + ] + } + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "s_suppkey", + "s_name", + "s_address", + "s_nationkey", + "s_phone", + "s_acctbal", + "s_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + }, + "expressions": [ + { + "selection": { + "directReference": { + "structField": {} + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 2 + } + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 4 + } + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 5 + } + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 6 + } + } + } + }, + { + "selection": { + "directReference": { + "structField": { + "field": 3 + } + } + } + } + ] + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/native_write_plan.json b/cpp-ch/local-engine/tests/json/native_write_plan.json new file mode 100644 index 000000000000..8d5ffce7b163 --- /dev/null +++ b/cpp-ch/local-engine/tests/json/native_write_plan.json @@ -0,0 +1,203 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "WriteParameters:isSnappy=1;format=parquet\n" + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "s_suppkey", + "s_name", + "s_address", + "s_nationkey", + "s_phone", + "s_acctbal", + "s_comment111" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "s_suppkey", + "s_name", + "s_address", + "s_nationkey", + "s_phone", + "s_acctbal", + "s_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 2, + "precision": 15, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/native_write_plan_1_spark33.json b/cpp-ch/local-engine/tests/json/native_write_plan_1_spark33.json new file mode 100644 index 000000000000..e053c9935264 --- /dev/null +++ b/cpp-ch/local-engine/tests/json/native_write_plan_1_spark33.json @@ -0,0 +1,116 @@ +{ + "extensions": [ + { + "extensionFunction": { + "name": "sum:req_i32" + } + } + ], + "relations": [ + { + "root": { + "input": { + "aggregate": { + "common": { + "direct": {} + }, + "input": { + "read": { + "baseSchema": { + "names": [ + "string_field#0", + "int_field#1" + ], + "struct": { + "types": [ + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + } + }, + "localFiles": { + "items": [ + { + "uriFile": "iterator:0" + } + ] + } + } + }, + "groupings": [ + { + "groupingExpressions": [ + { + "selection": { + "directReference": { + "structField": {} + } + } + } + ] + } + ], + "measures": [ + { + "measure": { + "phase": "AGGREGATION_PHASE_INITIAL_TO_INTERMEDIATE", + "outputType": { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + "arguments": [ + { + "value": { + "selection": { + "directReference": { + "structField": { + "field": 1 + } + } + } + } + } + ] + } + } + ], + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "has_required_child_distribution_expressions=false\n" + } + } + } + }, + "names": [ + "string_field#0", + "sum#31" + ], + "outputSchema": { + "types": [ + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/native_write_plan_1_spark35.json b/cpp-ch/local-engine/tests/json/native_write_plan_1_spark35.json new file mode 100644 index 000000000000..31592e5c75df --- /dev/null +++ b/cpp-ch/local-engine/tests/json/native_write_plan_1_spark35.json @@ -0,0 +1,246 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "WriteParameters:isSnappy=1;format=orc\n" + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i16": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i8": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "bool": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 18, + "precision": 38, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "string_field", + "int_field", + "long_field", + "float_field", + "double_field", + "short_field", + "byte_field", + "boolean_field", + "decimal_field", + "date_field" + ], + "struct": { + "types": [ + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i16": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i8": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "bool": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 18, + "precision": 38, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "read": { + "baseSchema": { + "names": [ + "string_field#0", + "int_field#1", + "long_field#2", + "float_field#3", + "double_field#4", + "short_field#5", + "byte_field#6", + "boolean_field#7", + "decimal_field#8", + "date_field#9" + ], + "struct": { + "types": [ + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp32": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i16": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i8": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "bool": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "decimal": { + "scale": 18, + "precision": 38, + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + } + }, + "localFiles": { + "items": [ + { + "uriFile": "iterator:0" + } + ] + } + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file