From 6aa83c82d3618bbc47087ebe21d3c3c49158e2ff Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Wed, 25 Sep 2024 14:16:17 +0800 Subject: [PATCH] Add NativeOutputWriter --- .../OptimizeTableCommandOverwrites.scala | 1 - .../OptimizeTableCommandOverwrites.scala | 1 - .../OptimizeTableCommandOverwrites.scala | 3 +- .../datasources/CHDatasourceJniWrapper.java | 51 +-- .../v1/CHFormatWriterInjects.scala | 2 +- .../v1/CHMergeTreeWriterInjects.scala | 3 +- .../MergeTreeFileFormatDataWriter.scala | 33 +- .../v1/clickhouse/MergeTreeOutputWriter.scala | 4 +- .../GlutenClickHouseMergeTreeWriteSuite.scala | 1 + cpp-ch/local-engine/CMakeLists.txt | 4 +- cpp-ch/local-engine/Common/CHUtil.cpp | 3 +- .../Parser/RelParsers/WriteRelParser.cpp | 2 +- .../Parser/SerializedPlanParser.cpp | 2 +- .../Parser/SubstraitParserUtils.cpp | 6 +- .../local-engine/Shuffle/NativeSplitter.cpp | 16 +- .../MergeTree/SparkMergeTreeWriter.cpp | 70 ++-- .../Storages/MergeTree/SparkMergeTreeWriter.h | 13 +- .../Storages/NativeOutputWriter.h | 36 ++ ...riterWrappers.cpp => NormalFileWriter.cpp} | 13 +- ...ileWriterWrappers.h => NormalFileWriter.h} | 27 +- cpp-ch/local-engine/local_engine_jni.cpp | 58 +-- .../tests/gtest_parquet_columnindex.cpp | 31 +- cpp-ch/local-engine/tests/gtest_parser.cpp | 1 - .../tests/gtest_write_pipeline.cpp | 207 +--------- .../tests/gtest_write_pipeline_mergetree.cpp | 279 +++++++++++++ .../tests/json/mergetree/2_one_pipeline.json | 368 ++++++++++++++++++ 26 files changed, 824 insertions(+), 411 deletions(-) create mode 100644 cpp-ch/local-engine/Storages/NativeOutputWriter.h rename cpp-ch/local-engine/Storages/Output/{FileWriterWrappers.cpp => NormalFileWriter.cpp} (93%) rename cpp-ch/local-engine/Storages/Output/{FileWriterWrappers.h => NormalFileWriter.h} (93%) create mode 100644 cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp create mode 100644 cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index b897010d5bb38..6d0d361b7dffe 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -118,7 +118,6 @@ object OptimizeTableCommandOverwrites extends Logging { val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, planWithSplitInfo.splitInfo, uuid, taskId.getId.toString, diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index b897010d5bb38..6d0d361b7dffe 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -118,7 +118,6 @@ object OptimizeTableCommandOverwrites extends Logging { val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, planWithSplitInfo.splitInfo, uuid, taskId.getId.toString, diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index ef30aaad2294f..58ec497459d39 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -120,7 +120,6 @@ object OptimizeTableCommandOverwrites extends Logging { val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, planWithSplitInfo.splitInfo, uuid, taskId.getId.toString, @@ -172,7 +171,7 @@ object OptimizeTableCommandOverwrites extends Logging { bucketNum: String, bin: Seq[AddFile], maxFileSize: Long): Seq[FileAction] = { - val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog); + val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog) val sparkSession = SparkSession.getActiveSession.get diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java index f19c5d39df1d8..e8746dbedad62 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java @@ -18,11 +18,15 @@ public class CHDatasourceJniWrapper { - public native long nativeInitFileWriterWrapper( - String filePath, byte[] preferredSchema, String formatHint); + public native void write(long instanceId, long blockAddress); + + public native String close(long instanceId); + + /// FileWriter + public native long createFilerWriter(String filePath, byte[] preferredSchema, String formatHint); - public native long nativeInitMergeTreeWriterWrapper( - byte[] plan, + /// MergeTreeWriter + public native long createMergeTreeWriter( byte[] splitInfo, String uuid, String taskId, @@ -31,43 +35,28 @@ public native long nativeInitMergeTreeWriterWrapper( byte[] confArray); public native String nativeMergeMTParts( - byte[] plan, - byte[] splitInfo, - String uuid, - String taskId, - String partition_dir, - String bucket_dir); + byte[] splitInfo, String uuid, String taskId, String partition_dir, String bucket_dir); public static native String filterRangesOnDriver(byte[] plan, byte[] read); - public native void write(long instanceId, long blockAddress); - - public native void writeToMergeTree(long instanceId, long blockAddress); - - public native void close(long instanceId); - - public native String closeMergeTreeWriter(long instanceId); - - /*- + /** * The input block is already sorted by partition columns + bucket expressions. (check - * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) - * However, the input block may contain parts(we call it stripe here) belonging to - * different partition/buckets. + * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) However, the input block may + * contain parts(we call it stripe here) belonging to different partition/buckets. * - * If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_. + *

If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_. * - * This function splits the input block in to several blocks, each of which belonging - * to the same partition/bucket. Notice the stripe will NOT contain partition columns + *

This function splits the input block in to several blocks, each of which belonging to the + * same partition/bucket. Notice the stripe will NOT contain partition columns * - * Since all rows in a stripe share the same partition/bucket, - * we only need to check the heading row. - * So, for each stripe, the native code also returns each stripe's first row's index. - * Caller can use these indice to get UnsafeRows from the input block, - * to help FileFormatDataWriter to aware partition/bucket changes. + *

Since all rows in a stripe share the same partition/bucket, we only need to check the + * heading row. So, for each stripe, the native code also returns each stripe's first row's index. + * Caller can use these indices to get UnsafeRows from the input block, to help + * FileFormatDataWriter to aware partition/bucket changes. */ public static native BlockStripes splitBlockByPartitionAndBucket( long blockAddress, - int[] partitionColIndice, + int[] partitionColIndices, boolean hasBucket, boolean reserve_partition_columns); } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala index 69c001e461d8c..ce927e7453d32 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala @@ -52,7 +52,7 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase { val namedStruct = namedStructBuilder.build val instance = - datasourceJniWrapper.nativeInitFileWriterWrapper(path, namedStruct.toByteArray, formatName) + datasourceJniWrapper.createFilerWriter(path, namedStruct.toByteArray, formatName) new OutputWriter { override def write(row: InternalRow): Unit = { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index 521b59d60e29b..1529beecb6f01 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -67,8 +67,7 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { splitInfo: Array[Byte]): OutputWriter = { val datasourceJniWrapper = new CHDatasourceJniWrapper() val instance = - datasourceJniWrapper.nativeInitMergeTreeWriterWrapper( - null, + datasourceJniWrapper.createMergeTreeWriter( splitInfo, UUID.randomUUID.toString, context.getTaskAttemptID.getTaskID.getId.toString, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala index 7b803e250278d..29f2b7e16ec8a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala @@ -59,7 +59,7 @@ abstract class MergeTreeFileFormatDataWriter( protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() protected var currentWriter: OutputWriter = _ - protected val returnedMetrics = mutable.HashMap[String, AddFile]() + protected val returnedMetrics: mutable.Map[String, AddFile] = mutable.HashMap[String, AddFile]() /** Trackers for computing various statistics on the data as it's being written out. */ protected val statsTrackers: Seq[WriteTaskStatsTracker] = @@ -71,10 +71,10 @@ abstract class MergeTreeFileFormatDataWriter( try { currentWriter.close() statsTrackers.foreach(_.closeFile(currentWriter.path())) - val ret = currentWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles - if (ret.nonEmpty) { - ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile)) - } + currentWriter + .asInstanceOf[MergeTreeOutputWriter] + .getAddFiles + .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) } finally { currentWriter = null } @@ -117,12 +117,7 @@ abstract class MergeTreeFileFormatDataWriter( releaseResources() val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { // committer.commitTask(taskAttemptContext) - val statuses = returnedMetrics - .map( - v => { - v._2 - }) - .toSeq + val statuses = returnedMetrics.map(_._2).toSeq new TaskCommitMessage(statuses) } @@ -142,7 +137,7 @@ abstract class MergeTreeFileFormatDataWriter( override def close(): Unit = {} - def getReturnedMetrics(): mutable.Map[String, AddFile] = returnedMetrics + def getReturnedMetrics: mutable.Map[String, AddFile] = returnedMetrics } /** FileFormatWriteTask for empty partitions */ @@ -443,7 +438,11 @@ class MergeTreeDynamicPartitionDataSingleWriter( case fakeRow: FakeRow => if (fakeRow.batch.numRows() > 0) { val blockStripes = GlutenRowSplitter.getInstance - .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed, true) + .splitBlockByPartitionAndBucket( + fakeRow, + partitionColIndice, + isBucketed, + reserve_partition_columns = true) val iter = blockStripes.iterator() while (iter.hasNext) { @@ -526,10 +525,10 @@ class MergeTreeDynamicPartitionDataConcurrentWriter( if (status.outputWriter != null) { try { status.outputWriter.close() - val ret = status.outputWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles - if (ret.nonEmpty) { - ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile)) - } + status.outputWriter + .asInstanceOf[MergeTreeOutputWriter] + .getAddFiles + .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) } finally { status.outputWriter = null } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala index 52593d7c17952..69b760887a2d3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala @@ -42,12 +42,12 @@ class MergeTreeOutputWriter( if (nextBatch.numRows > 0) { val col = nextBatch.column(0).asInstanceOf[CHColumnVector] - datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress) + datasourceJniWrapper.write(instance, col.getBlockAddress) } // else ignore this empty block } override def close(): Unit = { - val returnedMetrics = datasourceJniWrapper.closeMergeTreeWriter(instance) + val returnedMetrics = datasourceJniWrapper.close(instance) if (returnedMetrics != null && returnedMetrics.nonEmpty) { addFiles.appendAll( AddFileTags.partsMetricsToAddFile( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index b4eca622c89f4..186078c18dd18 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") + .set("spark.gluten.sql.native.writer.enabled", "true") .setCHSettings("min_insert_block_size_rows", 100000) .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index ca25692bf0135..d145ed339ff55 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -128,9 +128,9 @@ foreach(child ${children}) add_headers_and_sources(function_parsers ${child}) endforeach() -# Notice: soures files under Parser/*_udf subdirectories must be built into +# Notice: sources files under Parser/*_udf subdirectories must be built into # target ${LOCALENGINE_SHARED_LIB} directly to make sure all function parsers -# are registered successly. +# are registered successfully. add_library( ${LOCALENGINE_SHARED_LIB} SHARED local_engine_jni.cpp ${local_udfs_sources} ${function_parsers_sources} diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index c1fc49b0b96bc..5e57be5ae91e3 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -49,7 +49,7 @@ #include #include #include -#include +#include #include #include #include @@ -62,7 +62,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp index 16a5d72a9e857..6d44213ec73b1 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index bb0ea00b97e6d..e8f1196a66340 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -65,12 +65,12 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp index e72454ba6fb24..164cf0392dfbf 100644 --- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp +++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp @@ -26,14 +26,12 @@ namespace local_engine { void logDebugMessage(const google::protobuf::Message & message, const char * type) { - auto * logger = &Poco::Logger::get("SubstraitPlan"); - if (logger->debug()) + if (auto * logger = &Poco::Logger::get("SubstraitPlan"); logger->debug()) { namespace pb_util = google::protobuf::util; pb_util::JsonOptions options; std::string json; - auto s = pb_util::MessageToJsonString(message, &json, options); - if (!s.ok()) + if (auto s = pb_util::MessageToJsonString(message, &json, options); !s.ok()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type); LOG_DEBUG(logger, "{}:\n{}", type, json); } diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index 6cd74c8af343b..cee45aa34c4cb 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -21,16 +21,10 @@ #include #include #include -#include -#include #include #include -#include #include -#include #include -#include -#include namespace local_engine { @@ -86,7 +80,7 @@ void NativeSplitter::split(DB::Block & block) { if (partition_buffer[i]->size() >= options.buffer_size) { - output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns()))); + output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns()))); } } } @@ -116,7 +110,7 @@ bool NativeSplitter::hasNext() { if (inputHasNext()) { - split(*reinterpret_cast(inputNext())); + split(*reinterpret_cast(inputNext())); } else { @@ -125,7 +119,7 @@ bool NativeSplitter::hasNext() auto buffer = partition_buffer.at(i); if (buffer->size() > 0) { - output_buffer.emplace(std::pair(i, new Block(buffer->releaseColumns()))); + output_buffer.emplace(std::pair(i, new DB::Block(buffer->releaseColumns()))); } } break; @@ -214,7 +208,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); } -void HashNativeSplitter::computePartitionId(Block & block) +void HashNativeSplitter::computePartitionId(DB::Block & block) { partition_info = selector_builder->build(block); } @@ -229,7 +223,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio selector_builder = std::make_unique(options_.partition_num); } -void RoundRobinNativeSplitter::computePartitionId(Block & block) +void RoundRobinNativeSplitter::computePartitionId(DB::Block & block) { partition_info = selector_builder->build(block); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index dc32565f8420b..44a95cb2a4a6d 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -47,11 +47,44 @@ Block removeColumnSuffix(const Block & block) } return Block(columns); } + } namespace local_engine { +std::string PartInfo::toJson(const std::vector & part_infos) +{ + rapidjson::StringBuffer result; + rapidjson::Writer writer(result); + writer.StartArray(); + for (const auto & item : part_infos) + { + writer.StartObject(); + writer.Key("part_name"); + writer.String(item.part_name.c_str()); + writer.Key("mark_count"); + writer.Uint(item.mark_count); + writer.Key("disk_size"); + writer.Uint(item.disk_size); + writer.Key("row_count"); + writer.Uint(item.row_count); + writer.Key("bucket_id"); + writer.String(item.bucket_id.c_str()); + writer.Key("partition_values"); + writer.StartObject(); + for (const auto & key_value : item.partition_values) + { + writer.Key(key_value.first.c_str()); + writer.String(key_value.second.c_str()); + } + writer.EndObject(); + writer.EndObject(); + } + writer.EndArray(); + return result.GetString(); +} + std::unique_ptr SparkMergeTreeWriter::create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWritePartitionSettings & write_settings_, @@ -86,7 +119,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter( { } -void SparkMergeTreeWriter::write(const DB::Block & block) +void SparkMergeTreeWriter::write(DB::Block & block) { auto new_block = removeColumnSuffix(block); auto converter = ActionsDAG::makeConvertingActions( @@ -96,9 +129,10 @@ void SparkMergeTreeWriter::write(const DB::Block & block) executor.push(new_block); } -void SparkMergeTreeWriter::finalize() +std::string SparkMergeTreeWriter::close() { executor.finish(); + return PartInfo::toJson(getAllPartInfo()); } std::vector SparkMergeTreeWriter::getAllPartInfo() const @@ -120,36 +154,4 @@ std::vector SparkMergeTreeWriter::getAllPartInfo() const return res; } -String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_infos) -{ - rapidjson::StringBuffer result; - rapidjson::Writer writer(result); - writer.StartArray(); - for (const auto & item : part_infos) - { - writer.StartObject(); - writer.Key("part_name"); - writer.String(item.part_name.c_str()); - writer.Key("mark_count"); - writer.Uint(item.mark_count); - writer.Key("disk_size"); - writer.Uint(item.disk_size); - writer.Key("row_count"); - writer.Uint(item.row_count); - writer.Key("bucket_id"); - writer.String(item.bucket_id.c_str()); - writer.Key("partition_values"); - writer.StartObject(); - for (const auto & key_value : item.partition_values) - { - writer.Key(key_value.first.c_str()); - writer.String(key_value.second.c_str()); - } - writer.EndObject(); - writer.EndObject(); - } - writer.EndArray(); - return result.GetString(); -} - } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h index 699fd3d80b5b5..59535cef094dd 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB { @@ -44,12 +45,13 @@ struct PartInfo String bucket_id; bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; } + + static std::string toJson(const std::vector & part_infos); }; -class SparkMergeTreeWriter +class SparkMergeTreeWriter : public NativeOutputWriter { public: - static String partInfosToJson(const std::vector & part_infos); static std::unique_ptr create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWritePartitionSettings & write_settings_, @@ -61,9 +63,8 @@ class SparkMergeTreeWriter DB::QueryPipeline && pipeline_, std::unordered_map && partition_values_); - void write(const DB::Block & block); - void finalize(); - std::vector getAllPartInfo() const; + void write(DB::Block & block) override; + std::string close() override; private: DB::Block header; @@ -71,5 +72,7 @@ class SparkMergeTreeWriter DB::QueryPipeline pipeline; DB::PushingPipelineExecutor executor; std::unordered_map partition_values; + + std::vector getAllPartInfo() const; }; } diff --git a/cpp-ch/local-engine/Storages/NativeOutputWriter.h b/cpp-ch/local-engine/Storages/NativeOutputWriter.h new file mode 100644 index 0000000000000..bfcbe0b092a53 --- /dev/null +++ b/cpp-ch/local-engine/Storages/NativeOutputWriter.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include + +namespace DB +{ +class Block; +} +namespace local_engine +{ +class NativeOutputWriter +{ +public: + NativeOutputWriter() = default; + virtual ~NativeOutputWriter() = default; + + //TODO: change to write(const DB::Block & block) + virtual void write(DB::Block & block) = 0; + virtual std::string close() = 0; +}; +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp similarity index 93% rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp index 975e5046bac25..d8b7f3240386f 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "FileWriterWrappers.h" +#include "NormalFileWriter.h" #include #include @@ -25,12 +25,11 @@ namespace local_engine const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; const std::string SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"}; -NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) - : FileWriterWrapper(file_), context(context_) +NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_) { } -void NormalFileWriter::consume(DB::Block & block) +void NormalFileWriter::write(DB::Block & block) { if (!writer) [[unlikely]] { @@ -63,12 +62,14 @@ void NormalFileWriter::consume(DB::Block & block) writer->push(materializeBlock(block)); } -void NormalFileWriter::close() +std::string NormalFileWriter::close() { /// When insert into a table with empty dataset, NormalFileWriter::consume would be never called. /// So we need to skip when writer is nullptr. if (writer) writer->finish(); + + return std::string{}; } OutputFormatFilePtr createOutputFormatFile( @@ -84,7 +85,7 @@ OutputFormatFilePtr createOutputFormatFile( return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_schema, format_hint); } -std::unique_ptr createFileWriterWrapper( +std::unique_ptr NormalFileWriter::create( const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint) { return std::make_unique(createOutputFormatFile(context, file_uri, preferred_schema, format_hint), context); diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h similarity index 93% rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 49383f8de42cb..6d054a04fad39 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -36,31 +37,20 @@ namespace local_engine { -class FileWriterWrapper +class NormalFileWriter : public NativeOutputWriter { public: - explicit FileWriterWrapper(const OutputFormatFilePtr & file_) : file(file_) { } - virtual ~FileWriterWrapper() = default; + static std::unique_ptr create( + const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); - virtual void consume(DB::Block & block) = 0; - virtual void close() = 0; - -protected: - OutputFormatFilePtr file; -}; - -using FileWriterWrapperPtr = std::shared_ptr; - -class NormalFileWriter : public FileWriterWrapper -{ -public: NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_); ~NormalFileWriter() override = default; - void consume(DB::Block & block) override; - void close() override; + void write(DB::Block & block) override; + std::string close() override; private: + OutputFormatFilePtr file; DB::ContextPtr context; OutputFormatFile::OutputFormatPtr output_format; @@ -68,9 +58,6 @@ class NormalFileWriter : public FileWriterWrapper std::unique_ptr writer; }; -std::unique_ptr createFileWriterWrapper( - const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); - OutputFormatFilePtr createOutputFormatFile( const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index af6fce0607baa..5cec7af689cc7 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -45,9 +45,8 @@ #include #include #include -#include +#include #include -#include #include #include #include @@ -906,7 +905,7 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeC LOCAL_ENGINE_JNI_METHOD_END(env, ) } -JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper( +JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter( JNIEnv * env, jobject, jstring file_uri_, jbyteArray preferred_schema_, jstring format_hint_) { LOCAL_ENGINE_JNI_METHOD_START @@ -918,8 +917,7 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW reinterpret_cast(preferred_schema_ref.elems()), static_cast(preferred_schema_ref.length())}; substrait::NamedStruct res; - bool ok = res.ParseFromString(view); - if (!ok) + if (!res.ParseFromString(view)) return {}; return std::move(res); }; @@ -933,15 +931,14 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW // for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint const auto format_hint = jstring2string(env, format_hint_); const auto context = local_engine::QueryContext::instance().currentQueryContext(); - auto * writer = local_engine::createFileWriterWrapper(context, file_uri, preferred_schema, format_hint).release(); + auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, format_hint).release(); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } -JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper( +JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter( JNIEnv * env, jobject, - jbyteArray plan_, jbyteArray split_info_, jstring uuid_, jstring task_id_, @@ -1003,54 +1000,24 @@ Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNI { LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); + auto * writer = reinterpret_cast(instanceId); auto * block = reinterpret_cast(block_address); - writer->consume(*block); - LOCAL_ENGINE_JNI_METHOD_END(env, ) -} - -JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) -{ - LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); - SCOPE_EXIT({ delete writer; }); - writer->close(); - LOCAL_ENGINE_JNI_METHOD_END(env, ) -} - -JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree( - JNIEnv * env, jobject, jlong instanceId, jlong block_address) -{ - LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); - const auto * block = reinterpret_cast(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } -JNIEXPORT jstring -Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId) +JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) { LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); + auto * writer = reinterpret_cast(instanceId); SCOPE_EXIT({ delete writer; }); - - writer->finalize(); - const auto part_infos = writer->getAllPartInfo(); - const auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); - return local_engine::charTojstring(env, json_info.c_str()); + const auto result = writer->close(); + return local_engine::charTojstring(env, result.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts( - JNIEnv * env, - jobject, - jbyteArray plan_, - jbyteArray split_info_, - jstring uuid_, - jstring task_id_, - jstring partition_dir_, - jstring bucket_dir_) + JNIEnv * env, jobject, jbyteArray split_info_, jstring uuid_, jstring task_id_, jstring partition_dir_, jstring bucket_dir_) { LOCAL_ENGINE_JNI_METHOD_START @@ -1089,8 +1056,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir}); } - auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res); - + auto json_info = local_engine::PartInfo::toJson(res); return local_engine::charTojstring(env, json_info.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp index ac0ec2145757c..5a39580c19fdc 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -39,21 +40,21 @@ #include #include -# define ASSERT_DURATION_LE(secs, stmt) \ - { \ - std::promise completed; \ - auto stmt_future = completed.get_future(); \ - std::thread( \ - [&](std::promise & completed) \ - { \ - stmt; \ - completed.set_value(true); \ - }, \ - std::ref(completed)) \ - .detach(); \ - if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \ - GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \ - } +#define ASSERT_DURATION_LE(secs, stmt) \ + { \ + std::promise completed; \ + auto stmt_future = completed.get_future(); \ + std::thread( \ + [&](std::promise & completed) \ + { \ + stmt; \ + completed.set_value(true); \ + }, \ + std::ref(completed)) \ + .detach(); \ + if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \ + GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \ + } namespace DB::ErrorCodes diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp index b6c431ff25f35..da46ff6fdcdea 100644 --- a/cpp-ch/local-engine/tests/gtest_parser.cpp +++ b/cpp-ch/local-engine/tests/gtest_parser.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index 992c91c942e0e..233456992dff5 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -19,32 +19,20 @@ #include #include #include -#include #include #include -#include -#include #include #include -#include #include -#include #include -#include #include #include -#include -#include -#include -#include -#include #include #include -#include +#include #include #include #include -#include #include #include @@ -263,197 +251,4 @@ TEST(WritePipeline, ComputePartitionedExpression) EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0)); EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1)); EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2)); -} - -void do_remove(const std::string & folder) -{ - namespace fs = std::filesystem; - if (const std::filesystem::path ph(folder); fs::exists(ph)) - fs::remove_all(ph); -} - -Chunk person_chunk() -{ - auto id = INT()->createColumn(); - id->insert(100); - id->insert(200); - id->insert(300); - id->insert(400); - id->insert(500); - id->insert(600); - id->insert(700); - - auto name = STRING()->createColumn(); - name->insert("Joe"); - name->insert("Marry"); - name->insert("Mike"); - name->insert("Fred"); - name->insert("Albert"); - name->insert("Michelle"); - name->insert("Dan"); - - auto age = makeNullable(INT())->createColumn(); - Field null_field; - age->insert(30); - age->insert(null_field); - age->insert(18); - age->insert(50); - age->insert(null_field); - age->insert(30); - age->insert(50); - - - MutableColumns x; - x.push_back(std::move(id)); - x.push_back(std::move(name)); - x.push_back(std::move(age)); - return {std::move(x), 7}; -} - -TEST(WritePipeline, MergeTree) -{ - ThreadStatus thread_status; - - const auto context = DB::Context::createCopy(QueryContext::globalContext()); - context->setPath("./"); - const Settings & settings = context->getSettingsRef(); - - const std::string query - = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)"; - - const char * begin = query.data(); - const char * end = query.data() + query.size(); - ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]); - - ASTPtr ast = parseQuery(parser, begin, end, "", settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); - - EXPECT_TRUE(ast->as()); - auto & create = ast->as(); - - ColumnsDescription column_descriptions - = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE); - - StorageInMemoryMetadata metadata; - metadata.setColumns(column_descriptions); - metadata.setComment("args.comment"); - ASTPtr partition_by_key; - metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context); - - MergeTreeData::MergingParams merging_params; - merging_params.mode = MergeTreeData::MergingParams::Ordinary; - - - /// This merging param maybe used as part of sorting key - std::optional merging_param_key_arg; - /// Get sorting key from engine arguments. - /// - /// NOTE: store merging_param_key_arg as additional key column. We do it - /// before storage creation. After that storage will just copy this - /// column if sorting key will be changed. - metadata.sorting_key - = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg); - - std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); - - UUID uuid; - UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull; - UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull; - - SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); }); - - auto merge_tree = std::make_shared( - StorageID("", "", uuid), - "WritePipeline_MergeTree", - metadata, - LoadingStrictnessLevel::CREATE, - context, - "", - merging_params, - std::move(storage_settings)); - - Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}}; - DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]); - squashing.add(person_chunk()); - auto x = Squashing::squash(squashing.flush()); - x.getChunkInfos().add(std::make_shared()); - - ASSERT_EQ(7, x.getNumRows()); - ASSERT_EQ(3, x.getNumColumns()); - - - auto metadata_snapshot = std::make_shared(metadata); - ASTPtr none; - auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false)); - - sink->consume(x); - sink->onFinish(); -} - -INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json"); -INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json"); -INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json"); - -TEST(WritePipeline, SparkMergeTree) -{ - ThreadStatus thread_status; - - const auto context = DB::Context::createCopy(QueryContext::globalContext()); - context->setPath("./"); - const Settings & settings = context->getSettingsRef(); - - const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); - MergeTreeTableInstance merge_tree_table(extension_table); - - EXPECT_EQ(merge_tree_table.database, "default"); - EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); - EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree"); - EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default"); - - do_remove(merge_tree_table.relative_path); - - const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext()); - EXPECT_TRUE(dest_storage); - EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); - DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); - Block header = metadata_snapshot->getSampleBlock(); - - constexpr std::string_view split_template - = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; - constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")}; - - SparkMergeTreeWritePartitionSettings gm_write_settings{ - .part_name_prefix{"this_is_prefix"}, - }; - gm_write_settings.set(context); - - auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context); - SparkMergeTreeWriter & spark_merge_tree_writer = *writer; - - auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file); - EXPECT_TRUE(local_executor->hasNext()); - - do - { - spark_merge_tree_writer.write(*local_executor->nextColumnar()); - } while (local_executor->hasNext()); - - spark_merge_tree_writer.finalize(); - auto part_infos = spark_merge_tree_writer.getAllPartInfo(); - auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); - std::cerr << json_info << std::endl; - - /// - { - const auto extension_table_hdfs - = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); - MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs); - EXPECT_EQ(merge_tree_table_hdfs.database, "default"); - EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); - EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); - EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); - - const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext()); - EXPECT_TRUE(dest_storage_hdfs); - EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); - } } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp new file mode 100644 index 0000000000000..5ede32c890734 --- /dev/null +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::Setting +{ +extern const SettingsUInt64 max_parser_depth; +extern const SettingsUInt64 max_parser_backtracks; +extern const SettingsBool allow_settings_after_format_in_insert; +extern const SettingsUInt64 max_query_size; +extern const SettingsUInt64 min_insert_block_size_rows; +extern const SettingsUInt64 min_insert_block_size_bytes; +} + +using namespace local_engine; +using namespace DB; + +namespace +{ +void do_remove(const std::string & folder) +{ + namespace fs = std::filesystem; + if (const std::filesystem::path ph(folder); fs::exists(ph)) + fs::remove_all(ph); +} + +Chunk person_chunk() +{ + auto id = INT()->createColumn(); + id->insert(100); + id->insert(200); + id->insert(300); + id->insert(400); + id->insert(500); + id->insert(600); + id->insert(700); + + auto name = STRING()->createColumn(); + name->insert("Joe"); + name->insert("Marry"); + name->insert("Mike"); + name->insert("Fred"); + name->insert("Albert"); + name->insert("Michelle"); + name->insert("Dan"); + + auto age = makeNullable(INT())->createColumn(); + Field null_field; + age->insert(30); + age->insert(null_field); + age->insert(18); + age->insert(50); + age->insert(null_field); + age->insert(30); + age->insert(50); + + + MutableColumns x; + x.push_back(std::move(id)); + x.push_back(std::move(name)); + x.push_back(std::move(age)); + return {std::move(x), 7}; +} +} + +TEST(MergeTree, ClickhouseMergeTree) +{ + ThreadStatus thread_status; + + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const std::string query + = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)"; + + const char * begin = query.data(); + const char * end = query.data() + query.size(); + ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]); + + ASTPtr ast = parseQuery( + parser, + begin, + end, + "", + settings[Setting::max_query_size], + settings[Setting::max_parser_depth], + settings[Setting::max_parser_backtracks]); + + EXPECT_TRUE(ast->as()); + auto & create = ast->as(); + + ColumnsDescription column_descriptions + = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE); + + StorageInMemoryMetadata metadata; + metadata.setColumns(column_descriptions); + metadata.setComment("args.comment"); + ASTPtr partition_by_key; + metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context); + + MergeTreeData::MergingParams merging_params; + merging_params.mode = MergeTreeData::MergingParams::Ordinary; + + + /// This merging param maybe used as part of sorting key + std::optional merging_param_key_arg; + /// Get sorting key from engine arguments. + /// + /// NOTE: store merging_param_key_arg as additional key column. We do it + /// before storage creation. After that storage will just copy this + /// column if sorting key will be changed. + metadata.sorting_key + = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg); + + std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); + + UUID uuid; + UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull; + UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull; + + SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); }); + + auto merge_tree = std::make_shared( + StorageID("", "", uuid), + "WritePipeline_MergeTree", + metadata, + LoadingStrictnessLevel::CREATE, + context, + "", + merging_params, + std::move(storage_settings)); + + Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}}; + DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]); + squashing.add(person_chunk()); + auto x = Squashing::squash(squashing.flush()); + x.getChunkInfos().add(std::make_shared()); + + ASSERT_EQ(7, x.getNumRows()); + ASSERT_EQ(3, x.getNumColumns()); + + + auto metadata_snapshot = std::make_shared(metadata); + ASTPtr none; + auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false)); + + sink->consume(x); + sink->onFinish(); +} + +INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json"); +INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json"); +INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json"); + +TEST(MergeTree, SparkMergeTree) +{ + ThreadStatus thread_status; + + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); + MergeTreeTableInstance merge_tree_table(extension_table); + + EXPECT_EQ(merge_tree_table.database, "default"); + EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default"); + + do_remove(merge_tree_table.relative_path); + + const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext()); + EXPECT_TRUE(dest_storage); + EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); + DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); + Block header = metadata_snapshot->getSampleBlock(); + + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; + constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")}; + + SparkMergeTreeWritePartitionSettings gm_write_settings{ + .part_name_prefix{"this_is_prefix"}, + }; + gm_write_settings.set(context); + + auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context); + SparkMergeTreeWriter & spark_merge_tree_writer = *writer; + + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file); + EXPECT_TRUE(local_executor->hasNext()); + + do + { + spark_merge_tree_writer.write(*local_executor->nextColumnar()); + } while (local_executor->hasNext()); + + auto json_info = spark_merge_tree_writer.close(); + std::cerr << json_info << std::endl; + + /// + { + const auto extension_table_hdfs + = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); + MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs); + EXPECT_EQ(merge_tree_table_hdfs.database, "default"); + EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); + + const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext()); + EXPECT_TRUE(dest_storage_hdfs); + EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); + } +} + +INCBIN(_2_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/2_one_pipeline.json"); + +TEST(MergeTree, Pipeline) +{ + GTEST_SKIP(); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + GlutenWriteSettings settings{ + .task_write_tmp_dir = "file:///tmp/lineitem_mergetree", + .task_write_filename = "part-00000-a09f9d59-2dc6-43bc-a485-dcab8384b2ff.c000.mergetree", + }; + settings.set(context); + + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; + auto [_, local_executor] = test::create_plan_and_executor( + EMBEDDED_PLAN(_2_mergetree_plan_), + split_template, + GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"), + context); + EXPECT_TRUE(local_executor->hasNext()); + const Block & x = *local_executor->nextColumnar(); +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json new file mode 100644 index 0000000000000..fbc593267464d --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json @@ -0,0 +1,368 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "WriteParameters:isSnappy=1;format=mergetree\n" + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file