From 17bb0495f0a672f7bfcc8f35752a654d29ad12d1 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Wed, 9 Oct 2024 11:22:00 +0800 Subject: [PATCH] WriteConfiguration => StorageConfigProvider --- .../mergetree/DeltaMetaReader.scala | 2 +- .../datasources/mergetree/StorageMeta.scala | 34 +++++++++++-------- .../v1/CHMergeTreeWriterInjects.scala | 14 ++++---- 3 files changed, 27 insertions(+), 23 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala index c4aa5af102e97..35404c98cbaf5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/DeltaMetaReader.scala @@ -32,7 +32,7 @@ case class DeltaMetaReader(metadata: Metadata) extends TablePropertiesReader { conf.set(StorageMeta.DB, storageDB) conf.set(StorageMeta.TABLE, storageTable) conf.set(StorageMeta.SNAPSHOT_ID, storageSnapshotId) - writeConfiguration.foreach { case (k, v) => conf.set(k, v) } + storageConf.foreach { case (k, v) => conf.set(k, v) } } override lazy val partitionColumns: Seq[String] = diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala index 550c72aa05199..d56cad3a92538 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/mergetree/StorageMeta.scala @@ -32,19 +32,22 @@ object StorageMeta { val DEFAULT_PATH_BASED_DATABASE: String = "clickhouse_db" val DEFAULT_CREATE_TABLE_DATABASE: String = "default" val DEFAULT_ORDER_BY_KEY = "tuple()" - val DB: String = "storage_db" - val TABLE: String = "storage_table" - val SNAPSHOT_ID: String = "storage_snapshot_id" - val STORAGE_PATH: String = "storage_path" - val POLICY: String = "storage_policy" - val ORDER_BY_KEY: String = "storage_orderByKey" - val LOW_CARD_KEY: String = "storage_lowCardKey" - val MINMAX_INDEX_KEY: String = "storage_minmaxIndexKey" - val BF_INDEX_KEY: String = "storage_bfIndexKey" - val SET_INDEX_KEY: String = "storage_setIndexKey" - val PRIMARY_KEY: String = "storage_primaryKey" + val STORAGE_PREFIX: String = "storage_" + val DB: String = prefixOf("db") + val TABLE: String = prefixOf("table") + val SNAPSHOT_ID: String = prefixOf("snapshot_id") + val STORAGE_PATH: String = prefixOf("path") + val POLICY: String = prefixOf("policy") + val ORDER_BY_KEY: String = prefixOf("orderByKey") + val LOW_CARD_KEY: String = prefixOf("lowCardKey") + val MINMAX_INDEX_KEY: String = prefixOf("minmaxIndexKey") + val BF_INDEX_KEY: String = prefixOf("bfIndexKey") + val SET_INDEX_KEY: String = prefixOf("setIndexKey") + val PRIMARY_KEY: String = prefixOf("primaryKey") val SERIALIZER_HEADER: String = "MergeTree;" + private def prefixOf(key: String): String = s"$STORAGE_PREFIX.$key" + def withMoreStorageInfo( metadata: Metadata, snapshotId: String, @@ -87,11 +90,12 @@ object StorageMeta { .getOrElse(default) } -trait WriteConfiguration { - val writeConfiguration: Map[String, String] +/** all properties start with 'storage_' */ +trait StorageConfigProvider { + val storageConf: Map[String, String] } -trait TablePropertiesReader extends WriteConfiguration { +trait TablePropertiesReader extends StorageConfigProvider { def configuration: Map[String, String] @@ -175,7 +179,7 @@ trait TablePropertiesReader extends WriteConfiguration { } } - lazy val writeConfiguration: Map[String, String] = { + lazy val storageConf: Map[String, String] = { val (orderByKey0, primaryKey0) = StorageMeta.genOrderByAndPrimaryKeyStr( orderByKeyOption, primaryKeyOption diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index 81961cf352f19..d25758b0f7033 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -26,7 +26,7 @@ import org.apache.gluten.utils.ConfigUtil import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.datasources.{CHDatasourceJniWrapper, OutputWriter} -import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageMeta, WriteConfiguration} +import org.apache.spark.sql.execution.datasources.mergetree.{MetaSerializer, PartSerializer, StorageConfigProvider, StorageMeta} import org.apache.spark.sql.execution.datasources.v1.clickhouse.MergeTreeOutputWriter import org.apache.spark.sql.types.StructType @@ -42,12 +42,12 @@ import scala.collection.JavaConverters._ case class PlanWithSplitInfo(plan: Array[Byte], splitInfo: Array[Byte]) -case class HadoopConfReader(conf: Configuration) extends WriteConfiguration { - lazy val writeConfiguration: Map[String, String] = { +case class HadoopConfReader(conf: Configuration) extends StorageConfigProvider { + lazy val storageConf: Map[String, String] = { conf .iterator() .asScala - .filter(_.getKey.startsWith("storage_")) + .filter(_.getKey.startsWith(StorageMeta.STORAGE_PREFIX)) .map(entry => entry.getKey -> entry.getValue) .toMap } @@ -62,7 +62,7 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { } override def createNativeWrite(outputPath: String, context: TaskAttemptContext): Write = { - val conf = HadoopConfReader(context.getConfiguration).writeConfiguration + val conf = HadoopConfReader(context.getConfiguration).storageConf Write .newBuilder() .setCommon(Write.Common.newBuilder().setFormat(formatName).build()) @@ -92,8 +92,8 @@ class CHMergeTreeWriterInjects extends CHFormatWriterInjects { nativeConf: ju.Map[String, String]): OutputWriter = { val storage = HadoopConfReader(context.getConfiguration) - val database = storage.writeConfiguration(StorageMeta.DB) - val tableName = storage.writeConfiguration(StorageMeta.TABLE) + val database = storage.storageConf(StorageMeta.DB) + val tableName = storage.storageConf(StorageMeta.TABLE) val datasourceJniWrapper = new CHDatasourceJniWrapper( context.getTaskAttemptID.getTaskID.getId.toString,