Skip to content

Commit

Permalink
Spark: Support creating views via SQL (apache#9423)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored and adnanhemani committed Jan 30, 2024
1 parent 796b951 commit f4a1409
Show file tree
Hide file tree
Showing 10 changed files with 609 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
Expand All @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => CheckViews)

// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.SchemaUtils

object CheckViews extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
_, _, _, _, _, _) =>
verifyColumnCount(ident, columnAliases, query)
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)

case _ => // OK
}
}

private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = {
if (columns.nonEmpty) {
if (columns.length > query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
} else if (columns.length < query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.MetadataBuilder

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -59,6 +61,30 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _)
if query.resolved && !c.rewritten =>
val aliased = aliasColumns(query, columnAliases, columnComments)
c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true)
}

private def aliasColumns(
plan: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]]): LogicalPlan = {
if (columnAliases.isEmpty || columnAliases.length != plan.output.length) {
plan
} else {
val projectList = plan.output.zipWithIndex.map { case (attr, pos) =>
if (columnComments.apply(pos).isDefined) {
val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
} else {
Alias(attr, columnAliases.apply(pos))()
}
}
Project(projectList, plan)
}
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -151,7 +177,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
}


implicit class ViewHelper(plugin: CatalogPlugin) {
implicit class IcebergViewHelper(plugin: CatalogPlugin) {
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,19 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog

Expand All @@ -40,6 +46,20 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)

case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
Some(queryText), query, allowExisting, replace) =>
val q = CTESubstitution.apply(query)
verifyTemporaryObjectsDontExist(resolved.identifier, q)
CreateIcebergView(child = resolved,
queryText = queryText,
query = q,
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand All @@ -62,4 +82,45 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
None
}
}

/**
* Permanent views are not allowed to reference temp objects
*/
private def verifyTemporaryObjectsDontExist(
name: Identifier,
child: LogicalPlan): Unit = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tempViews = collectTemporaryViews(child)
tempViews.foreach { nameParts =>
throw new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "VIEW",
"objName" -> name.name(),
"tempObj" -> "VIEW",
"tempObjName" -> nameParts.quoted))
}

// TODO: check for temp function names
}

/**
* Collect all temporary views and return the identifiers separately
*/
private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
child.flatMap {
case unresolved: UnresolvedRelation if isTempView(unresolved.multipartIdentifier) =>
Seq(unresolved.multipartIdentifier)
case view: View if view.isTempView => Seq(view.desc.identifier.nameParts)
case plan => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
case _ => Seq.empty
})
}.distinct
}

collectTempViews(child)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class CreateIcebergView(
child: LogicalPlan,
queryText: String,
query: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String] = Seq.empty,
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
rewritten: Boolean = false) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._


case class CreateV2ViewExec(
catalog: ViewCatalog,
ident: Identifier,
queryText: String,
viewSchema: StructType,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) +
(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)

if (replace) {
// CREATE OR REPLACE VIEW
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
// FIXME: replaceView API doesn't exist in Spark 3.5
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -107,6 +108,21 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query,
columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
queryText = queryText,
columnAliases = columnAliases,
columnComments = columnComments,
queryColumnNames = queryColumnNames,
viewSchema = query.schema,
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace) :: Nil

case _ => Nil
}

Expand Down
Loading

0 comments on commit f4a1409

Please sign in to comment.