From 1f440dd47b7181a9e3ba73b2910968ef1ed1e06c Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Tue, 8 Oct 2024 15:49:39 +0800 Subject: [PATCH] WriteConfiguration --- .../delta/catalog/ClickHouseTableV2Base.scala | 4 +- .../mergetree/DeltaMetaReader.scala | 29 +++++++---- .../datasources/mergetree/StorageMeta.scala | 51 ++++++++++++------- 3 files changed, 52 insertions(+), 32 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 062e969622971..7246b1a47d0c1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -20,7 +20,6 @@ import org.apache.gluten.expression.ConverterUtils.normalizeColName import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.delta.Snapshot -import org.apache.spark.sql.delta.actions.Metadata import org.apache.spark.sql.execution.datasources.clickhouse.utils.MergeTreeDeltaUtil import org.apache.spark.sql.execution.datasources.mergetree.{StorageMeta, TablePropertiesReader} @@ -38,7 +37,8 @@ trait ClickHouseTableV2Base extends TablePropertiesReader { def configuration: Map[String, String] = deltaProperties - def metadata: Metadata = deltaSnapshot.metadata + override lazy val partitionColumns: Seq[String] = + deltaSnapshot.metadata.partitionColumns.map(normalizeColName) lazy val dataBaseName: String = deltaCatalog .map(_.identifier.database.getOrElse(StorageMeta.DEFAULT_CREATE_TABLE_DATABASE)) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala index de322b65dd8ee..07416cc575fd8 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala @@ -16,21 +16,28 @@ */ package org.apache.spark.sql.execution.datasources.mergetree +import org.apache.gluten.expression.ConverterUtils.normalizeColName + import org.apache.spark.sql.delta.actions.Metadata -class DeltaMetaReader( - override val metadata: Metadata, - override val configuration: Map[String, String]) - extends TablePropertiesReader { +import org.apache.hadoop.conf.Configuration + +case class DeltaMetaReader(metadata: Metadata) extends TablePropertiesReader { - def storageDB: String = configuration(StorageMeta.STORAGE_DB) - def storageTable: String = configuration(StorageMeta.STORAGE_TABLE) - def storageSnapshotId: String = configuration(StorageMeta.STORAGE_SNAPSHOT_ID) + def storageDB: String = configuration(StorageMeta.DB) + def storageTable: String = configuration(StorageMeta.TABLE) + def storageSnapshotId: String = configuration(StorageMeta.SNAPSHOT_ID) def storagePath: String = configuration(StorageMeta.STORAGE_PATH) -} -object DeltaMetaReader { - def apply(metadata: Metadata): DeltaMetaReader = { - new DeltaMetaReader(metadata, metadata.configuration) + def updateToHadoopConf(conf: Configuration): Unit = { + conf.set(StorageMeta.DB, storageDB) + conf.set(StorageMeta.TABLE, storageTable) + conf.set(StorageMeta.SNAPSHOT_ID, storageSnapshotId) + writeConfiguration.foreach { case (k, v) => conf.set(k, v) } } + + override lazy val partitionColumns: Seq[String] = + metadata.partitionColumns.map(normalizeColName) + + override lazy val configuration: Map[String, String] = metadata.configuration } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index 6a7ebc3c39d2e..58ed1106acb6b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -30,10 +30,17 @@ object StorageMeta { // Storage properties val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" val DEFAULT_CREATE_TABLE_DATABASE: String = "default" - val STORAGE_DB: String = "storage_db" - val STORAGE_TABLE: String = "storage_table" - val STORAGE_SNAPSHOT_ID: String = "storage_snapshot_id" + val DB: String = "storage_db" + val TABLE: String = "storage_table" + val SNAPSHOT_ID: String = "storage_snapshot_id" val STORAGE_PATH: String = "storage_path" + val POLICY: String = "storage_policy" + val ORDER_BY_KEY: String = "storage_orderByKey" + val LOW_CARD_KEY: String = "storage_lowCardKey" + val MINMAX_INDEX_KEY: String = "storage_minmaxIndexKey" + val BF_INDEX_KEY: String = "storage_bfIndexKey" + val SET_INDEX_KEY: String = "storage_setIndexKey" + val PRIMARY_KEY: String = "storage_primaryKey" val SERIALIZER_HEADER: String = "MergeTree;" def withMoreStorageInfo( @@ -43,9 +50,9 @@ object StorageMeta { database: String, tableName: String): Metadata = { val moreOptions = Seq( - STORAGE_DB -> database, - STORAGE_SNAPSHOT_ID -> snapshotId, - STORAGE_TABLE -> tableName, + DB -> database, + SNAPSHOT_ID -> snapshotId, + TABLE -> tableName, STORAGE_PATH -> deltaPath.toString) withMoreOptions(metadata, moreOptions) } @@ -55,12 +62,15 @@ object StorageMeta { } } -trait TablePropertiesReader { +trait WriteConfiguration { + val writeConfiguration: Map[String, String] +} + +trait TablePropertiesReader extends WriteConfiguration { def configuration: Map[String, String] - /** delta */ - def metadata: Metadata + val partitionColumns: Seq[String] private def getCommaSeparatedColumns(keyName: String): Option[Seq[String]] = { configuration.get(keyName).map { @@ -107,9 +117,6 @@ trait TablePropertiesReader { getCommaSeparatedColumns("setIndexKey") } - lazy val partitionColumns: Seq[String] = - metadata.partitionColumns.map(normalizeColName) - lazy val orderByKeyOption: Option[Seq[String]] = { val orderByKeys = if (bucketOption.exists(_.sortColumnNames.nonEmpty)) { @@ -149,15 +156,21 @@ trait TablePropertiesReader { primaryKeyOption ) Map( - "storage_policy" -> configuration.getOrElse("storage_policy", "default"), - "storage_orderByKey" -> orderByKey0, - "storage_lowCardKey" -> lowCardKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_minmaxIndexKey" -> minmaxIndexKeyOption + StorageMeta.POLICY -> configuration.getOrElse(StorageMeta.POLICY, "default"), + StorageMeta.ORDER_BY_KEY -> orderByKey0, + StorageMeta.LOW_CARD_KEY -> lowCardKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + StorageMeta.MINMAX_INDEX_KEY -> minmaxIndexKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + StorageMeta.BF_INDEX_KEY -> bfIndexKeyOption + .map(MergeTreeDeltaUtil.columnsToStr) + .getOrElse(""), + StorageMeta.SET_INDEX_KEY -> setIndexKeyOption .map(MergeTreeDeltaUtil.columnsToStr) .getOrElse(""), - "storage_bfIndexKey" -> bfIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_setIndexKey" -> setIndexKeyOption.map(MergeTreeDeltaUtil.columnsToStr).getOrElse(""), - "storage_primaryKey" -> primaryKey0 + StorageMeta.PRIMARY_KEY -> primaryKey0 ) } }