From ffe9fbae1375ead446893a29a4f6283f11d5dcb5 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 24 Jan 2024 18:05:15 +0100 Subject: [PATCH] WIP: aliasing --- .../sql/catalyst/analysis/CheckViews.scala | 8 +++- .../sql/catalyst/analysis/ResolveViews.scala | 28 +++++++++++++ .../analysis/RewriteViewCommands.scala | 13 ++++-- .../logical/views/CreateIcebergView.scala | 4 +- .../datasources/v2/CreateV2ViewExec.scala | 42 +++---------------- .../v2/ExtendedDataSourceV2Strategy.scala | 4 +- 6 files changed, 54 insertions(+), 45 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index e2bfe3a637a4..123dc163dd3a 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.plans.logical.View import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SchemaUtils object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { - case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, _, _, _, _) => + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, + queryColumnNames, _, _, _, _, _) => verifyAmountOfColumns(ident, columnAliases, query) verifyTemporaryObjectsDontExist(ident, query) + SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) case _ => // OK } @@ -98,4 +102,4 @@ object CheckViews extends (LogicalPlan => Unit) { collectTempViews(child) } -} \ No newline at end of file +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a4ae3dc99def..ce20e68767e2 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.types.MetadataBuilder case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -59,6 +61,32 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) + + case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, + rewritten) + if query.resolved && !rewritten => + val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) + val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) + c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) + } + + private def aliasPlan( + analyzedPlan: LogicalPlan, + columnAliases: Seq[String], + columnComments: Seq[Option[String]]): LogicalPlan = { + if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) => + if (columnComments.apply(pos).isDefined) { + val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() + Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + } else { + Alias(attr, columnAliases.apply(pos))() + } + } + Project(projectList, analyzedPlan) + } } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 826501157052..7b4883494d96 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -45,10 +45,15 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, Some(queryText), query, allowExisting, replace) => - val columnAliases = userSpecifiedColumns.map(_._1) - val columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)) - CreateIcebergView(resolved, queryText, query, columnAliases, columnComments, - comment, properties, allowExisting, replace) + CreateIcebergView(child = resolved, + queryText = queryText, + query = query, + columnAliases = userSpecifiedColumns.map(_._1), + columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala index 8ef96586d355..9366d5efe163 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -28,10 +28,12 @@ case class CreateIcebergView( query: LogicalPlan, columnAliases: Seq[String], columnComments: Seq[Option[String]], + queryColumnNames: Seq[String] = Seq.empty, comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean) extends BinaryCommand { + replace: Boolean, + rewritten: Boolean = false) extends BinaryCommand { override def left: LogicalPlan = child override def right: LogicalPlan = query diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 6efd7c0e2efc..2d0fcc69114e 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -19,21 +19,13 @@ package org.apache.spark.sql.execution.datasources.v2 -import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException -import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog -import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.CommandExecutionMode -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.MetadataBuilder import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.util.SchemaUtils import scala.collection.JavaConverters._ @@ -44,19 +36,16 @@ case class CreateV2ViewExec( viewSchema: StructType, columnAliases: Seq[String], columnComments: Seq[Option[String]], + queryColumnNames: Seq[String], comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean) extends LeafV2CommandExec { + replace: Boolean, + query: LogicalPlan) extends LeafV2CommandExec { override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { -// val analyzedPlan = session.sessionState.executePlan(query, CommandExecutionMode.SKIP).analyzed -// - val queryColumnNames = viewSchema.fieldNames - SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) - val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null val currentNamespace = session.sessionState.catalogManager.currentNamespace @@ -79,7 +68,7 @@ case class CreateV2ViewExec( currentCatalog, currentNamespace, viewSchema, - queryColumnNames, + queryColumnNames.toArray, columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) @@ -92,7 +81,7 @@ case class CreateV2ViewExec( currentCatalog, currentNamespace, viewSchema, - queryColumnNames, + queryColumnNames.toArray, columnAliases.toArray, columnComments.map(c => c.orNull).toArray, newProperties.asJava) @@ -107,25 +96,4 @@ case class CreateV2ViewExec( override def simpleString(maxFields: Int): String = { s"CreateV2ViewExec: ${ident}" } - - /** - * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, - * else return the analyzed plan directly. - */ - private def aliasPlan( - analyzedPlan: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { - if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - val projectedPlan = Project(projectList, analyzedPlan) - session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed - } - } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 8951d8208494..e6148dcca4a3 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -109,13 +109,15 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query, - columnAliases, columnComments, comment, properties, allowExisting, replace) => + columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) => CreateV2ViewExec( catalog = viewCatalog, ident = ident, queryText = queryText, columnAliases = columnAliases, columnComments = columnComments, + queryColumnNames = queryColumnNames, + query = query, viewSchema = query.schema, comment = comment, properties = properties,