From f348f8b05e40bf0908120f6e1458bc2a0d171e99 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Fri, 26 Jan 2024 08:05:04 +0100 Subject: [PATCH] review feedback --- .../sql/catalyst/analysis/CheckViews.scala | 4 +- .../sql/catalyst/analysis/ResolveViews.scala | 25 +++---- .../analysis/RewriteViewCommands.scala | 5 +- .../iceberg/spark/extensions/TestViews.java | 75 ++++++++++++++++++- .../apache/iceberg/spark/SparkCatalog.java | 2 +- .../iceberg/spark/source/SparkView.java | 2 +- 6 files changed, 89 insertions(+), 24 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala index 95f54ccaf724..4a1736764d0d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckViews.scala @@ -32,9 +32,9 @@ object CheckViews extends (LogicalPlan => Unit) { override def apply(plan: LogicalPlan): Unit = { plan foreach { case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, query, columnAliases, _, - queryColumnNames, _, _, _, _, _) => + _, _, _, _, _, _) => verifyColumnCount(ident, columnAliases, query) - SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + SchemaUtils.checkColumnNameDuplication(query.schema.fieldNames, SQLConf.get.resolver) case _ => // OK } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 49a6f3df2884..5616f6f70be6 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -62,31 +62,28 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look .map(_ => ResolvedV2View(catalog.asViewCatalog, ident)) .getOrElse(u) - case c@CreateIcebergView(ResolvedIdentifier(_, ident), _, query, columnAliases, columnComments, _, _, _, _, _, _) + case c@CreateIcebergView(ResolvedIdentifier(_, _), _, query, columnAliases, columnComments, _, _, _, _, _, _) if query.resolved && !c.rewritten => - val rewritten = rewriteIdentifiers(query, ident.asMultipartIdentifier) - val aliasedPlan = aliasPlan(rewritten, columnAliases, columnComments) - c.copy(query = aliasedPlan, queryColumnNames = query.schema.fieldNames, rewritten = true) + val aliased = aliasColumns(query, columnAliases, columnComments) + c.copy(query = aliased, queryColumnNames = query.schema.fieldNames, rewritten = true) } - private def aliasPlan( - analyzedPlan: LogicalPlan, + private def aliasColumns( + plan: LogicalPlan, columnAliases: Seq[String], columnComments: Seq[Option[String]]): LogicalPlan = { - if (columnAliases.isEmpty || columnAliases.length != analyzedPlan.output.length) { - analyzedPlan + if (columnAliases.isEmpty || columnAliases.length != plan.output.length) { + plan } else { - val projectList = analyzedPlan.output.zipWithIndex.map { case (_, pos) => - val column = GetColumnByOrdinal(pos, analyzedPlan.schema.fields.apply(pos).dataType) - + val projectList = plan.output.zipWithIndex.map { case (attr, pos) => if (columnComments.apply(pos).isDefined) { val meta = new MetadataBuilder().putString("comment", columnComments.apply(pos).get).build() - Alias(column, columnAliases.apply(pos))(explicitMetadata = Some(meta)) + Alias(attr, columnAliases.apply(pos))(explicitMetadata = Some(meta)) } else { - Alias(column, columnAliases.apply(pos))() + Alias(attr, columnAliases.apply(pos))() } } - Project(projectList, analyzedPlan) + Project(projectList, plan) } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 884f6c9f774f..066ba59394d7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -49,10 +49,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi case CreateView(ResolvedView(resolved), userSpecifiedColumns, comment, properties, Some(queryText), query, allowExisting, replace) => - verifyTemporaryObjectsDontExist(resolved.identifier, query) + val q = CTESubstitution.apply(query) + verifyTemporaryObjectsDontExist(resolved.identifier, q) CreateIcebergView(child = resolved, queryText = queryText, - query = query, + query = q, columnAliases = userSpecifiedColumns.map(_._1), columnComments = userSpecifiedColumns.map(_._2.orElse(Option.empty)), comment = comment, diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index d212cc15aaa3..bf6509afee77 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -40,6 +40,7 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.source.SimpleRecord; import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.View; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -989,8 +990,17 @@ public void createViewWithColumnAliases() throws NoSuchTableException { "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", viewName, tableName); - assertThat(viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)).properties()) - .containsEntry("queryColumnNames", "id, data"); + View view = viewCatalog().loadView(TableIdentifier.of(NAMESPACE, viewName)); + assertThat(view.properties()).containsEntry("queryColumnNames", "id,data"); + + assertThat(view.schema().columns()).hasSize(2); + Types.NestedField first = view.schema().columns().get(0); + assertThat(first.name()).isEqualTo("new_id"); + assertThat(first.doc()).isEqualTo("ID"); + + Types.NestedField second = view.schema().columns().get(1); + assertThat(second.name()).isEqualTo("new_data"); + assertThat(second.doc()).isEqualTo("DATA"); assertThat(sql("SELECT new_id FROM %s", viewName)) .hasSize(3) @@ -1008,14 +1018,32 @@ public void createViewWithColumnAliases() throws NoSuchTableException { } @Test - public void createViewWithDuplicateQueryColumnNames() { + public void createViewWithDuplicateColumnNames() { assertThatThrownBy( () -> sql( - "CREATE VIEW viewWithDuplicateQueryColumnNames (new_id , new_data) AS SELECT id, id FROM %s WHERE id <= 3", + "CREATE VIEW viewWithDuplicateColumnNames (new_id, new_id) AS SELECT id, id FROM %s WHERE id <= 3", tableName)) .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The column `new_id` already exists"); + } + + @Test + public void createViewWithDuplicateQueryColumnNames() throws NoSuchTableException { + insertRows(3); + String viewName = "viewWithDuplicateQueryColumnNames"; + String sql = String.format("SELECT id, id FROM %s WHERE id <= 3", tableName); + + // not specifying column aliases in the view should fail + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) .hasMessageContaining("The column `id` already exists"); + + sql("CREATE VIEW %s (id_one, id_two) AS %s", viewName, sql); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1, 1), row(2, 2), row(3, 3)); } @Test @@ -1033,6 +1061,45 @@ public void createViewWithCTE() throws NoSuchTableException { assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); } + @Test + public void createViewWithConflictingNamesForCTEAndTempView() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithConflictingNamesForCTEAndTempView"; + String cteName = "cteName"; + String sql = + String.format( + "WITH %s AS (SELECT max(id) as max FROM %s) " + + "(SELECT max, count(1) AS count FROM %s GROUP BY max)", + cteName, tableName, cteName); + + // create a CTE and a TEMP VIEW with the same name + sql("CREATE TEMPORARY VIEW %s AS SELECT * from %s", cteName, tableName); + sql("CREATE VIEW %s AS %s", viewName, sql); + + // CTE should take precedence over the TEMP VIEW when data is read + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void createViewWithCTEReferencingTempView() { + String viewName = "viewWithCTEReferencingTempView"; + String tempViewInCTE = "tempViewInCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tempViewInCTE); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE ID <= 5", tempViewInCTE, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS %s", viewName, sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewName) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempViewInCTE); + } + @Test public void createViewWithNonExistingQueryColumn() { assertThatThrownBy( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index db8829261472..37e7387d696d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -572,7 +572,7 @@ public View createView( if (null != asViewCatalog) { Schema icebergSchema = SparkSchemaUtil.convert(schema); - StringJoiner joiner = new StringJoiner(", "); + StringJoiner joiner = new StringJoiner(","); Arrays.stream(queryColumnNames).forEach(joiner::add); try { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index d7a13562c408..5391d75476ce 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -88,7 +88,7 @@ public StructType schema() { @Override public String[] queryColumnNames() { return icebergView.properties().containsKey(QUERY_COLUMN_NAMES) - ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(", ") + ? icebergView.properties().get(QUERY_COLUMN_NAMES).split(",") : new String[0]; }