Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Support creating views via SQL #9423

Merged
merged 7 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iceberg.spark.extensions

import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.analysis.CheckViews
import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion
import org.apache.spark.sql.catalyst.analysis.ResolveProcedures
import org.apache.spark.sql.catalyst.analysis.ResolveViews
Expand All @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) {
extensions.injectResolutionRule { spark => ResolveProcedures(spark) }
extensions.injectResolutionRule { spark => ResolveViews(spark) }
extensions.injectResolutionRule { _ => ProcedureArgumentCoercion }
extensions.injectCheckRule(_ => CheckViews)

// optimizer extensions
extensions.injectOptimizerRule { _ => ReplaceStaticInvoke }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.View
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.SchemaUtils

object CheckViews extends (LogicalPlan => Unit) {

override def apply(plan: LogicalPlan): Unit = {
plan foreach {
case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _,
queryColumnNames, _, _, _, _, _) =>
verifyAmountOfColumns(ident, columnAliases, query)
nastra marked this conversation as resolved.
Show resolved Hide resolved
verifyTemporaryObjectsDontExist(ident, query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this has to be done in RewriteViewCommands because by the time the check rules run, the analyzer would have substituted temporary views into the logical query plan already. Since we're already rewriting unresolved relations in the plan, we can just keep track of the temporary view names at the same time and throw if there were any.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think you implementation here looks great. I think we just need to run it earlier before temp views might be substituted into the plan in place of UnresolvedRelation.

SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver)
nastra marked this conversation as resolved.
Show resolved Hide resolved

case _ => // OK
}
}

private def verifyAmountOfColumns(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = {
nastra marked this conversation as resolved.
Show resolved Hide resolved
if (columns.nonEmpty) {
if (columns.length > query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
} else if (columns.length < query.output.length) {
throw new AnalysisException(
errorClass = "CREATE_VIEW_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
messageParameters = Map(
"viewName" -> ident.toString,
"viewColumns" -> columns.mkString(", "),
"dataColumns" -> query.output.map(c => c.name).mkString(", ")))
}
}
}

/**
* Permanent views are not allowed to reference temp objects
*/
private def verifyTemporaryObjectsDontExist(
name: Identifier,
child: LogicalPlan): Unit = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._

val tempViews = collectTemporaryViews(child)
tempViews.foreach { nameParts =>
throw new AnalysisException(
errorClass = "INVALID_TEMP_OBJ_REFERENCE",
messageParameters = Map(
"obj" -> "VIEW",
"objName" -> name.name(),
"tempObj" -> "VIEW",
"tempObjName" -> nameParts.quoted))
}

// TODO: check for temp function names
}


/**
* Collect all temporary views and return the identifiers separately
*/
private def collectTemporaryViews(child: LogicalPlan): Seq[Seq[String]] = {
def collectTempViews(child: LogicalPlan): Seq[Seq[String]] = {
child.flatMap {
case view: View if view.isTempView => Seq(view.desc.identifier.nameParts)
case plan => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra, this is what makes me wonder whether we support subqueries properly in the code that rewrites UnresolvedRelation in views (qualifyTableIdentifiers). We may need to support UnresolvedRelation within SubqueryExpression like this does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, this doesn't need to be fixed for view resolution in this PR. The relevant part for this PR is that we catch temporary views that are nested in subquery expressions.

Here's an example of a subquery expression in a view:

CREATE VIEW subset_of_table AS
SELECT * FROM table WHERE id = (SELECT id FROM temporary_view);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test that makes sure using a temp view in a subquery expression fails. I'll also follow-up separately and will look into supporting subqueries properly for views

case _ => Seq.empty
})
}.distinct
}

collectTempViews(child)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
Expand All @@ -38,6 +39,7 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.View
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.MetadataBuilder

case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

Expand All @@ -59,6 +61,32 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)

case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _,
rewritten)
nastra marked this conversation as resolved.
Show resolved Hide resolved
if query.resolved && !rewritten =>
val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier)
Copy link
Contributor

