Skip to content

Commit

Permalink
Spark: Support altering views
Browse files Browse the repository at this point in the history
This adds support for `ALTER VIEW <viewName> AS <query>` as defined in https://spark.apache.org/docs/latest/sql-ref-syntax-ddl-alter-view.html:
  • Loading branch information
nastra committed Jan 29, 2024
1 parent a14db5c commit 6dfa2dd
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.UnsetViewProperties
import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView
Expand All @@ -45,6 +46,9 @@ object CheckViews extends (LogicalPlan => Unit) {
}
}

case AlterViewAs(ResolvedV2View(_, _), _, query) =>
SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver)

case _ => // OK
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.ViewUtil.IcebergViewHelper
import org.apache.spark.sql.catalyst.analysis.ViewUtil.isViewCatalog
import org.apache.spark.sql.catalyst.analysis.ViewUtil.loadView
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs
import org.apache.spark.sql.catalyst.plans.logical.CreateView
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -67,6 +68,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi
loadView(resolved.catalog, resolved.identifier)
.map(_ => ResolvedV2View(resolved.catalog.asViewCatalog, resolved.identifier))
.getOrElse(u)

case a@AlterViewAs(ResolvedV2View(_, ident), _, query) =>
val q = CTESubstitution.apply(query)
verifyTemporaryObjectsDontExist(ident, q)
a.copy(query = q)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField
import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch
import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag
Expand Down Expand Up @@ -135,6 +136,20 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi
case UnsetViewProperties(ResolvedV2View(catalog, ident), propertyKeys, _) =>
AlterV2ViewExec(catalog, ident, propertyKeys.map(ViewChange.removeProperty)) :: Nil

case AlterViewAs(ResolvedV2View(catalog, ident), queryText, query) =>
CreateV2ViewExec(
catalog = catalog,
ident = ident,
queryText = queryText,
columnAliases = Seq.empty,
columnComments = Seq.empty,
queryColumnNames = query.schema.fieldNames,
viewSchema = query.schema,
comment = None,
properties = Map.empty,
allowExisting = false,
replace = true) :: Nil

case _ => Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.iceberg.IcebergBuild;
Expand Down Expand Up @@ -1206,10 +1205,286 @@ public void alterViewUnsetUnknownProperty() {
() -> sql("ALTER VIEW %s UNSET TBLPROPERTIES IF EXISTS ('unknown-key')", viewName));
}

@Test
public void alterView() throws NoSuchTableException {
insertRows(6);
String viewName = "alteredView";

sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName);

