Skip to content

Commit

Permalink
WriteConfiguration => StorageConfigProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Oct 9, 2024
1 parent caf2481 commit 17bb049
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class DeltaMetaReader(metadata: Metadata) extends TablePropertiesReader {
conf.set(StorageMeta.DB, storageDB)
conf.set(StorageMeta.TABLE, storageTable)
conf.set(StorageMeta.SNAPSHOT_ID, storageSnapshotId)
writeConfiguration.foreach { case (k, v) => conf.set(k, v) }
storageConf.foreach { case (k, v) => conf.set(k, v) }
}

override lazy val partitionColumns: Seq[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ object StorageMeta {
val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db"
val DEFAULT_CREATE_TABLE_DATABASE: String = "default"
val DEFAULT_ORDER_BY_KEY = "tuple()"
val DB: String = "storage_db"
val TABLE: String = "storage_table"
val SNAPSHOT_ID: String = "storage_snapshot_id"
val STORAGE_PATH: String = "storage_path"
val POLICY: String = "storage_policy"
val ORDER_BY_KEY: String = "storage_orderByKey"
val LOW_CARD_KEY: String = "storage_lowCardKey"
val MINMAX_INDEX_KEY: String = "storage_minmaxIndexKey"
val BF_INDEX_KEY: String = "storage_bfIndexKey"
val SET_INDEX_KEY: String = "storage_setIndexKey"
val PRIMARY_KEY: String = "storage_primaryKey"
val STORAGE_PREFIX: String = "storage_"
val DB: String = prefixOf("db")
val TABLE: String = prefixOf("table")
val SNAPSHOT_ID: String = prefixOf("snapshot_id")
val STORAGE_PATH: String = prefixOf("path")
val POLICY: String = prefixOf("policy")
val ORDER_BY_KEY: String = prefixOf("orderByKey")
val LOW_CARD_KEY: String = prefixOf("lowCardKey")
val MINMAX_INDEX_KEY: String = prefixOf("minmaxIndexKey")
val BF_INDEX_KEY: String = prefixOf("bfIndexKey")
val SET_INDEX_KEY: String = prefixOf("setIndexKey")
val PRIMARY_KEY: String = prefixOf("primaryKey")
val SERIALIZER_HEADER: String = "MergeTree;"

private def prefixOf(key: String): String = s"$STORAGE_PREFIX.$key"

def withMoreStorageInfo(
metadata: Metadata,
snapshotId: String,
Expand Down Expand Up @@ -87,11 +90,12 @@ object StorageMeta {
.getOrElse(default)
}

trait WriteConfiguration {
val writeConfiguration: Map[String, String]
/** all properties start with 'storage_' */
trait StorageConfigProvider {
val storageConf: Map[String, String]
}

trait TablePropertiesReader extends WriteConfiguration {
trait TablePropertiesReader extends StorageConfigProvider {

def configuration: Map[String, String]

Expand Down Expand Up @@ -175,7 +179,7 @@ trait TablePropertiesReader extends WriteConfiguration {
}
}

lazy val writeConfiguration: Map[String, String] = {
lazy val storageConf: Map[String, String] = {
val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr(
orderByKeyOption,
primaryKeyOption
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.gluten.utils.ConfigUtil

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter}
import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageMeta, WriteConfiguration}
import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageConfigProvider, StorageMeta}
import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter
import org.apache.spark.sql.types.StructType

Expand All @@ -42,12 +42,12 @@ import scala.collection.JavaConverters._

case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte])

case class HadoopConfReader(conf: Configuration) extends WriteConfiguration {
lazy val writeConfiguration: Map[String, String] = {
case class HadoopConfReader(conf: Configuration) extends StorageConfigProvider {
lazy val storageConf: Map[String, String] = {
conf
.iterator()
.asScala
.filter(_.getKey.startsWith("storage_"))
.filter(_.getKey.startsWith(StorageMeta.STORAGE_PREFIX))
.map(entry => entry.getKey -> entry.getValue)
.toMap
}
Expand All @@ -62,7 +62,7 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
}

override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = {
val conf = HadoopConfReader(context.getConfiguration).writeConfiguration
val conf = HadoopConfReader(context.getConfiguration).storageConf
Write
.newBuilder()
.setCommon(Write.Common.newBuilder().setFormat(formatName).build())
Expand Down Expand Up @@ -92,8 +92,8 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects {
nativeConf: ju.Map[String, String]): OutputWriter = {

val storage = HadoopConfReader(context.getConfiguration)
val database = storage.writeConfiguration(StorageMeta.DB)
val tableName = storage.writeConfiguration(StorageMeta.TABLE)
val database = storage.storageConf(StorageMeta.DB)
val tableName = storage.storageConf(StorageMeta.TABLE)

val datasourceJniWrapper = new CHDatasourceJniWrapper(
context.getTaskAttemptID.getTaskID.getId.toString,
Expand Down

0 comments on commit 17bb049

Please sign in to comment.