diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 4682ef5e0bb0..a65aadceb475 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.UnsetViewProperties import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView @@ -45,6 +46,9 @@ object CheckViews extends (LogicalPlan => Unit) { } } + case AlterViewAs(ResolvedV2View(_, _), _, query) => + SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) + case _ => // OK } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 37e849c71d3a..0b4ba0fad502 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.ViewUtil.IcebergViewHelper import org.apache.spark.sql.catalyst.analysis.ViewUtil.isViewCatalog import org.apache.spark.sql.catalyst.analysis.ViewUtil.loadView import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -67,6 +68,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi loadView(resolved.catalog, resolved.identifier) .map(_ => ResolvedV2View(resolved.catalog.asViewCatalog, resolved.identifier)) .getOrElse(u) + + case a@AlterViewAs(ResolvedV2View(_, ident), _, query) => + val q = CTESubstitution.apply(query) + verifyTemporaryObjectsDontExist(ident, q) + a.copy(query = q) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 2fcfdf2294ff..0fbc7686786b 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.expressions.GenericInternalRow import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.catalyst.plans.logical.AddPartitionField +import org.apache.spark.sql.catalyst.plans.logical.AlterViewAs import org.apache.spark.sql.catalyst.plans.logical.Call import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceBranch import org.apache.spark.sql.catalyst.plans.logical.CreateOrReplaceTag @@ -135,6 +136,20 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case UnsetViewProperties(ResolvedV2View(catalog, ident), propertyKeys, _) => AlterV2ViewExec(catalog, ident, propertyKeys.map(ViewChange.removeProperty)) :: Nil + case AlterViewAs(ResolvedV2View(catalog, ident), queryText, query) => + CreateV2ViewExec( + catalog = catalog, + ident = ident, + queryText = queryText, + columnAliases = Seq.empty, + columnComments = Seq.empty, + queryColumnNames = query.schema.fieldNames, + viewSchema = query.schema, + comment = None, + properties = Map.empty, + allowExisting = false, + replace = true) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 48c3d0ba6c6c..13ae560d4451 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.IcebergBuild; @@ -1206,10 +1205,286 @@ public void alterViewUnsetUnknownProperty() { () -> sql("ALTER VIEW %s UNSET TBLPROPERTIES IF EXISTS ('unknown-key')", viewName)); } + @Test + public void alterView() throws NoSuchTableException { + insertRows(6); + String viewName = "alteredView"; + + sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); + + assertThat(sql("SELECT id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + sql("ALTER VIEW %s AS SELECT id FROM %s WHERE id > 3", viewName, tableName); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(4), row(5), row(6)); + } + + @Test + public void alterNonExistingView() { + // this is a bit tricky because the view can't be found in the view catalog and + // resolution bubbles up to check whether it's a V1 view but then fails, indicating that the + // catalog doesn't support views + assertThatThrownBy( + () -> sql("ALTER VIEW non_existing AS SELECT id FROM %s WHERE id > 3", tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "The feature is not supported: Catalog `%s` does not support views", catalogName)); + } + + @Test + public void alterViewWithInvalidSQL() { + String viewName = "alteredViewWithInvalidSQL"; + + sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); + assertThatThrownBy(() -> sql("ALTER VIEW %s AS invalid SQL", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Syntax error"); + } + + @Test + public void alterViewReferencingTempView() throws NoSuchTableException { + insertRows(10); + String tempView = "temporaryViewBeingReferencedInView"; + String viewReferencingTempView = "alteredViewReferencingTemporaryView"; + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewReferencingTempView, tableName); + + // altering a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("ALTER VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); + } + + @Test + public void alterViewReferencingGlobalTempView() throws NoSuchTableException { + insertRows(10); + String globalTempView = "globalTemporaryViewBeingReferencedInView"; + String viewReferencingTempView = "alteredViewReferencingGlobalTemporaryView"; + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", + globalTempView, tableName); + + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewReferencingTempView, tableName); + + // altering a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "ALTER VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); + } + + @Test + public void alterViewWithUpdatedQueryColumns() throws NoSuchTableException { + insertRows(6); + String viewName = viewName("viewWithQueryColumns"); + + sql( + "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.properties()).containsEntry("queryColumnNames", "id,data"); + + assertThat(view.schema().columns()).hasSize(2); + Types.NestedField first = view.schema().columns().get(0); + assertThat(first.name()).isEqualTo("new_id"); + assertThat(first.doc()).isEqualTo("ID"); + + Types.NestedField second = view.schema().columns().get(1); + assertThat(second.name()).isEqualTo("new_data"); + assertThat(second.doc()).isEqualTo("DATA"); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + // query is updated to only contain id + sql("ALTER VIEW %s AS SELECT id from %s WHERE id > 3", viewName, tableName); + + // querying new_id / new_data shouldn't work anymore + assertThatThrownBy(() -> sql("SELECT new_id FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `new_id` cannot be resolved"); + + assertThatThrownBy(() -> sql("SELECT new_data FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `new_data` cannot be resolved"); + + assertThat(sql("SELECT id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(4), row(5), row(6)); + + view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.properties()).containsEntry("queryColumnNames", "id"); + + assertThat(view.schema().columns()).hasSize(1); + Types.NestedField field = view.schema().columns().get(0); + assertThat(field.name()).isEqualTo("id"); + assertThat(field.doc()).isNull(); + } + + @Test + public void alterViewWithDuplicateQueryColumnNames() { + String viewName = viewName("viewWithDuplicateQueryColumnNames"); + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + // having duplicate columns in the underlying query should fail + assertThatThrownBy( + () -> sql("ALTER VIEW %s AS SELECT id, id FROM %s WHERE id <= 3", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `id` already exists"); + } + + @Test + public void alterViewWithNonExistingTable() { + String viewName = "alteredViewUsingNonExistingTable"; + + sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); + assertThatThrownBy(() -> sql("ALTER VIEW %s AS SELECT id from non_existing", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void alterViewWithNonExistingQueryColumn() { + String viewName = "alteredViewUsingNonExistingQueryColumn"; + + sql("CREATE VIEW %s AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); + assertThatThrownBy( + () -> sql("ALTER VIEW %s AS SELECT non_existing from %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `non_existing` cannot be resolved"); + } + + /** The purpose of this test is mainly to make sure that altering normal view isn't messed up */ + @Test + public void alterViewHiddenByTempView() throws NoSuchTableException { + insertRows(6); + String viewName = "alteredViewHiddenByTempView"; + String sql = String.format("SELECT id, data FROM %s WHERE id > 3", tableName); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); + sql("CREATE VIEW %s AS %s", viewName, sql); + + ViewCatalog viewCatalog = viewCatalog(); + View view = viewCatalog.loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.sqlFor("spark").sql()).isEqualTo(sql); + + // results from TEMP VIEW are returned + assertThat(sql("SELECT id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + // TEMP VIEW will be updated + sql("ALTER VIEW %s AS SELECT id FROM %s WHERE id >= 3 AND id < 4", viewName, tableName); + + // results from TEMP VIEW are returned + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(3)); + + // the only way to ALTER the Iceberg view would be to rename the TEMP VIEW and then do an ALTER + assertThat(view.sqlFor("spark").sql()).isEqualTo(sql); + } + + @Test + public void alterViewWithCTE() throws NoSuchTableException { + insertRows(10); + String viewName = viewName("simpleViewWithCTE"); + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); + + sql("CREATE VIEW %s AS SELECT * FROM %s", viewName, tableName); + sql("ALTER VIEW %s AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void alterViewWithCTEReferencingTempView() { + String viewName = viewName("viewWithCTEReferencingTempView"); + String tempViewInCTE = viewName("tempViewInCTE"); + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tempViewInCTE); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName); + sql("CREATE VIEW %s AS SELECT * FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewName) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempViewInCTE); + } + + @Test + public void alterViewWithSubqueryExpressionUsingTempView() { + String viewName = viewName("viewWithSubqueryExpression"); + String tempView = viewName("simpleTempView"); + String sql = + String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName); + sql("CREATE VIEW %s AS SELECT * from %s", viewName, tableName); + + assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format("because it references to the temporary object %s", tempView)); + } + + @Test + public void alterViewWithSubqueryExpressionUsingGlobalTempView() { + String viewName = viewName("simpleViewWithSubqueryExpression"); + String globalTempView = viewName("simpleGlobalTempView"); + String sql = + String.format( + "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)", + tableName, globalTempView); + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", + globalTempView, tableName); + sql("CREATE VIEW %s AS SELECT * from %s", viewName, tableName); + + assertThatThrownBy(() -> sql("ALTER VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format( + "because it references to the temporary object global_temp.%s", globalTempView)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { - records.add(new SimpleRecord(i, UUID.randomUUID().toString())); + records.add(new SimpleRecord(i, Integer.toString(i * 2))); } Dataset df = spark.createDataFrame(records, SimpleRecord.class);