From 2c351347791b2d1a9d829803118cf9f9ef83f04a Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Tue, 8 Oct 2024 17:12:30 +0800 Subject: [PATCH] using hadoop Configuration to pass parameter --- .../source/DeltaMergeTreeFileFormat.scala | 29 ++++------------ .../source/DeltaMergeTreeFileFormat.scala | 30 ++++------------ .../source/DeltaMergeTreeFileFormat.scala | 29 ++++------------ .../clickhouse/ClickhouseMetaSerializer.scala | 31 +++++++++-------- .../v1/CHMergeTreeWriterInjects.scala | 34 ++++++++++++------- 5 files changed, 57 insertions(+), 96 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 19b3d396bd6f4..1760331a44e64 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.updateToHadoopConf(conf) new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index c2d1ec47b9ca1..adc5f2700f213 100644 --- a/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-23/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + + // just for the sake of compatibility val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.updateToHadoopConf(conf) new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala index 1489ae4dbf490..df750f594187e 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/datasources/v2/clickhouse/source/DeltaMergeTreeFileFormat.scala @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaParquetFileFormat import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} -import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader -import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects} +import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects import org.apache.spark.sql.types.StructType import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} @@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { // pass compression to job conf so that the file extension can be aware of it. - // val conf = ContextUtil.getConfiguration(job) + val conf = job.getConfiguration + val nativeConf = GlutenMergeTreeWriterInjects .getInstance() .nativeConf(options, "") @transient val deltaMetaReader = DeltaMetaReader(metadata) - - val database = deltaMetaReader.storageDB - val tableName = deltaMetaReader.storageTable - val deltaPath = deltaMetaReader.storagePath - - val extensionTableBC = sparkSession.sparkContext.broadcast( - ClickhouseMetaSerializer - .forWrite(deltaMetaReader, metadata.schema) - .toByteArray) + deltaMetaReader.updateToHadoopConf(conf) new OutputWriterFactory { override def getFileExtension(context: TaskAttemptContext): String = { @@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata) path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = { - require(path == deltaPath) + GlutenMergeTreeWriterInjects .getInstance() - .asInstanceOf[CHMergeTreeWriterInjects] - .createOutputWriter( - path, - metadata.schema, - context, - nativeConf, - database, - tableName, - extensionTableBC.value - ) + .createOutputWriter(path, metadata.schema, context, nativeConf) } } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala index 5c863d76c947c..db3a9389637e1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/clickhouse/ClickhouseMetaSerializer.scala @@ -20,7 +20,7 @@ import org.apache.gluten.execution.MergeTreePartSplit import org.apache.gluten.expression.ConverterUtils import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil -import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta} +import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts import org.apache.spark.sql.types.StructType @@ -81,22 +81,23 @@ object ClickhousePartSerializer { } object ClickhouseMetaSerializer { + def forWrite( + relativePath: String, + clickhouseTableConfigs: Map[String, String], + dataSchema: StructType): ReadRel.ExtensionTable = { - def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = { - val clickhouseTableConfigs = deltaMetaReader.writeConfiguration - - val orderByKey = clickhouseTableConfigs("storage_orderByKey") - val lowCardKey = clickhouseTableConfigs("storage_lowCardKey") - val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey") - val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey") - val setIndexKey = clickhouseTableConfigs("storage_setIndexKey") - val primaryKey = clickhouseTableConfigs("storage_primaryKey") + val orderByKey = clickhouseTableConfigs(StorageMeta.ORDER_BY_KEY) + val lowCardKey = clickhouseTableConfigs(StorageMeta.LOW_CARD_KEY) + val minmaxIndexKey = clickhouseTableConfigs(StorageMeta.MINMAX_INDEX_KEY) + val bfIndexKey = clickhouseTableConfigs(StorageMeta.BF_INDEX_KEY) + val setIndexKey = clickhouseTableConfigs(StorageMeta.SET_INDEX_KEY) + val primaryKey = clickhouseTableConfigs(StorageMeta.PRIMARY_KEY) val result = apply( - deltaMetaReader.storageDB, - deltaMetaReader.storageTable, - deltaMetaReader.storageSnapshotId, - deltaMetaReader.storagePath, + clickhouseTableConfigs(StorageMeta.DB), + clickhouseTableConfigs(StorageMeta.TABLE), + clickhouseTableConfigs(StorageMeta.SNAPSHOT_ID), + relativePath, "", // absolutePath orderByKey, lowCardKey, @@ -106,7 +107,7 @@ object ClickhouseMetaSerializer { primaryKey, ClickhousePartSerializer.fromPartNames(Seq()), ConverterUtils.convertNamedStructJson(dataSchema), - clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava + clickhouseTableConfigs.filter(_._1 == StorageMeta.POLICY).asJava ) ExtensionTableNode.toProtobuf(result) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index c17e321e444bc..2416d3e79243b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -27,12 +27,14 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter} import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer} +import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, WriteConfiguration} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter import org.apache.spark.sql.types.StructType import com.google.common.collect.Lists import com.google.protobuf.{Any, StringValue} import io.substrait.proto.NamedStruct +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.TaskAttemptContext import java.{util => ju} @@ -41,6 +43,17 @@ import scala.collection.JavaConverters._ case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte]) +case class HadoopConfReader(conf: Configuration) extends WriteConfiguration { + lazy val writeConfiguration: Map[String, String] = { + conf + .iterator() + .asScala + .filter(_.getKey.startsWith("storage_")) + .map(entry => entry.getKey -> entry.getValue) + .toMap + } +} + class CHMergeTreeWriterInjects extends CHFormatWriterInjects { override def nativeConf( @@ -58,28 +71,25 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { path: String, dataSchema: StructType, context: TaskAttemptContext, - nativeConf: ju.Map[String, String]): OutputWriter = null + nativeConf: ju.Map[String, String]): OutputWriter = { - override val formatName: String = "mergetree" + val storage = HadoopConfReader(context.getConfiguration) + val database = storage.writeConfiguration(StorageMeta.DB) + val tableName = storage.writeConfiguration(StorageMeta.TABLE) + val extensionTable = + ClickhouseMetaSerializer.forWrite(path, storage.writeConfiguration, dataSchema) - def createOutputWriter( - path: String, - dataSchema: StructType, - context: TaskAttemptContext, - nativeConf: ju.Map[String, String], - database: String, - tableName: String, - splitInfo: Array[Byte]): OutputWriter = { val datasourceJniWrapper = new CHDatasourceJniWrapper( - splitInfo, + extensionTable.toByteArray, context.getTaskAttemptID.getTaskID.getId.toString, context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"), context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"), ConfigUtil.serialize(nativeConf) ) - new MergeTreeOutputWriter(datasourceJniWrapper, database, tableName, path) } + + override val formatName: String = "mergetree" } object CHMergeTreeWriterInjects {