assertThat(sql("SELECT id FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(1), row(2), row(3));

sql("ALTER VIEW %s AS SELECT id FROM %s WHERE id > 3", viewName, tableName);

assertThat(sql("SELECT * FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(4), row(5), row(6));
}

@Test
public void alterNonExistingView() {
// this is a bit tricky because the view can't be found in the view catalog and
// resolution bubbles up to check whether it's a V1 view but then fails, indicating that the
// catalog doesn't support views
assertThatThrownBy(
() -> sql("ALTER VIEW non_existing AS SELECT id FROM %s WHERE id > 3", tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
String.format(
"The feature is not supported: Catalog `%s` does not support views", catalogName));
}

@Test
public void alterViewWithInvalidSQL() {
String viewName = "alteredViewWithInvalidSQL";

sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName);
assertThatThrownBy(() -> sql("ALTER VIEW %s AS invalid SQL", viewName, tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Syntax error");
}

@Test
public void alterViewReferencingTempView() throws NoSuchTableException {
insertRows(10);
String tempView = "temporaryViewBeingReferencedInView";
String viewReferencingTempView = "alteredViewReferencingTemporaryView";

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName);
sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewReferencingTempView, tableName);

// altering a view that references a TEMP VIEW shouldn't be possible
assertThatThrownBy(
() -> sql("ALTER VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot create the persistent object")
.hasMessageContaining(viewReferencingTempView)
.hasMessageContaining("of the type VIEW because it references to the temporary object")
.hasMessageContaining(tempView);
}

@Test
public void alterViewReferencingGlobalTempView() throws NoSuchTableException {
insertRows(10);
String globalTempView = "globalTemporaryViewBeingReferencedInView";
String viewReferencingTempView = "alteredViewReferencingGlobalTemporaryView";

sql(
"CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5",
globalTempView, tableName);

sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewReferencingTempView, tableName);

// altering a view that references a GLOBAL TEMP VIEW shouldn't be possible
assertThatThrownBy(
() ->
sql(
"ALTER VIEW %s AS SELECT id FROM global_temp.%s",
viewReferencingTempView, globalTempView))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot create the persistent object")
.hasMessageContaining(viewReferencingTempView)
.hasMessageContaining("of the type VIEW because it references to the temporary object")
.hasMessageContaining(globalTempView);
}

@Test
public void alterViewWithUpdatedQueryColumns() throws NoSuchTableException {
insertRows(6);
String viewName = viewName("viewWithQueryColumns");

sql(
"CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3",
viewName, tableName);

View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
assertThat(view.properties()).containsEntry("queryColumnNames", "id,data");

assertThat(view.schema().columns()).hasSize(2);
Types.NestedField first = view.schema().columns().get(0);
assertThat(first.name()).isEqualTo("new_id");
assertThat(first.doc()).isEqualTo("ID");

Types.NestedField second = view.schema().columns().get(1);
assertThat(second.name()).isEqualTo("new_data");
assertThat(second.doc()).isEqualTo("DATA");

assertThat(sql("SELECT new_id FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(1), row(2), row(3));

// query is updated to only contain id
sql("ALTER VIEW %s AS SELECT id from %s WHERE id > 3", viewName, tableName);

// querying new_id / new_data shouldn't work anymore
assertThatThrownBy(() -> sql("SELECT new_id FROM %s", viewName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"A column or function parameter with name `new_id` cannot be resolved");

assertThatThrownBy(() -> sql("SELECT new_data FROM %s", viewName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"A column or function parameter with name `new_data` cannot be resolved");

assertThat(sql("SELECT id FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(4), row(5), row(6));

view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName));
assertThat(view.properties()).containsEntry("queryColumnNames", "id");

assertThat(view.schema().columns()).hasSize(1);
Types.NestedField field = view.schema().columns().get(0);
assertThat(field.name()).isEqualTo("id");
assertThat(field.doc()).isNull();
}

@Test
public void alterViewWithDuplicateQueryColumnNames() {
String viewName = viewName("viewWithDuplicateQueryColumnNames");
sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName);

// having duplicate columns in the underlying query should fail
assertThatThrownBy(
() -> sql("ALTER VIEW %s AS SELECT id, id FROM %s WHERE id <= 3", viewName, tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The column `id` already exists");
}

@Test
public void alterViewWithNonExistingTable() {
String viewName = "alteredViewUsingNonExistingTable";

sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName);
assertThatThrownBy(() -> sql("ALTER VIEW %s AS SELECT id from non_existing", viewName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The table or view `non_existing` cannot be found");
}

@Test
public void alterViewWithNonExistingQueryColumn() {
String viewName = "alteredViewUsingNonExistingQueryColumn";

sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName);
assertThatThrownBy(
() -> sql("ALTER VIEW %s AS SELECT non_existing from %s", viewName, tableName))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(
"A column or function parameter with name `non_existing` cannot be resolved");
}

/** The purpose of this test is mainly to make sure that altering normal view isn't messed up */
@Test
public void alterViewHiddenByTempView() throws NoSuchTableException {
insertRows(6);
String viewName = "alteredViewHiddenByTempView";
String sql = String.format("SELECT id, data FROM %s WHERE id > 3", tableName);

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName);
sql("CREATE VIEW %s AS %s", viewName, sql);

ViewCatalog viewCatalog = viewCatalog();
View view = viewCatalog.loadView(TableIdentifier.of(NAMESPACE, viewName));
assertThat(view.sqlFor("spark").sql()).isEqualTo(sql);

// results from TEMP VIEW are returned
assertThat(sql("SELECT id FROM %s", viewName))
.hasSize(3)
.containsExactlyInAnyOrder(row(1), row(2), row(3));

// TEMP VIEW will be updated
sql("ALTER VIEW %s AS SELECT id FROM %s WHERE id >= 3 AND id < 4", viewName, tableName);

// results from TEMP VIEW are returned
assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(3));

// the only way to ALTER the Iceberg view would be to rename the TEMP VIEW and then do an ALTER
assertThat(view.sqlFor("spark").sql()).isEqualTo(sql);
}

@Test
public void alterViewWithCTE() throws NoSuchTableException {
insertRows(10);
String viewName = viewName("simpleViewWithCTE");
String sql =
String.format(
"WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+ "SELECT max, count(1) AS count FROM max_by_data GROUP BY max",
tableName);

sql("CREATE VIEW %s AS SELECT * FROM %s", viewName, tableName);
sql("ALTER VIEW %s AS %s", viewName, sql);

assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L));
}

@Test
public void alterViewWithCTEReferencingTempView() {
String viewName = viewName("viewWithCTEReferencingTempView");
String tempViewInCTE = viewName("tempViewInCTE");
String sql =
String.format(
"WITH max_by_data AS (SELECT max(id) as max FROM %s) "
+ "SELECT max, count(1) AS count FROM max_by_data GROUP BY max",
tempViewInCTE);

sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName);
sql("CREATE VIEW %s AS SELECT * FROM %s", viewName, tableName);

assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Cannot create the persistent object")
.hasMessageContaining(viewName)
.hasMessageContaining("of the type VIEW because it references to the temporary object")
.hasMessageContaining(tempViewInCTE);
}

@Test
public void alterViewWithSubqueryExpressionUsingTempView() {
String viewName = viewName("viewWithSubqueryExpression");
String tempView = viewName("simpleTempView");
String sql =
String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView);

sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName);
sql("CREATE VIEW %s AS SELECT * from %s", viewName, tableName);

assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("Cannot create the persistent object %s", viewName))
.hasMessageContaining(
String.format("because it references to the temporary object %s", tempView));
}

@Test
public void alterViewWithSubqueryExpressionUsingGlobalTempView() {
String viewName = viewName("simpleViewWithSubqueryExpression");
String globalTempView = viewName("simpleGlobalTempView");
String sql =
String.format(
"SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)",
tableName, globalTempView);

sql(
"CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5",
globalTempView, tableName);
sql("CREATE VIEW %s AS SELECT * from %s", viewName, tableName);

assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining(String.format("Cannot create the persistent object %s", viewName))
.hasMessageContaining(
String.format(
"because it references to the temporary object global_temp.%s", globalTempView));
}

private void insertRows(int numRows) throws NoSuchTableException {
List<SimpleRecord> records = Lists.newArrayListWithCapacity(numRows);
for (int i = 1; i <= numRows; i++) {
records.add(new SimpleRecord(i, UUID.randomUUID().toString()));
records.add(new SimpleRecord(i, Integer.toString(i * 2)));
}

Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
Expand Down

0 comments on commit 6dfa2dd

Please sign in to comment.