Skip to content

Commit

Permalink
using hadoop Configuration to pass parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Oct 8, 2024
1 parent e7d502a commit 552e819
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

// just for the sake of compatibility
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.updateToHadoopConf(conf)

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import org.apache.gluten.execution.MergeTreePartSplit
import org.apache.gluten.expression.ConverterUtils

import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.mergetree.{DeltaMetaReader, StorageMeta}
import org.apache.spark.sql.execution.datasources.mergetree.StorageMeta
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -81,22 +81,23 @@ object ClickhousePartSerializer {
}

object ClickhouseMetaSerializer {
def forWrite(
relativePath: String,
clickhouseTableConfigs: Map[String, String],
dataSchema: StructType): ReadRel.ExtensionTable = {

def forWrite(deltaMetaReader: DeltaMetaReader, dataSchema: StructType): ReadRel.ExtensionTable = {
val clickhouseTableConfigs = deltaMetaReader.writeConfiguration

val orderByKey = clickhouseTableConfigs("storage_orderByKey")
val lowCardKey = clickhouseTableConfigs("storage_lowCardKey")
val minmaxIndexKey = clickhouseTableConfigs("storage_minmaxIndexKey")
val bfIndexKey = clickhouseTableConfigs("storage_bfIndexKey")
val setIndexKey = clickhouseTableConfigs("storage_setIndexKey")
val primaryKey = clickhouseTableConfigs("storage_primaryKey")
val orderByKey = clickhouseTableConfigs(StorageMeta.ORDER_BY_KEY)
val lowCardKey = clickhouseTableConfigs(StorageMeta.LOW_CARD_KEY)
val minmaxIndexKey = clickhouseTableConfigs(StorageMeta.MINMAX_INDEX_KEY)
val bfIndexKey = clickhouseTableConfigs(StorageMeta.BF_INDEX_KEY)
val setIndexKey = clickhouseTableConfigs(StorageMeta.SET_INDEX_KEY)
val primaryKey = clickhouseTableConfigs(StorageMeta.PRIMARY_KEY)

val result = apply(
deltaMetaReader.storageDB,
deltaMetaReader.storageTable,
deltaMetaReader.storageSnapshotId,
deltaMetaReader.storagePath,
clickhouseTableConfigs(StorageMeta.DB),
clickhouseTableConfigs(StorageMeta.TABLE),
clickhouseTableConfigs(StorageMeta.SNAPSHOT_ID),
relativePath,
"", // absolutePath
orderByKey,
lowCardKey,
Expand All @@ -106,7 +107,7 @@ object ClickhouseMetaSerializer {
primaryKey,
ClickhousePartSerializer.fromPartNames(Seq()),
ConverterUtils.convertNamedStructJson(dataSchema),
clickhouseTableConfigs.filter(_._1 == "storage_policy").asJava
clickhouseTableConfigs.filter(_._1 == StorageMeta.POLICY).asJava
)
ExtensionTableNode.toProtobuf(result)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import org.apache.gluten.utils.ConfigUtil
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter}
import org.apache.spark.sql.execution.datasources.clickhouse.{ClickhouseMetaSerializer, ClickhousePartSerializer}
import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, WriteConfiguration}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter
import org.apache.spark.sql.types.StructType

import com.google.common.collect.Lists
import com.google.protobuf.{Any, StringValue}
import io.substrait.proto.NamedStruct
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.TaskAttemptContext

import java.{util => ju}
Expand All @@ -41,6 +43,17 @@ import scala.collection.JavaConverters._

case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])

case class HadoopConfReader(conf: Configuration) extends WriteConfiguration {
lazy val writeConfiguration: Map[String, String] = {
conf
.iterator()
.asScala
.filter(_.getKey.startsWith("storage_"))
.map(entry => entry.getKey -> entry.getValue)
.toMap
}
}

class CHMergeTreeWriterInjects extends CHFormatWriterInjects {

override def nativeConf(
Expand All @@ -58,28 +71,25 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
path: String,
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: ju.Map[String, String]): OutputWriter = null
nativeConf: ju.Map[String, String]): OutputWriter = {

override val formatName: String = "mergetree"
val storage = HadoopConfReader(context.getConfiguration)
val database = storage.writeConfiguration(StorageMeta.DB)
val tableName = storage.writeConfiguration(StorageMeta.TABLE)
val extensionTable =
ClickhouseMetaSerializer.forWrite(path, storage.writeConfiguration, dataSchema)

def createOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext,
nativeConf: ju.Map[String, String],
database: String,
tableName: String,
splitInfo: Array[Byte]): OutputWriter = {
val datasourceJniWrapper = new CHDatasourceJniWrapper(
splitInfo,
extensionTable.toByteArray,
context.getTaskAttemptID.getTaskID.getId.toString,
context.getConfiguration.get("mapreduce.task.gluten.mergetree.partition.dir"),
context.getConfiguration.get("mapreduce.task.gluten.mergetree.bucketid.str"),
ConfigUtil.serialize(nativeConf)
)

new MergeTreeOutputWriter(datasourceJniWrapper, database, tableName, path)
}

override val formatName: String = "mergetree"
}

object CHMergeTreeWriterInjects {
Expand Down

0 comments on commit 552e819

Please sign in to comment.