From 5d28de617578c4dd933cdd103413c61d72db5294 Mon Sep 17 00:00:00 2001 From: Chang chen Date: Wed, 9 Oct 2024 17:38:44 +0800 Subject: [PATCH] [GLUTEN-7028][CH][Part-5] Refactor: add NativeOutputWriter to unify CHDatasourceJniWrapper (#7395) * Add NativeOutputWriter * refactor CHDatasourceJniWrapper * WriteConfiguration * using hadoop Configuration to pass parameter * Implement CHMergeTreeWriterInjects::createNativeWrite * Rename datasources.clickhouse.ClickhouseMetaSerializer => datasources.mergetree.MetaSerializer * delete MergeTreeDeltaUtil and move its functionality to StorageMeta * WriteConfiguration => StorageConfigProvider * fix prefixof * WriteConfiguration => StorageConfigProvider 2 * withStorageID --- backends-clickhouse/pom.xml | 22 ++ .../sql/delta/catalog/ClickHouseTableV2.scala | 8 +- .../OptimizeTableCommandOverwrites.scala | 10 +- .../source/DeltaMergeTreeFileFormat.scala | 29 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 8 +- .../OptimizeTableCommandOverwrites.scala | 12 +- .../source/DeltaMergeTreeFileFormat.scala | 30 +- .../sql/delta/catalog/ClickHouseTableV2.scala | 7 +- .../OptimizeTableCommandOverwrites.scala | 12 +- .../source/DeltaMergeTreeFileFormat.scala | 29 +- .../datasources/CHDatasourceJniWrapper.java | 76 ++-- .../clickhouse/ExtensionTableBuilder.java | 7 +- .../datasources/v1/write_optimization.proto | 37 ++ .../clickhouse/CHIteratorApi.scala | 5 +- .../delta/catalog/ClickHouseTableV2Base.scala | 25 +- .../commands/GlutenCHCacheDataCommand.scala | 8 +- .../clickhouse/utils/MergeTreeDeltaUtil.scala | 47 --- .../utils/MergeTreePartsPartitionsUtil.scala | 6 +- .../mergetree/DeltaMetaReader.scala | 38 +- .../MetaSerializer.scala} | 84 ++-- .../datasources/mergetree/StorageMeta.scala | 93 +++-- .../v1/CHFormatWriterInjects.scala | 64 ++- .../v1/CHMergeTreeWriterInjects.scala | 87 +++-- .../datasources/v1/CHOrcWriterInjects.scala | 18 +- .../v1/CHParquetWriterInjects.scala | 16 +- .../MergeTreeFileFormatDataWriter.scala | 33 +- .../v1/clickhouse/MergeTreeOutputWriter.scala | 13 +- .../v2/clickhouse/ClickHouseConfig.scala | 5 +- .../GlutenClickHouseMergeTreeWriteSuite.scala | 1 + cpp-ch/local-engine/CMakeLists.txt | 4 +- cpp-ch/local-engine/Common/CHUtil.cpp | 3 +- .../Parser/RelParsers/WriteRelParser.cpp | 2 +- .../Parser/SerializedPlanParser.cpp | 2 +- .../Parser/SubstraitParserUtils.cpp | 6 +- .../local-engine/Shuffle/NativeSplitter.cpp | 16 +- .../Storages/MergeTree/SparkMergeTreeMeta.cpp | 34 +- .../Storages/MergeTree/SparkMergeTreeMeta.h | 3 + .../MergeTree/SparkMergeTreeWriter.cpp | 70 ++-- .../Storages/MergeTree/SparkMergeTreeWriter.h | 13 +- .../Storages/NativeOutputWriter.h | 36 ++ ...riterWrappers.cpp => NormalFileWriter.cpp} | 13 +- ...ileWriterWrappers.h => NormalFileWriter.h} | 27 +- cpp-ch/local-engine/local_engine_jni.cpp | 116 ++---- .../proto/write_optimization.proto | 1 + .../tests/gtest_parquet_columnindex.cpp | 31 +- cpp-ch/local-engine/tests/gtest_parser.cpp | 1 - .../tests/gtest_write_pipeline.cpp | 207 +--------- .../tests/gtest_write_pipeline_mergetree.cpp | 279 +++++++++++++ .../tests/json/mergetree/2_one_pipeline.json | 368 ++++++++++++++++++ 49 files changed, 1247 insertions(+), 815 deletions(-) create mode 100644 backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto delete mode 100644 backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala rename backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/{clickhouse/ClickhouseMetaSerializer.scala => mergetree/MetaSerializer.scala} (66%) create mode 100644 cpp-ch/local-engine/Storages/NativeOutputWriter.h rename cpp-ch/local-engine/Storages/Output/{FileWriterWrappers.cpp => NormalFileWriter.cpp} (93%) rename cpp-ch/local-engine/Storages/Output/{FileWriterWrappers.h => NormalFileWriter.h} (93%) create mode 120000 cpp-ch/local-engine/proto/write_optimization.proto create mode 100644 cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp create mode 100644 cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml index 130fe88552e7..6bc9f8ec5336 100644 --- a/backends-clickhouse/pom.xml +++ b/backends-clickhouse/pom.xml @@ -253,6 +253,28 @@ target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + org.xolstice.maven.plugins + protobuf-maven-plugin + + + compile-gluten-proto + generate-sources + + compile + test-compile + + + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + src/main/resources/org/apache/spark/sql/execution/datasources/v1 + false + + + + org.apache.maven.plugins maven-resources-plugin diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index ae8ec32bd0a4..25f691d2e8cb 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -96,12 +96,8 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - StorageMeta.withMoreStorageInfo( - meta, - ClickhouseSnapshot.genSnapshotId(snapshot), - deltaLog.dataPath, - dataBaseName, - tableName)) + StorageMeta + .withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot))) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index b897010d5bb3..242a5c3c277a 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import java.util.{Date, UUID} +import java.util.Date import scala.collection.mutable.ArrayBuffer object OptimizeTableCommandOverwrites extends Logging { @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - val uuid = UUID.randomUUID.toString - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( description.path, description.database, @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging { description.tableSchema.toAttributes ) - val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = - datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, + CHDatasourceJniWrapper.nativeMergeMTParts( planWithSplitInfo.splitInfo, - uuid, - taskId.getId.toString, description.partitionDir.getOrElse(""), description.bucketDir.getOrElse("") ) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 19b3d396bd6f..5b2dc164b56a 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) } new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index ae8ec32bd0a4..25f691d2e8cb 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -96,12 +96,8 @@ class ClickHouseTableV2( def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( - StorageMeta.withMoreStorageInfo( - meta, - ClickhouseSnapshot.genSnapshotId(snapshot), - deltaLog.dataPath, - dataBaseName, - tableName)) + StorageMeta + .withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot))) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index b897010d5bb3..e7b8ce1d3053 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import java.util.{Date, UUID} +import java.util.Date import scala.collection.mutable.ArrayBuffer object OptimizeTableCommandOverwrites extends Logging { @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - val uuid = UUID.randomUUID.toString - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( description.path, description.database, @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging { description.tableSchema.toAttributes ) - val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = - datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, + CHDatasourceJniWrapper.nativeMergeMTParts( planWithSplitInfo.splitInfo, - uuid, - taskId.getId.toString, description.partitionDir.getOrElse(""), description.bucketDir.getOrElse("") ) @@ -170,7 +164,7 @@ object OptimizeTableCommandOverwrites extends Logging { bucketNum: String, bin: Seq[AddFile], maxFileSize: Long): Seq[FileAction] = { - val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog); + val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog) val sparkSession = SparkSession.getActiveSession.get diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index c2d1ec47b9ca..24dbc6e03bef 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + + // just for the sake of compatibility val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) } new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala index f730b42e4db0..5f6a2dc3d712 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala @@ -99,12 +99,11 @@ class ClickHouseTableV2( def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = { new DeltaMergeTreeFileFormat( protocol, - StorageMeta.withMoreStorageInfo( + StorageMeta.withStorageID( meta, - ClickhouseSnapshot.genSnapshotId(initialSnapshot), - deltaLog.dataPath, dataBaseName, - tableName)) + tableName, + ClickhouseSnapshot.genSnapshotId(initialSnapshot))) } override def deltaProperties: Map[String, String] = properties().asScala.toMap diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala index ef30aaad2294..0a3b2ef5ef26 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path} import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl -import java.util.{Date, UUID} +import java.util.Date import scala.collection.mutable.ArrayBuffer object OptimizeTableCommandOverwrites extends Logging { @@ -97,8 +97,6 @@ object OptimizeTableCommandOverwrites extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { - val uuid = UUID.randomUUID.toString - val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel( description.path, description.database, @@ -117,13 +115,9 @@ object OptimizeTableCommandOverwrites extends Logging { DataTypeUtils.toAttributes(description.tableSchema) ) - val datasourceJniWrapper = new CHDatasourceJniWrapper() val returnedMetrics = - datasourceJniWrapper.nativeMergeMTParts( - planWithSplitInfo.plan, + CHDatasourceJniWrapper.nativeMergeMTParts( planWithSplitInfo.splitInfo, - uuid, - taskId.getId.toString, description.partitionDir.getOrElse(""), description.bucketDir.getOrElse("") ) @@ -172,7 +166,7 @@ object OptimizeTableCommandOverwrites extends Logging { bucketNum: String, bin: Seq[AddFile], maxFileSize: Long): Seq[FileAction] = { - val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog); + val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog) val sparkSession = SparkSession.getActiveSession.get diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 1489ae4dbf49..6cc431f4f99c 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) } new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java index f19c5d39df1d..441feba67c57 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java @@ -16,58 +16,64 @@ */ package org.apache.spark.sql.execution.datasources; +import io.substrait.proto.WriteRel; + public class CHDatasourceJniWrapper { - public native long nativeInitFileWriterWrapper( - String filePath, byte[] preferredSchema, String formatHint); + private final long instance; - public native long nativeInitMergeTreeWriterWrapper( - byte[] plan, - byte[] splitInfo, - String uuid, - String taskId, - String partition_dir, - String bucket_dir, - byte[] confArray); + public CHDatasourceJniWrapper(String filePath, WriteRel write) { + this.instance = createFilerWriter(filePath, write.toByteArray()); + } - public native String nativeMergeMTParts( - byte[] plan, - byte[] splitInfo, - String uuid, - String taskId, - String partition_dir, - String bucket_dir); + public CHDatasourceJniWrapper( + String taskId, String partition_dir, String bucket_dir, WriteRel write, byte[] confArray) { + this.instance = + createMergeTreeWriter(taskId, partition_dir, bucket_dir, write.toByteArray(), confArray); + } - public static native String filterRangesOnDriver(byte[] plan, byte[] read); + public void write(long blockAddress) { + write(instance, blockAddress); + } + + public String close() { + return close(instance); + } - public native void write(long instanceId, long blockAddress); + private native void write(long instanceId, long blockAddress); - public native void writeToMergeTree(long instanceId, long blockAddress); + private native String close(long instanceId); - public native void close(long instanceId); + /// FileWriter + private native long createFilerWriter(String filePath, byte[] writeRel); - public native String closeMergeTreeWriter(long instanceId); + /// MergeTreeWriter + private native long createMergeTreeWriter( + String taskId, String partition_dir, String bucket_dir, byte[] writeRel, byte[] confArray); + + public static native String nativeMergeMTParts( + byte[] splitInfo, String partition_dir, String bucket_dir); + + public static native String filterRangesOnDriver(byte[] plan, byte[] read); - /*- + /** * The input block is already sorted by partition columns + bucket expressions. (check - * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) - * However, the input block may contain parts(we call it stripe here) belonging to - * different partition/buckets. + * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) However, the input block may + * contain parts(we call it stripe here) belonging to different partition/buckets. * - * If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_. + *

If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_. * - * This function splits the input block in to several blocks, each of which belonging - * to the same partition/bucket. Notice the stripe will NOT contain partition columns + *

This function splits the input block in to several blocks, each of which belonging to the + * same partition/bucket. Notice the stripe will NOT contain partition columns * - * Since all rows in a stripe share the same partition/bucket, - * we only need to check the heading row. - * So, for each stripe, the native code also returns each stripe's first row's index. - * Caller can use these indice to get UnsafeRows from the input block, - * to help FileFormatDataWriter to aware partition/bucket changes. + *

Since all rows in a stripe share the same partition/bucket, we only need to check the + * heading row. So, for each stripe, the native code also returns each stripe's first row's index. + * Caller can use these indices to get UnsafeRows from the input block, to help + * FileFormatDataWriter to aware partition/bucket changes. */ public static native BlockStripes splitBlockByPartitionAndBucket( long blockAddress, - int[] partitionColIndice, + int[] partitionColIndices, boolean hasBucket, boolean reserve_partition_columns); } diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java index 9d6ed6868ec1..a305972045b9 100644 --- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java +++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java @@ -16,6 +16,9 @@ */ package org.apache.spark.sql.execution.datasources.clickhouse; +import org.apache.spark.sql.execution.datasources.mergetree.MetaSerializer; +import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer; + import java.util.List; import java.util.Map; @@ -34,13 +37,13 @@ public static ExtensionTableNode makeExtensionTable( String bfIndexKey, String setIndexKey, String primaryKey, - ClickhousePartSerializer partSerializer, + PartSerializer partSerializer, String tableSchemaJson, Map clickhouseTableConfigs, List preferredLocations) { String result = - ClickhouseMetaSerializer.apply( + MetaSerializer.apply( database, tableName, snapshotId, diff --git a/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto b/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto new file mode 100644 index 000000000000..a09f3ea0940c --- /dev/null +++ b/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +syntax = "proto3"; + +package local_engine; + +option java_package = "org.apache.spark.sql.execution.datasources.v1"; +option java_multiple_files = true; + +message Write { + message Common { + string format = 1; + } + message ParquetWrite{} + message OrcWrite{} + message MergeTreeWrite{ + string database = 1; + string table = 2; + string snapshot_id = 3; + string order_by_key = 4; + string low_card_key = 5; + string minmax_index_key = 6; + string bf_index_key = 7; + string set_index_key = 8; + string primary_key = 9; + string relative_path = 10; + string absolute_path = 11; + + string storage_policy = 12; + } + + Common common = 1; + oneof file_format { + ParquetWrite parquet = 2; + OrcWrite orc = 3; + MergeTreeWrite mergetree = 4; + } +} \ No newline at end of file diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index 3c834b7ca847..6760d29561a3 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -38,7 +38,8 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter} import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.FilePartition -import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode} +import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode} +import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper @@ -148,7 +149,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { p.bfIndexKey, p.setIndexKey, p.primaryKey, - ClickhousePartSerializer.fromMergeTreePartSplits(p.partList.toSeq), + PartSerializer.fromMergeTreePartSplits(p.partList.toSeq), p.tableSchemaJson, p.clickhouseTableConfigs.asJava, CHAffinity.getNativeMergeTreePartitionLocations(p).toList.asJava diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 062e96962297..346ed4a17576 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -16,12 +16,10 @@ */ package org.apache.spark.sql.delta.catalog -import org.apache.gluten.expression.ConverterUtils.normalizeColName +import org.apache.gluten.expression.ConverterUtils import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.Snapshot -import org.apache.spark.sql.delta.actions.Metadata -import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader} import org.apache.hadoop.fs.Path @@ -38,7 +36,8 @@ trait ClickHouseTableV2Base extends TablePropertiesReader { def configuration: Map[String, String] = deltaProperties - def metadata: Metadata = deltaSnapshot.metadata + override lazy val partitionColumns: Seq[String] = + deltaSnapshot.metadata.partitionColumns.map(ConverterUtils.normalizeColName) lazy val dataBaseName: String = deltaCatalog .map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE)) @@ -49,18 +48,16 @@ trait ClickHouseTableV2Base extends TablePropertiesReader { .getOrElse(deltaPath.toUri.getPath) lazy val clickhouseTableConfigs: Map[String, String] = { - Map("storage_policy" -> deltaProperties.getOrElse("storage_policy", "default")) + Map(StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default")) } - def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) + def primaryKey(): String = StorageMeta.columnsToStr(primaryKeyOption) - def orderByKey(): String = orderByKeyOption match { - case Some(keys) => keys.map(normalizeColName).mkString(",") - case None => "tuple()" - } + def orderByKey(): String = + StorageMeta.columnsToStr(orderByKeyOption, StorageMeta.DEFAULT_ORDER_BY_KEY) - def lowCardKey(): String = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption) - def minmaxIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption) - def bfIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption) - def setIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption) + def lowCardKey(): String = StorageMeta.columnsToStr(lowCardKeyOption) + def minmaxIndexKey(): String = StorageMeta.columnsToStr(minmaxIndexKeyOption) + def bfIndexKey(): String = StorageMeta.columnsToStr(bfIndexKeyOption) + def setIndexKey(): String = StorageMeta.columnsToStr(setIndexKeyOption) } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 341018fb590b..1c7b4f232205 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, import org.apache.spark.sql.delta._ import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.commands.GlutenCacheBase._ -import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder} -import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil +import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder +import org.apache.spark.sql.execution.datasources.mergetree.{PartSerializer, StorageMeta} import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.{BooleanType, StringType} @@ -176,13 +176,13 @@ case class GlutenCHCacheDataCommand( onePart.tablePath, pathToCache.toString, snapshot.metadata.configuration - .getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY), + .getOrElse("orderByKey", StorageMeta.DEFAULT_ORDER_BY_KEY), snapshot.metadata.configuration.getOrElse("lowCardKey", ""), snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""), snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""), snapshot.metadata.configuration.getOrElse("setIndexKey", ""), snapshot.metadata.configuration.getOrElse("primaryKey", ""), - ClickhousePartSerializer.fromPartNames(parts.map(_.name).toSeq), + PartSerializer.fromPartNames(parts.map(_.name).toSeq), ConverterUtils.convertNamedStructJson(snapshot.metadata.schema), snapshot.metadata.configuration.asJava, new JList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala deleted file mode 100644 index 854c6f91c917..000000000000 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.datasources.clickhouse.utils - -import org.apache.gluten.expression.ConverterUtils.normalizeColName - -object MergeTreeDeltaUtil { - - val DEFAULT_ORDER_BY_KEY = "tuple()" - - def genOrderByAndPrimaryKeyStr( - orderByKeyOption: Option[Seq[String]], - primaryKeyOption: Option[Seq[String]]): (String, String) = { - - val orderByKey = - orderByKeyOption.filter(_.nonEmpty).map(columnsToStr).getOrElse(DEFAULT_ORDER_BY_KEY) - val primaryKey = primaryKeyOption - .filter(p => orderByKey != DEFAULT_ORDER_BY_KEY && p.nonEmpty) - .map(columnsToStr) - .getOrElse("") - - (orderByKey, primaryKey) - } - - def columnsToStr(option: Option[Seq[String]]): String = option match { - case Some(keys) => keys.map(normalizeColName).mkString(",") - case None => "" - } - - def columnsToStr(keys: Seq[String]): String = { - keys.map(normalizeColName).mkString(",") - } -} diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala index dc31822fd73e..9347d8679b98 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala @@ -35,8 +35,8 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot import org.apache.spark.sql.delta.catalog.ClickHouseTableV2 import org.apache.spark.sql.delta.files.TahoeFileIndex import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory} -import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange} -import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta +import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange} +import org.apache.spark.sql.execution.datasources.mergetree.{PartSerializer, StorageMeta} import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat import org.apache.spark.sql.types.BooleanType @@ -589,7 +589,7 @@ object MergeTreePartsPartitionsUtil extends Logging { table.bfIndexKey(), table.setIndexKey(), table.primaryKey(), - ClickhousePartSerializer.fromAddMergeTreeParts(selectPartsFiles), + PartSerializer.fromAddMergeTreeParts(selectPartsFiles), tableSchemaJson, clickhouseTableConfigs.asJava, new JArrayList[String]() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala index de322b65dd8e..a69b1c651248 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala @@ -16,21 +16,35 @@ */ package org.apache.spark.sql.execution.datasources.mergetree +import org.apache.gluten.expression.ConverterUtils + import org.apache.spark.sql.delta.actions.Metadata -class DeltaMetaReader( - override val metadata: Metadata, - override val configuration: Map[String, String]) - extends TablePropertiesReader { +case class DeltaMetaReader(metadata: Metadata) + extends TablePropertiesReader + with StorageConfigProvider { - def storageDB: String = configuration(StorageMeta.STORAGE_DB) - def storageTable: String = configuration(StorageMeta.STORAGE_TABLE) - def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID) - def storagePath: String = configuration(StorageMeta.STORAGE_PATH) -} + override lazy val partitionColumns: Seq[String] = + metadata.partitionColumns.map(ConverterUtils.normalizeColName) + + override lazy val configuration: Map[String, String] = metadata.configuration -object DeltaMetaReader { - def apply(metadata: Metadata): DeltaMetaReader = { - new DeltaMetaReader(metadata, metadata.configuration) + lazy val storageConf: Map[String, String] = { + val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr( + orderByKeyOption, + primaryKeyOption + ) + Map( + StorageMeta.DB -> configuration(StorageMeta.DB), + StorageMeta.TABLE -> configuration(StorageMeta.TABLE), + StorageMeta.SNAPSHOT_ID -> configuration(StorageMeta.SNAPSHOT_ID), + StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default"), + StorageMeta.ORDER_BY_KEY -> orderByKey0, + StorageMeta.LOW_CARD_KEY -> StorageMeta.columnsToStr(lowCardKeyOption), + StorageMeta.MINMAX_INDEX_KEY -> StorageMeta.columnsToStr(minmaxIndexKeyOption), + StorageMeta.BF_INDEX_KEY -> StorageMeta.columnsToStr(bfIndexKeyOption), + StorageMeta.SET_INDEX_KEY -> StorageMeta.columnsToStr(setIndexKeyOption), + StorageMeta.PRIMARY_KEY -> primaryKey0 + ) } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala similarity index 66% rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala index 5c863d76c947..94a74bd2fa15 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala @@ -14,25 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.clickhouse +package org.apache.spark.sql.execution.datasources.mergetree import org.apache.gluten.execution.MergeTreePartSplit import org.apache.gluten.expression.ConverterUtils -import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil -import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta} +import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableNode import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts -import org.apache.spark.sql.types.StructType import com.fasterxml.jackson.databind.ObjectMapper import io.substrait.proto.ReadRel -import java.net.URI import java.util.{Map => jMap} -import scala.collection.JavaConverters._ - -case class ClickhousePartSerializer( +case class PartSerializer( partList: Seq[String], starts: Seq[Long], lengths: Seq[Long] @@ -58,59 +53,29 @@ case class ClickhousePartSerializer( } } -object ClickhousePartSerializer { - def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): ClickhousePartSerializer = { +object PartSerializer { + def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): PartSerializer = { val partList = partLists.map(_.name) val starts = partLists.map(_.start) val lengths = partLists.map(_.length) - ClickhousePartSerializer(partList, starts, lengths) + PartSerializer(partList, starts, lengths) } - def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): ClickhousePartSerializer = { + def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): PartSerializer = { val partList = parts.map(_.name) val starts = parts.map(_ => 0L) val lengths = parts.map(_.marks) - ClickhousePartSerializer(partList, starts, lengths) + PartSerializer(partList, starts, lengths) } - def fromPartNames(partNames: Seq[String]): ClickhousePartSerializer = { + def fromPartNames(partNames: Seq[String]): PartSerializer = { // starts and lengths is useless for writing val partRanges = Seq.range(0L, partNames.length) - ClickhousePartSerializer(partNames, partRanges, partRanges) + PartSerializer(partNames, partRanges, partRanges) } } -object ClickhouseMetaSerializer { - - def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = { - val clickhouseTableConfigs = deltaMetaReader.writeConfiguration - - val orderByKey = clickhouseTableConfigs("storage_orderByKey") - val lowCardKey = clickhouseTableConfigs("storage_lowCardKey") - val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey") - val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey") - val setIndexKey = clickhouseTableConfigs("storage_setIndexKey") - val primaryKey = clickhouseTableConfigs("storage_primaryKey") - - val result = apply( - deltaMetaReader.storageDB, - deltaMetaReader.storageTable, - deltaMetaReader.storageSnapshotId, - deltaMetaReader.storagePath, - "", // absolutePath - orderByKey, - lowCardKey, - minmaxIndexKey, - bfIndexKey, - setIndexKey, - primaryKey, - ClickhousePartSerializer.fromPartNames(Seq()), - ConverterUtils.convertNamedStructJson(dataSchema), - clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava - ) - ExtensionTableNode.toProtobuf(result) - - } +object MetaSerializer { // scalastyle:off argcount def apply1( database: String, @@ -124,11 +89,11 @@ object ClickhouseMetaSerializer { bfIndexKeyOption: Option[Seq[String]], setIndexKeyOption: Option[Seq[String]], primaryKeyOption: Option[Seq[String]], - partSerializer: ClickhousePartSerializer, + partSerializer: PartSerializer, tableSchemaJson: String, clickhouseTableConfigs: jMap[String, String]): ReadRel.ExtensionTable = { - val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( + val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr( orderByKeyOption, primaryKeyOption ) @@ -140,10 +105,10 @@ object ClickhouseMetaSerializer { relativePath, absolutePath, orderByKey0, - lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - minmaxIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), + StorageMeta.columnsToStr(lowCardKeyOption), + StorageMeta.columnsToStr(minmaxIndexKeyOption), + StorageMeta.columnsToStr(bfIndexKeyOption), + StorageMeta.columnsToStr(setIndexKeyOption), primaryKey0, partSerializer, tableSchemaJson, @@ -164,7 +129,7 @@ object ClickhouseMetaSerializer { bfIndexKey0: String, setIndexKey0: String, primaryKey0: String, - partSerializer: ClickhousePartSerializer, + partSerializer: PartSerializer, tableSchemaJson: String, clickhouseTableConfigs: jMap[String, String]): String = { // scalastyle:on argcount @@ -192,7 +157,9 @@ object ClickhouseMetaSerializer { .append(orderByKey) .append("\n") - if (orderByKey.nonEmpty && !(orderByKey == "tuple()")) { + if (orderByKey.isEmpty || orderByKey == StorageMeta.DEFAULT_ORDER_BY_KEY) { + extensionTableStr.append("").append("\n") + } else { extensionTableStr.append(primaryKey).append("\n") } @@ -200,7 +167,7 @@ object ClickhouseMetaSerializer { extensionTableStr.append(minmaxIndexKey).append("\n") extensionTableStr.append(bfIndexKey).append("\n") extensionTableStr.append(setIndexKey).append("\n") - extensionTableStr.append(normalizeRelativePath(relativePath)).append("\n") + extensionTableStr.append(StorageMeta.normalizeRelativePath(relativePath)).append("\n") extensionTableStr.append(absolutePath).append("\n") appendConfigs(extensionTableStr, clickhouseTableConfigs) extensionTableStr.append(partSerializer()) @@ -208,13 +175,6 @@ object ClickhouseMetaSerializer { extensionTableStr.toString() } - private def normalizeRelativePath(relativePath: String): String = { - val table_uri = URI.create(relativePath) - if (table_uri.getPath.startsWith("/")) { - table_uri.getPath.substring(1) - } else table_uri.getPath - } - private def appendConfigs( extensionTableStr: StringBuilder, clickhouseTableConfigs: jMap[String, String]): Unit = { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index 6a7ebc3c39d2..bdd8c9b34683 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -16,13 +16,12 @@ */ package org.apache.spark.sql.execution.datasources.mergetree -import org.apache.gluten.expression.ConverterUtils.normalizeColName +import org.apache.gluten.expression.ConverterUtils import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.delta.actions.Metadata -import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil -import org.apache.hadoop.fs.Path +import java.net.URI /** Reserved table property for MergeTree table. */ object StorageMeta { @@ -30,42 +29,75 @@ object StorageMeta { // Storage properties val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" val DEFAULT_CREATE_TABLE_DATABASE: String = "default" - val STORAGE_DB: String = "storage_db" - val STORAGE_TABLE: String = "storage_table" - val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id" - val STORAGE_PATH: String = "storage_path" + val DEFAULT_ORDER_BY_KEY = "tuple()" + val STORAGE_PREFIX: String = "storage_" + val DB: String = prefixOf("db") + val TABLE: String = prefixOf("table") + val SNAPSHOT_ID: String = prefixOf("snapshot_id") + val STORAGE_PATH: String = prefixOf("path") + val POLICY: String = prefixOf("policy") + val ORDER_BY_KEY: String = prefixOf("orderByKey") + val LOW_CARD_KEY: String = prefixOf("lowCardKey") + val MINMAX_INDEX_KEY: String = prefixOf("minmaxIndexKey") + val BF_INDEX_KEY: String = prefixOf("bfIndexKey") + val SET_INDEX_KEY: String = prefixOf("setIndexKey") + val PRIMARY_KEY: String = prefixOf("primaryKey") val SERIALIZER_HEADER: String = "MergeTree;" - def withMoreStorageInfo( + private def prefixOf(key: String): String = s"$STORAGE_PREFIX$key" + + def withStorageID( metadata: Metadata, - snapshotId: String, - deltaPath: Path, database: String, - tableName: String): Metadata = { - val moreOptions = Seq( - STORAGE_DB -> database, - STORAGE_SNAPSHOT_ID -> snapshotId, - STORAGE_TABLE -> tableName, - STORAGE_PATH -> deltaPath.toString) + tableName: String, + snapshotId: String): Metadata = { + val moreOptions = Seq(DB -> database, SNAPSHOT_ID -> snapshotId, TABLE -> tableName) withMoreOptions(metadata, moreOptions) } private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = { metadata.copy(configuration = metadata.configuration ++ newOptions) } + + def normalizeRelativePath(relativePath: String): String = { + val table_uri = URI.create(relativePath) + if (table_uri.getPath.startsWith("/")) { + table_uri.getPath.substring(1) + } else table_uri.getPath + } + + // TODO: remove this method + def genOrderByAndPrimaryKeyStr( + orderByKeyOption: Option[Seq[String]], + primaryKeyOption: Option[Seq[String]]): (String, String) = { + + val orderByKey = columnsToStr(orderByKeyOption, DEFAULT_ORDER_BY_KEY) + val primaryKey = if (orderByKey == DEFAULT_ORDER_BY_KEY) "" else columnsToStr(primaryKeyOption) + (orderByKey, primaryKey) + } + + def columnsToStr(option: Option[Seq[String]], default: String = ""): String = + option + .filter(_.nonEmpty) + .map(keys => keys.map(ConverterUtils.normalizeColName).mkString(",")) + .getOrElse(default) +} + +/** all properties start with 'storage_' */ +trait StorageConfigProvider { + val storageConf: Map[String, String] } trait TablePropertiesReader { def configuration: Map[String, String] - /** delta */ - def metadata: Metadata + val partitionColumns: Seq[String] private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = { configuration.get(keyName).map { v => - val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq + val keys = v.split(",").map(n => ConverterUtils.normalizeColName(n.trim)).toSeq keys.foreach { s => if (s.contains(".")) { @@ -107,13 +139,10 @@ trait TablePropertiesReader { getCommaSeparatedColumns("setIndexKey") } - lazy val partitionColumns: Seq[String] = - metadata.partitionColumns.map(normalizeColName) - lazy val orderByKeyOption: Option[Seq[String]] = { val orderByKeys = if (bucketOption.exists(_.sortColumnNames.nonEmpty)) { - bucketOption.map(_.sortColumnNames.map(normalizeColName)) + bucketOption.map(_.sortColumnNames.map(ConverterUtils.normalizeColName)) } else { getCommaSeparatedColumns("orderByKey") } @@ -142,22 +171,4 @@ trait TablePropertiesReader { primaryKeys } } - - lazy val writeConfiguration: Map[String, String] = { - val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr( - orderByKeyOption, - primaryKeyOption - ) - Map( - "storage_policy" -> configuration.getOrElse("storage_policy", "default"), - "storage_orderByKey" -> orderByKey0, - "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_minmaxIndexKey" -> minmaxIndexKeyOption - .map(MergeTreeDeltaUtil.columnsToStr) - .getOrElse(""), - "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_primaryKey" -> primaryKey0 - ) - } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala index 69c001e461d8..03ee31daba08 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v1 import org.apache.gluten.execution.datasource.GlutenRowSplitter import org.apache.gluten.expression.ConverterUtils import org.apache.gluten.memory.CHThreadGroup +import org.apache.gluten.utils.SubstraitUtil import org.apache.gluten.vectorized.CHColumnVector import org.apache.spark.sql.SparkSession @@ -27,32 +28,57 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.types.StructType -import io.substrait.proto.{NamedStruct, Type} +import com.google.protobuf.Any +import io.substrait.proto +import io.substrait.proto.{AdvancedExtension, NamedObjectWrite, NamedStruct} import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.TaskAttemptContext +import java.{util => ju} + +import scala.collection.JavaConverters.seqAsJavaListConverter + trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase { + // TODO: move to SubstraitUtil + private def toNameStruct(dataSchema: StructType): NamedStruct = { + SubstraitUtil + .createNameStructBuilder( + ConverterUtils.collectAttributeTypeNodes(dataSchema), + dataSchema.fieldNames.map(ConverterUtils.normalizeColName).toSeq.asJava, + ju.Collections.emptyList() + ) + .build() + } + def createWriteRel( + outputPath: String, + dataSchema: StructType, + context: TaskAttemptContext): proto.WriteRel = { + proto.WriteRel + .newBuilder() + .setTableSchema(toNameStruct(dataSchema)) + .setNamedTable( + NamedObjectWrite.newBuilder + .setAdvancedExtension( + AdvancedExtension + .newBuilder() + .setOptimization(Any.pack(createNativeWrite(outputPath, context))) + .build()) + .build()) + .build() + } + + def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write + override def createOutputWriter( - path: String, + outputPath: String, dataSchema: StructType, context: TaskAttemptContext, - nativeConf: java.util.Map[String, String]): OutputWriter = { - val originPath = path - val datasourceJniWrapper = new CHDatasourceJniWrapper() + nativeConf: ju.Map[String, String]): OutputWriter = { CHThreadGroup.registerNewThreadGroup() - val namedStructBuilder = NamedStruct.newBuilder - val structBuilder = Type.Struct.newBuilder - for (field <- dataSchema.fields) { - namedStructBuilder.addNames(field.name) - structBuilder.addTypes(ConverterUtils.getTypeNode(field.dataType, field.nullable).toProtobuf) - } - namedStructBuilder.setStruct(structBuilder.build) - val namedStruct = namedStructBuilder.build - - val instance = - datasourceJniWrapper.nativeInitFileWriterWrapper(path, namedStruct.toByteArray, formatName) + val datasourceJniWrapper = + new CHDatasourceJniWrapper(outputPath, createWriteRel(outputPath, dataSchema, context)) new OutputWriter { override def write(row: InternalRow): Unit = { @@ -61,17 +87,17 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase { if (nextBatch.numRows > 0) { val col = nextBatch.column(0).asInstanceOf[CHColumnVector] - datasourceJniWrapper.write(instance, col.getBlockAddress) + datasourceJniWrapper.write(col.getBlockAddress) } // else just ignore this empty block } override def close(): Unit = { - datasourceJniWrapper.close(instance) + datasourceJniWrapper.close() } // Do NOT add override keyword for compatibility on spark 3.1. def path(): String = { - originPath + outputPath } } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index 521b59d60e29..d25758b0f703 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -26,59 +26,86 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter} -import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer} +import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageConfigProvider, StorageMeta} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter import org.apache.spark.sql.types.StructType import com.google.common.collect.Lists import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.TaskAttemptContext -import java.util.{Map => JMap, UUID} +import java.{util => ju} import scala.collection.JavaConverters._ case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte]) +case class HadoopConfReader(conf: Configuration) extends StorageConfigProvider { + lazy val storageConf: Map[String, String] = { + conf + .iterator() + .asScala + .filter(_.getKey.startsWith(StorageMeta.STORAGE_PREFIX)) + .map(entry => entry.getKey -> entry.getValue) + .toMap + } +} + class CHMergeTreeWriterInjects extends CHFormatWriterInjects { override def nativeConf( options: Map[String, String], - compressionCodec: String): JMap[String, String] = { + compressionCodec: String): ju.Map[String, String] = { options.asJava } + override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = { + val conf = HadoopConfReader(context.getConfiguration).storageConf + Write + .newBuilder() + .setCommon(Write.Common.newBuilder().setFormat(formatName).build()) + .setMergetree( + Write.MergeTreeWrite + .newBuilder() + .setDatabase(conf(StorageMeta.DB)) + .setTable(conf(StorageMeta.TABLE)) + .setSnapshotId(conf(StorageMeta.SNAPSHOT_ID)) + .setOrderByKey(conf(StorageMeta.ORDER_BY_KEY)) + .setLowCardKey(conf(StorageMeta.LOW_CARD_KEY)) + .setMinmaxIndexKey(conf(StorageMeta.MINMAX_INDEX_KEY)) + .setBfIndexKey(conf(StorageMeta.BF_INDEX_KEY)) + .setSetIndexKey(conf(StorageMeta.SET_INDEX_KEY)) + .setPrimaryKey(conf(StorageMeta.PRIMARY_KEY)) + .setRelativePath(StorageMeta.normalizeRelativePath(outputPath)) + .setAbsolutePath("") + .setStoragePolicy(conf(StorageMeta.POLICY)) + .build()) + .build() + } + override def createOutputWriter( - path: String, + outputPath: String, dataSchema: StructType, context: TaskAttemptContext, - nativeConf: JMap[String, String]): OutputWriter = null + nativeConf: ju.Map[String, String]): OutputWriter = { + + val storage = HadoopConfReader(context.getConfiguration) + val database = storage.storageConf(StorageMeta.DB) + val tableName = storage.storageConf(StorageMeta.TABLE) + + val datasourceJniWrapper = new CHDatasourceJniWrapper( + context.getTaskAttemptID.getTaskID.getId.toString, + context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), + context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), + createWriteRel(outputPath, dataSchema, context), + ConfigUtil.serialize(nativeConf) + ) + new MergeTreeOutputWriter(datasourceJniWrapper, database, tableName, outputPath) + } override val formatName: String = "mergetree" - - def createOutputWriter( - path: String, - dataSchema: StructType, - context: TaskAttemptContext, - nativeConf: JMap[String, String], - database: String, - tableName: String, - splitInfo: Array[Byte]): OutputWriter = { - val datasourceJniWrapper = new CHDatasourceJniWrapper() - val instance = - datasourceJniWrapper.nativeInitMergeTreeWriterWrapper( - null, - splitInfo, - UUID.randomUUID.toString, - context.getTaskAttemptID.getTaskID.getId.toString, - context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), - context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), - ConfigUtil.serialize(nativeConf) - ) - - new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path) - } } object CHMergeTreeWriterInjects { @@ -114,7 +141,7 @@ object CHMergeTreeWriterInjects { val substraitContext = new SubstraitContext - val extensionTable = ClickhouseMetaSerializer.apply1( + val extensionTable = MetaSerializer.apply1( database, tableName, snapshotId, @@ -126,7 +153,7 @@ object CHMergeTreeWriterInjects { bfIndexKeyOption, setIndexKeyOption, primaryKeyOption, - ClickhousePartSerializer.fromPartNames(partList), + PartSerializer.fromPartNames(partList), tableSchemaJson, clickhouseTableConfigs.asJava ) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala index 5beee25510b9..7c791bcf87d2 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala @@ -16,17 +16,25 @@ */ package org.apache.spark.sql.execution.datasources.v1 -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.mutable +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import java.{util => ju} class CHOrcWriterInjects extends CHFormatWriterInjects { + override def nativeConf( options: Map[String, String], - compressionCodec: String): java.util.Map[String, String] = { - val sparkOptions = new mutable.HashMap[String, String]() + compressionCodec: String): ju.Map[String, String] = { + // TODO: implement it - sparkOptions.asJava + ju.Collections.emptyMap() } + override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = Write + .newBuilder() + .setCommon(Write.Common.newBuilder().setFormat(formatName).build()) + .setOrc(Write.OrcWrite.newBuilder().build()) + .build() + override val formatName: String = "orc" } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala index 80214c67bba5..8c700c413346 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala @@ -20,16 +20,17 @@ import org.apache.gluten.GlutenConfig import org.apache.spark.sql.internal.SQLConf -import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.collection.mutable +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import java.{util => ju} class CHParquetWriterInjects extends CHFormatWriterInjects { override def nativeConf( options: Map[String, String], - compressionCodec: String): java.util.Map[String, String] = { + compressionCodec: String): ju.Map[String, String] = { // pass options to native so that velox can take user-specified conf to write parquet, // i.e., compression, block size, block rows. - val sparkOptions = new mutable.HashMap[String, String]() + val sparkOptions = new ju.HashMap[String, String]() sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec) val blockSize = options.getOrElse( GlutenConfig.PARQUET_BLOCK_SIZE, @@ -39,9 +40,14 @@ class CHParquetWriterInjects extends CHFormatWriterInjects { GlutenConfig.PARQUET_BLOCK_ROWS, GlutenConfig.getConf.columnarParquetWriteBlockRows.toString) sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows) - sparkOptions.asJava + sparkOptions } + override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = Write + .newBuilder() + .setCommon(Write.Common.newBuilder().setFormat(formatName).build()) + .setParquet(Write.ParquetWrite.newBuilder().build()) + .build() override val formatName: String = "parquet" } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala index 7b803e250278..29f2b7e16ec8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala @@ -59,7 +59,7 @@ abstract class MergeTreeFileFormatDataWriter( protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]() protected var currentWriter: OutputWriter = _ - protected val returnedMetrics = mutable.HashMap[String, AddFile]() + protected val returnedMetrics: mutable.Map[String, AddFile] = mutable.HashMap[String, AddFile]() /** Trackers for computing various statistics on the data as it's being written out. */ protected val statsTrackers: Seq[WriteTaskStatsTracker] = @@ -71,10 +71,10 @@ abstract class MergeTreeFileFormatDataWriter( try { currentWriter.close() statsTrackers.foreach(_.closeFile(currentWriter.path())) - val ret = currentWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles - if (ret.nonEmpty) { - ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile)) - } + currentWriter + .asInstanceOf[MergeTreeOutputWriter] + .getAddFiles + .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) } finally { currentWriter = null } @@ -117,12 +117,7 @@ abstract class MergeTreeFileFormatDataWriter( releaseResources() val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs { // committer.commitTask(taskAttemptContext) - val statuses = returnedMetrics - .map( - v => { - v._2 - }) - .toSeq + val statuses = returnedMetrics.map(_._2).toSeq new TaskCommitMessage(statuses) } @@ -142,7 +137,7 @@ abstract class MergeTreeFileFormatDataWriter( override def close(): Unit = {} - def getReturnedMetrics(): mutable.Map[String, AddFile] = returnedMetrics + def getReturnedMetrics: mutable.Map[String, AddFile] = returnedMetrics } /** FileFormatWriteTask for empty partitions */ @@ -443,7 +438,11 @@ class MergeTreeDynamicPartitionDataSingleWriter( case fakeRow: FakeRow => if (fakeRow.batch.numRows() > 0) { val blockStripes = GlutenRowSplitter.getInstance - .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed, true) + .splitBlockByPartitionAndBucket( + fakeRow, + partitionColIndice, + isBucketed, + reserve_partition_columns = true) val iter = blockStripes.iterator() while (iter.hasNext) { @@ -526,10 +525,10 @@ class MergeTreeDynamicPartitionDataConcurrentWriter( if (status.outputWriter != null) { try { status.outputWriter.close() - val ret = status.outputWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles - if (ret.nonEmpty) { - ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile)) - } + status.outputWriter + .asInstanceOf[MergeTreeOutputWriter] + .getAddFiles + .foreach(addFile => returnedMetrics.put(addFile.path, addFile)) } finally { status.outputWriter = null } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala index 52593d7c1795..14ac659deabd 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala @@ -27,11 +27,10 @@ import org.apache.spark.util.Utils import scala.collection.mutable.ArrayBuffer class MergeTreeOutputWriter( + datasourceJniWrapper: CHDatasourceJniWrapper, database: String, tableName: String, - datasourceJniWrapper: CHDatasourceJniWrapper, - instance: Long, - originPath: String) + outputPath: String) extends OutputWriter { protected var addFiles: ArrayBuffer[AddFile] = new ArrayBuffer[AddFile]() @@ -42,18 +41,18 @@ class MergeTreeOutputWriter( if (nextBatch.numRows > 0) { val col = nextBatch.column(0).asInstanceOf[CHColumnVector] - datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress) + datasourceJniWrapper.write(col.getBlockAddress) } // else ignore this empty block } override def close(): Unit = { - val returnedMetrics = datasourceJniWrapper.closeMergeTreeWriter(instance) + val returnedMetrics = datasourceJniWrapper.close() if (returnedMetrics != null && returnedMetrics.nonEmpty) { addFiles.appendAll( AddFileTags.partsMetricsToAddFile( database, tableName, - originPath, + outputPath, returnedMetrics, Seq(Utils.localHostName()))) } @@ -61,7 +60,7 @@ class MergeTreeOutputWriter( // Do NOT add override keyword for compatibility on spark 3.1. def path(): String = { - originPath + outputPath } def getAddFiles: ArrayBuffer[AddFile] = { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala index 53ccb8d1c046..38f4fe7e26ab 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.clickhouse import org.apache.gluten.backendsapi.clickhouse.CHConf import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import java.util @@ -64,8 +65,8 @@ object ClickHouseConfig { if (!configurations.contains("sampling_key")) { configurations += ("sampling_key" -> "") } - if (!configurations.contains("storage_policy")) { - configurations += ("storage_policy" -> "default") + if (!configurations.contains(StorageMeta.POLICY)) { + configurations += (StorageMeta.POLICY -> "default") } if (!configurations.contains("is_distribute")) { configurations += ("is_distribute" -> "true") diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index b4eca622c89f..186078c18dd1 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteSuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.sql.adaptive.enabled", "true") .set("spark.sql.files.maxPartitionBytes", "20000000") + .set("spark.gluten.sql.native.writer.enabled", "true") .setCHSettings("min_insert_block_size_rows", 100000) .setCHSettings("mergetree.merge_after_insert", false) .setCHSettings("input_format_parquet_max_block_size", 8192) diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt index ca25692bf013..d145ed339ff5 100644 --- a/cpp-ch/local-engine/CMakeLists.txt +++ b/cpp-ch/local-engine/CMakeLists.txt @@ -128,9 +128,9 @@ foreach(child ${children}) add_headers_and_sources(function_parsers ${child}) endforeach() -# Notice: soures files under Parser/*_udf subdirectories must be built into +# Notice: sources files under Parser/*_udf subdirectories must be built into # target ${LOCALENGINE_SHARED_LIB} directly to make sure all function parsers -# are registered successly. +# are registered successfully. add_library( ${LOCALENGINE_SHARED_LIB} SHARED local_engine_jni.cpp ${local_udfs_sources} ${function_parsers_sources} diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp index c1fc49b0b96b..5e57be5ae91e 100644 --- a/cpp-ch/local-engine/Common/CHUtil.cpp +++ b/cpp-ch/local-engine/Common/CHUtil.cpp @@ -49,7 +49,7 @@ #include #include #include -#include +#include #include #include #include @@ -62,7 +62,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp index 16a5d72a9e85..6d44213ec73b 100644 --- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp +++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp @@ -23,7 +23,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index bb0ea00b97e6..e8f1196a6634 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -65,12 +65,12 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include #include diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp index e72454ba6fb2..164cf0392dfb 100644 --- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp +++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp @@ -26,14 +26,12 @@ namespace local_engine { void logDebugMessage(const google::protobuf::Message & message, const char * type) { - auto * logger = &Poco::Logger::get("SubstraitPlan"); - if (logger->debug()) + if (auto * logger = &Poco::Logger::get("SubstraitPlan"); logger->debug()) { namespace pb_util = google::protobuf::util; pb_util::JsonOptions options; std::string json; - auto s = pb_util::MessageToJsonString(message, &json, options); - if (!s.ok()) + if (auto s = pb_util::MessageToJsonString(message, &json, options); !s.ok()) throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type); LOG_DEBUG(logger, "{}:\n{}", type, json); } diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index 6cd74c8af343..cee45aa34c4c 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -21,16 +21,10 @@ #include #include #include -#include -#include #include #include -#include #include -#include #include -#include -#include namespace local_engine { @@ -86,7 +80,7 @@ void NativeSplitter::split(DB::Block & block) { if (partition_buffer[i]->size() >= options.buffer_size) { - output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns()))); + output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns()))); } } } @@ -116,7 +110,7 @@ bool NativeSplitter::hasNext() { if (inputHasNext()) { - split(*reinterpret_cast(inputNext())); + split(*reinterpret_cast(inputNext())); } else { @@ -125,7 +119,7 @@ bool NativeSplitter::hasNext() auto buffer = partition_buffer.at(i); if (buffer->size() > 0) { - output_buffer.emplace(std::pair(i, new Block(buffer->releaseColumns()))); + output_buffer.emplace(std::pair(i, new DB::Block(buffer->releaseColumns()))); } } break; @@ -214,7 +208,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm); } -void HashNativeSplitter::computePartitionId(Block & block) +void HashNativeSplitter::computePartitionId(DB::Block & block) { partition_info = selector_builder->build(block); } @@ -229,7 +223,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio selector_builder = std::make_unique(options_.partition_num); } -void RoundRobinNativeSplitter::computePartitionId(Block & block) +void RoundRobinNativeSplitter::computePartitionId(DB::Block & block) { partition_info = selector_builder->build(block); } diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp index 2921fc887f32..b7c60552468d 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp @@ -29,6 +29,8 @@ #include #include +#include + using namespace DB; using namespace local_engine; namespace @@ -139,11 +141,8 @@ void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString & assertChar('\n', in); readString(table.order_by_key, in); assertChar('\n', in); - if (table.order_by_key != MergeTreeTable::TUPLE) - { - readString(table.primary_key, in); - assertChar('\n', in); - } + readString(table.primary_key, in); + assertChar('\n', in); readString(table.low_card_key, in); assertChar('\n', in); readString(table.minmax_index_key, in); @@ -204,7 +203,7 @@ SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const ContextMutab return merge_tree_table.getStorage(context); } -MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) +MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) : MergeTreeTable() { ReadBufferFromString in(info); doParseMergeTreeTableString(*this, in); @@ -244,6 +243,29 @@ std::shared_ptr MergeTreeTable::buildMetaData(const return doBuildMetadata(header.getNamesAndTypesList(), context, *this); } +MergeTreeTable::MergeTreeTable(const substrait::WriteRel & write_rel) +{ + assert(write_rel.has_named_table()); + const substrait::NamedObjectWrite & named_table = write_rel.named_table(); + local_engine::Write write_opt; + named_table.advanced_extension().optimization().UnpackTo(&write_opt); + assert(write_opt.has_mergetree()); + const Write_MergeTreeWrite & write = write_opt.mergetree(); + database = write.database(); + table = write.table(); + snapshot_id = write.snapshot_id(); + schema = write_rel.table_schema(); + order_by_key = write.order_by_key(); + low_card_key = write.low_card_key(); + minmax_index_key = write.minmax_index_key(); + bf_index_key = write.bf_index_key(); + set_index_key = write.set_index_key(); + primary_key = write.primary_key(); + relative_path = write.relative_path(); + absolute_path = write.absolute_path(); // always empty, see createNativeWrite in java + table_configs.storage_policy = write.storage_policy(); +} + std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config) { auto settings = std::make_unique(); diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h index 2a20430205a6..d015e78b3471 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h @@ -77,6 +77,9 @@ struct MergeTreeTable SparkStorageMergeTreePtr copyToVirtualStorage(const ContextMutablePtr & context) const; std::shared_ptr buildMetaData(const DB::Block & header, const ContextPtr & context) const; + + MergeTreeTable() = default; + explicit MergeTreeTable(const substrait::WriteRel & write_rel); }; struct MergeTreeTableInstance : MergeTreeTable diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp index dc32565f8420..44a95cb2a4a6 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp @@ -47,11 +47,44 @@ Block removeColumnSuffix(const Block & block) } return Block(columns); } + } namespace local_engine { +std::string PartInfo::toJson(const std::vector & part_infos) +{ + rapidjson::StringBuffer result; + rapidjson::Writer writer(result); + writer.StartArray(); + for (const auto & item : part_infos) + { + writer.StartObject(); + writer.Key("part_name"); + writer.String(item.part_name.c_str()); + writer.Key("mark_count"); + writer.Uint(item.mark_count); + writer.Key("disk_size"); + writer.Uint(item.disk_size); + writer.Key("row_count"); + writer.Uint(item.row_count); + writer.Key("bucket_id"); + writer.String(item.bucket_id.c_str()); + writer.Key("partition_values"); + writer.StartObject(); + for (const auto & key_value : item.partition_values) + { + writer.Key(key_value.first.c_str()); + writer.String(key_value.second.c_str()); + } + writer.EndObject(); + writer.EndObject(); + } + writer.EndArray(); + return result.GetString(); +} + std::unique_ptr SparkMergeTreeWriter::create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWritePartitionSettings & write_settings_, @@ -86,7 +119,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter( { } -void SparkMergeTreeWriter::write(const DB::Block & block) +void SparkMergeTreeWriter::write(DB::Block & block) { auto new_block = removeColumnSuffix(block); auto converter = ActionsDAG::makeConvertingActions( @@ -96,9 +129,10 @@ void SparkMergeTreeWriter::write(const DB::Block & block) executor.push(new_block); } -void SparkMergeTreeWriter::finalize() +std::string SparkMergeTreeWriter::close() { executor.finish(); + return PartInfo::toJson(getAllPartInfo()); } std::vector SparkMergeTreeWriter::getAllPartInfo() const @@ -120,36 +154,4 @@ std::vector SparkMergeTreeWriter::getAllPartInfo() const return res; } -String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_infos) -{ - rapidjson::StringBuffer result; - rapidjson::Writer writer(result); - writer.StartArray(); - for (const auto & item : part_infos) - { - writer.StartObject(); - writer.Key("part_name"); - writer.String(item.part_name.c_str()); - writer.Key("mark_count"); - writer.Uint(item.mark_count); - writer.Key("disk_size"); - writer.Uint(item.disk_size); - writer.Key("row_count"); - writer.Uint(item.row_count); - writer.Key("bucket_id"); - writer.String(item.bucket_id.c_str()); - writer.Key("partition_values"); - writer.StartObject(); - for (const auto & key_value : item.partition_values) - { - writer.Key(key_value.first.c_str()); - writer.String(key_value.second.c_str()); - } - writer.EndObject(); - writer.EndObject(); - } - writer.EndArray(); - return result.GetString(); -} - } \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h index 699fd3d80b5b..59535cef094d 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB { @@ -44,12 +45,13 @@ struct PartInfo String bucket_id; bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; } + + static std::string toJson(const std::vector & part_infos); }; -class SparkMergeTreeWriter +class SparkMergeTreeWriter : public NativeOutputWriter { public: - static String partInfosToJson(const std::vector & part_infos); static std::unique_ptr create( const MergeTreeTable & merge_tree_table, const SparkMergeTreeWritePartitionSettings & write_settings_, @@ -61,9 +63,8 @@ class SparkMergeTreeWriter DB::QueryPipeline && pipeline_, std::unordered_map && partition_values_); - void write(const DB::Block & block); - void finalize(); - std::vector getAllPartInfo() const; + void write(DB::Block & block) override; + std::string close() override; private: DB::Block header; @@ -71,5 +72,7 @@ class SparkMergeTreeWriter DB::QueryPipeline pipeline; DB::PushingPipelineExecutor executor; std::unordered_map partition_values; + + std::vector getAllPartInfo() const; }; } diff --git a/cpp-ch/local-engine/Storages/NativeOutputWriter.h b/cpp-ch/local-engine/Storages/NativeOutputWriter.h new file mode 100644 index 000000000000..bfcbe0b092a5 --- /dev/null +++ b/cpp-ch/local-engine/Storages/NativeOutputWriter.h @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include + +namespace DB +{ +class Block; +} +namespace local_engine +{ +class NativeOutputWriter +{ +public: + NativeOutputWriter() = default; + virtual ~NativeOutputWriter() = default; + + //TODO: change to write(const DB::Block & block) + virtual void write(DB::Block & block) = 0; + virtual std::string close() = 0; +}; +} \ No newline at end of file diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp similarity index 93% rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp index 975e5046bac2..d8b7f3240386 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#include "FileWriterWrappers.h" +#include "NormalFileWriter.h" #include #include @@ -25,12 +25,11 @@ namespace local_engine const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"}; const std::string SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"}; -NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) - : FileWriterWrapper(file_), context(context_) +NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_) { } -void NormalFileWriter::consume(DB::Block & block) +void NormalFileWriter::write(DB::Block & block) { if (!writer) [[unlikely]] { @@ -63,12 +62,14 @@ void NormalFileWriter::consume(DB::Block & block) writer->push(materializeBlock(block)); } -void NormalFileWriter::close() +std::string NormalFileWriter::close() { /// When insert into a table with empty dataset, NormalFileWriter::consume would be never called. /// So we need to skip when writer is nullptr. if (writer) writer->finish(); + + return std::string{}; } OutputFormatFilePtr createOutputFormatFile( @@ -84,7 +85,7 @@ OutputFormatFilePtr createOutputFormatFile( return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_schema, format_hint); } -std::unique_ptr createFileWriterWrapper( +std::unique_ptr NormalFileWriter::create( const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint) { return std::make_unique(createOutputFormatFile(context, file_uri, preferred_schema, format_hint), context); diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h similarity index 93% rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.h index 49383f8de42c..6d054a04fad3 100644 --- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h +++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -36,31 +37,20 @@ namespace local_engine { -class FileWriterWrapper +class NormalFileWriter : public NativeOutputWriter { public: - explicit FileWriterWrapper(const OutputFormatFilePtr & file_) : file(file_) { } - virtual ~FileWriterWrapper() = default; + static std::unique_ptr create( + const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); - virtual void consume(DB::Block & block) = 0; - virtual void close() = 0; - -protected: - OutputFormatFilePtr file; -}; - -using FileWriterWrapperPtr = std::shared_ptr; - -class NormalFileWriter : public FileWriterWrapper -{ -public: NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_); ~NormalFileWriter() override = default; - void consume(DB::Block & block) override; - void close() override; + void write(DB::Block & block) override; + std::string close() override; private: + OutputFormatFilePtr file; DB::ContextPtr context; OutputFormatFile::OutputFormatPtr output_format; @@ -68,9 +58,6 @@ class NormalFileWriter : public FileWriterWrapper std::unique_ptr writer; }; -std::unique_ptr createFileWriterWrapper( - const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); - OutputFormatFilePtr createOutputFormatFile( const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index af6fce0607ba..cce059badf57 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -45,12 +44,12 @@ #include #include #include -#include +#include #include -#include #include #include #include +#include #include #include #include @@ -906,58 +905,41 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeC LOCAL_ENGINE_JNI_METHOD_END(env, ) } -JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper( - JNIEnv * env, jobject, jstring file_uri_, jbyteArray preferred_schema_, jstring format_hint_) +JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter( + JNIEnv * env, jobject, jstring file_uri_, jbyteArray writeRel) { LOCAL_ENGINE_JNI_METHOD_START - const auto preferred_schema_ref = local_engine::getByteArrayElementsSafe(env, preferred_schema_); - auto parse_named_struct = [&]() -> std::optional - { - std::string_view view{ - reinterpret_cast(preferred_schema_ref.elems()), static_cast(preferred_schema_ref.length())}; - - substrait::NamedStruct res; - bool ok = res.ParseFromString(view); - if (!ok) - return {}; - return std::move(res); - }; + const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel); + substrait::WriteRel write_rel = local_engine::BinaryToMessage( + {reinterpret_cast(writeRelBytes.elems()), static_cast(writeRelBytes.length())}); - auto named_struct = parse_named_struct(); - if (!named_struct.has_value()) - throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse schema from substrait protobuf failed"); + assert(write_rel.has_named_table()); + const substrait::NamedObjectWrite & named_table = write_rel.named_table(); + local_engine::Write write_opt; + named_table.advanced_extension().optimization().UnpackTo(&write_opt); + DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(write_rel.table_schema()); - DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(*named_struct); const auto file_uri = jstring2string(env, file_uri_); + // for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint - const auto format_hint = jstring2string(env, format_hint_); const auto context = local_engine::QueryContext::instance().currentQueryContext(); - auto * writer = local_engine::createFileWriterWrapper(context, file_uri, preferred_schema, format_hint).release(); + auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, write_opt.common().format()).release(); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, 0) } -JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper( - JNIEnv * env, - jobject, - jbyteArray plan_, - jbyteArray split_info_, - jstring uuid_, - jstring task_id_, - jstring partition_dir_, - jstring bucket_dir_, - jbyteArray conf_plan) +JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter( + JNIEnv * env, jobject, jstring task_id_, jstring partition_dir_, jstring bucket_dir_, jbyteArray writeRel, jbyteArray conf_plan) { LOCAL_ENGINE_JNI_METHOD_START auto query_context = local_engine::QueryContext::instance().currentQueryContext(); // by task update new configs ( in case of dynamic config update ) const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan); - const std::string::size_type conf_plan_size = conf_plan_a.length(); - - local_engine::SparkConfigs::updateConfig(query_context, {reinterpret_cast(conf_plan_a.elems()), conf_plan_size}); + local_engine::SparkConfigs::updateConfig( + query_context, {reinterpret_cast(conf_plan_a.elems()), static_cast(conf_plan_a.length())}); - const auto uuid_str = jstring2string(env, uuid_); + const auto uuid_str = toString(DB::UUIDHelpers::generateV4()); const auto task_id = jstring2string(env, task_id_); const auto partition_dir = jstring2string(env, partition_dir_); const auto bucket_dir = jstring2string(env, bucket_dir_); @@ -966,10 +948,10 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW .part_name_prefix{uuid}, .partition_dir{partition_dir}, .bucket_dir{bucket_dir}}; settings.set(query_context); - const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_); - auto extension_table = local_engine::BinaryToMessage( - {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())}); - local_engine::MergeTreeTableInstance merge_tree_table(extension_table); + const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel); + substrait::WriteRel write_rel = local_engine::BinaryToMessage( + {reinterpret_cast(writeRelBytes.elems()), static_cast(writeRelBytes.length())}); + local_engine::MergeTreeTable merge_tree_table(write_rel); auto * writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, settings, query_context).release(); return reinterpret_cast(writer); @@ -1003,59 +985,28 @@ Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNI { LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); + auto * writer = reinterpret_cast(instanceId); auto * block = reinterpret_cast(block_address); - writer->consume(*block); - LOCAL_ENGINE_JNI_METHOD_END(env, ) -} - -JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) -{ - LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); - SCOPE_EXIT({ delete writer; }); - writer->close(); - LOCAL_ENGINE_JNI_METHOD_END(env, ) -} - -JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree( - JNIEnv * env, jobject, jlong instanceId, jlong block_address) -{ - LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); - const auto * block = reinterpret_cast(block_address); writer->write(*block); LOCAL_ENGINE_JNI_METHOD_END(env, ) } -JNIEXPORT jstring -Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId) +JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId) { LOCAL_ENGINE_JNI_METHOD_START - auto * writer = reinterpret_cast(instanceId); + auto * writer = reinterpret_cast(instanceId); SCOPE_EXIT({ delete writer; }); - - writer->finalize(); - const auto part_infos = writer->getAllPartInfo(); - const auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); - return local_engine::charTojstring(env, json_info.c_str()); + const auto result = writer->close(); + return local_engine::charTojstring(env, result.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts( - JNIEnv * env, - jobject, - jbyteArray plan_, - jbyteArray split_info_, - jstring uuid_, - jstring task_id_, - jstring partition_dir_, - jstring bucket_dir_) + JNIEnv * env, jclass, jbyteArray split_info_, jstring partition_dir_, jstring bucket_dir_) { LOCAL_ENGINE_JNI_METHOD_START - const auto uuid_str = jstring2string(env, uuid_); - + const auto uuid_str = toString(DB::UUIDHelpers::generateV4()); const auto partition_dir = jstring2string(env, partition_dir_); const auto bucket_dir = jstring2string(env, bucket_dir_); @@ -1074,8 +1025,8 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn DB::StorageID storage_id = temp_storage->getStorageID(); SCOPE_EXIT({ local_engine::StorageMergeTreeFactory::freeStorage(storage_id); }); - std::vector selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames( - temp_storage->getStorageID(), "", merge_tree_table.getPartNames()); + std::vector selected_parts + = local_engine::StorageMergeTreeFactory::getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames()); std::unordered_map partition_values; std::vector loaded @@ -1089,8 +1040,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir}); } - auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res); - + auto json_info = local_engine::PartInfo::toJson(res); return local_engine::charTojstring(env, json_info.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr) } diff --git a/cpp-ch/local-engine/proto/write_optimization.proto b/cpp-ch/local-engine/proto/write_optimization.proto new file mode 120000 index 000000000000..d1338a75fedb --- /dev/null +++ b/cpp-ch/local-engine/proto/write_optimization.proto @@ -0,0 +1 @@ +../../../backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp index ac0ec2145757..5a39580c19fd 100644 --- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -39,21 +40,21 @@ #include #include -# define ASSERT_DURATION_LE(secs, stmt) \ - { \ - std::promise completed; \ - auto stmt_future = completed.get_future(); \ - std::thread( \ - [&](std::promise & completed) \ - { \ - stmt; \ - completed.set_value(true); \ - }, \ - std::ref(completed)) \ - .detach(); \ - if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \ - GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \ - } +#define ASSERT_DURATION_LE(secs, stmt) \ + { \ + std::promise completed; \ + auto stmt_future = completed.get_future(); \ + std::thread( \ + [&](std::promise & completed) \ + { \ + stmt; \ + completed.set_value(true); \ + }, \ + std::ref(completed)) \ + .detach(); \ + if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \ + GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \ + } namespace DB::ErrorCodes diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp index b6c431ff25f3..da46ff6fdcde 100644 --- a/cpp-ch/local-engine/tests/gtest_parser.cpp +++ b/cpp-ch/local-engine/tests/gtest_parser.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include #include diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp index 992c91c942e0..233456992dff 100644 --- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp @@ -19,32 +19,20 @@ #include #include #include -#include #include #include -#include -#include #include #include -#include #include -#include #include -#include #include #include -#include -#include -#include -#include -#include #include #include -#include +#include #include #include #include -#include #include #include @@ -263,197 +251,4 @@ TEST(WritePipeline, ComputePartitionedExpression) EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0)); EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1)); EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2)); -} - -void do_remove(const std::string & folder) -{ - namespace fs = std::filesystem; - if (const std::filesystem::path ph(folder); fs::exists(ph)) - fs::remove_all(ph); -} - -Chunk person_chunk() -{ - auto id = INT()->createColumn(); - id->insert(100); - id->insert(200); - id->insert(300); - id->insert(400); - id->insert(500); - id->insert(600); - id->insert(700); - - auto name = STRING()->createColumn(); - name->insert("Joe"); - name->insert("Marry"); - name->insert("Mike"); - name->insert("Fred"); - name->insert("Albert"); - name->insert("Michelle"); - name->insert("Dan"); - - auto age = makeNullable(INT())->createColumn(); - Field null_field; - age->insert(30); - age->insert(null_field); - age->insert(18); - age->insert(50); - age->insert(null_field); - age->insert(30); - age->insert(50); - - - MutableColumns x; - x.push_back(std::move(id)); - x.push_back(std::move(name)); - x.push_back(std::move(age)); - return {std::move(x), 7}; -} - -TEST(WritePipeline, MergeTree) -{ - ThreadStatus thread_status; - - const auto context = DB::Context::createCopy(QueryContext::globalContext()); - context->setPath("./"); - const Settings & settings = context->getSettingsRef(); - - const std::string query - = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)"; - - const char * begin = query.data(); - const char * end = query.data() + query.size(); - ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]); - - ASTPtr ast = parseQuery(parser, begin, end, "", settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]); - - EXPECT_TRUE(ast->as()); - auto & create = ast->as(); - - ColumnsDescription column_descriptions - = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE); - - StorageInMemoryMetadata metadata; - metadata.setColumns(column_descriptions); - metadata.setComment("args.comment"); - ASTPtr partition_by_key; - metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context); - - MergeTreeData::MergingParams merging_params; - merging_params.mode = MergeTreeData::MergingParams::Ordinary; - - - /// This merging param maybe used as part of sorting key - std::optional merging_param_key_arg; - /// Get sorting key from engine arguments. - /// - /// NOTE: store merging_param_key_arg as additional key column. We do it - /// before storage creation. After that storage will just copy this - /// column if sorting key will be changed. - metadata.sorting_key - = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg); - - std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); - - UUID uuid; - UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull; - UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull; - - SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); }); - - auto merge_tree = std::make_shared( - StorageID("", "", uuid), - "WritePipeline_MergeTree", - metadata, - LoadingStrictnessLevel::CREATE, - context, - "", - merging_params, - std::move(storage_settings)); - - Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}}; - DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]); - squashing.add(person_chunk()); - auto x = Squashing::squash(squashing.flush()); - x.getChunkInfos().add(std::make_shared()); - - ASSERT_EQ(7, x.getNumRows()); - ASSERT_EQ(3, x.getNumColumns()); - - - auto metadata_snapshot = std::make_shared(metadata); - ASTPtr none; - auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false)); - - sink->consume(x); - sink->onFinish(); -} - -INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json"); -INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json"); -INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json"); - -TEST(WritePipeline, SparkMergeTree) -{ - ThreadStatus thread_status; - - const auto context = DB::Context::createCopy(QueryContext::globalContext()); - context->setPath("./"); - const Settings & settings = context->getSettingsRef(); - - const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); - MergeTreeTableInstance merge_tree_table(extension_table); - - EXPECT_EQ(merge_tree_table.database, "default"); - EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); - EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree"); - EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default"); - - do_remove(merge_tree_table.relative_path); - - const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext()); - EXPECT_TRUE(dest_storage); - EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); - DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); - Block header = metadata_snapshot->getSampleBlock(); - - constexpr std::string_view split_template - = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; - constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")}; - - SparkMergeTreeWritePartitionSettings gm_write_settings{ - .part_name_prefix{"this_is_prefix"}, - }; - gm_write_settings.set(context); - - auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context); - SparkMergeTreeWriter & spark_merge_tree_writer = *writer; - - auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file); - EXPECT_TRUE(local_executor->hasNext()); - - do - { - spark_merge_tree_writer.write(*local_executor->nextColumnar()); - } while (local_executor->hasNext()); - - spark_merge_tree_writer.finalize(); - auto part_infos = spark_merge_tree_writer.getAllPartInfo(); - auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos); - std::cerr << json_info << std::endl; - - /// - { - const auto extension_table_hdfs - = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); - MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs); - EXPECT_EQ(merge_tree_table_hdfs.database, "default"); - EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); - EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); - EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); - - const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext()); - EXPECT_TRUE(dest_storage_hdfs); - EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); - } } \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp new file mode 100644 index 000000000000..5ede32c89073 --- /dev/null +++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB::Setting +{ +extern const SettingsUInt64 max_parser_depth; +extern const SettingsUInt64 max_parser_backtracks; +extern const SettingsBool allow_settings_after_format_in_insert; +extern const SettingsUInt64 max_query_size; +extern const SettingsUInt64 min_insert_block_size_rows; +extern const SettingsUInt64 min_insert_block_size_bytes; +} + +using namespace local_engine; +using namespace DB; + +namespace +{ +void do_remove(const std::string & folder) +{ + namespace fs = std::filesystem; + if (const std::filesystem::path ph(folder); fs::exists(ph)) + fs::remove_all(ph); +} + +Chunk person_chunk() +{ + auto id = INT()->createColumn(); + id->insert(100); + id->insert(200); + id->insert(300); + id->insert(400); + id->insert(500); + id->insert(600); + id->insert(700); + + auto name = STRING()->createColumn(); + name->insert("Joe"); + name->insert("Marry"); + name->insert("Mike"); + name->insert("Fred"); + name->insert("Albert"); + name->insert("Michelle"); + name->insert("Dan"); + + auto age = makeNullable(INT())->createColumn(); + Field null_field; + age->insert(30); + age->insert(null_field); + age->insert(18); + age->insert(50); + age->insert(null_field); + age->insert(30); + age->insert(50); + + + MutableColumns x; + x.push_back(std::move(id)); + x.push_back(std::move(name)); + x.push_back(std::move(age)); + return {std::move(x), 7}; +} +} + +TEST(MergeTree, ClickhouseMergeTree) +{ + ThreadStatus thread_status; + + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const std::string query + = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)"; + + const char * begin = query.data(); + const char * end = query.data() + query.size(); + ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]); + + ASTPtr ast = parseQuery( + parser, + begin, + end, + "", + settings[Setting::max_query_size], + settings[Setting::max_parser_depth], + settings[Setting::max_parser_backtracks]); + + EXPECT_TRUE(ast->as()); + auto & create = ast->as(); + + ColumnsDescription column_descriptions + = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE); + + StorageInMemoryMetadata metadata; + metadata.setColumns(column_descriptions); + metadata.setComment("args.comment"); + ASTPtr partition_by_key; + metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context); + + MergeTreeData::MergingParams merging_params; + merging_params.mode = MergeTreeData::MergingParams::Ordinary; + + + /// This merging param maybe used as part of sorting key + std::optional merging_param_key_arg; + /// Get sorting key from engine arguments. + /// + /// NOTE: store merging_param_key_arg as additional key column. We do it + /// before storage creation. After that storage will just copy this + /// column if sorting key will be changed. + metadata.sorting_key + = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg); + + std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings()); + + UUID uuid; + UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull; + UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull; + + SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); }); + + auto merge_tree = std::make_shared( + StorageID("", "", uuid), + "WritePipeline_MergeTree", + metadata, + LoadingStrictnessLevel::CREATE, + context, + "", + merging_params, + std::move(storage_settings)); + + Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}}; + DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]); + squashing.add(person_chunk()); + auto x = Squashing::squash(squashing.flush()); + x.getChunkInfos().add(std::make_shared()); + + ASSERT_EQ(7, x.getNumRows()); + ASSERT_EQ(3, x.getNumColumns()); + + + auto metadata_snapshot = std::make_shared(metadata); + ASTPtr none; + auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false)); + + sink->consume(x); + sink->onFinish(); +} + +INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json"); +INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json"); +INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json"); + +TEST(MergeTree, SparkMergeTree) +{ + ThreadStatus thread_status; + + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + context->setPath("./"); + const Settings & settings = context->getSettingsRef(); + + const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_)); + MergeTreeTableInstance merge_tree_table(extension_table); + + EXPECT_EQ(merge_tree_table.database, "default"); + EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree"); + EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default"); + + do_remove(merge_tree_table.relative_path); + + const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext()); + EXPECT_TRUE(dest_storage); + EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote()); + DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr(); + Block header = metadata_snapshot->getSampleBlock(); + + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; + constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")}; + + SparkMergeTreeWritePartitionSettings gm_write_settings{ + .part_name_prefix{"this_is_prefix"}, + }; + gm_write_settings.set(context); + + auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context); + SparkMergeTreeWriter & spark_merge_tree_writer = *writer; + + auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file); + EXPECT_TRUE(local_executor->hasNext()); + + do + { + spark_merge_tree_writer.write(*local_executor->nextColumnar()); + } while (local_executor->hasNext()); + + auto json_info = spark_merge_tree_writer.close(); + std::cerr << json_info << std::endl; + + /// + { + const auto extension_table_hdfs + = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_)); + MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs); + EXPECT_EQ(merge_tree_table_hdfs.database, "default"); + EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs"); + EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main"); + + const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext()); + EXPECT_TRUE(dest_storage_hdfs); + EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote()); + } +} + +INCBIN(_2_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/2_one_pipeline.json"); + +TEST(MergeTree, Pipeline) +{ + GTEST_SKIP(); + const auto context = DB::Context::createCopy(QueryContext::globalContext()); + GlutenWriteSettings settings{ + .task_write_tmp_dir = "file:///tmp/lineitem_mergetree", + .task_write_filename = "part-00000-a09f9d59-2dc6-43bc-a485-dcab8384b2ff.c000.mergetree", + }; + settings.set(context); + + constexpr std::string_view split_template + = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})"; + auto [_, local_executor] = test::create_plan_and_executor( + EMBEDDED_PLAN(_2_mergetree_plan_), + split_template, + GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"), + context); + EXPECT_TRUE(local_executor->hasNext()); + const Block & x = *local_executor->nextColumnar(); +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json new file mode 100644 index 000000000000..fbc593267464 --- /dev/null +++ b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json @@ -0,0 +1,368 @@ +{ + "relations": [ + { + "root": { + "input": { + "write": { + "namedTable": { + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "WriteParameters:isSnappy=1;format=mergetree\n" + }, + "enhancement": { + "@type": "type.googleapis.com/substrait.Type", + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + }, + "tableSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "l_orderkey", + "l_partkey", + "l_suppkey", + "l_linenumber", + "l_quantity", + "l_extendedprice", + "l_discount", + "l_tax", + "l_returnflag", + "l_linestatus", + "l_shipdate", + "l_commitdate", + "l_receiptdate", + "l_shipinstruct", + "l_shipmode", + "l_comment" + ], + "struct": { + "types": [ + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "i64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "fp64": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "date": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + }, + { + "string": { + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL", + "NORMAL_COL" + ] + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + } + } + }, + "outputSchema": { + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file