diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 123dc163dd3a..2d1645b95ee9 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -35,7 +35,7 @@ object CheckViews extends (LogicalPlan => Unit) { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, queryColumnNames, _, _, _, _, _) => - verifyAmountOfColumns(ident, columnAliases, query) + verifyColumnCount(ident, columnAliases, query) verifyTemporaryObjectsDontExist(ident, query) SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) @@ -43,7 +43,7 @@ object CheckViews extends (LogicalPlan => Unit) { } } - private def verifyAmountOfColumns(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { + private def verifyColumnCount(ident: Identifier, columns: Seq[String], query: LogicalPlan): Unit = { if (columns.nonEmpty) { if (columns.length > query.output.length) { throw new AnalysisException( diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index ce20e68767e2..49a6f3df2884 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -62,9 +62,8 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, - rewritten) - if query.resolved && !rewritten => + case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) + if query.resolved && !c.rewritten => val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) @@ -77,12 +76,14 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { analyzedPlan } else { - val projectList = analyzedPlan.output.zipWithIndex.map { case (attr, pos) => + val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) => + val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType) + if (columnComments.apply(pos).isDefined) { val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() - Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta)) } else { - Alias(attr, columnAliases.apply(pos))() + Alias(column, columnAliases.apply(pos))() } } Project(projectList, analyzedPlan) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala index 2d0fcc69114e..892e1eb857e4 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -40,8 +40,7 @@ case class CreateV2ViewExec( comment: Option[String], properties: Map[String, String], allowExisting: Boolean, - replace: Boolean, - query: LogicalPlan) extends LeafV2CommandExec { + replace: Boolean) extends LeafV2CommandExec { override lazy val output: Seq[Attribute] = Nil diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index e6148dcca4a3..0505fe4e3030 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -117,7 +117,6 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi columnAliases = columnAliases, columnComments = columnComments, queryColumnNames = queryColumnNames, - query = query, viewSchema = query.schema, comment = comment, properties = properties, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 65d2c39d437b..d212cc15aaa3 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -350,21 +350,39 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { } @Test - public void createViewReferencingTempView() throws NoSuchTableException { + public void readFromViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; + String sql = String.format("SELECT id FROM %s", tempView); + + ViewCatalog viewCatalog = viewCatalog(); sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); - // creating a view that references a TEMP VIEW shouldn't be possible - assertThatThrownBy( - () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", tempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("Cannot create the persistent object") - .hasMessageContaining(viewReferencingTempView) - .hasMessageContaining("of the type VIEW because it references to the temporary object") - .hasMessageContaining(tempView); + .hasMessageContaining("The table or view") + .hasMessageContaining(tempView) + .hasMessageContaining("cannot be found"); } @Test @@ -416,26 +434,41 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa } @Test - public void createViewReferencingGlobalTempView() throws NoSuchTableException { + public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); String globalTempView = "globalTempViewBeingReferenced"; String viewReferencingTempView = "viewReferencingGlobalTempView"; + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + sql( "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", globalTempView, tableName); - // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible - assertThatThrownBy( - () -> - sql( - "CREATE VIEW %s AS SELECT id FROM global_temp.%s", - viewReferencingTempView, globalTempView)) + // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("Cannot create the persistent object") - .hasMessageContaining(viewReferencingTempView) - .hasMessageContaining("of the type VIEW because it references to the temporary object") - .hasMessageContaining(globalTempView); + .hasMessageContaining("The table or view") + .hasMessageContaining(globalTempView) + .hasMessageContaining("cannot be found"); } @Test @@ -870,16 +903,65 @@ public void createViewIfNotExists() { () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); } + @Test + public void createViewWithInvalidSQL() { + assertThatThrownBy(() -> sql("CREATE VIEW simpleViewWithInvalidSQL AS invalid SQL")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Syntax error"); + } + + @Test + public void createViewReferencingTempView() throws NoSuchTableException { + insertRows(10); + String tempView = "temporaryViewBeingReferencedInAnotherView"; + String viewReferencingTempView = "viewReferencingTemporaryView"; + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); + + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); + } + + @Test + public void createViewReferencingGlobalTempView() throws NoSuchTableException { + insertRows(10); + String globalTempView = "globalTemporaryViewBeingReferenced"; + String viewReferencingTempView = "viewReferencingGlobalTemporaryView"; + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", + globalTempView, tableName); + + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); + } + @Test public void createViewUsingNonExistingTable() { - assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", "viewName", "non_existing")) + assertThatThrownBy( + () -> sql("CREATE VIEW viewWithNonExistingTable AS SELECT id FROM non_existing")) .isInstanceOf(AnalysisException.class) .hasMessageContaining("The table or view `non_existing` cannot be found"); } @Test - public void createViewColumnMismatch() { - String viewName = "viewWithMismatchedColumns"; + public void createViewWithMismatchedColumnCounts() { + String viewName = "viewWithMismatchedColumnCounts"; assertThatThrownBy( () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) @@ -917,7 +999,7 @@ public void createViewWithColumnAliases() throws NoSuchTableException { sql("DROP VIEW %s", viewName); sql( - "CREATE VIEW %s (new_id, new_data) AS SELECT id, data FROM %s WHERE id <= 3", + "CREATE VIEW %s (new_data, new_id) AS SELECT data, id FROM %s WHERE id <= 3", viewName, tableName); assertThat(sql("SELECT new_id FROM %s", viewName)) @@ -927,31 +1009,79 @@ public void createViewWithColumnAliases() throws NoSuchTableException { @Test public void createViewWithDuplicateQueryColumnNames() { - String viewName = "viewWithDuplicateQueryColumnNames"; - assertThatThrownBy( () -> sql( - "CREATE VIEW %s (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", - viewName, tableName)) + "CREATE VIEW viewWithDuplicateQueryColumnNames (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", + tableName)) .isInstanceOf(AnalysisException.class) .hasMessageContaining("The column `id` already exists"); } @Test - public void createViewWithNonExistingQueryColumn() { - String viewName = "viewWithNonExistingQueryColumn"; + public void createViewWithCTE() throws NoSuchTableException { + insertRows(10); + String viewName = "simpleViewWithCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); + + sql("CREATE VIEW %s AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + @Test + public void createViewWithNonExistingQueryColumn() { assertThatThrownBy( () -> sql( - "CREATE VIEW %s AS SELECT non_existing FROM %s WHERE id <= 3", - viewName, tableName)) + "CREATE VIEW viewWithNonExistingQueryColumn AS SELECT non_existing FROM %s WHERE id <= 3", + tableName)) .isInstanceOf(AnalysisException.class) .hasMessageContaining( "A column or function parameter with name `non_existing` cannot be resolved"); } + @Test + public void createViewWithSubqueryExpressionUsingTempView() { + String viewName = "viewWithSubqueryExpression"; + String tempView = "simpleTempView"; + String sql = + String.format("SELECT * FROM %s WHERE id = (SELECT id FROM %s)", tableName, tempView); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", tempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format("because it references to the temporary object %s", tempView)); + } + + @Test + public void createViewWithSubqueryExpressionUsingGlobalTempView() { + String viewName = "simpleViewWithSubqueryExpression"; + String globalTempView = "simpleGlobalTempView"; + String sql = + String.format( + "SELECT * FROM %s WHERE id = (SELECT id FROM global_temp.%s)", + tableName, globalTempView); + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id from %s WHERE id = 5", + globalTempView, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create the persistent object %s", viewName)) + .hasMessageContaining( + String.format( + "because it references to the temporary object global_temp.%s", globalTempView)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 62d300b36a1d..d7a13562c408 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -112,11 +112,6 @@ public Map properties() { propsBuilder.put("provider", "iceberg"); propsBuilder.put("location", icebergView.location()); - if (icebergView.properties().containsKey(QUERY_COLUMN_NAMES)) { - String queryColumnNames = - icebergView.properties().get(QUERY_COLUMN_NAMES).replace("[", "").replace("]", ""); - propsBuilder.put(QUERY_COLUMN_NAMES, queryColumnNames); - } if (icebergView instanceof BaseView) { ViewOperations ops = ((BaseView) icebergView).operations();