Skip to content

Commit

Permalink
WriteConfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Oct 9, 2024
1 parent 78b0096 commit 1f440dd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import org.apache.gluten.expression.ConverterUtils.normalizeColName

import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.Metadata
import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil
import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader}

Expand All @@ -38,7 +37,8 @@ trait ClickHouseTableV2Base extends TablePropertiesReader {

def configuration: Map[String, String] = deltaProperties

def metadata: Metadata = deltaSnapshot.metadata
override lazy val partitionColumns: Seq[String] =
deltaSnapshot.metadata.partitionColumns.map(normalizeColName)

lazy val dataBaseName: String = deltaCatalog
.map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,28 @@
*/
package org.apache.spark.sql.execution.datasources.mergetree

import org.apache.gluten.expression.ConverterUtils.normalizeColName

import org.apache.spark.sql.delta.actions.Metadata

class DeltaMetaReader(
override val metadata: Metadata,
override val configuration: Map[String, String])
extends TablePropertiesReader {
import org.apache.hadoop.conf.Configuration

case class DeltaMetaReader(metadata: Metadata) extends TablePropertiesReader {

def storageDB: String = configuration(StorageMeta.STORAGE_DB)
def storageTable: String = configuration(StorageMeta.STORAGE_TABLE)
def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID)
def storageDB: String = configuration(StorageMeta.DB)
def storageTable: String = configuration(StorageMeta.TABLE)
def storageSnapshotId: String = configuration(StorageMeta.SNAPSHOT_ID)
def storagePath: String = configuration(StorageMeta.STORAGE_PATH)
}

object DeltaMetaReader {
def apply(metadata: Metadata): DeltaMetaReader = {
new DeltaMetaReader(metadata, metadata.configuration)
def updateToHadoopConf(conf: Configuration): Unit = {
conf.set(StorageMeta.DB, storageDB)
conf.set(StorageMeta.TABLE, storageTable)
conf.set(StorageMeta.SNAPSHOT_ID, storageSnapshotId)
writeConfiguration.foreach { case (k, v) => conf.set(k, v) }
}

override lazy val partitionColumns: Seq[String] =
metadata.partitionColumns.map(normalizeColName)

override lazy val configuration: Map[String, String] = metadata.configuration
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,17 @@ object StorageMeta {
// Storage properties
val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db"
val DEFAULT_CREATE_TABLE_DATABASE: String = "default"
val STORAGE_DB: String = "storage_db"
val STORAGE_TABLE: String = "storage_table"
val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id"
val DB: String = "storage_db"
val TABLE: String = "storage_table"
val SNAPSHOT_ID: String = "storage_snapshot_id"
val STORAGE_PATH: String = "storage_path"
val POLICY: String = "storage_policy"
val ORDER_BY_KEY: String = "storage_orderByKey"
val LOW_CARD_KEY: String = "storage_lowCardKey"
val MINMAX_INDEX_KEY: String = "storage_minmaxIndexKey"
val BF_INDEX_KEY: String = "storage_bfIndexKey"
val SET_INDEX_KEY: String = "storage_setIndexKey"
val PRIMARY_KEY: String = "storage_primaryKey"
val SERIALIZER_HEADER: String = "MergeTree;"

def withMoreStorageInfo(
Expand All @@ -43,9 +50,9 @@ object StorageMeta {
database: String,
tableName: String): Metadata = {
val moreOptions = Seq(
STORAGE_DB -> database,
STORAGE_SNAPSHOT_ID -> snapshotId,
STORAGE_TABLE -> tableName,
DB -> database,
SNAPSHOT_ID -> snapshotId,
TABLE -> tableName,
STORAGE_PATH -> deltaPath.toString)
withMoreOptions(metadata, moreOptions)
}
Expand All @@ -55,12 +62,15 @@ object StorageMeta {
}
}

trait TablePropertiesReader {
trait WriteConfiguration {
val writeConfiguration: Map[String, String]
}

trait TablePropertiesReader extends WriteConfiguration {

def configuration: Map[String, String]

/** delta */
def metadata: Metadata
val partitionColumns: Seq[String]

private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = {
configuration.get(keyName).map {
Expand Down Expand Up @@ -107,9 +117,6 @@ trait TablePropertiesReader {
getCommaSeparatedColumns("setIndexKey")
}

lazy val partitionColumns: Seq[String] =
metadata.partitionColumns.map(normalizeColName)

lazy val orderByKeyOption: Option[Seq[String]] = {
val orderByKeys =
if (bucketOption.exists(_.sortColumnNames.nonEmpty)) {
Expand Down Expand Up @@ -149,15 +156,21 @@ trait TablePropertiesReader {
primaryKeyOption
)
Map(
"storage_policy" -> configuration.getOrElse("storage_policy", "default"),
"storage_orderByKey" -> orderByKey0,
"storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_minmaxIndexKey" -> minmaxIndexKeyOption
StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default"),
StorageMeta.ORDER_BY_KEY -> orderByKey0,
StorageMeta.LOW_CARD_KEY -> lowCardKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
StorageMeta.MINMAX_INDEX_KEY -> minmaxIndexKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
StorageMeta.BF_INDEX_KEY -> bfIndexKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
StorageMeta.SET_INDEX_KEY -> setIndexKeyOption
.map(MergeTreeDeltaUtil.columnsToStr)
.getOrElse(""),
"storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""),
"storage_primaryKey" -> primaryKey0
StorageMeta.PRIMARY_KEY -> primaryKey0
)
}
}

0 comments on commit 1f440dd

Please sign in to comment.