org.apache.maven.plugins
maven-resources-plugin
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index ae8ec32bd0a4..25f691d2e8cb 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -96,12 +96,8 @@ class ClickHouseTableV2(
def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
- StorageMeta.withMoreStorageInfo(
- meta,
- ClickhouseSnapshot.genSnapshotId(snapshot),
- deltaLog.dataPath,
- dataBaseName,
- tableName))
+ StorageMeta
+ .withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot)))
}
override def deltaProperties: Map[String, String] = properties().asScala.toMap
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index b897010d5bb3..242a5c3c277a 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import java.util.{Date, UUID}
+import java.util.Date
import scala.collection.mutable.ArrayBuffer
object OptimizeTableCommandOverwrites extends Logging {
@@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
- val uuid = UUID.randomUUID.toString
-
val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
@@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
- datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
+ CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
- uuid,
- taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index 19b3d396bd6f..5b2dc164b56a 100644
--- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
-import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
+import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
- // val conf = ContextUtil.getConfiguration(job)
+ val conf = job.getConfiguration
+
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")
@transient val deltaMetaReader = DeltaMetaReader(metadata)
-
- val database = deltaMetaReader.storageDB
- val tableName = deltaMetaReader.storageTable
- val deltaPath = deltaMetaReader.storagePath
-
- val extensionTableBC = sparkSession.sparkContext.broadcast(
- ClickhouseMetaSerializer
- .forWrite(deltaMetaReader, metadata.schema)
- .toByteArray)
+ deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
@@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- require(path == deltaPath)
+
GlutenMergeTreeWriterInjects
.getInstance()
- .asInstanceOf[CHMergeTreeWriterInjects]
- .createOutputWriter(
- path,
- metadata.schema,
- context,
- nativeConf,
- database,
- tableName,
- extensionTableBC.value
- )
+ .createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index ae8ec32bd0a4..25f691d2e8cb 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -96,12 +96,8 @@ class ClickHouseTableV2(
def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
- StorageMeta.withMoreStorageInfo(
- meta,
- ClickhouseSnapshot.genSnapshotId(snapshot),
- deltaLog.dataPath,
- dataBaseName,
- tableName))
+ StorageMeta
+ .withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot)))
}
override def deltaProperties: Map[String, String] = properties().asScala.toMap
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index b897010d5bb3..e7b8ce1d3053 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import java.util.{Date, UUID}
+import java.util.Date
import scala.collection.mutable.ArrayBuffer
object OptimizeTableCommandOverwrites extends Logging {
@@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
- val uuid = UUID.randomUUID.toString
-
val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
@@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
- datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
+ CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
- uuid,
- taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
@@ -170,7 +164,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
- val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
+ val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)
val sparkSession = SparkSession.getActiveSession.get
diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index c2d1ec47b9ca..24dbc6e03bef 100644
--- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
-import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
+import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
- // val conf = ContextUtil.getConfiguration(job)
+ val conf = job.getConfiguration
+
+ // just for the sake of compatibility
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")
@transient val deltaMetaReader = DeltaMetaReader(metadata)
-
- val database = deltaMetaReader.storageDB
- val tableName = deltaMetaReader.storageTable
- val deltaPath = deltaMetaReader.storagePath
-
- val extensionTableBC = sparkSession.sparkContext.broadcast(
- ClickhouseMetaSerializer
- .forWrite(deltaMetaReader, metadata.schema)
- .toByteArray)
+ deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
@@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- require(path == deltaPath)
+
GlutenMergeTreeWriterInjects
.getInstance()
- .asInstanceOf[CHMergeTreeWriterInjects]
- .createOutputWriter(
- path,
- metadata.schema,
- context,
- nativeConf,
- database,
- tableName,
- extensionTableBC.value
- )
+ .createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
index f730b42e4db0..5f6a2dc3d712 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/catalog/ClickHouseTableV2.scala
@@ -99,12 +99,11 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
- StorageMeta.withMoreStorageInfo(
+ StorageMeta.withStorageID(
meta,
- ClickhouseSnapshot.genSnapshotId(initialSnapshot),
- deltaLog.dataPath,
dataBaseName,
- tableName))
+ tableName,
+ ClickhouseSnapshot.genSnapshotId(initialSnapshot)))
}
override def deltaProperties: Map[String, String] = properties().asScala.toMap
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
index ef30aaad2294..0a3b2ef5ef26 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/delta/commands/OptimizeTableCommandOverwrites.scala
@@ -44,7 +44,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
-import java.util.{Date, UUID}
+import java.util.Date
import scala.collection.mutable.ArrayBuffer
object OptimizeTableCommandOverwrites extends Logging {
@@ -97,8 +97,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
- val uuid = UUID.randomUUID.toString
-
val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
@@ -117,13 +115,9 @@ object OptimizeTableCommandOverwrites extends Logging {
DataTypeUtils.toAttributes(description.tableSchema)
)
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
- datasourceJniWrapper.nativeMergeMTParts(
- planWithSplitInfo.plan,
+ CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
- uuid,
- taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
@@ -172,7 +166,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
- val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
+ val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)
val sparkSession = SparkSession.getActiveSession.get
diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
index 1489ae4dbf49..6cc431f4f99c 100644
--- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
+++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala
@@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
-import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
-import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
+import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
@@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
- // val conf = ContextUtil.getConfiguration(job)
+ val conf = job.getConfiguration
+
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")
@transient val deltaMetaReader = DeltaMetaReader(metadata)
-
- val database = deltaMetaReader.storageDB
- val tableName = deltaMetaReader.storageTable
- val deltaPath = deltaMetaReader.storagePath
-
- val extensionTableBC = sparkSession.sparkContext.broadcast(
- ClickhouseMetaSerializer
- .forWrite(deltaMetaReader, metadata.schema)
- .toByteArray)
+ deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }
new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
@@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
- require(path == deltaPath)
+
GlutenMergeTreeWriterInjects
.getInstance()
- .asInstanceOf[CHMergeTreeWriterInjects]
- .createOutputWriter(
- path,
- metadata.schema,
- context,
- nativeConf,
- database,
- tableName,
- extensionTableBC.value
- )
+ .createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
index f19c5d39df1d..441feba67c57 100644
--- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
+++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/CHDatasourceJniWrapper.java
@@ -16,58 +16,64 @@
*/
package org.apache.spark.sql.execution.datasources;
+import io.substrait.proto.WriteRel;
+
public class CHDatasourceJniWrapper {
- public native long nativeInitFileWriterWrapper(
- String filePath, byte[] preferredSchema, String formatHint);
+ private final long instance;
- public native long nativeInitMergeTreeWriterWrapper(
- byte[] plan,
- byte[] splitInfo,
- String uuid,
- String taskId,
- String partition_dir,
- String bucket_dir,
- byte[] confArray);
+ public CHDatasourceJniWrapper(String filePath, WriteRel write) {
+ this.instance = createFilerWriter(filePath, write.toByteArray());
+ }
- public native String nativeMergeMTParts(
- byte[] plan,
- byte[] splitInfo,
- String uuid,
- String taskId,
- String partition_dir,
- String bucket_dir);
+ public CHDatasourceJniWrapper(
+ String taskId, String partition_dir, String bucket_dir, WriteRel write, byte[] confArray) {
+ this.instance =
+ createMergeTreeWriter(taskId, partition_dir, bucket_dir, write.toByteArray(), confArray);
+ }
- public static native String filterRangesOnDriver(byte[] plan, byte[] read);
+ public void write(long blockAddress) {
+ write(instance, blockAddress);
+ }
+
+ public String close() {
+ return close(instance);
+ }
- public native void write(long instanceId, long blockAddress);
+ private native void write(long instanceId, long blockAddress);
- public native void writeToMergeTree(long instanceId, long blockAddress);
+ private native String close(long instanceId);
- public native void close(long instanceId);
+ /// FileWriter
+ private native long createFilerWriter(String filePath, byte[] writeRel);
- public native String closeMergeTreeWriter(long instanceId);
+ /// MergeTreeWriter
+ private native long createMergeTreeWriter(
+ String taskId, String partition_dir, String bucket_dir, byte[] writeRel, byte[] confArray);
+
+ public static native String nativeMergeMTParts(
+ byte[] splitInfo, String partition_dir, String bucket_dir);
+
+ public static native String filterRangesOnDriver(byte[] plan, byte[] read);
- /*-
+ /**
* The input block is already sorted by partition columns + bucket expressions. (check
- * org.apache.spark.sql.execution.datasources.FileFormatWriter#write)
- * However, the input block may contain parts(we call it stripe here) belonging to
- * different partition/buckets.
+ * org.apache.spark.sql.execution.datasources.FileFormatWriter#write) However, the input block may
+ * contain parts(we call it stripe here) belonging to different partition/buckets.
*
- * If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_.
+ * If bucketing is enabled, the input block's last column is guaranteed to be _bucket_value_.
*
- * This function splits the input block in to several blocks, each of which belonging
- * to the same partition/bucket. Notice the stripe will NOT contain partition columns
+ *
This function splits the input block in to several blocks, each of which belonging to the
+ * same partition/bucket. Notice the stripe will NOT contain partition columns
*
- * Since all rows in a stripe share the same partition/bucket,
- * we only need to check the heading row.
- * So, for each stripe, the native code also returns each stripe's first row's index.
- * Caller can use these indice to get UnsafeRows from the input block,
- * to help FileFormatDataWriter to aware partition/bucket changes.
+ *
Since all rows in a stripe share the same partition/bucket, we only need to check the
+ * heading row. So, for each stripe, the native code also returns each stripe's first row's index.
+ * Caller can use these indices to get UnsafeRows from the input block, to help
+ * FileFormatDataWriter to aware partition/bucket changes.
*/
public static native BlockStripes splitBlockByPartitionAndBucket(
long blockAddress,
- int[] partitionColIndice,
+ int[] partitionColIndices,
boolean hasBucket,
boolean reserve_partition_columns);
}
diff --git a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
index 9d6ed6868ec1..a305972045b9 100644
--- a/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
+++ b/backends-clickhouse/src/main/java/org/apache/spark/sql/execution/datasources/clickhouse/ExtensionTableBuilder.java
@@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.execution.datasources.clickhouse;
+import org.apache.spark.sql.execution.datasources.mergetree.MetaSerializer;
+import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer;
+
import java.util.List;
import java.util.Map;
@@ -34,13 +37,13 @@ public static ExtensionTableNode makeExtensionTable(
String bfIndexKey,
String setIndexKey,
String primaryKey,
- ClickhousePartSerializer partSerializer,
+ PartSerializer partSerializer,
String tableSchemaJson,
Map clickhouseTableConfigs,
List preferredLocations) {
String result =
- ClickhouseMetaSerializer.apply(
+ MetaSerializer.apply(
database,
tableName,
snapshotId,
diff --git a/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto b/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto
new file mode 100644
index 000000000000..a09f3ea0940c
--- /dev/null
+++ b/backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto
@@ -0,0 +1,37 @@
+// SPDX-License-Identifier: Apache-2.0
+syntax = "proto3";
+
+package local_engine;
+
+option java_package = "org.apache.spark.sql.execution.datasources.v1";
+option java_multiple_files = true;
+
+message Write {
+ message Common {
+ string format = 1;
+ }
+ message ParquetWrite{}
+ message OrcWrite{}
+ message MergeTreeWrite{
+ string database = 1;
+ string table = 2;
+ string snapshot_id = 3;
+ string order_by_key = 4;
+ string low_card_key = 5;
+ string minmax_index_key = 6;
+ string bf_index_key = 7;
+ string set_index_key = 8;
+ string primary_key = 9;
+ string relative_path = 10;
+ string absolute_path = 11;
+
+ string storage_policy = 12;
+ }
+
+ Common common = 1;
+ oneof file_format {
+ ParquetWrite parquet = 2;
+ OrcWrite orc = 3;
+ MergeTreeWrite mergetree = 4;
+ }
+}
\ No newline at end of file
diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
index 3c834b7ca847..6760d29561a3 100644
--- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala
@@ -38,7 +38,8 @@ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.FilePartition
-import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, ExtensionTableNode}
+import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, ExtensionTableNode}
+import org.apache.spark.sql.execution.datasources.mergetree.PartSerializer
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
@@ -148,7 +149,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
p.bfIndexKey,
p.setIndexKey,
p.primaryKey,
- ClickhousePartSerializer.fromMergeTreePartSplits(p.partList.toSeq),
+ PartSerializer.fromMergeTreePartSplits(p.partList.toSeq),
p.tableSchemaJson,
p.clickhouseTableConfigs.asJava,
CHAffinity.getNativeMergeTreePartitionLocations(p).toList.asJava
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
index 062e96962297..346ed4a17576 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala
@@ -16,12 +16,10 @@
*/
package org.apache.spark.sql.delta.catalog
-import org.apache.gluten.expression.ConverterUtils.normalizeColName
+import org.apache.gluten.expression.ConverterUtils
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.Snapshot
-import org.apache.spark.sql.delta.actions.Metadata
-import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader}
import org.apache.hadoop.fs.Path
@@ -38,7 +36,8 @@ trait ClickHouseTableV2Base extends TablePropertiesReader {
def configuration: Map[String, String] = deltaProperties
- def metadata: Metadata = deltaSnapshot.metadata
+ override lazy val partitionColumns: Seq[String] =
+ deltaSnapshot.metadata.partitionColumns.map(ConverterUtils.normalizeColName)
lazy val dataBaseName: String = deltaCatalog
.map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE))
@@ -49,18 +48,16 @@ trait ClickHouseTableV2Base extends TablePropertiesReader {
.getOrElse(deltaPath.toUri.getPath)
lazy val clickhouseTableConfigs: Map[String, String] = {
- Map("storage_policy" -> deltaProperties.getOrElse("storage_policy", "default"))
+ Map(StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default"))
}
- def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption)
+ def primaryKey(): String = StorageMeta.columnsToStr(primaryKeyOption)
- def orderByKey(): String = orderByKeyOption match {
- case Some(keys) => keys.map(normalizeColName).mkString(",")
- case None => "tuple()"
- }
+ def orderByKey(): String =
+ StorageMeta.columnsToStr(orderByKeyOption, StorageMeta.DEFAULT_ORDER_BY_KEY)
- def lowCardKey(): String = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption)
- def minmaxIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption)
- def bfIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption)
- def setIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption)
+ def lowCardKey(): String = StorageMeta.columnsToStr(lowCardKeyOption)
+ def minmaxIndexKey(): String = StorageMeta.columnsToStr(minmaxIndexKeyOption)
+ def bfIndexKey(): String = StorageMeta.columnsToStr(bfIndexKeyOption)
+ def setIndexKey(): String = StorageMeta.columnsToStr(setIndexKeyOption)
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
index 341018fb590b..1c7b4f232205 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala
@@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference,
import org.apache.spark.sql.delta._
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.commands.GlutenCacheBase._
-import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder}
-import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
+import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
+import org.apache.spark.sql.execution.datasources.mergetree.{PartSerializer, StorageMeta}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.{BooleanType, StringType}
@@ -176,13 +176,13 @@ case class GlutenCHCacheDataCommand(
onePart.tablePath,
pathToCache.toString,
snapshot.metadata.configuration
- .getOrElse("orderByKey", MergeTreeDeltaUtil.DEFAULT_ORDER_BY_KEY),
+ .getOrElse("orderByKey", StorageMeta.DEFAULT_ORDER_BY_KEY),
snapshot.metadata.configuration.getOrElse("lowCardKey", ""),
snapshot.metadata.configuration.getOrElse("minmaxIndexKey", ""),
snapshot.metadata.configuration.getOrElse("bloomfilterIndexKey", ""),
snapshot.metadata.configuration.getOrElse("setIndexKey", ""),
snapshot.metadata.configuration.getOrElse("primaryKey", ""),
- ClickhousePartSerializer.fromPartNames(parts.map(_.name).toSeq),
+ PartSerializer.fromPartNames(parts.map(_.name).toSeq),
ConverterUtils.convertNamedStructJson(snapshot.metadata.schema),
snapshot.metadata.configuration.asJava,
new JList[String]()
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala
deleted file mode 100644
index 854c6f91c917..000000000000
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreeDeltaUtil.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.datasources.clickhouse.utils
-
-import org.apache.gluten.expression.ConverterUtils.normalizeColName
-
-object MergeTreeDeltaUtil {
-
- val DEFAULT_ORDER_BY_KEY = "tuple()"
-
- def genOrderByAndPrimaryKeyStr(
- orderByKeyOption: Option[Seq[String]],
- primaryKeyOption: Option[Seq[String]]): (String, String) = {
-
- val orderByKey =
- orderByKeyOption.filter(_.nonEmpty).map(columnsToStr).getOrElse(DEFAULT_ORDER_BY_KEY)
- val primaryKey = primaryKeyOption
- .filter(p => orderByKey != DEFAULT_ORDER_BY_KEY && p.nonEmpty)
- .map(columnsToStr)
- .getOrElse("")
-
- (orderByKey, primaryKey)
- }
-
- def columnsToStr(option: Option[Seq[String]]): String = option match {
- case Some(keys) => keys.map(normalizeColName).mkString(",")
- case None => ""
- }
-
- def columnsToStr(keys: Seq[String]): String = {
- keys.map(normalizeColName).mkString(",")
- }
-}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
index dc31822fd73e..9347d8679b98 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/utils/MergeTreePartsPartitionsUtil.scala
@@ -35,8 +35,8 @@ import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, HadoopFsRelation, PartitionDirectory}
-import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhousePartSerializer, ExtensionTableBuilder, MergeTreePartFilterReturnedRange}
-import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
+import org.apache.spark.sql.execution.datasources.clickhouse.{ExtensionTableBuilder, MergeTreePartFilterReturnedRange}
+import org.apache.spark.sql.execution.datasources.mergetree.{PartSerializer, StorageMeta}
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.source.DeltaMergeTreeFileFormat
import org.apache.spark.sql.types.BooleanType
@@ -589,7 +589,7 @@ object MergeTreePartsPartitionsUtil extends Logging {
table.bfIndexKey(),
table.setIndexKey(),
table.primaryKey(),
- ClickhousePartSerializer.fromAddMergeTreeParts(selectPartsFiles),
+ PartSerializer.fromAddMergeTreeParts(selectPartsFiles),
tableSchemaJson,
clickhouseTableConfigs.asJava,
new JArrayList[String]()
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala
index de322b65dd8e..a69b1c651248 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala
@@ -16,21 +16,35 @@
*/
package org.apache.spark.sql.execution.datasources.mergetree
+import org.apache.gluten.expression.ConverterUtils
+
import org.apache.spark.sql.delta.actions.Metadata
-class DeltaMetaReader(
- override val metadata: Metadata,
- override val configuration: Map[String, String])
- extends TablePropertiesReader {
+case class DeltaMetaReader(metadata: Metadata)
+ extends TablePropertiesReader
+ with StorageConfigProvider {
- def storageDB: String = configuration(StorageMeta.STORAGE_DB)
- def storageTable: String = configuration(StorageMeta.STORAGE_TABLE)
- def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID)
- def storagePath: String = configuration(StorageMeta.STORAGE_PATH)
-}
+ override lazy val partitionColumns: Seq[String] =
+ metadata.partitionColumns.map(ConverterUtils.normalizeColName)
+
+ override lazy val configuration: Map[String, String] = metadata.configuration
-object DeltaMetaReader {
- def apply(metadata: Metadata): DeltaMetaReader = {
- new DeltaMetaReader(metadata, metadata.configuration)
+ lazy val storageConf: Map[String, String] = {
+ val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr(
+ orderByKeyOption,
+ primaryKeyOption
+ )
+ Map(
+ StorageMeta.DB -> configuration(StorageMeta.DB),
+ StorageMeta.TABLE -> configuration(StorageMeta.TABLE),
+ StorageMeta.SNAPSHOT_ID -> configuration(StorageMeta.SNAPSHOT_ID),
+ StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default"),
+ StorageMeta.ORDER_BY_KEY -> orderByKey0,
+ StorageMeta.LOW_CARD_KEY -> StorageMeta.columnsToStr(lowCardKeyOption),
+ StorageMeta.MINMAX_INDEX_KEY -> StorageMeta.columnsToStr(minmaxIndexKeyOption),
+ StorageMeta.BF_INDEX_KEY -> StorageMeta.columnsToStr(bfIndexKeyOption),
+ StorageMeta.SET_INDEX_KEY -> StorageMeta.columnsToStr(setIndexKeyOption),
+ StorageMeta.PRIMARY_KEY -> primaryKey0
+ )
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala
similarity index 66%
rename from backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala
rename to backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala
index 5c863d76c947..94a74bd2fa15 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/MetaSerializer.scala
@@ -14,25 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution.datasources.clickhouse
+package org.apache.spark.sql.execution.datasources.mergetree
import org.apache.gluten.execution.MergeTreePartSplit
import org.apache.gluten.expression.ConverterUtils
-import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
-import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta}
+import org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableNode
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
-import org.apache.spark.sql.types.StructType
import com.fasterxml.jackson.databind.ObjectMapper
import io.substrait.proto.ReadRel
-import java.net.URI
import java.util.{Map => jMap}
-import scala.collection.JavaConverters._
-
-case class ClickhousePartSerializer(
+case class PartSerializer(
partList: Seq[String],
starts: Seq[Long],
lengths: Seq[Long]
@@ -58,59 +53,29 @@ case class ClickhousePartSerializer(
}
}
-object ClickhousePartSerializer {
- def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): ClickhousePartSerializer = {
+object PartSerializer {
+ def fromMergeTreePartSplits(partLists: Seq[MergeTreePartSplit]): PartSerializer = {
val partList = partLists.map(_.name)
val starts = partLists.map(_.start)
val lengths = partLists.map(_.length)
- ClickhousePartSerializer(partList, starts, lengths)
+ PartSerializer(partList, starts, lengths)
}
- def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): ClickhousePartSerializer = {
+ def fromAddMergeTreeParts(parts: Seq[AddMergeTreeParts]): PartSerializer = {
val partList = parts.map(_.name)
val starts = parts.map(_ => 0L)
val lengths = parts.map(_.marks)
- ClickhousePartSerializer(partList, starts, lengths)
+ PartSerializer(partList, starts, lengths)
}
- def fromPartNames(partNames: Seq[String]): ClickhousePartSerializer = {
+ def fromPartNames(partNames: Seq[String]): PartSerializer = {
// starts and lengths is useless for writing
val partRanges = Seq.range(0L, partNames.length)
- ClickhousePartSerializer(partNames, partRanges, partRanges)
+ PartSerializer(partNames, partRanges, partRanges)
}
}
-object ClickhouseMetaSerializer {
-
- def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = {
- val clickhouseTableConfigs = deltaMetaReader.writeConfiguration
-
- val orderByKey = clickhouseTableConfigs("storage_orderByKey")
- val lowCardKey = clickhouseTableConfigs("storage_lowCardKey")
- val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey")
- val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey")
- val setIndexKey = clickhouseTableConfigs("storage_setIndexKey")
- val primaryKey = clickhouseTableConfigs("storage_primaryKey")
-
- val result = apply(
- deltaMetaReader.storageDB,
- deltaMetaReader.storageTable,
- deltaMetaReader.storageSnapshotId,
- deltaMetaReader.storagePath,
- "", // absolutePath
- orderByKey,
- lowCardKey,
- minmaxIndexKey,
- bfIndexKey,
- setIndexKey,
- primaryKey,
- ClickhousePartSerializer.fromPartNames(Seq()),
- ConverterUtils.convertNamedStructJson(dataSchema),
- clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava
- )
- ExtensionTableNode.toProtobuf(result)
-
- }
+object MetaSerializer {
// scalastyle:off argcount
def apply1(
database: String,
@@ -124,11 +89,11 @@ object ClickhouseMetaSerializer {
bfIndexKeyOption: Option[Seq[String]],
setIndexKeyOption: Option[Seq[String]],
primaryKeyOption: Option[Seq[String]],
- partSerializer: ClickhousePartSerializer,
+ partSerializer: PartSerializer,
tableSchemaJson: String,
clickhouseTableConfigs: jMap[String, String]): ReadRel.ExtensionTable = {
- val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
+ val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr(
orderByKeyOption,
primaryKeyOption
)
@@ -140,10 +105,10 @@ object ClickhouseMetaSerializer {
relativePath,
absolutePath,
orderByKey0,
- lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- minmaxIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
+ StorageMeta.columnsToStr(lowCardKeyOption),
+ StorageMeta.columnsToStr(minmaxIndexKeyOption),
+ StorageMeta.columnsToStr(bfIndexKeyOption),
+ StorageMeta.columnsToStr(setIndexKeyOption),
primaryKey0,
partSerializer,
tableSchemaJson,
@@ -164,7 +129,7 @@ object ClickhouseMetaSerializer {
bfIndexKey0: String,
setIndexKey0: String,
primaryKey0: String,
- partSerializer: ClickhousePartSerializer,
+ partSerializer: PartSerializer,
tableSchemaJson: String,
clickhouseTableConfigs: jMap[String, String]): String = {
// scalastyle:on argcount
@@ -192,7 +157,9 @@ object ClickhouseMetaSerializer {
.append(orderByKey)
.append("\n")
- if (orderByKey.nonEmpty && !(orderByKey == "tuple()")) {
+ if (orderByKey.isEmpty || orderByKey == StorageMeta.DEFAULT_ORDER_BY_KEY) {
+ extensionTableStr.append("").append("\n")
+ } else {
extensionTableStr.append(primaryKey).append("\n")
}
@@ -200,7 +167,7 @@ object ClickhouseMetaSerializer {
extensionTableStr.append(minmaxIndexKey).append("\n")
extensionTableStr.append(bfIndexKey).append("\n")
extensionTableStr.append(setIndexKey).append("\n")
- extensionTableStr.append(normalizeRelativePath(relativePath)).append("\n")
+ extensionTableStr.append(StorageMeta.normalizeRelativePath(relativePath)).append("\n")
extensionTableStr.append(absolutePath).append("\n")
appendConfigs(extensionTableStr, clickhouseTableConfigs)
extensionTableStr.append(partSerializer())
@@ -208,13 +175,6 @@ object ClickhouseMetaSerializer {
extensionTableStr.toString()
}
- private def normalizeRelativePath(relativePath: String): String = {
- val table_uri = URI.create(relativePath)
- if (table_uri.getPath.startsWith("/")) {
- table_uri.getPath.substring(1)
- } else table_uri.getPath
- }
-
private def appendConfigs(
extensionTableStr: StringBuilder,
clickhouseTableConfigs: jMap[String, String]): Unit = {
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala
index 6a7ebc3c39d2..bdd8c9b34683 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala
@@ -16,13 +16,12 @@
*/
package org.apache.spark.sql.execution.datasources.mergetree
-import org.apache.gluten.expression.ConverterUtils.normalizeColName
+import org.apache.gluten.expression.ConverterUtils
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.delta.actions.Metadata
-import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
-import org.apache.hadoop.fs.Path
+import java.net.URI
/** Reserved table property for MergeTree table. */
object StorageMeta {
@@ -30,42 +29,75 @@ object StorageMeta {
// Storage properties
val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db"
val DEFAULT_CREATE_TABLE_DATABASE: String = "default"
- val STORAGE_DB: String = "storage_db"
- val STORAGE_TABLE: String = "storage_table"
- val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id"
- val STORAGE_PATH: String = "storage_path"
+ val DEFAULT_ORDER_BY_KEY = "tuple()"
+ val STORAGE_PREFIX: String = "storage_"
+ val DB: String = prefixOf("db")
+ val TABLE: String = prefixOf("table")
+ val SNAPSHOT_ID: String = prefixOf("snapshot_id")
+ val STORAGE_PATH: String = prefixOf("path")
+ val POLICY: String = prefixOf("policy")
+ val ORDER_BY_KEY: String = prefixOf("orderByKey")
+ val LOW_CARD_KEY: String = prefixOf("lowCardKey")
+ val MINMAX_INDEX_KEY: String = prefixOf("minmaxIndexKey")
+ val BF_INDEX_KEY: String = prefixOf("bfIndexKey")
+ val SET_INDEX_KEY: String = prefixOf("setIndexKey")
+ val PRIMARY_KEY: String = prefixOf("primaryKey")
val SERIALIZER_HEADER: String = "MergeTree;"
- def withMoreStorageInfo(
+ private def prefixOf(key: String): String = s"$STORAGE_PREFIX$key"
+
+ def withStorageID(
metadata: Metadata,
- snapshotId: String,
- deltaPath: Path,
database: String,
- tableName: String): Metadata = {
- val moreOptions = Seq(
- STORAGE_DB -> database,
- STORAGE_SNAPSHOT_ID -> snapshotId,
- STORAGE_TABLE -> tableName,
- STORAGE_PATH -> deltaPath.toString)
+ tableName: String,
+ snapshotId: String): Metadata = {
+ val moreOptions = Seq(DB -> database, SNAPSHOT_ID -> snapshotId, TABLE -> tableName)
withMoreOptions(metadata, moreOptions)
}
private def withMoreOptions(metadata: Metadata, newOptions: Seq[(String, String)]): Metadata = {
metadata.copy(configuration = metadata.configuration ++ newOptions)
}
+
+ def normalizeRelativePath(relativePath: String): String = {
+ val table_uri = URI.create(relativePath)
+ if (table_uri.getPath.startsWith("/")) {
+ table_uri.getPath.substring(1)
+ } else table_uri.getPath
+ }
+
+ // TODO: remove this method
+ def genOrderByAndPrimaryKeyStr(
+ orderByKeyOption: Option[Seq[String]],
+ primaryKeyOption: Option[Seq[String]]): (String, String) = {
+
+ val orderByKey = columnsToStr(orderByKeyOption, DEFAULT_ORDER_BY_KEY)
+ val primaryKey = if (orderByKey == DEFAULT_ORDER_BY_KEY) "" else columnsToStr(primaryKeyOption)
+ (orderByKey, primaryKey)
+ }
+
+ def columnsToStr(option: Option[Seq[String]], default: String = ""): String =
+ option
+ .filter(_.nonEmpty)
+ .map(keys => keys.map(ConverterUtils.normalizeColName).mkString(","))
+ .getOrElse(default)
+}
+
+/** all properties start with 'storage_' */
+trait StorageConfigProvider {
+ val storageConf: Map[String, String]
}
trait TablePropertiesReader {
def configuration: Map[String, String]
- /** delta */
- def metadata: Metadata
+ val partitionColumns: Seq[String]
private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = {
configuration.get(keyName).map {
v =>
- val keys = v.split(",").map(n => normalizeColName(n.trim)).toSeq
+ val keys = v.split(",").map(n => ConverterUtils.normalizeColName(n.trim)).toSeq
keys.foreach {
s =>
if (s.contains(".")) {
@@ -107,13 +139,10 @@ trait TablePropertiesReader {
getCommaSeparatedColumns("setIndexKey")
}
- lazy val partitionColumns: Seq[String] =
- metadata.partitionColumns.map(normalizeColName)
-
lazy val orderByKeyOption: Option[Seq[String]] = {
val orderByKeys =
if (bucketOption.exists(_.sortColumnNames.nonEmpty)) {
- bucketOption.map(_.sortColumnNames.map(normalizeColName))
+ bucketOption.map(_.sortColumnNames.map(ConverterUtils.normalizeColName))
} else {
getCommaSeparatedColumns("orderByKey")
}
@@ -142,22 +171,4 @@ trait TablePropertiesReader {
primaryKeys
}
}
-
- lazy val writeConfiguration: Map[String, String] = {
- val (orderByKey0, primaryKey0) = MergeTreeDeltaUtil.genOrderByAndPrimaryKeyStr(
- orderByKeyOption,
- primaryKeyOption
- )
- Map(
- "storage_policy" -> configuration.getOrElse("storage_policy", "default"),
- "storage_orderByKey" -> orderByKey0,
- "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- "storage_minmaxIndexKey" -> minmaxIndexKeyOption
- .map(MergeTreeDeltaUtil.columnsToStr)
- .getOrElse(""),
- "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
- "storage_primaryKey" -> primaryKey0
- )
- }
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
index 69c001e461d8..03ee31daba08 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHFormatWriterInjects.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v1
import org.apache.gluten.execution.datasource.GlutenRowSplitter
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.memory.CHThreadGroup
+import org.apache.gluten.utils.SubstraitUtil
import org.apache.gluten.vectorized.CHColumnVector
import org.apache.spark.sql.SparkSession
@@ -27,32 +28,57 @@ import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.orc.OrcUtils
import org.apache.spark.sql.types.StructType
-import io.substrait.proto.{NamedStruct, Type}
+import com.google.protobuf.Any
+import io.substrait.proto
+import io.substrait.proto.{AdvancedExtension, NamedObjectWrite, NamedStruct}
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.TaskAttemptContext
+import java.{util => ju}
+
+import scala.collection.JavaConverters.seqAsJavaListConverter
+
trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
+ // TODO: move to SubstraitUtil
+ private def toNameStruct(dataSchema: StructType): NamedStruct = {
+ SubstraitUtil
+ .createNameStructBuilder(
+ ConverterUtils.collectAttributeTypeNodes(dataSchema),
+ dataSchema.fieldNames.map(ConverterUtils.normalizeColName).toSeq.asJava,
+ ju.Collections.emptyList()
+ )
+ .build()
+ }
+ def createWriteRel(
+ outputPath: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): proto.WriteRel = {
+ proto.WriteRel
+ .newBuilder()
+ .setTableSchema(toNameStruct(dataSchema))
+ .setNamedTable(
+ NamedObjectWrite.newBuilder
+ .setAdvancedExtension(
+ AdvancedExtension
+ .newBuilder()
+ .setOptimization(Any.pack(createNativeWrite(outputPath, context)))
+ .build())
+ .build())
+ .build()
+ }
+
+ def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write
+
override def createOutputWriter(
- path: String,
+ outputPath: String,
dataSchema: StructType,
context: TaskAttemptContext,
- nativeConf: java.util.Map[String, String]): OutputWriter = {
- val originPath = path
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
+ nativeConf: ju.Map[String, String]): OutputWriter = {
CHThreadGroup.registerNewThreadGroup()
- val namedStructBuilder = NamedStruct.newBuilder
- val structBuilder = Type.Struct.newBuilder
- for (field <- dataSchema.fields) {
- namedStructBuilder.addNames(field.name)
- structBuilder.addTypes(ConverterUtils.getTypeNode(field.dataType, field.nullable).toProtobuf)
- }
- namedStructBuilder.setStruct(structBuilder.build)
- val namedStruct = namedStructBuilder.build
-
- val instance =
- datasourceJniWrapper.nativeInitFileWriterWrapper(path, namedStruct.toByteArray, formatName)
+ val datasourceJniWrapper =
+ new CHDatasourceJniWrapper(outputPath, createWriteRel(outputPath, dataSchema, context))
new OutputWriter {
override def write(row: InternalRow): Unit = {
@@ -61,17 +87,17 @@ trait CHFormatWriterInjects extends GlutenFormatWriterInjectsBase {
if (nextBatch.numRows > 0) {
val col = nextBatch.column(0).asInstanceOf[CHColumnVector]
- datasourceJniWrapper.write(instance, col.getBlockAddress)
+ datasourceJniWrapper.write(col.getBlockAddress)
} // else just ignore this empty block
}
override def close(): Unit = {
- datasourceJniWrapper.close(instance)
+ datasourceJniWrapper.close()
}
// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
- originPath
+ outputPath
}
}
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
index 521b59d60e29..d25758b0f703 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala
@@ -26,59 +26,86 @@ import org.apache.gluten.utils.ConfigUtil
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter}
-import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer}
+import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageConfigProvider, StorageMeta}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter
import org.apache.spark.sql.types.StructType
import com.google.common.collect.Lists
import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.NamedStruct
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.TaskAttemptContext
-import java.util.{Map => JMap, UUID}
+import java.{util => ju}
import scala.collection.JavaConverters._
case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])
+case class HadoopConfReader(conf: Configuration) extends StorageConfigProvider {
+ lazy val storageConf: Map[String, String] = {
+ conf
+ .iterator()
+ .asScala
+ .filter(_.getKey.startsWith(StorageMeta.STORAGE_PREFIX))
+ .map(entry => entry.getKey -> entry.getValue)
+ .toMap
+ }
+}
+
class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
override def nativeConf(
options: Map[String, String],
- compressionCodec: String): JMap[String, String] = {
+ compressionCodec: String): ju.Map[String, String] = {
options.asJava
}
+ override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = {
+ val conf = HadoopConfReader(context.getConfiguration).storageConf
+ Write
+ .newBuilder()
+ .setCommon(Write.Common.newBuilder().setFormat(formatName).build())
+ .setMergetree(
+ Write.MergeTreeWrite
+ .newBuilder()
+ .setDatabase(conf(StorageMeta.DB))
+ .setTable(conf(StorageMeta.TABLE))
+ .setSnapshotId(conf(StorageMeta.SNAPSHOT_ID))
+ .setOrderByKey(conf(StorageMeta.ORDER_BY_KEY))
+ .setLowCardKey(conf(StorageMeta.LOW_CARD_KEY))
+ .setMinmaxIndexKey(conf(StorageMeta.MINMAX_INDEX_KEY))
+ .setBfIndexKey(conf(StorageMeta.BF_INDEX_KEY))
+ .setSetIndexKey(conf(StorageMeta.SET_INDEX_KEY))
+ .setPrimaryKey(conf(StorageMeta.PRIMARY_KEY))
+ .setRelativePath(StorageMeta.normalizeRelativePath(outputPath))
+ .setAbsolutePath("")
+ .setStoragePolicy(conf(StorageMeta.POLICY))
+ .build())
+ .build()
+ }
+
override def createOutputWriter(
- path: String,
+ outputPath: String,
dataSchema: StructType,
context: TaskAttemptContext,
- nativeConf: JMap[String, String]): OutputWriter = null
+ nativeConf: ju.Map[String, String]): OutputWriter = {
+
+ val storage = HadoopConfReader(context.getConfiguration)
+ val database = storage.storageConf(StorageMeta.DB)
+ val tableName = storage.storageConf(StorageMeta.TABLE)
+
+ val datasourceJniWrapper = new CHDatasourceJniWrapper(
+ context.getTaskAttemptID.getTaskID.getId.toString,
+ context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
+ context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
+ createWriteRel(outputPath, dataSchema, context),
+ ConfigUtil.serialize(nativeConf)
+ )
+ new MergeTreeOutputWriter(datasourceJniWrapper, database, tableName, outputPath)
+ }
override val formatName: String = "mergetree"
-
- def createOutputWriter(
- path: String,
- dataSchema: StructType,
- context: TaskAttemptContext,
- nativeConf: JMap[String, String],
- database: String,
- tableName: String,
- splitInfo: Array[Byte]): OutputWriter = {
- val datasourceJniWrapper = new CHDatasourceJniWrapper()
- val instance =
- datasourceJniWrapper.nativeInitMergeTreeWriterWrapper(
- null,
- splitInfo,
- UUID.randomUUID.toString,
- context.getTaskAttemptID.getTaskID.getId.toString,
- context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
- context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
- ConfigUtil.serialize(nativeConf)
- )
-
- new MergeTreeOutputWriter(database, tableName, datasourceJniWrapper, instance, path)
- }
}
object CHMergeTreeWriterInjects {
@@ -114,7 +141,7 @@ object CHMergeTreeWriterInjects {
val substraitContext = new SubstraitContext
- val extensionTable = ClickhouseMetaSerializer.apply1(
+ val extensionTable = MetaSerializer.apply1(
database,
tableName,
snapshotId,
@@ -126,7 +153,7 @@ object CHMergeTreeWriterInjects {
bfIndexKeyOption,
setIndexKeyOption,
primaryKeyOption,
- ClickhousePartSerializer.fromPartNames(partList),
+ PartSerializer.fromPartNames(partList),
tableSchemaJson,
clickhouseTableConfigs.asJava
)
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala
index 5beee25510b9..7c791bcf87d2 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHOrcWriterInjects.scala
@@ -16,17 +16,25 @@
*/
package org.apache.spark.sql.execution.datasources.v1
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.collection.mutable
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import java.{util => ju}
class CHOrcWriterInjects extends CHFormatWriterInjects {
+
override def nativeConf(
options: Map[String, String],
- compressionCodec: String): java.util.Map[String, String] = {
- val sparkOptions = new mutable.HashMap[String, String]()
+ compressionCodec: String): ju.Map[String, String] = {
+
// TODO: implement it
- sparkOptions.asJava
+ ju.Collections.emptyMap()
}
+ override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = Write
+ .newBuilder()
+ .setCommon(Write.Common.newBuilder().setFormat(formatName).build())
+ .setOrc(Write.OrcWrite.newBuilder().build())
+ .build()
+
override val formatName: String = "orc"
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
index 80214c67bba5..8c700c413346 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHParquetWriterInjects.scala
@@ -20,16 +20,17 @@ import org.apache.gluten.GlutenConfig
import org.apache.spark.sql.internal.SQLConf
-import scala.collection.JavaConverters.mapAsJavaMapConverter
-import scala.collection.mutable
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import java.{util => ju}
class CHParquetWriterInjects extends CHFormatWriterInjects {
override def nativeConf(
options: Map[String, String],
- compressionCodec: String): java.util.Map[String, String] = {
+ compressionCodec: String): ju.Map[String, String] = {
// pass options to native so that velox can take user-specified conf to write parquet,
// i.e., compression, block size, block rows.
- val sparkOptions = new mutable.HashMap[String, String]()
+ val sparkOptions = new ju.HashMap[String, String]()
sparkOptions.put(SQLConf.PARQUET_COMPRESSION.key, compressionCodec)
val blockSize = options.getOrElse(
GlutenConfig.PARQUET_BLOCK_SIZE,
@@ -39,9 +40,14 @@ class CHParquetWriterInjects extends CHFormatWriterInjects {
GlutenConfig.PARQUET_BLOCK_ROWS,
GlutenConfig.getConf.columnarParquetWriteBlockRows.toString)
sparkOptions.put(GlutenConfig.PARQUET_BLOCK_ROWS, blockRows)
- sparkOptions.asJava
+ sparkOptions
}
+ override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = Write
+ .newBuilder()
+ .setCommon(Write.Common.newBuilder().setFormat(formatName).build())
+ .setParquet(Write.ParquetWrite.newBuilder().build())
+ .build()
override val formatName: String = "parquet"
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
index 7b803e250278..29f2b7e16ec8 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatDataWriter.scala
@@ -59,7 +59,7 @@ abstract class MergeTreeFileFormatDataWriter(
protected val updatedPartitions: mutable.Set[String] = mutable.Set[String]()
protected var currentWriter: OutputWriter = _
- protected val returnedMetrics = mutable.HashMap[String, AddFile]()
+ protected val returnedMetrics: mutable.Map[String, AddFile] = mutable.HashMap[String, AddFile]()
/** Trackers for computing various statistics on the data as it's being written out. */
protected val statsTrackers: Seq[WriteTaskStatsTracker] =
@@ -71,10 +71,10 @@ abstract class MergeTreeFileFormatDataWriter(
try {
currentWriter.close()
statsTrackers.foreach(_.closeFile(currentWriter.path()))
- val ret = currentWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles
- if (ret.nonEmpty) {
- ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile))
- }
+ currentWriter
+ .asInstanceOf[MergeTreeOutputWriter]
+ .getAddFiles
+ .foreach(addFile => returnedMetrics.put(addFile.path, addFile))
} finally {
currentWriter = null
}
@@ -117,12 +117,7 @@ abstract class MergeTreeFileFormatDataWriter(
releaseResources()
val (taskCommitMessage, taskCommitTime) = Utils.timeTakenMs {
// committer.commitTask(taskAttemptContext)
- val statuses = returnedMetrics
- .map(
- v => {
- v._2
- })
- .toSeq
+ val statuses = returnedMetrics.map(_._2).toSeq
new TaskCommitMessage(statuses)
}
@@ -142,7 +137,7 @@ abstract class MergeTreeFileFormatDataWriter(
override def close(): Unit = {}
- def getReturnedMetrics(): mutable.Map[String, AddFile] = returnedMetrics
+ def getReturnedMetrics: mutable.Map[String, AddFile] = returnedMetrics
}
/** FileFormatWriteTask for empty partitions */
@@ -443,7 +438,11 @@ class MergeTreeDynamicPartitionDataSingleWriter(
case fakeRow: FakeRow =>
if (fakeRow.batch.numRows() > 0) {
val blockStripes = GlutenRowSplitter.getInstance
- .splitBlockByPartitionAndBucket(fakeRow, partitionColIndice, isBucketed, true)
+ .splitBlockByPartitionAndBucket(
+ fakeRow,
+ partitionColIndice,
+ isBucketed,
+ reserve_partition_columns = true)
val iter = blockStripes.iterator()
while (iter.hasNext) {
@@ -526,10 +525,10 @@ class MergeTreeDynamicPartitionDataConcurrentWriter(
if (status.outputWriter != null) {
try {
status.outputWriter.close()
- val ret = status.outputWriter.asInstanceOf[MergeTreeOutputWriter].getAddFiles
- if (ret.nonEmpty) {
- ret.foreach(addFile => returnedMetrics.put(addFile.path, addFile))
- }
+ status.outputWriter
+ .asInstanceOf[MergeTreeOutputWriter]
+ .getAddFiles
+ .foreach(addFile => returnedMetrics.put(addFile.path, addFile))
} finally {
status.outputWriter = null
}
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
index 52593d7c1795..14ac659deabd 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeOutputWriter.scala
@@ -27,11 +27,10 @@ import org.apache.spark.util.Utils
import scala.collection.mutable.ArrayBuffer
class MergeTreeOutputWriter(
+ datasourceJniWrapper: CHDatasourceJniWrapper,
database: String,
tableName: String,
- datasourceJniWrapper: CHDatasourceJniWrapper,
- instance: Long,
- originPath: String)
+ outputPath: String)
extends OutputWriter {
protected var addFiles: ArrayBuffer[AddFile] = new ArrayBuffer[AddFile]()
@@ -42,18 +41,18 @@ class MergeTreeOutputWriter(
if (nextBatch.numRows > 0) {
val col = nextBatch.column(0).asInstanceOf[CHColumnVector]
- datasourceJniWrapper.writeToMergeTree(instance, col.getBlockAddress)
+ datasourceJniWrapper.write(col.getBlockAddress)
} // else ignore this empty block
}
override def close(): Unit = {
- val returnedMetrics = datasourceJniWrapper.closeMergeTreeWriter(instance)
+ val returnedMetrics = datasourceJniWrapper.close()
if (returnedMetrics != null && returnedMetrics.nonEmpty) {
addFiles.appendAll(
AddFileTags.partsMetricsToAddFile(
database,
tableName,
- originPath,
+ outputPath,
returnedMetrics,
Seq(Utils.localHostName())))
}
@@ -61,7 +60,7 @@ class MergeTreeOutputWriter(
// Do NOT add override keyword for compatibility on spark 3.1.
def path(): String = {
- originPath
+ outputPath
}
def getAddFiles: ArrayBuffer[AddFile] = {
diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala
index 53ccb8d1c046..38f4fe7e26ab 100644
--- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala
+++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v2/clickhouse/ClickHouseConfig.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.clickhouse
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import java.util
@@ -64,8 +65,8 @@ object ClickHouseConfig {
if (!configurations.contains("sampling_key")) {
configurations += ("sampling_key" -> "")
}
- if (!configurations.contains("storage_policy")) {
- configurations += ("storage_policy" -> "default")
+ if (!configurations.contains(StorageMeta.POLICY)) {
+ configurations += (StorageMeta.POLICY -> "default")
}
if (!configurations.contains("is_distribute")) {
configurations += ("is_distribute" -> "true")
diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
index b4eca622c89f..186078c18dd1 100644
--- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
+++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala
@@ -55,6 +55,7 @@ class GlutenClickHouseMergeTreeWriteSuite
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.files.maxPartitionBytes", "20000000")
+ .set("spark.gluten.sql.native.writer.enabled", "true")
.setCHSettings("min_insert_block_size_rows", 100000)
.setCHSettings("mergetree.merge_after_insert", false)
.setCHSettings("input_format_parquet_max_block_size", 8192)
diff --git a/cpp-ch/local-engine/CMakeLists.txt b/cpp-ch/local-engine/CMakeLists.txt
index ca25692bf013..d145ed339ff5 100644
--- a/cpp-ch/local-engine/CMakeLists.txt
+++ b/cpp-ch/local-engine/CMakeLists.txt
@@ -128,9 +128,9 @@ foreach(child ${children})
add_headers_and_sources(function_parsers ${child})
endforeach()
-# Notice: soures files under Parser/*_udf subdirectories must be built into
+# Notice: sources files under Parser/*_udf subdirectories must be built into
# target ${LOCALENGINE_SHARED_LIB} directly to make sure all function parsers
-# are registered successly.
+# are registered successfully.
add_library(
${LOCALENGINE_SHARED_LIB} SHARED
local_engine_jni.cpp ${local_udfs_sources} ${function_parsers_sources}
diff --git a/cpp-ch/local-engine/Common/CHUtil.cpp b/cpp-ch/local-engine/Common/CHUtil.cpp
index c1fc49b0b96b..5e57be5ae91e 100644
--- a/cpp-ch/local-engine/Common/CHUtil.cpp
+++ b/cpp-ch/local-engine/Common/CHUtil.cpp
@@ -49,7 +49,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
@@ -62,7 +62,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
index 16a5d72a9e85..6d44213ec73b 100644
--- a/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
+++ b/cpp-ch/local-engine/Parser/RelParsers/WriteRelParser.cpp
@@ -23,7 +23,7 @@
#include
#include
#include
-#include
+#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
index bb0ea00b97e6..e8f1196a6634 100644
--- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
+++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
@@ -65,12 +65,12 @@
#include
#include
#include
-#include
#include
#include
#include
#include
#include
+#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
index e72454ba6fb2..164cf0392dfb 100644
--- a/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
+++ b/cpp-ch/local-engine/Parser/SubstraitParserUtils.cpp
@@ -26,14 +26,12 @@ namespace local_engine
{
void logDebugMessage(const google::protobuf::Message & message, const char * type)
{
- auto * logger = &Poco::Logger::get("SubstraitPlan");
- if (logger->debug())
+ if (auto * logger = &Poco::Logger::get("SubstraitPlan"); logger->debug())
{
namespace pb_util = google::protobuf::util;
pb_util::JsonOptions options;
std::string json;
- auto s = pb_util::MessageToJsonString(message, &json, options);
- if (!s.ok())
+ if (auto s = pb_util::MessageToJsonString(message, &json, options); !s.ok())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can not convert {} to Json", type);
LOG_DEBUG(logger, "{}:\n{}", type, json);
}
diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
index 6cd74c8af343..cee45aa34c4c 100644
--- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
+++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp
@@ -21,16 +21,10 @@
#include
#include
#include
-#include
-#include
#include
#include
-#include
#include
-#include
#include
-#include
-#include
namespace local_engine
{
@@ -86,7 +80,7 @@ void NativeSplitter::split(DB::Block & block)
{
if (partition_buffer[i]->size() >= options.buffer_size)
{
- output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns())));
+ output_buffer.emplace(std::pair(i, std::make_unique(partition_buffer[i]->releaseColumns())));
}
}
}
@@ -116,7 +110,7 @@ bool NativeSplitter::hasNext()
{
if (inputHasNext())
{
- split(*reinterpret_cast(inputNext()));
+ split(*reinterpret_cast(inputNext()));
}
else
{
@@ -125,7 +119,7 @@ bool NativeSplitter::hasNext()
auto buffer = partition_buffer.at(i);
if (buffer->size() > 0)
{
- output_buffer.emplace(std::pair(i, new Block(buffer->releaseColumns())));
+ output_buffer.emplace(std::pair(i, new DB::Block(buffer->releaseColumns())));
}
}
break;
@@ -214,7 +208,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject
selector_builder = std::make_unique(options.partition_num, hash_fields, options_.hash_algorithm);
}
-void HashNativeSplitter::computePartitionId(Block & block)
+void HashNativeSplitter::computePartitionId(DB::Block & block)
{
partition_info = selector_builder->build(block);
}
@@ -229,7 +223,7 @@ RoundRobinNativeSplitter::RoundRobinNativeSplitter(NativeSplitter::Options optio
selector_builder = std::make_unique(options_.partition_num);
}
-void RoundRobinNativeSplitter::computePartitionId(Block & block)
+void RoundRobinNativeSplitter::computePartitionId(DB::Block & block)
{
partition_info = selector_builder->build(block);
}
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
index 2921fc887f32..b7c60552468d 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.cpp
@@ -29,6 +29,8 @@
#include
#include
+#include
+
using namespace DB;
using namespace local_engine;
namespace
@@ -139,11 +141,8 @@ void doParseMergeTreeTableString(MergeTreeTable & table, ReadBufferFromString &
assertChar('\n', in);
readString(table.order_by_key, in);
assertChar('\n', in);
- if (table.order_by_key != MergeTreeTable::TUPLE)
- {
- readString(table.primary_key, in);
- assertChar('\n', in);
- }
+ readString(table.primary_key, in);
+ assertChar('\n', in);
readString(table.low_card_key, in);
assertChar('\n', in);
readString(table.minmax_index_key, in);
@@ -204,7 +203,7 @@ SparkStorageMergeTreePtr MergeTreeTable::copyToVirtualStorage(const ContextMutab
return merge_tree_table.getStorage(context);
}
-MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info)
+MergeTreeTableInstance::MergeTreeTableInstance(const std::string & info) : MergeTreeTable()
{
ReadBufferFromString in(info);
doParseMergeTreeTableString(*this, in);
@@ -244,6 +243,29 @@ std::shared_ptr MergeTreeTable::buildMetaData(const
return doBuildMetadata(header.getNamesAndTypesList(), context, *this);
}
+MergeTreeTable::MergeTreeTable(const substrait::WriteRel & write_rel)
+{
+ assert(write_rel.has_named_table());
+ const substrait::NamedObjectWrite & named_table = write_rel.named_table();
+ local_engine::Write write_opt;
+ named_table.advanced_extension().optimization().UnpackTo(&write_opt);
+ assert(write_opt.has_mergetree());
+ const Write_MergeTreeWrite & write = write_opt.mergetree();
+ database = write.database();
+ table = write.table();
+ snapshot_id = write.snapshot_id();
+ schema = write_rel.table_schema();
+ order_by_key = write.order_by_key();
+ low_card_key = write.low_card_key();
+ minmax_index_key = write.minmax_index_key();
+ bf_index_key = write.bf_index_key();
+ set_index_key = write.set_index_key();
+ primary_key = write.primary_key();
+ relative_path = write.relative_path();
+ absolute_path = write.absolute_path(); // always empty, see createNativeWrite in java
+ table_configs.storage_policy = write.storage_policy();
+}
+
std::unique_ptr buildMergeTreeSettings(const MergeTreeTableSettings & config)
{
auto settings = std::make_unique();
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h
index 2a20430205a6..d015e78b3471 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeMeta.h
@@ -77,6 +77,9 @@ struct MergeTreeTable
SparkStorageMergeTreePtr copyToVirtualStorage(const ContextMutablePtr & context) const;
std::shared_ptr buildMetaData(const DB::Block & header, const ContextPtr & context) const;
+
+ MergeTreeTable() = default;
+ explicit MergeTreeTable(const substrait::WriteRel & write_rel);
};
struct MergeTreeTableInstance : MergeTreeTable
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
index dc32565f8420..44a95cb2a4a6 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.cpp
@@ -47,11 +47,44 @@ Block removeColumnSuffix(const Block & block)
}
return Block(columns);
}
+
}
namespace local_engine
{
+std::string PartInfo::toJson(const std::vector & part_infos)
+{
+ rapidjson::StringBuffer result;
+ rapidjson::Writer writer(result);
+ writer.StartArray();
+ for (const auto & item : part_infos)
+ {
+ writer.StartObject();
+ writer.Key("part_name");
+ writer.String(item.part_name.c_str());
+ writer.Key("mark_count");
+ writer.Uint(item.mark_count);
+ writer.Key("disk_size");
+ writer.Uint(item.disk_size);
+ writer.Key("row_count");
+ writer.Uint(item.row_count);
+ writer.Key("bucket_id");
+ writer.String(item.bucket_id.c_str());
+ writer.Key("partition_values");
+ writer.StartObject();
+ for (const auto & key_value : item.partition_values)
+ {
+ writer.Key(key_value.first.c_str());
+ writer.String(key_value.second.c_str());
+ }
+ writer.EndObject();
+ writer.EndObject();
+ }
+ writer.EndArray();
+ return result.GetString();
+}
+
std::unique_ptr SparkMergeTreeWriter::create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWritePartitionSettings & write_settings_,
@@ -86,7 +119,7 @@ SparkMergeTreeWriter::SparkMergeTreeWriter(
{
}
-void SparkMergeTreeWriter::write(const DB::Block & block)
+void SparkMergeTreeWriter::write(DB::Block & block)
{
auto new_block = removeColumnSuffix(block);
auto converter = ActionsDAG::makeConvertingActions(
@@ -96,9 +129,10 @@ void SparkMergeTreeWriter::write(const DB::Block & block)
executor.push(new_block);
}
-void SparkMergeTreeWriter::finalize()
+std::string SparkMergeTreeWriter::close()
{
executor.finish();
+ return PartInfo::toJson(getAllPartInfo());
}
std::vector SparkMergeTreeWriter::getAllPartInfo() const
@@ -120,36 +154,4 @@ std::vector SparkMergeTreeWriter::getAllPartInfo() const
return res;
}
-String SparkMergeTreeWriter::partInfosToJson(const std::vector & part_infos)
-{
- rapidjson::StringBuffer result;
- rapidjson::Writer writer(result);
- writer.StartArray();
- for (const auto & item : part_infos)
- {
- writer.StartObject();
- writer.Key("part_name");
- writer.String(item.part_name.c_str());
- writer.Key("mark_count");
- writer.Uint(item.mark_count);
- writer.Key("disk_size");
- writer.Uint(item.disk_size);
- writer.Key("row_count");
- writer.Uint(item.row_count);
- writer.Key("bucket_id");
- writer.String(item.bucket_id.c_str());
- writer.Key("partition_values");
- writer.StartObject();
- for (const auto & key_value : item.partition_values)
- {
- writer.Key(key_value.first.c_str());
- writer.String(key_value.second.c_str());
- }
- writer.EndObject();
- writer.EndObject();
- }
- writer.EndArray();
- return result.GetString();
-}
-
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
index 699fd3d80b5b..59535cef094d 100644
--- a/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
+++ b/cpp-ch/local-engine/Storages/MergeTree/SparkMergeTreeWriter.h
@@ -21,6 +21,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -44,12 +45,13 @@ struct PartInfo
String bucket_id;
bool operator<(const PartInfo & rhs) const { return disk_size < rhs.disk_size; }
+
+ static std::string toJson(const std::vector & part_infos);
};
-class SparkMergeTreeWriter
+class SparkMergeTreeWriter : public NativeOutputWriter
{
public:
- static String partInfosToJson(const std::vector & part_infos);
static std::unique_ptr create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWritePartitionSettings & write_settings_,
@@ -61,9 +63,8 @@ class SparkMergeTreeWriter
DB::QueryPipeline && pipeline_,
std::unordered_map && partition_values_);
- void write(const DB::Block & block);
- void finalize();
- std::vector getAllPartInfo() const;
+ void write(DB::Block & block) override;
+ std::string close() override;
private:
DB::Block header;
@@ -71,5 +72,7 @@ class SparkMergeTreeWriter
DB::QueryPipeline pipeline;
DB::PushingPipelineExecutor executor;
std::unordered_map partition_values;
+
+ std::vector getAllPartInfo() const;
};
}
diff --git a/cpp-ch/local-engine/Storages/NativeOutputWriter.h b/cpp-ch/local-engine/Storages/NativeOutputWriter.h
new file mode 100644
index 000000000000..bfcbe0b092a5
--- /dev/null
+++ b/cpp-ch/local-engine/Storages/NativeOutputWriter.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+#include
+
+namespace DB
+{
+class Block;
+}
+namespace local_engine
+{
+class NativeOutputWriter
+{
+public:
+ NativeOutputWriter() = default;
+ virtual ~NativeOutputWriter() = default;
+
+ //TODO: change to write(const DB::Block & block)
+ virtual void write(DB::Block & block) = 0;
+ virtual std::string close() = 0;
+};
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
similarity index 93%
rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
index 975e5046bac2..d8b7f3240386 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.cpp
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.cpp
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "FileWriterWrappers.h"
+#include "NormalFileWriter.h"
#include
#include
@@ -25,12 +25,11 @@ namespace local_engine
const std::string SubstraitFileSink::NO_PARTITION_ID{"__NO_PARTITION_ID__"};
const std::string SubstraitPartitionedFileSink::DEFAULT_PARTITION_NAME{"__HIVE_DEFAULT_PARTITION__"};
-NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_)
- : FileWriterWrapper(file_), context(context_)
+NormalFileWriter::NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_) : file(file_), context(context_)
{
}
-void NormalFileWriter::consume(DB::Block & block)
+void NormalFileWriter::write(DB::Block & block)
{
if (!writer) [[unlikely]]
{
@@ -63,12 +62,14 @@ void NormalFileWriter::consume(DB::Block & block)
writer->push(materializeBlock(block));
}
-void NormalFileWriter::close()
+std::string NormalFileWriter::close()
{
/// When insert into a table with empty dataset, NormalFileWriter::consume would be never called.
/// So we need to skip when writer is nullptr.
if (writer)
writer->finish();
+
+ return std::string{};
}
OutputFormatFilePtr createOutputFormatFile(
@@ -84,7 +85,7 @@ OutputFormatFilePtr createOutputFormatFile(
return OutputFormatFileUtil::createFile(context, write_buffer_builder, encoded, preferred_schema, format_hint);
}
-std::unique_ptr createFileWriterWrapper(
+std::unique_ptr NormalFileWriter::create(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint)
{
return std::make_unique(createOutputFormatFile(context, file_uri, preferred_schema, format_hint), context);
diff --git a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
similarity index 93%
rename from cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
rename to cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
index 49383f8de42c..6d054a04fad3 100644
--- a/cpp-ch/local-engine/Storages/Output/FileWriterWrappers.h
+++ b/cpp-ch/local-engine/Storages/Output/NormalFileWriter.h
@@ -26,6 +26,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -36,31 +37,20 @@
namespace local_engine
{
-class FileWriterWrapper
+class NormalFileWriter : public NativeOutputWriter
{
public:
- explicit FileWriterWrapper(const OutputFormatFilePtr & file_) : file(file_) { }
- virtual ~FileWriterWrapper() = default;
+ static std::unique_ptr create(
+ const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
- virtual void consume(DB::Block & block) = 0;
- virtual void close() = 0;
-
-protected:
- OutputFormatFilePtr file;
-};
-
-using FileWriterWrapperPtr = std::shared_ptr;
-
-class NormalFileWriter : public FileWriterWrapper
-{
-public:
NormalFileWriter(const OutputFormatFilePtr & file_, const DB::ContextPtr & context_);
~NormalFileWriter() override = default;
- void consume(DB::Block & block) override;
- void close() override;
+ void write(DB::Block & block) override;
+ std::string close() override;
private:
+ OutputFormatFilePtr file;
DB::ContextPtr context;
OutputFormatFile::OutputFormatPtr output_format;
@@ -68,9 +58,6 @@ class NormalFileWriter : public FileWriterWrapper
std::unique_ptr writer;
};
-std::unique_ptr createFileWriterWrapper(
- const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
-
OutputFormatFilePtr createOutputFormatFile(
const DB::ContextPtr & context, const std::string & file_uri, const DB::Block & preferred_schema, const std::string & format_hint);
diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp
index af6fce0607ba..cce059badf57 100644
--- a/cpp-ch/local-engine/local_engine_jni.cpp
+++ b/cpp-ch/local-engine/local_engine_jni.cpp
@@ -30,7 +30,6 @@
#include
#include
#include
-#include
#include
#include
#include
@@ -45,12 +44,12 @@
#include
#include
#include
-#include
+#include
#include
-#include
#include
#include
#include
+#include
#include
#include
#include
@@ -906,58 +905,41 @@ JNIEXPORT void Java_org_apache_gluten_vectorized_CHBlockWriterJniWrapper_nativeC
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitFileWriterWrapper(
- JNIEnv * env, jobject, jstring file_uri_, jbyteArray preferred_schema_, jstring format_hint_)
+JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createFilerWriter(
+ JNIEnv * env, jobject, jstring file_uri_, jbyteArray writeRel)
{
LOCAL_ENGINE_JNI_METHOD_START
- const auto preferred_schema_ref = local_engine::getByteArrayElementsSafe(env, preferred_schema_);
- auto parse_named_struct = [&]() -> std::optional
- {
- std::string_view view{
- reinterpret_cast(preferred_schema_ref.elems()), static_cast(preferred_schema_ref.length())};
-
- substrait::NamedStruct res;
- bool ok = res.ParseFromString(view);
- if (!ok)
- return {};
- return std::move(res);
- };
+ const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel);
+ substrait::WriteRel write_rel = local_engine::BinaryToMessage(
+ {reinterpret_cast(writeRelBytes.elems()), static_cast(writeRelBytes.length())});
- auto named_struct = parse_named_struct();
- if (!named_struct.has_value())
- throw DB::Exception(DB::ErrorCodes::CANNOT_PARSE_PROTOBUF_SCHEMA, "Parse schema from substrait protobuf failed");
+ assert(write_rel.has_named_table());
+ const substrait::NamedObjectWrite & named_table = write_rel.named_table();
+ local_engine::Write write_opt;
+ named_table.advanced_extension().optimization().UnpackTo(&write_opt);
+ DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(write_rel.table_schema());
- DB::Block preferred_schema = local_engine::TypeParser::buildBlockFromNamedStructWithoutDFS(*named_struct);
const auto file_uri = jstring2string(env, file_uri_);
+
// for HiveFileFormat, the file url may not end with .parquet, so we pass in the format as a hint
- const auto format_hint = jstring2string(env, format_hint_);
const auto context = local_engine::QueryContext::instance().currentQueryContext();
- auto * writer = local_engine::createFileWriterWrapper(context, file_uri, preferred_schema, format_hint).release();
+ auto * writer = local_engine::NormalFileWriter::create(context, file_uri, preferred_schema, write_opt.common().format()).release();
return reinterpret_cast(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, 0)
}
-JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeInitMergeTreeWriterWrapper(
- JNIEnv * env,
- jobject,
- jbyteArray plan_,
- jbyteArray split_info_,
- jstring uuid_,
- jstring task_id_,
- jstring partition_dir_,
- jstring bucket_dir_,
- jbyteArray conf_plan)
+JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_createMergeTreeWriter(
+ JNIEnv * env, jobject, jstring task_id_, jstring partition_dir_, jstring bucket_dir_, jbyteArray writeRel, jbyteArray conf_plan)
{
LOCAL_ENGINE_JNI_METHOD_START
auto query_context = local_engine::QueryContext::instance().currentQueryContext();
// by task update new configs ( in case of dynamic config update )
const auto conf_plan_a = local_engine::getByteArrayElementsSafe(env, conf_plan);
- const std::string::size_type conf_plan_size = conf_plan_a.length();
-
- local_engine::SparkConfigs::updateConfig(query_context, {reinterpret_cast(conf_plan_a.elems()), conf_plan_size});
+ local_engine::SparkConfigs::updateConfig(
+ query_context, {reinterpret_cast(conf_plan_a.elems()), static_cast(conf_plan_a.length())});
- const auto uuid_str = jstring2string(env, uuid_);
+ const auto uuid_str = toString(DB::UUIDHelpers::generateV4());
const auto task_id = jstring2string(env, task_id_);
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
@@ -966,10 +948,10 @@ JNIEXPORT jlong Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniW
.part_name_prefix{uuid}, .partition_dir{partition_dir}, .bucket_dir{bucket_dir}};
settings.set(query_context);
- const auto split_info_a = local_engine::getByteArrayElementsSafe(env, split_info_);
- auto extension_table = local_engine::BinaryToMessage(
- {reinterpret_cast(split_info_a.elems()), static_cast(split_info_a.length())});
- local_engine::MergeTreeTableInstance merge_tree_table(extension_table);
+ const auto writeRelBytes = local_engine::getByteArrayElementsSafe(env, writeRel);
+ substrait::WriteRel write_rel = local_engine::BinaryToMessage(
+ {reinterpret_cast(writeRelBytes.elems()), static_cast(writeRelBytes.length())});
+ local_engine::MergeTreeTable merge_tree_table(write_rel);
auto * writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, settings, query_context).release();
return reinterpret_cast(writer);
@@ -1003,59 +985,28 @@ Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_write(JNI
{
LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
+ auto * writer = reinterpret_cast(instanceId);
auto * block = reinterpret_cast(block_address);
- writer->consume(*block);
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
- SCOPE_EXIT({ delete writer; });
- writer->close();
- LOCAL_ENGINE_JNI_METHOD_END(env, )
-}
-
-JNIEXPORT void Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_writeToMergeTree(
- JNIEnv * env, jobject, jlong instanceId, jlong block_address)
-{
- LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
- const auto * block = reinterpret_cast(block_address);
writer->write(*block);
LOCAL_ENGINE_JNI_METHOD_END(env, )
}
-JNIEXPORT jstring
-Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_closeMergeTreeWriter(JNIEnv * env, jobject, jlong instanceId)
+JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_close(JNIEnv * env, jobject, jlong instanceId)
{
LOCAL_ENGINE_JNI_METHOD_START
- auto * writer = reinterpret_cast(instanceId);
+ auto * writer = reinterpret_cast(instanceId);
SCOPE_EXIT({ delete writer; });
-
- writer->finalize();
- const auto part_infos = writer->getAllPartInfo();
- const auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
- return local_engine::charTojstring(env, json_info.c_str());
+ const auto result = writer->close();
+ return local_engine::charTojstring(env, result.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJniWrapper_nativeMergeMTParts(
- JNIEnv * env,
- jobject,
- jbyteArray plan_,
- jbyteArray split_info_,
- jstring uuid_,
- jstring task_id_,
- jstring partition_dir_,
- jstring bucket_dir_)
+ JNIEnv * env, jclass, jbyteArray split_info_, jstring partition_dir_, jstring bucket_dir_)
{
LOCAL_ENGINE_JNI_METHOD_START
- const auto uuid_str = jstring2string(env, uuid_);
-
+ const auto uuid_str = toString(DB::UUIDHelpers::generateV4());
const auto partition_dir = jstring2string(env, partition_dir_);
const auto bucket_dir = jstring2string(env, bucket_dir_);
@@ -1074,8 +1025,8 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
DB::StorageID storage_id = temp_storage->getStorageID();
SCOPE_EXIT({ local_engine::StorageMergeTreeFactory::freeStorage(storage_id); });
- std::vector selected_parts = local_engine::StorageMergeTreeFactory::instance().getDataPartsByNames(
- temp_storage->getStorageID(), "", merge_tree_table.getPartNames());
+ std::vector selected_parts
+ = local_engine::StorageMergeTreeFactory::getDataPartsByNames(temp_storage->getStorageID(), "", merge_tree_table.getPartNames());
std::unordered_map partition_values;
std::vector loaded
@@ -1089,8 +1040,7 @@ JNIEXPORT jstring Java_org_apache_spark_sql_execution_datasources_CHDatasourceJn
partPtr->name, partPtr->getMarksCount(), partPtr->getBytesOnDisk(), partPtr->rows_count, partition_values, bucket_dir});
}
- auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(res);
-
+ auto json_info = local_engine::PartInfo::toJson(res);
return local_engine::charTojstring(env, json_info.c_str());
LOCAL_ENGINE_JNI_METHOD_END(env, nullptr)
}
diff --git a/cpp-ch/local-engine/proto/write_optimization.proto b/cpp-ch/local-engine/proto/write_optimization.proto
new file mode 120000
index 000000000000..d1338a75fedb
--- /dev/null
+++ b/cpp-ch/local-engine/proto/write_optimization.proto
@@ -0,0 +1 @@
+../../../backends-clickhouse/src/main/resources/org/apache/spark/sql/execution/datasources/v1/write_optimization.proto
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
index ac0ec2145757..5a39580c19fd 100644
--- a/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex.cpp
@@ -20,6 +20,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -39,21 +40,21 @@
#include
#include
-# define ASSERT_DURATION_LE(secs, stmt) \
- { \
- std::promise completed; \
- auto stmt_future = completed.get_future(); \
- std::thread( \
- [&](std::promise & completed) \
- { \
- stmt; \
- completed.set_value(true); \
- }, \
- std::ref(completed)) \
- .detach(); \
- if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \
- GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \
- }
+#define ASSERT_DURATION_LE(secs, stmt) \
+ { \
+ std::promise completed; \
+ auto stmt_future = completed.get_future(); \
+ std::thread( \
+ [&](std::promise & completed) \
+ { \
+ stmt; \
+ completed.set_value(true); \
+ }, \
+ std::ref(completed)) \
+ .detach(); \
+ if (stmt_future.wait_for(std::chrono::seconds(secs)) == std::future_status::timeout) \
+ GTEST_FATAL_FAILURE_(" timed out (> " #secs " seconds). Check code for infinite loops"); \
+ }
namespace DB::ErrorCodes
diff --git a/cpp-ch/local-engine/tests/gtest_parser.cpp b/cpp-ch/local-engine/tests/gtest_parser.cpp
index b6c431ff25f3..da46ff6fdcde 100644
--- a/cpp-ch/local-engine/tests/gtest_parser.cpp
+++ b/cpp-ch/local-engine/tests/gtest_parser.cpp
@@ -24,7 +24,6 @@
#include
#include
#include
-#include
#include
#include
#include
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
index 992c91c942e0..233456992dff 100644
--- a/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline.cpp
@@ -19,32 +19,20 @@
#include
#include
#include
-#include
#include
#include
-#include
-#include
#include
#include
-#include
#include
-#include
#include
-#include
#include
#include
-#include
-#include
-#include
-#include
-#include
#include
#include
-#include
+#include
#include
#include
#include
-#include
#include
#include
@@ -263,197 +251,4 @@ TEST(WritePipeline, ComputePartitionedExpression)
EXPECT_EQ("s_nationkey=1/name=one", partition_by_result_column->getDataAt(0));
EXPECT_EQ("s_nationkey=2/name=two", partition_by_result_column->getDataAt(1));
EXPECT_EQ("s_nationkey=3/name=three", partition_by_result_column->getDataAt(2));
-}
-
-void do_remove(const std::string & folder)
-{
- namespace fs = std::filesystem;
- if (const std::filesystem::path ph(folder); fs::exists(ph))
- fs::remove_all(ph);
-}
-
-Chunk person_chunk()
-{
- auto id = INT()->createColumn();
- id->insert(100);
- id->insert(200);
- id->insert(300);
- id->insert(400);
- id->insert(500);
- id->insert(600);
- id->insert(700);
-
- auto name = STRING()->createColumn();
- name->insert("Joe");
- name->insert("Marry");
- name->insert("Mike");
- name->insert("Fred");
- name->insert("Albert");
- name->insert("Michelle");
- name->insert("Dan");
-
- auto age = makeNullable(INT())->createColumn();
- Field null_field;
- age->insert(30);
- age->insert(null_field);
- age->insert(18);
- age->insert(50);
- age->insert(null_field);
- age->insert(30);
- age->insert(50);
-
-
- MutableColumns x;
- x.push_back(std::move(id));
- x.push_back(std::move(name));
- x.push_back(std::move(age));
- return {std::move(x), 7};
-}
-
-TEST(WritePipeline, MergeTree)
-{
- ThreadStatus thread_status;
-
- const auto context = DB::Context::createCopy(QueryContext::globalContext());
- context->setPath("./");
- const Settings & settings = context->getSettingsRef();
-
- const std::string query
- = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)";
-
- const char * begin = query.data();
- const char * end = query.data() + query.size();
- ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]);
-
- ASTPtr ast = parseQuery(parser, begin, end, "", settings[Setting::max_query_size], settings[Setting::max_parser_depth], settings[Setting::max_parser_backtracks]);
-
- EXPECT_TRUE(ast->as());
- auto & create = ast->as();
-
- ColumnsDescription column_descriptions
- = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE);
-
- StorageInMemoryMetadata metadata;
- metadata.setColumns(column_descriptions);
- metadata.setComment("args.comment");
- ASTPtr partition_by_key;
- metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context);
-
- MergeTreeData::MergingParams merging_params;
- merging_params.mode = MergeTreeData::MergingParams::Ordinary;
-
-
- /// This merging param maybe used as part of sorting key
- std::optional merging_param_key_arg;
- /// Get sorting key from engine arguments.
- ///
- /// NOTE: store merging_param_key_arg as additional key column. We do it
- /// before storage creation. After that storage will just copy this
- /// column if sorting key will be changed.
- metadata.sorting_key
- = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
-
- std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings());
-
- UUID uuid;
- UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull;
- UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull;
-
- SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); });
-
- auto merge_tree = std::make_shared(
- StorageID("", "", uuid),
- "WritePipeline_MergeTree",
- metadata,
- LoadingStrictnessLevel::CREATE,
- context,
- "",
- merging_params,
- std::move(storage_settings));
-
- Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}};
- DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]);
- squashing.add(person_chunk());
- auto x = Squashing::squash(squashing.flush());
- x.getChunkInfos().add(std::make_shared());
-
- ASSERT_EQ(7, x.getNumRows());
- ASSERT_EQ(3, x.getNumColumns());
-
-
- auto metadata_snapshot = std::make_shared(metadata);
- ASTPtr none;
- auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false));
-
- sink->consume(x);
- sink->onFinish();
-}
-
-INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json");
-INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json");
-INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json");
-
-TEST(WritePipeline, SparkMergeTree)
-{
- ThreadStatus thread_status;
-
- const auto context = DB::Context::createCopy(QueryContext::globalContext());
- context->setPath("./");
- const Settings & settings = context->getSettingsRef();
-
- const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_));
- MergeTreeTableInstance merge_tree_table(extension_table);
-
- EXPECT_EQ(merge_tree_table.database, "default");
- EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
- EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree");
- EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default");
-
- do_remove(merge_tree_table.relative_path);
-
- const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext());
- EXPECT_TRUE(dest_storage);
- EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
- DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
- Block header = metadata_snapshot->getSampleBlock();
-
- constexpr std::string_view split_template
- = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
- constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
-
- SparkMergeTreeWritePartitionSettings gm_write_settings{
- .part_name_prefix{"this_is_prefix"},
- };
- gm_write_settings.set(context);
-
- auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context);
- SparkMergeTreeWriter & spark_merge_tree_writer = *writer;
-
- auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file);
- EXPECT_TRUE(local_executor->hasNext());
-
- do
- {
- spark_merge_tree_writer.write(*local_executor->nextColumnar());
- } while (local_executor->hasNext());
-
- spark_merge_tree_writer.finalize();
- auto part_infos = spark_merge_tree_writer.getAllPartInfo();
- auto json_info = local_engine::SparkMergeTreeWriter::partInfosToJson(part_infos);
- std::cerr << json_info << std::endl;
-
- ///
- {
- const auto extension_table_hdfs
- = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_));
- MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
- EXPECT_EQ(merge_tree_table_hdfs.database, "default");
- EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
- EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
- EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");
-
- const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
- EXPECT_TRUE(dest_storage_hdfs);
- EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
- }
}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
new file mode 100644
index 000000000000..5ede32c89073
--- /dev/null
+++ b/cpp-ch/local-engine/tests/gtest_write_pipeline_mergetree.cpp
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+namespace DB::Setting
+{
+extern const SettingsUInt64 max_parser_depth;
+extern const SettingsUInt64 max_parser_backtracks;
+extern const SettingsBool allow_settings_after_format_in_insert;
+extern const SettingsUInt64 max_query_size;
+extern const SettingsUInt64 min_insert_block_size_rows;
+extern const SettingsUInt64 min_insert_block_size_bytes;
+}
+
+using namespace local_engine;
+using namespace DB;
+
+namespace
+{
+void do_remove(const std::string & folder)
+{
+ namespace fs = std::filesystem;
+ if (const std::filesystem::path ph(folder); fs::exists(ph))
+ fs::remove_all(ph);
+}
+
+Chunk person_chunk()
+{
+ auto id = INT()->createColumn();
+ id->insert(100);
+ id->insert(200);
+ id->insert(300);
+ id->insert(400);
+ id->insert(500);
+ id->insert(600);
+ id->insert(700);
+
+ auto name = STRING()->createColumn();
+ name->insert("Joe");
+ name->insert("Marry");
+ name->insert("Mike");
+ name->insert("Fred");
+ name->insert("Albert");
+ name->insert("Michelle");
+ name->insert("Dan");
+
+ auto age = makeNullable(INT())->createColumn();
+ Field null_field;
+ age->insert(30);
+ age->insert(null_field);
+ age->insert(18);
+ age->insert(50);
+ age->insert(null_field);
+ age->insert(30);
+ age->insert(50);
+
+
+ MutableColumns x;
+ x.push_back(std::move(id));
+ x.push_back(std::move(name));
+ x.push_back(std::move(age));
+ return {std::move(x), 7};
+}
+}
+
+TEST(MergeTree, ClickhouseMergeTree)
+{
+ ThreadStatus thread_status;
+
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ context->setPath("./");
+ const Settings & settings = context->getSettingsRef();
+
+ const std::string query
+ = R"(create table if not exists person (id Int32, Name String, Age Nullable(Int32)) engine = MergeTree() ORDER BY id)";
+
+ const char * begin = query.data();
+ const char * end = query.data() + query.size();
+ ParserQuery parser(end, settings[Setting::allow_settings_after_format_in_insert]);
+
+ ASTPtr ast = parseQuery(
+ parser,
+ begin,
+ end,
+ "",
+ settings[Setting::max_query_size],
+ settings[Setting::max_parser_depth],
+ settings[Setting::max_parser_backtracks]);
+
+ EXPECT_TRUE(ast->as());
+ auto & create = ast->as();
+
+ ColumnsDescription column_descriptions
+ = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, context, LoadingStrictnessLevel::CREATE);
+
+ StorageInMemoryMetadata metadata;
+ metadata.setColumns(column_descriptions);
+ metadata.setComment("args.comment");
+ ASTPtr partition_by_key;
+ metadata.partition_key = KeyDescription::getKeyFromAST(partition_by_key, metadata.columns, context);
+
+ MergeTreeData::MergingParams merging_params;
+ merging_params.mode = MergeTreeData::MergingParams::Ordinary;
+
+
+ /// This merging param maybe used as part of sorting key
+ std::optional merging_param_key_arg;
+ /// Get sorting key from engine arguments.
+ ///
+ /// NOTE: store merging_param_key_arg as additional key column. We do it
+ /// before storage creation. After that storage will just copy this
+ /// column if sorting key will be changed.
+ metadata.sorting_key
+ = KeyDescription::getSortingKeyFromAST(create.storage->order_by->ptr(), metadata.columns, context, merging_param_key_arg);
+
+ std::unique_ptr storage_settings = std::make_unique(context->getMergeTreeSettings());
+
+ UUID uuid;
+ UUIDHelpers::getHighBytes(uuid) = 0xffffffffffff0fffull | 0x0000000000004000ull;
+ UUIDHelpers::getLowBytes(uuid) = 0x3fffffffffffffffull | 0x8000000000000000ull;
+
+ SCOPE_EXIT({ do_remove("WritePipeline_MergeTree"); });
+
+ auto merge_tree = std::make_shared(
+ StorageID("", "", uuid),
+ "WritePipeline_MergeTree",
+ metadata,
+ LoadingStrictnessLevel::CREATE,
+ context,
+ "",
+ merging_params,
+ std::move(storage_settings));
+
+ Block header{{INT(), "id"}, {STRING(), "Name"}, {makeNullable(INT()), "Age"}};
+ DB::Squashing squashing(header, settings[Setting::min_insert_block_size_rows], settings[Setting::min_insert_block_size_bytes]);
+ squashing.add(person_chunk());
+ auto x = Squashing::squash(squashing.flush());
+ x.getChunkInfos().add(std::make_shared());
+
+ ASSERT_EQ(7, x.getNumRows());
+ ASSERT_EQ(3, x.getNumColumns());
+
+
+ auto metadata_snapshot = std::make_shared(metadata);
+ ASTPtr none;
+ auto sink = std::static_pointer_cast(merge_tree->write(none, metadata_snapshot, context, false));
+
+ sink->consume(x);
+ sink->onFinish();
+}
+
+INCBIN(_1_mergetree_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree.json");
+INCBIN(_1_mergetree_hdfs_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_mergetree_hdfs.json");
+INCBIN(_1_read_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/1_plan.json");
+
+TEST(MergeTree, SparkMergeTree)
+{
+ ThreadStatus thread_status;
+
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ context->setPath("./");
+ const Settings & settings = context->getSettingsRef();
+
+ const auto extension_table = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_));
+ MergeTreeTableInstance merge_tree_table(extension_table);
+
+ EXPECT_EQ(merge_tree_table.database, "default");
+ EXPECT_EQ(merge_tree_table.table, "lineitem_mergetree");
+ EXPECT_EQ(merge_tree_table.relative_path, "lineitem_mergetree");
+ EXPECT_EQ(merge_tree_table.table_configs.storage_policy, "default");
+
+ do_remove(merge_tree_table.relative_path);
+
+ const auto dest_storage = merge_tree_table.getStorage(QueryContext::globalMutableContext());
+ EXPECT_TRUE(dest_storage);
+ EXPECT_FALSE(dest_storage->getStoragePolicy()->getAnyDisk()->isRemote());
+ DB::StorageMetadataPtr metadata_snapshot = dest_storage->getInMemoryMetadataPtr();
+ Block header = metadata_snapshot->getSampleBlock();
+
+ constexpr std::string_view split_template
+ = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
+ constexpr std::string_view file{GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
+
+ SparkMergeTreeWritePartitionSettings gm_write_settings{
+ .part_name_prefix{"this_is_prefix"},
+ };
+ gm_write_settings.set(context);
+
+ auto writer = local_engine::SparkMergeTreeWriter::create(merge_tree_table, gm_write_settings, context);
+ SparkMergeTreeWriter & spark_merge_tree_writer = *writer;
+
+ auto [_, local_executor] = test::create_plan_and_executor(EMBEDDED_PLAN(_1_read_), split_template, file);
+ EXPECT_TRUE(local_executor->hasNext());
+
+ do
+ {
+ spark_merge_tree_writer.write(*local_executor->nextColumnar());
+ } while (local_executor->hasNext());
+
+ auto json_info = spark_merge_tree_writer.close();
+ std::cerr << json_info << std::endl;
+
+ ///
+ {
+ const auto extension_table_hdfs
+ = local_engine::JsonStringToMessage(EMBEDDED_PLAN(_1_mergetree_hdfs_));
+ MergeTreeTableInstance merge_tree_table_hdfs(extension_table_hdfs);
+ EXPECT_EQ(merge_tree_table_hdfs.database, "default");
+ EXPECT_EQ(merge_tree_table_hdfs.table, "lineitem_mergetree_hdfs");
+ EXPECT_EQ(merge_tree_table_hdfs.relative_path, "3.5/test/lineitem_mergetree_hdfs");
+ EXPECT_EQ(merge_tree_table_hdfs.table_configs.storage_policy, "__hdfs_main");
+
+ const auto dest_storage_hdfs = merge_tree_table_hdfs.getStorage(QueryContext::globalMutableContext());
+ EXPECT_TRUE(dest_storage_hdfs);
+ EXPECT_TRUE(dest_storage_hdfs->getStoragePolicy()->getAnyDisk()->isRemote());
+ }
+}
+
+INCBIN(_2_mergetree_plan_, SOURCE_DIR "/utils/extern-local-engine/tests/json/mergetree/2_one_pipeline.json");
+
+TEST(MergeTree, Pipeline)
+{
+ GTEST_SKIP();
+ const auto context = DB::Context::createCopy(QueryContext::globalContext());
+ GlutenWriteSettings settings{
+ .task_write_tmp_dir = "file:///tmp/lineitem_mergetree",
+ .task_write_filename = "part-00000-a09f9d59-2dc6-43bc-a485-dcab8384b2ff.c000.mergetree",
+ };
+ settings.set(context);
+
+ constexpr std::string_view split_template
+ = R"({"items":[{"uriFile":"{replace_local_files}","length":"19230111","parquet":{},"schema":{},"metadataColumns":[{}],"properties":{"fileSize":"19230111","modificationTime":"1722330598029"}}]})";
+ auto [_, local_executor] = test::create_plan_and_executor(
+ EMBEDDED_PLAN(_2_mergetree_plan_),
+ split_template,
+ GLUTEN_SOURCE_TPCH_DIR("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet"),
+ context);
+ EXPECT_TRUE(local_executor->hasNext());
+ const Block & x = *local_executor->nextColumnar();
+}
\ No newline at end of file
diff --git a/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json
new file mode 100644
index 000000000000..fbc593267464
--- /dev/null
+++ b/cpp-ch/local-engine/tests/json/mergetree/2_one_pipeline.json
@@ -0,0 +1,368 @@
+{
+ "relations": [
+ {
+ "root": {
+ "input": {
+ "write": {
+ "namedTable": {
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/google.protobuf.StringValue",
+ "value": "WriteParameters:isSnappy=1;format=mergetree\n"
+ },
+ "enhancement": {
+ "@type": "type.googleapis.com/substrait.Type",
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ],
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ },
+ "tableSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "input": {
+ "read": {
+ "common": {
+ "direct": {}
+ },
+ "baseSchema": {
+ "names": [
+ "l_orderkey",
+ "l_partkey",
+ "l_suppkey",
+ "l_linenumber",
+ "l_quantity",
+ "l_extendedprice",
+ "l_discount",
+ "l_tax",
+ "l_returnflag",
+ "l_linestatus",
+ "l_shipdate",
+ "l_commitdate",
+ "l_receiptdate",
+ "l_shipinstruct",
+ "l_shipmode",
+ "l_comment"
+ ],
+ "struct": {
+ "types": [
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "i64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "fp64": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "date": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ },
+ {
+ "string": {
+ "nullability": "NULLABILITY_NULLABLE"
+ }
+ }
+ ]
+ },
+ "columnTypes": [
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL",
+ "NORMAL_COL"
+ ]
+ },
+ "advancedExtension": {
+ "optimization": {
+ "@type": "type.googleapis.com/google.protobuf.StringValue",
+ "value": "isMergeTree=0\n"
+ }
+ }
+ }
+ }
+ }
+ },
+ "outputSchema": {
+ "nullability": "NULLABILITY_REQUIRED"
+ }
+ }
+ }
+ ]
+}
\ No newline at end of file