diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index a978b94f49ac..2afbdb4411f7 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -53,6 +54,11 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(createViewRelation(parts, _)) .getOrElse(u) + + case u@UnresolvedTableOrView(CatalogAndIdentifier(catalog, ident), _, _) => + loadView(catalog, ident) + .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) + .getOrElse(u) } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -143,4 +149,13 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def isBuiltinFunction(name: String): Boolean = { spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } + + implicit class ViewHelper(plugin: CatalogPlugin) { + def asViewCatalog: ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + } } diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala new file mode 100644 index 000000000000..b9c05ff0061d --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/ResolvedV2View.scala @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + +case class ResolvedV2View( + catalog: ViewCatalog, + identifier: Identifier) extends LeafNodeWithoutStats { + override def output: Seq[Attribute] = Nil +} diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 3af5ddd131e9..651e1d014fbb 100644 --- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -22,6 +22,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.iceberg.spark.Spark3Util import org.apache.iceberg.spark.SparkCatalog import org.apache.iceberg.spark.SparkSessionCatalog +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.Strategy import org.apache.spark.sql.catalyst.InternalRow @@ -40,14 +41,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.MergeRows import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce +import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering import org.apache.spark.sql.catalyst.plans.logical.UpdateRows import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta +import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.execution.OrderAwareCoalesceExec import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ @@ -117,6 +121,14 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case OrderAwareCoalesce(numPartitions, coalescer, child) => OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil + case RenameTable(ResolvedV2View(oldCatalog: ViewCatalog, oldIdent), newName, isView@true) => + val newIdent = Spark3Util.catalogAndIdentifier(spark, newName.toList.asJava) + if (oldCatalog.name != newIdent.catalog().name()) { + throw new AnalysisException( + s"Cannot move view between catalogs: from=${oldCatalog.name} and to=${newIdent.catalog().name()}") + } + RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil + case _ => Nil } diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala new file mode 100644 index 000000000000..61d362044c3c --- /dev/null +++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RenameV2ViewExec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog + + +case class RenameV2ViewExec( + catalog: ViewCatalog, + oldIdent: Identifier, + newIdent: Identifier) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.renameView(oldIdent, newIdent) + + Seq.empty + } + + + override def simpleString(maxFields: Int): String = { + s"RenameV2View ${oldIdent} to {newIdent}" + } +} diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 153119dd75de..c59eaa67e71b 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import java.util.Random; import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -635,6 +636,157 @@ private Catalog tableCatalog() { return Spark3Util.loadIcebergCatalog(spark, catalogName); } + @Test + public void renameView() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + assertThat(sql("SELECT * FROM %s", renamedView)) + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void renameViewHiddenByTempView() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // renames the TEMP VIEW + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + assertThat(sql("SELECT * FROM %s", renamedView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList())); + + // original view still exists with its name + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, viewName))).isTrue(); + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isFalse(); + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList())); + + // will rename the Iceberg view + sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); + assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isTrue(); + } + + @Test + public void renameViewToDifferentTargetCatalog() { + String viewName = viewName("originalView"); + String renamedView = viewName("renamedView"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", viewName, renamedView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "Cannot move view between catalogs: from=spark_with_views and to=spark_catalog"); + } + + @Test + public void renameNonExistingView() { + assertThatThrownBy(() -> sql("ALTER VIEW non_existing RENAME TO target")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void renameViewTargetAlreadyExistsAsView() { + String viewName = viewName("renameViewSource"); + String target = viewName("renameViewTarget"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, target)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format("Cannot create view default.%s because it already exists", target)); + } + + @Test + public void renameViewTargetAlreadyExistsAsTable() { + String viewName = viewName("renameViewSource"); + String target = viewName("renameViewTarget"); + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("CREATE TABLE %s (id INT, data STRING)", target); + assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format("Cannot create view default.%s because it already exists", target)); + } + + private String viewName(String viewName) { + return viewName + new Random().nextInt(1000000); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 6cbca2cb9412..ec99c8b097a5 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -588,8 +588,18 @@ public boolean dropView(Identifier ident) { @Override public void renameView(Identifier fromIdentifier, Identifier toIdentifier) throws NoSuchViewException, ViewAlreadyExistsException { - throw new UnsupportedOperationException( - "Renaming a view is not supported by catalog: " + catalogName); + if (null != asViewCatalog) { + try { + asViewCatalog.renameView(buildIdentifier(fromIdentifier), buildIdentifier(toIdentifier)); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(fromIdentifier); + } catch (org.apache.iceberg.exceptions.AlreadyExistsException e) { + throw new ViewAlreadyExistsException(toIdentifier); + } + } else { + throw new UnsupportedOperationException( + "Renaming a view is not supported by catalog: " + catalogName); + } } @Override