diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 696dff96bfb0..2766cfbd3510 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -25,13 +25,10 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView -import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View -import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -42,11 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.execution.command.ViewHelper -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.MetadataBuilder -import org.apache.spark.sql.util.SchemaUtils case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -64,19 +57,13 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(createViewRelation(parts, _)) .getOrElse(u) - case CreateIcebergView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, - properties, originalText, child, allowExisting, replace) => - convertCreateView( - catalog = asViewCatalog(catalog), - viewInfo = ViewInfo( - ident = ident, - userSpecifiedColumns = userSpecifiedColumns, - comment = comment, - properties = properties, - originalText = originalText), - child = child, - allowExisting = allowExisting, - replace = replace) + case c@CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment, + properties, Some(originalText), query, allowExisting, replace) => + val identifier = Spark3Util.toV1TableIdentifier(ident) + ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) + ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier) + c + } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -167,90 +154,4 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def isBuiltinFunction(name: String): Boolean = { catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } - - def asViewCatalog(plugin: CatalogPlugin): ViewCatalog = plugin match { - case viewCatalog: ViewCatalog => - viewCatalog - case _ => - throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") - } - - private case class ViewInfo(ident: Identifier, - userSpecifiedColumns: Seq[(String, Option[String])], - comment: Option[String], - properties: Map[String, String], - originalText: Option[String]) - - /** - * Convert [[CreateView]] or [[AlterViewAs]] to logical plan [[CreateV2View]]. - */ - private def convertCreateView( - catalog: ViewCatalog, - viewInfo: ViewInfo, - child: LogicalPlan, - allowExisting: Boolean, - replace: Boolean): LogicalPlan = { - val qe = spark.sessionState.executePlan(child, CommandExecutionMode.SKIP) - qe.assertAnalyzed() - val analyzedPlan = qe.analyzed - - val identifier = Spark3Util.toV1TableIdentifier(viewInfo.ident) - - if (viewInfo.userSpecifiedColumns.nonEmpty) { - if (viewInfo.userSpecifiedColumns.length > analyzedPlan.output.length) { - throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( - identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan) - } else if (viewInfo.userSpecifiedColumns.length < analyzedPlan.output.length) { - throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( - identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan) - } - } - - ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, child, Seq.empty) - ViewHelper.verifyAutoGeneratedAliasesNotExists(child, false, identifier) - - val queryOutput = analyzedPlan.schema.fieldNames - SchemaUtils.checkColumnNameDuplication(queryOutput, SQLConf.get.resolver) - - val sql = viewInfo.originalText.getOrElse { - throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() - } - - val viewSchema = aliasPlan(analyzedPlan, viewInfo.userSpecifiedColumns).schema - val columnAliases = viewInfo.userSpecifiedColumns.map(_._1).toArray - val columnComments = viewInfo.userSpecifiedColumns.map(_._2.getOrElse("")).toArray - - CreateV2View( - view = viewInfo.ident.asMultipartIdentifier, - sql = sql, - comment = None, - viewSchema = viewSchema, - queryOutput, - columnAliases = columnAliases, - columnComments = columnComments, - properties = viewInfo.properties, - allowExisting = allowExisting, - replace = replace) - } - - /** - * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, - * else return the analyzed plan directly. - */ - private def aliasPlan( - analyzedPlan: LogicalPlan, - userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { - if (userSpecifiedColumns.isEmpty) { - analyzedPlan - } else { - val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { - case (attr, (colName, None)) => Alias(attr, colName)() - case (attr, (colName, Some(colComment))) => - val meta = new MetadataBuilder().putString("comment", colComment).build() - Alias(attr, colName)(explicitMetadata = Some(meta)) - } - val projectedPlan = Project(projectList, analyzedPlan) - spark.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed - } - } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala deleted file mode 100644 index 5e907c1cf8ff..000000000000 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.spark.sql.catalyst.plans.logical.views - -import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.catalyst.plans.logical.LeafCommand -import org.apache.spark.sql.types.StructType - -case class CreateV2View( - view: Seq[String], - sql: String, - comment: Option[String], - viewSchema: StructType, - queryColumnNames: Array[String], - columnAliases: Array[String], - columnComments: Array[String], - properties: Map[String, String], - allowExisting: Boolean, - replace: Boolean) extends LeafCommand { - - import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ - - override lazy val output: Seq[Attribute] = Nil - - override def simpleString(maxFields: Int): String = { - s"CreateV2View: ${view.quoted}" - } -} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index c4dbf3c9d536..4db76601385a 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -19,26 +19,30 @@ package org.apache.spark.sql.execution.datasources.v2 +import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.ViewCatalog -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils import scala.collection.JavaConverters._ case class CreateV2ViewExec( catalog: ViewCatalog, ident: Identifier, - sql: String, - currentCatalog: String, - currentNamespace: Array[String], + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], comment: Option[String], - viewSchema: StructType, - queryColumnNames: Array[String], - columnAliases: Array[String], - columnComments: Array[String], properties: Map[String, String], allowExisting: Boolean, replace: Boolean) extends LeafV2CommandExec { @@ -46,6 +50,32 @@ case class CreateV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { + val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + val identifier = Spark3Util.toV1TableIdentifier(ident) + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } + } + + val queryColumnNames = analyzedPlan.schema.fieldNames + SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + + val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema + val columnAliases = userSpecifiedColumns.map(_._1).toArray + val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + + val currentCatalog = session.sessionState.catalogManager.currentCatalog.name + val currentNamespace = session.sessionState.catalogManager.currentNamespace + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION val createEngineVersion = Some(engineVersion) val newProperties = properties ++ @@ -61,7 +91,7 @@ case class CreateV2ViewExec( // FIXME: replaceView API doesn't exist in Spark 3.5 catalog.createView( ident, - sql, + originalText, currentCatalog, currentNamespace, viewSchema, @@ -74,7 +104,7 @@ case class CreateV2ViewExec( // CREATE VIEW [IF NOT EXISTS] catalog.createView( ident, - sql, + originalText, currentCatalog, currentNamespace, viewSchema, @@ -93,4 +123,25 @@ case class CreateV2ViewExec( override def simpleString(maxFields: Int): String = { s"CreateV2ViewExec: ${ident}" } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + val projectedPlan = Project(projectList, analyzedPlan) + session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed + } + } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index aded692b2894..6ae6c931ff78 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering -import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.Identifier @@ -99,12 +99,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil - case CreateV2View( - IcebergViewCatalogAndIdentifier(catalog, ident), sql, comment, viewSchema, queryColumnNames, - columnAliases, columnComments, properties, allowExisting, replace) => - CreateV2ViewExec(catalog, ident, sql, catalogManager.currentCatalog.name, - catalogManager.currentNamespace, comment, viewSchema, queryColumnNames, - columnAliases, columnComments, properties, allowExisting, replace) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment, + properties, Some(originalText), query, allowExisting, replace) => + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + originalText = originalText, + query = query, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil case _ => Nil } @@ -130,16 +136,4 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } } } - - private object IcebergViewCatalogAndIdentifier { - def unapply(identifier: Seq[String]): Option[(ViewCatalog, Identifier)] = { - val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, identifier.asJava) - catalogAndIdentifier.catalog match { - case icebergCatalog: SparkCatalog => - Some((icebergCatalog, catalogAndIdentifier.identifier)) - case _ => - None - } - } - } }