Skip to content

Commit

Permalink
Spark 3.4: Support renaming views
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 18, 2024
1 parent 2446cee commit 882981d
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
Expand All @@ -53,6 +54,11 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
loadView(catalog, ident)
.map(createViewRelation(parts, _))
.getOrElse(u)

case u@UnresolvedTableOrView(CatalogAndIdentifier(catalog, ident), _, _) =>
loadView(catalog, ident)
.map(_ => ResolvedV2View(catalog.asViewCatalog, ident))
.getOrElse(u)
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -143,4 +149,13 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
private def isBuiltinFunction(name: String): Boolean = {
spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
}

implicit class ViewHelper(plugin: CatalogPlugin) {
def asViewCatalog: ViewCatalog = plugin match {
case viewCatalog: ViewCatalog =>
viewCatalog
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.views

import org.apache.spark.sql.catalyst.analysis.LeafNodeWithoutStats
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog

case class ResolvedV2View(
catalog: ViewCatalog,
identifier: Identifier) extends LeafNodeWithoutStats {
override def output: Seq[Attribute] = Nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.iceberg.spark.Spark3Util
import org.apache.iceberg.spark.SparkCatalog
import org.apache.iceberg.spark.SparkSessionCatalog
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Strategy
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -40,14 +41,17 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.MergeRows
import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode
import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce
import org.apache.spark.sql.catalyst.plans.logical.RenameTable
import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField
import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields
import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering
import org.apache.spark.sql.catalyst.plans.logical.UpdateRows
import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.TableCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog
import org.apache.spark.sql.execution.OrderAwareCoalesceExec
import org.apache.spark.sql.execution.SparkPlan
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -117,6 +121,14 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case OrderAwareCoalesce(numPartitions, coalescer, child) =>
OrderAwareCoalesceExec(numPartitions, coalescer, planLater(child)) :: Nil

case RenameTable(ResolvedV2View(oldCatalog: ViewCatalog, oldIdent), newName, isView@true) =>
val newIdent = Spark3Util.catalogAndIdentifier(spark, newName.toList.asJava)
if (oldCatalog.name != newIdent.catalog().name()) {
throw new AnalysisException(
s"Cannot move view between catalogs: from=${oldCatalog.name} and to=${newIdent.catalog().name()}")
}
RenameV2ViewExec(oldCatalog, oldIdent, newIdent.identifier()) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.catalog.ViewCatalog


case class RenameV2ViewExec(
catalog: ViewCatalog,
oldIdent: Identifier,
newIdent: Identifier) extends LeafV2CommandExec {

override lazy val output: Seq[Attribute] = Nil

override protected def run(): Seq[InternalRow] = {
catalog.renameView(oldIdent, newIdent)

Seq.empty
}


override def simpleString(maxFields: Int): String = {
s"RenameV2View ${oldIdent} to {newIdent}"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -42,6 +44,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -635,6 +638,157 @@ private Catalog tableCatalog() {
return Spark3Util.loadIcebergCatalog(spark, catalogName);
}

@Test
public void renameView() throws NoSuchTableException {
insertRows(10);
String viewName = viewName("originalView");
String renamedView = viewName("renamedView");
String sql = String.format("SELECT id FROM %s", tableName);

ViewCatalog viewCatalog = viewCatalog();

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, viewName))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);

List<Object[]> expected =
IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList());
assertThat(sql("SELECT * FROM %s", renamedView))
.hasSize(10)
.containsExactlyInAnyOrderElementsOf(expected);
}

@Test
public void renameViewHiddenByTempView() throws NoSuchTableException {
insertRows(10);
String viewName = viewName("originalView");
String renamedView = viewName("renamedView");
String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName);

ViewCatalog viewCatalog = viewCatalog();

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName);

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, viewName))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

// renames the TEMP VIEW
sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
assertThat(sql("SELECT * FROM %s", renamedView))
.hasSize(5)
.containsExactlyInAnyOrderElementsOf(
IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()));

// original view still exists with its name
assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, viewName))).isTrue();
assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isFalse();
assertThat(sql("SELECT * FROM %s", viewName))
.hasSize(5)
.containsExactlyInAnyOrderElementsOf(
IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList()));

// will rename the Iceberg view
sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
assertThat(viewCatalog.viewExists(TableIdentifier.of(NAMESPACE, renamedView))).isTrue();
}

@Test
public void renameViewToDifferentTargetCatalog() {
String viewName = viewName("originalView");
String renamedView = viewName("renamedView");
String sql = String.format("SELECT id FROM %s", tableName);

ViewCatalog viewCatalog = viewCatalog();

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, viewName))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO spark_catalog.%s", viewName, renamedView))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"Cannot move view between catalogs: from=spark_with_views and to=spark_catalog");
}

@Test
public void renameNonExistingView() {
assertThatThrownBy(() -> sql("ALTER VIEW non_existing RENAME TO target"))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The table or view `non_existing` cannot be found");
}

@Test
public void renameViewTargetAlreadyExistsAsView() {
String viewName = viewName("renameViewSource");
String target = viewName("renameViewTarget");
String sql = String.format("SELECT id FROM %s", tableName);

ViewCatalog viewCatalog = viewCatalog();

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, viewName))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, target))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
String.format("Cannot create view default.%s because it already exists", target));
}

@Test
public void renameViewTargetAlreadyExistsAsTable() {
String viewName = viewName("renameViewSource");
String target = viewName("renameViewTarget");
String sql = String.format("SELECT id FROM %s", tableName);

ViewCatalog viewCatalog = viewCatalog();

viewCatalog
.buildView(TableIdentifier.of(NAMESPACE, viewName))
.withQuery("spark", sql)
.withDefaultNamespace(NAMESPACE)
.withDefaultCatalog(catalogName)
.withSchema(schema(sql))
.create();

sql("CREATE TABLE %s (id INT, data STRING)", target);
assertThatThrownBy(() -> sql("ALTER VIEW %s RENAME TO %s", viewName, target))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
String.format("Cannot create view default.%s because it already exists", target));
}

private String viewName(String viewName) {
return viewName + new Random().nextInt(1000000);
}

private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
Expand Down

0 comments on commit 882981d

Please sign in to comment.