Skip to content

Commit

Permalink
move view creation logic into CreateV2ViewExec
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 8, 2024
1 parent fb2b1ec commit f128c2a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,10 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.UpCast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
Expand All @@ -42,11 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.execution.command.ViewHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.util.SchemaUtils

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -64,19 +57,13 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
.map(createViewRelation(parts, _))
.getOrElse(u)

case CreateIcebergView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace) =>
convertCreateView(
catalog = asViewCatalog(catalog),
viewInfo = ViewInfo(
ident = ident,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
originalText = originalText),
child = child,
allowExisting = allowExisting,
replace = replace)
case c@CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment,
properties, Some(originalText), query, allowExisting, replace) =>
val identifier = Spark3Util.toV1TableIdentifier(ident)
ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty)
ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier)
c

}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -167,90 +154,4 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
private def isBuiltinFunction(name: String): Boolean = {
catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
}

def asViewCatalog(plugin: CatalogPlugin): ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views")
}

private case class ViewInfo(ident: Identifier,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
properties: Map[String, String],
originalText: Option[String])

/**
* Convert [[CreateView]] or [[AlterViewAs]] to logical plan [[CreateV2View]].
*/
private def convertCreateView(
catalog: ViewCatalog,
viewInfo: ViewInfo,
child: LogicalPlan,
allowExisting: Boolean,
replace: Boolean): LogicalPlan = {
val qe = spark.sessionState.executePlan(child, CommandExecutionMode.SKIP)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

val identifier = Spark3Util.toV1TableIdentifier(viewInfo.ident)

if (viewInfo.userSpecifiedColumns.nonEmpty) {
if (viewInfo.userSpecifiedColumns.length > analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan)
} else if (viewInfo.userSpecifiedColumns.length < analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan)
}
}

ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, child, Seq.empty)
ViewHelper.verifyAutoGeneratedAliasesNotExists(child, false, identifier)

val queryOutput = analyzedPlan.schema.fieldNames
SchemaUtils.checkColumnNameDuplication(queryOutput, SQLConf.get.resolver)

val sql = viewInfo.originalText.getOrElse {
throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError()
}

val viewSchema = aliasPlan(analyzedPlan, viewInfo.userSpecifiedColumns).schema
val columnAliases = viewInfo.userSpecifiedColumns.map(_._1).toArray
val columnComments = viewInfo.userSpecifiedColumns.map(_._2.getOrElse("")).toArray

CreateV2View(
view = viewInfo.ident.asMultipartIdentifier,
sql = sql,
comment = None,
viewSchema = viewSchema,
queryOutput,
columnAliases = columnAliases,
columnComments = columnComments,
properties = viewInfo.properties,
allowExisting = allowExisting,
replace = replace)
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def aliasPlan(
analyzedPlan: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
val projectedPlan = Project(projectList, analyzedPlan)
spark.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,63 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.MetadataBuilder
import org.apache.spark.sql.util.SchemaUtils
import scala.collection.JavaConverters._


case class CreateV2ViewExec(
catalog: ViewCatalog,
ident: Identifier,
sql: String,
currentCatalog: String,
currentNamespace: Array[String],
originalText: String,
query: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])],
comment: Option[String],
viewSchema: StructType,
queryColumnNames: Array[String],
columnAliases: Array[String],
columnComments: Array[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed

val identifier = Spark3Util.toV1TableIdentifier(ident)

if (userSpecifiedColumns.nonEmpty) {
if (userSpecifiedColumns.length > analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError(
identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
} else if (userSpecifiedColumns.length < analyzedPlan.output.length) {
throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError(
identifier, userSpecifiedColumns.map(_._1), analyzedPlan)
}
}

val queryColumnNames = analyzedPlan.schema.fieldNames
SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)

val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema
val columnAliases = userSpecifiedColumns.map(_._1).toArray
val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray

val currentCatalog = session.sessionState.catalogManager.currentCatalog.name
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val createEngineVersion = Some(engineVersion)
val newProperties = properties ++
Expand All @@ -61,7 +91,7 @@ case class CreateV2ViewExec(
// FIXME: replaceView API doesn't exist in Spark 3.5
catalog.createView(
ident,
sql,
originalText,
currentCatalog,
currentNamespace,
viewSchema,
Expand All @@ -74,7 +104,7 @@ case class CreateV2ViewExec(
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
sql,
originalText,
currentCatalog,
currentNamespace,
viewSchema,
Expand All @@ -93,4 +123,25 @@ case class CreateV2ViewExec(
override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}

/**
* If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns,
* else return the analyzed plan directly.
*/
private def aliasPlan(
analyzedPlan: LogicalPlan,
userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = {
if (userSpecifiedColumns.isEmpty) {
analyzedPlan
} else {
val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
case (attr, (colName, None)) => Alias(attr, colName)()
case (attr, (colName, Some(colComment))) =>
val meta = new MetadataBuilder().putString("comment", colComment).build()
Alias(attr, colName)(explicitMetadata = Some(meta))
}
val projectedPlan = Project(projectList, analyzedPlan)
session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -99,12 +99,18 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateV2View(
IcebergViewCatalogAndIdentifier(catalog, ident), sql, comment, viewSchema, queryColumnNames,
columnAliases, columnComments, properties, allowExisting, replace) =>
CreateV2ViewExec(catalog, ident, sql, catalogManager.currentCatalog.name,
catalogManager.currentNamespace, comment, viewSchema, queryColumnNames,
columnAliases, columnComments, properties, allowExisting, replace) :: Nil
case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment,
properties, Some(originalText), query, allowExisting, replace) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
originalText = originalText,
query = query,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace) :: Nil

case _ => Nil
}
Expand All @@ -130,16 +136,4 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
}
}
}

private object IcebergViewCatalogAndIdentifier {
def unapply(identifier: Seq[String]): Option[(ViewCatalog, Identifier)] = {
val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, identifier.asJava)
catalogAndIdentifier.catalog match {
case icebergCatalog: SparkCatalog =>
Some((icebergCatalog, catalogAndIdentifier.identifier))
case _ =>
None
}
}
}
}

0 comments on commit f128c2a

Please sign in to comment.