From 528ee2459621e1a235a00d4b181a1adb316838ae Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 18 Jan 2024 12:10:14 +0100 Subject: [PATCH 1/2] Spark 3.4: Support renaming views --- .../sql/catalyst/analysis/ResolveViews.scala | 15 ++ .../plans/logical/views/ResolvedV2View.scala | 31 ++++ .../v2/ExtendedDataSourceV2Strategy.scala | 12 ++ .../datasources/v2/RenameV2ViewExec.scala | 45 +++++ .../iceberg/spark/extensions/TestViews.java | 154 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 14 +- 6 files changed, 269 insertions(+), 2 deletions(-) create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a978b94f49ac..2afbdb4411f7 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -53,6 +54,11 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(createViewRelation(parts, _)) .getOrElse(u) + + case u@UnresolvedTableOrView(CatalogAndIdentifier(catalog, ident), _, _) => + loadView(catalog, ident) + .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) + .getOrElse(u) } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -143,4 +149,13 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def isBuiltinFunction(name: String): Boolean = { spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } + + implicit class ViewHelper(plugin: CatalogPlugin) { + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + } } diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala new file mode 100644 index 000000000000..b9c05ff0061d --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + +case class ResolvedV2View( + catalog: ViewCatalog, + identifier: Identifier) extends LeafNodeWithoutStats { + override def output: Seq[Attribute] = Nil +} diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 3af5ddd131e9..651e1d014fbb 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -22,6 +22,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.iceberg.spark.Spark3Util import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.spark.SparkSessionCatalog +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow @@ -40,14 +41,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce +import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.catalyst.plans.logical.UpdateRows import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.execution.OrderAwareCoalesceExec import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ @@ -117,6 +121,14 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case OrderAwareCoalesce(numPartitions, coalescer, child) => OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil + case RenameTable(ResolvedV2View(oldCatalog: ViewCatalog, oldIdent), newName, isView@true) => + val newIdent = Spark3Util.catalogAndIdentifier(spark, newName.toList.asJava) + if (oldCatalog.name != newIdent.catalog().name()) { + throw new AnalysisException( + s"Cannot move view between catalogs: from=${oldCatalog.name} and to=${newIdent.catalog().name()}") + } + RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil + case _ => Nil } diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala new file mode 100644 index 000000000000..61d362044c3c --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class RenameV2ViewExec( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.renameView(oldIdent, newIdent) + + Seq.empty + } + + + override def simpleString(maxFields: Int): String = { + s"RenameV2View ${oldIdent} to {newIdent}" + } +} diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 153119dd75de..7f9b691f777c 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -19,10 +19,12 @@ package org.apache.iceberg.spark.extensions; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -42,6 +44,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -635,6 +638,157 @@ private Catalog tableCatalog() { return Spark3Util.loadIcebergCatalog(spark, catalogName); } + @Test + public void renameView() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + assertThat(sql("SELECT * FROM %s", renamedView)) + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void renameViewHiddenByTempView() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // renames the TEMP VIEW + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + assertThat(sql("SELECT * FROM %s", renamedView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList())); + + // original view still exists with its name + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, viewName))).isTrue(); + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isFalse(); + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList())); + + // will rename the Iceberg view + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isTrue(); + } + + @Test + public void renameViewToDifferentTargetCatalog() { + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", viewName, renamedView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "Cannot move view between catalogs: from=spark_with_views and to=spark_catalog"); + } + + @Test + public void renameNonExistingView() { + assertThatThrownBy(() -> sql("ALTER VIEW non_existing RENAME TO target")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void renameViewTargetAlreadyExistsAsView() { + String viewName = viewName("renameViewSource"); + String target = viewName("renameViewTarget"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, target)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format("Cannot create view default.%s because it already exists", target)); + } + + @Test + public void renameViewTargetAlreadyExistsAsTable() { + String viewName = viewName("renameViewSource"); + String target = viewName("renameViewTarget"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("CREATE TABLE %s (id INT, data STRING)", target); + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format("Cannot create view default.%s because it already exists", target)); + } + + private String viewName(String viewName) { + return viewName + new Random().nextInt(1000000); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 6cbca2cb9412..ec99c8b097a5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -588,8 +588,18 @@ public boolean dropView(Identifier ident) { @Override public void renameView(Identifier fromIdentifier, Identifier toIdentifier) throws NoSuchViewException, ViewAlreadyExistsException { - throw new UnsupportedOperationException( - "Renaming a view is not supported by catalog: " + catalogName); + if (null != asViewCatalog) { + try { + asViewCatalog.renameView(buildIdentifier(fromIdentifier), buildIdentifier(toIdentifier)); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(fromIdentifier); + } catch (org.apache.iceberg.exceptions.AlreadyExistsException e) { + throw new ViewAlreadyExistsException(toIdentifier); + } + } else { + throw new UnsupportedOperationException( + "Renaming a view is not supported by catalog: " + catalogName); + } } @Override From 19be63e6b58c47db08075cf3b4f7aee97a472673 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 18 Jan 2024 12:18:27 +0100 Subject: [PATCH 2/2] Spark 3.4: Support dropping views --- .../sql/catalyst/analysis/ResolveViews.scala | 8 +- .../analysis/RewriteViewCommands.scala | 65 +++++++++++ .../IcebergSparkSqlExtensionsParser.scala | 3 +- .../plans/logical/views/DropIcebergView.scala | 30 +++++ .../datasources/v2/DropV2ViewExec.scala | 48 ++++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 5 + .../iceberg/spark/extensions/TestViews.java | 105 +++++++++++++++++- .../apache/iceberg/spark/SparkCatalog.java | 7 +- 8 files changed, 260 insertions(+), 11 deletions(-) create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala create mode 100644 spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 2afbdb4411f7..169327042969 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -117,8 +117,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def qualifyFunctionIdentifiers( - plan: LogicalPlan, - catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { case u@UnresolvedFunction(Seq(name), _, _, _, _) => if (!isBuiltinFunction(name)) { u.copy(nameParts = catalogAndNamespace :+ name) @@ -143,11 +143,11 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look } private def isCatalog(name: String): Boolean = { - spark.sessionState.catalogManager.isCatalogRegistered(name) + catalogManager.isCatalogRegistered(name) } private def isBuiltinFunction(name: String): Boolean = { - spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } implicit class ViewHelper(plugin: CatalogPlugin) { diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala new file mode 100644 index 000000000000..2b35db33c0c5 --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.DropView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog + +/** + * ResolveSessionCatalog exits early for some v2 View commands, + * thus they are pre-substituted here and then handled in ResolveViews + */ +case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case DropView(ResolvedView(resolved), ifExists) => + DropIcebergView(resolved, ifExists) + } + + private def isTempView(nameParts: Seq[String]): Boolean = { + catalogManager.v1SessionCatalog.isTempView(nameParts) + } + + private def isViewCatalog(catalog: CatalogPlugin): Boolean = { + catalog.isInstanceOf[ViewCatalog] + } + + object ResolvedView { + def unapply(unresolved: UnresolvedIdentifier): Option[ResolvedIdentifier] = unresolved match { + case UnresolvedIdentifier(nameParts, true) if isTempView(nameParts) => + None + + case UnresolvedIdentifier(CatalogAndIdentifier(catalog, ident), _) if isViewCatalog(catalog) => + Some(ResolvedIdentifier(catalog, ident)) + + case _ => + None + } + } +} diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index d059952c7d24..1f74f2f811c8 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases +import org.apache.spark.sql.catalyst.analysis.RewriteViewCommands import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface @@ -128,7 +129,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - val parsedPlan = delegate.parsePlan(sqlText) + val parsedPlan = RewriteViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) parsedPlan match { case e: ExplainCommand => e.copy(logicalPlan = replaceRowLevelCommands(e.logicalPlan)) diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala new file mode 100644 index 000000000000..275dba6fbf5e --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/DropIcebergView.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.UnaryCommand + +case class DropIcebergView( + child: LogicalPlan, + ifExists: Boolean) extends UnaryCommand { + override protected def withNewChildInternal(newChild: LogicalPlan): DropIcebergView = + copy(child = newChild) +} diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala new file mode 100644 index 000000000000..c35af1486fc7 --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class DropV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + ifExists: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val dropped = catalog.dropView(ident) + if (!dropped && !ifExists) { + throw new NoSuchViewException(ident) + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"DropV2View: ${ident}" + } +} diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 651e1d014fbb..ea228be0b80e 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ResolvedIdentifier import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -48,6 +49,7 @@ import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.catalyst.plans.logical.UpdateRows import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog @@ -129,6 +131,9 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil + case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => + DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case _ => Nil } diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 7f9b691f777c..6725c9497499 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -593,10 +593,7 @@ public void fullFunctionIdentifier() { @Test public void fullFunctionIdentifierNotRewrittenLoadFailure() { String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure"; - String sql = - String.format( - "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value", - catalogName); + String sql = "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value"; // avoid namespace failures sql("USE spark_catalog"); @@ -785,6 +782,106 @@ public void renameViewTargetAlreadyExistsAsTable() { String.format("Cannot create view default.%s because it already exists", target)); } + @Test + public void dropView() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + } + + @Test + public void dropNonExistingView() { + assertThatThrownBy(() -> sql("DROP VIEW non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The view %s.%s cannot be found", NAMESPACE, "non_existing"); + } + + @Test + public void dropViewIfExists() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW IF EXISTS %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + + assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS %s", viewName)); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropGlobalTempView() { + String globalTempView = "globalViewToBeDropped"; + sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s", globalTempView, tableName); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isTrue(); + + sql("DROP VIEW global_temp.%s", globalTempView); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropTempView() { + String tempView = "tempViewToBeDropped"; + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s", tempView, tableName); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isTrue(); + + sql("DROP VIEW %s", tempView); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropV1View() { + String v1View = "v1ViewToBeDropped"; + sql("USE spark_catalog"); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName); + sql("USE %s", catalogName); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isTrue(); + + sql("DROP VIEW spark_catalog.%s.%s", NAMESPACE, v1View); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isFalse(); + } + + private SessionCatalog v1SessionCatalog() { + return spark.sessionState().catalogManager().v1SessionCatalog(); + } + private String viewName(String viewName) { return viewName + new Random().nextInt(1000000); } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index ec99c8b097a5..23a62858f488 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -581,8 +581,11 @@ public View alterView(Identifier ident, ViewChange... changes) @Override public boolean dropView(Identifier ident) { - throw new UnsupportedOperationException( - "Dropping a view is not supported by catalog: " + catalogName); + if (null != asViewCatalog) { + return asViewCatalog.dropView(buildIdentifier(ident)); + } + + return false; } @Override