Skip to content

Commit

Permalink
[GLUTEN-7028][CH][Part-5] Refactor: add NativeOutputWriter to unify C…
Browse files Browse the repository at this point in the history
…HDatasourceJniWrapper (#7395)

* Add NativeOutputWriter

* refactor CHDatasourceJniWrapper

* WriteConfiguration

* using hadoop Configuration to pass parameter

* Implement CHMergeTreeWriterInjects::createNativeWrite

* Rename datasources.clickhouse.ClickhouseMetaSerializer => datasources.mergetree.MetaSerializer

* delete MergeTreeDeltaUtil and move its functionality to StorageMeta

* WriteConfiguration => StorageConfigProvider

* fix prefixof

* WriteConfiguration => StorageConfigProvider 2

* withStorageID
  • Loading branch information
baibaichen authored Oct 9, 2024
1 parent dfbf226 commit 5d28de6
Show file tree
Hide file tree
Showing 49 changed files with 1,247 additions and 815 deletions.
22 changes: 22 additions & 0 deletions backends-clickhouse/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,28 @@
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<!-- compile proto buffer files using copied protoc binary -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<executions>
<execution>
<id>compile-gluten-proto</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
<configuration>
<protocArtifact>
com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}
</protocArtifact>
<protoSourceRoot>src/main/resources/org/apache/spark/sql/execution/datasources/v1</protoSourceRoot>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
StorageMeta
.withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot)))
}

override def deltaProperties: Map[String, String] = properties().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -50,22 +49,15 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -76,19 +68,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,8 @@ class ClickHouseTableV2(

def getFileFormat(meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
StorageMeta.withMoreStorageInfo(
meta,
ClickhouseSnapshot.genSnapshotId(snapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
StorageMeta
.withStorageID(meta, dataBaseName, tableName, ClickhouseSnapshot.genSnapshotId(snapshot)))
}

override def deltaProperties: Map[String, String] = properties().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -95,8 +95,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -115,13 +113,9 @@ object OptimizeTableCommandOverwrites extends Logging {
description.tableSchema.toAttributes
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down Expand Up @@ -170,7 +164,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)

val sparkSession = SparkSession.getActiveSession.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,16 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

// just for the sake of compatibility
val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +72,10 @@ class DeltaMergeTreeFileFormat(metadata: Metadata) extends DeltaParquetFileForma
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,11 @@ class ClickHouseTableV2(
def getFileFormat(protocol: Protocol, meta: Metadata): DeltaMergeTreeFileFormat = {
new DeltaMergeTreeFileFormat(
protocol,
StorageMeta.withMoreStorageInfo(
StorageMeta.withStorageID(
meta,
ClickhouseSnapshot.genSnapshotId(initialSnapshot),
deltaLog.dataPath,
dataBaseName,
tableName))
tableName,
ClickhouseSnapshot.genSnapshotId(initialSnapshot)))
}

override def deltaProperties: Map[String, String] = properties().asScala.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.{FileAlreadyExistsException, Path}
import org.apache.hadoop.mapreduce.{TaskAttemptContext, TaskAttemptID, TaskID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl

import java.util.{Date, UUID}
import java.util.Date
import scala.collection.mutable.ArrayBuffer

object OptimizeTableCommandOverwrites extends Logging {
Expand Down Expand Up @@ -97,8 +97,6 @@ object OptimizeTableCommandOverwrites extends Logging {
try {
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {

val uuid = UUID.randomUUID.toString

val planWithSplitInfo = CHMergeTreeWriterInjects.genMergeTreeWriteRel(
description.path,
description.database,
Expand All @@ -117,13 +115,9 @@ object OptimizeTableCommandOverwrites extends Logging {
DataTypeUtils.toAttributes(description.tableSchema)
)

val datasourceJniWrapper = new CHDatasourceJniWrapper()
val returnedMetrics =
datasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.plan,
CHDatasourceJniWrapper.nativeMergeMTParts(
planWithSplitInfo.splitInfo,
uuid,
taskId.getId.toString,
description.partitionDir.getOrElse(""),
description.bucketDir.getOrElse("")
)
Expand Down Expand Up @@ -172,7 +166,7 @@ object OptimizeTableCommandOverwrites extends Logging {
bucketNum: String,
bin: Seq[AddFile],
maxFileSize: Long): Seq[FileAction] = {
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog);
val tableV2 = ClickHouseTableV2.getTable(txn.deltaLog)

val sparkSession = SparkSession.getActiveSession.get

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaParquetFileFormat
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.execution.datasources.clickhouse.ClickhouseMetaSerializer
import org.apache.spark.sql.execution.datasources.mergetree.DeltaMetaReader
import org.apache.spark.sql.execution.datasources.v1.{CHMergeTreeWriterInjects, GlutenMergeTreeWriterInjects}
import org.apache.spark.sql.execution.datasources.v1.GlutenMergeTreeWriterInjects
import org.apache.spark.sql.types.StructType

import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -53,22 +52,15 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
// pass compression to job conf so that the file extension can be aware of it.
// val conf = ContextUtil.getConfiguration(job)
val conf = job.getConfiguration

val nativeConf =
GlutenMergeTreeWriterInjects
.getInstance()
.nativeConf(options, "")

@transient val deltaMetaReader = DeltaMetaReader(metadata)

val database = deltaMetaReader.storageDB
val tableName = deltaMetaReader.storageTable
val deltaPath = deltaMetaReader.storagePath

val extensionTableBC = sparkSession.sparkContext.broadcast(
ClickhouseMetaSerializer
.forWrite(deltaMetaReader, metadata.schema)
.toByteArray)
deltaMetaReader.storageConf.foreach { case (k, v) => conf.set(k, v) }

new OutputWriterFactory {
override def getFileExtension(context: TaskAttemptContext): String = {
Expand All @@ -79,19 +71,10 @@ class DeltaMergeTreeFileFormat(protocol: Protocol, metadata: Metadata)
path: String,
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
require(path == deltaPath)

GlutenMergeTreeWriterInjects
.getInstance()
.asInstanceOf[CHMergeTreeWriterInjects]
.createOutputWriter(
path,
metadata.schema,
context,
nativeConf,
database,
tableName,
extensionTableBC.value
)
.createOutputWriter(path, metadata.schema, context, nativeConf)
}
}
}
Expand Down
Loading

0 comments on commit 5d28de6

Please sign in to comment.