@rdblue rdblue Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nastra, this can't be done here. It needs to be done before any of Spark's rules run. Otherwise, a temporary view may already have been substituted into the plan and could allow the checks to pass even though the view is invalid.

This rule is only responsible for applying the column aliases and comments. Rewriting the identifiers should be done in RewriteViewCommands.

val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments)
c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true)
}

private def aliasPlan(
nastra marked this conversation as resolved.
Show resolved Hide resolved
analyzedPlan: LogicalPlan,
nastra marked this conversation as resolved.
Show resolved Hide resolved
columnAliases: Seq[String],
columnComments: Seq[Option[String]]): LogicalPlan = {
if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) {
analyzedPlan
nastra marked this conversation as resolved.
Show resolved Hide resolved
} else {
val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) =>
if (columnComments.apply(pos).isDefined) {
val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build()
Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta))
} else {
Alias(attr, columnAliases.apply(pos))()
}
}
Project(projectList, analyzedPlan)
}
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -151,7 +179,7 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
}


implicit class ViewHelper(plugin: CatalogPlugin) {
implicit class IcebergViewHelper(plugin: CatalogPlugin) {
nastra marked this conversation as resolved.
Show resolved Hide resolved
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
Expand All @@ -40,6 +42,18 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(ResolvedView(resolved), ifExists) =>
DropIcebergView(resolved, ifExists)

case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties,
Some(queryText), query, allowExisting, replace) =>
CreateIcebergView(child = resolved,
queryText = queryText,
query = query,
columnAliases = userSpecifiedColumns.map(_._1),
columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)),
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

case class CreateIcebergView(
rdblue marked this conversation as resolved.
Show resolved Hide resolved
child: LogicalPlan,
queryText: String,
query: LogicalPlan,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String] = Seq.empty,
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
rewritten: Boolean = false) extends BinaryCommand {
override def left: LogicalPlan = child

override def right: LogicalPlan = query

override protected def withNewChildrenInternal(
newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan =
copy(child = newLeft, query = newRight)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._


case class CreateV2ViewExec(
nastra marked this conversation as resolved.
Show resolved Hide resolved
catalog: ViewCatalog,
ident: Identifier,
queryText: String,
viewSchema: StructType,
columnAliases: Seq[String],
columnComments: Seq[Option[String]],
queryColumnNames: Seq[String],
comment: Option[String],
properties: Map[String, String],
allowExisting: Boolean,
replace: Boolean,
query: LogicalPlan) extends LeafV2CommandExec {
nastra marked this conversation as resolved.
Show resolved Hide resolved

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
val currentCatalogName = session.sessionState.catalogManager.currentCatalog.name
val currentCatalog = if (!catalog.name().equals(currentCatalogName)) currentCatalogName else null
val currentNamespace = session.sessionState.catalogManager.currentNamespace

val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION
val newProperties = properties ++
comment.map(ViewCatalog.PROP_COMMENT -> _) +
(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> engineVersion,
ViewCatalog.PROP_ENGINE_VERSION -> engineVersion)

if (replace) {
// CREATE OR REPLACE VIEW
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}
// FIXME: replaceView API doesn't exist in Spark 3.5
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
}

Nil
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -107,6 +108,22 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) =>
DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil

case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), queryText, query,
columnAliases, columnComments, queryColumnNames, comment, properties, allowExisting, replace, _) =>
CreateV2ViewExec(
catalog = viewCatalog,
ident = ident,
queryText = queryText,
columnAliases = columnAliases,
columnComments = columnComments,
queryColumnNames = queryColumnNames,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd also be fine with not tracking this and passing an empty array through. That's not correct for Spark, but it would work for us since Iceberg doesn't store these.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically we don't need to track it in this PR but we'll eventually need it when we show the properties of views and such. I'd probably keep it here but let me know if you'd like me to remove it here and introduce it in an upcoming PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable to include it. Let's just leave it as-is since you even have a test for it.

query = query,
viewSchema = query.schema,
comment = comment,
properties = properties,
allowExisting = allowExisting,
replace = replace) :: Nil

case _ => Nil
}

Expand Down
Loading