From fb2b1ecf13adb08ea9843f5ac1d76d1c1bd2e836 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 21 Dec 2023 16:00:01 +0100 Subject: [PATCH] Spark: Support creating views via SQL --- .../analysis/HijackViewCommands.scala | 8 + .../sql/catalyst/analysis/ResolveViews.scala | 110 ++++++++++++ .../logical/views/CreateIcebergView.scala | 41 +++++ .../plans/logical/views/CreateV2View.scala | 45 +++++ .../datasources/v2/CreateV2ViewExec.scala | 96 +++++++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 22 +++ .../iceberg/spark/extensions/TestViews.java | 158 ++++++++++-------- .../apache/iceberg/spark/SparkCatalog.java | 27 +++ .../iceberg/spark/source/SparkView.java | 12 +- 9 files changed, 449 insertions(+), 70 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala index a878f912c65b..10ffdaf2489e 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager @@ -41,6 +43,12 @@ case class HijackViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wit case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) if isViewCatalog(catalogManager.currentCatalog) && !isTempView(nameParts) => DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) + + case CreateView(UnresolvedIdentifier(nameParts, allowTemp), userSpecifiedColumns, + comment, properties, originalText, query, allowExisting, replace) + if isViewCatalog(catalogManager.currentCatalog) => + CreateIcebergView(UnresolvedIdentifier(nameParts, allowTemp), userSpecifiedColumns, + comment, properties, originalText, query, allowExisting, replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 28f051e2b02d..696dff96bfb0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -19,14 +19,19 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.spark.Spark3Util import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -37,6 +42,11 @@ import org.apache.spark.sql.connector.catalog.LookupCatalog import org.apache.spark.sql.connector.catalog.View import org.apache.spark.sql.connector.catalog.ViewCatalog import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.execution.command.ViewHelper +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { @@ -53,6 +63,20 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look loadView(catalog, ident) .map(createViewRelation(parts, _)) .getOrElse(u) + + case CreateIcebergView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment, + properties, originalText, child, allowExisting, replace) => + convertCreateView( + catalog = asViewCatalog(catalog), + viewInfo = ViewInfo( + ident = ident, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + originalText = originalText), + child = child, + allowExisting = allowExisting, + replace = replace) } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -143,4 +167,90 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def isBuiltinFunction(name: String): Boolean = { catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } + + def asViewCatalog(plugin: CatalogPlugin): ViewCatalog = plugin match { + case viewCatalog: ViewCatalog => + viewCatalog + case _ => + throw QueryCompilationErrors.missingCatalogAbilityError(plugin, "views") + } + + private case class ViewInfo(ident: Identifier, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String]) + + /** + * Convert [[CreateView]] or [[AlterViewAs]] to logical plan [[CreateV2View]]. + */ + private def convertCreateView( + catalog: ViewCatalog, + viewInfo: ViewInfo, + child: LogicalPlan, + allowExisting: Boolean, + replace: Boolean): LogicalPlan = { + val qe = spark.sessionState.executePlan(child, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + val identifier = Spark3Util.toV1TableIdentifier(viewInfo.ident) + + if (viewInfo.userSpecifiedColumns.nonEmpty) { + if (viewInfo.userSpecifiedColumns.length > analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (viewInfo.userSpecifiedColumns.length < analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, viewInfo.userSpecifiedColumns.map(_._1), analyzedPlan) + } + } + + ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, child, Seq.empty) + ViewHelper.verifyAutoGeneratedAliasesNotExists(child, false, identifier) + + val queryOutput = analyzedPlan.schema.fieldNames + SchemaUtils.checkColumnNameDuplication(queryOutput, SQLConf.get.resolver) + + val sql = viewInfo.originalText.getOrElse { + throw QueryCompilationErrors.createPersistedViewFromDatasetAPINotAllowedError() + } + + val viewSchema = aliasPlan(analyzedPlan, viewInfo.userSpecifiedColumns).schema + val columnAliases = viewInfo.userSpecifiedColumns.map(_._1).toArray + val columnComments = viewInfo.userSpecifiedColumns.map(_._2.getOrElse("")).toArray + + CreateV2View( + view = viewInfo.ident.asMultipartIdentifier, + sql = sql, + comment = None, + viewSchema = viewSchema, + queryOutput, + columnAliases = columnAliases, + columnComments = columnComments, + properties = viewInfo.properties, + allowExisting = allowExisting, + replace = replace) + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + val projectedPlan = Project(projectList, analyzedPlan) + spark.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed + } + } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala new file mode 100644 index 000000000000..036ec8ce2b95 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class CreateIcebergView( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean) extends BinaryCommand { + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala new file mode 100644 index 000000000000..5e907c1cf8ff --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateV2View.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LeafCommand +import org.apache.spark.sql.types.StructType + +case class CreateV2View( + view: Seq[String], + sql: String, + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafCommand { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + override lazy val output: Seq[Attribute] = Nil + + override def simpleString(maxFields: Int): String = { + s"CreateV2View: ${view.quoted}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala new file mode 100644 index 000000000000..c4dbf3c9d536 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.types.StructType +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + sql: String, + currentCatalog: String, + currentNamespace: Array[String], + comment: Option[String], + viewSchema: StructType, + queryColumnNames: Array[String], + columnAliases: Array[String], + columnComments: Array[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateV2ViewExec: ${ident}" + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index a28fda80010d..aded692b2894 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -42,7 +42,9 @@ import org.apache.spark.sql.catalyst.plans.logical.OrderAwareCoalesce import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.CreateV2View import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.connector.catalog.CatalogManager import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.connector.catalog.ViewCatalog @@ -51,6 +53,7 @@ import org.apache.spark.sql.execution.SparkPlan import scala.jdk.CollectionConverters._ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy with PredicateHelper { + val catalogManager: CatalogManager = spark.sessionState.catalogManager override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case c @ Call(procedure, args) => @@ -96,6 +99,13 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case CreateV2View( + IcebergViewCatalogAndIdentifier(catalog, ident), sql, comment, viewSchema, queryColumnNames, + columnAliases, columnComments, properties, allowExisting, replace) => + CreateV2ViewExec(catalog, ident, sql, catalogManager.currentCatalog.name, + catalogManager.currentNamespace, comment, viewSchema, queryColumnNames, + columnAliases, columnComments, properties, allowExisting, replace) :: Nil + case _ => Nil } @@ -120,4 +130,16 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi } } } + + private object IcebergViewCatalogAndIdentifier { + def unapply(identifier: Seq[String]): Option[(ViewCatalog, Identifier)] = { + val catalogAndIdentifier = Spark3Util.catalogAndIdentifier(spark, identifier.asJava) + catalogAndIdentifier.catalog match { + case icebergCatalog: SparkCatalog => + Some((icebergCatalog, catalogAndIdentifier.identifier)) + case _ => + None + } + } + } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index c986fbcde76c..d68498085a98 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -138,24 +138,9 @@ public void readFromMultipleViews() throws NoSuchTableException { insertRows(6); String viewName = "firstView"; String secondView = "secondView"; - String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); - String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); - ViewCatalog viewCatalog = viewCatalog(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", viewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(viewSQL)) - .create(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, secondView)) - .withQuery("spark", secondViewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(secondViewSQL)) - .create(); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id > 3", secondView, tableName); assertThat(sql("SELECT * FROM %s", viewName)) .hasSize(3) @@ -349,39 +334,22 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { } @Test - public void readFromViewReferencingTempView() throws NoSuchTableException { + public void createViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; String sql = String.format("SELECT id FROM %s", tempView); - ViewCatalog viewCatalog = viewCatalog(); - sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); - // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", sql) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema(sql)) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM %s", tempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(tempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); } @Test @@ -433,41 +401,26 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa } @Test - public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { + public void createViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); String globalTempView = "globalTempViewBeingReferenced"; String viewReferencingTempView = "viewReferencingGlobalTempView"; - ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); - sql( "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", globalTempView, tableName); - // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(globalTempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); } @Test @@ -733,6 +686,75 @@ private SessionCatalog v1SessionCatalog() { return spark.sessionState().catalogManager().v1SessionCatalog(); } + @Test + public void createViewIfNotExists() { + String viewName = "viewThatAlreadyExists"; + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "Cannot create view %s.%s because it already exists", NAMESPACE, viewName)); + + // using IF NOT EXISTS should work + assertThatNoException() + .isThrownBy( + () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); + } + + @Test + public void createViewUsingNonExistingTable() { + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", "viewName", "non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void createViewColumnMismatch() { + String viewName = "viewWithMismatchedColumns"; + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("not enough data columns") + .hasMessageContaining("View columns: `id`, `data`") + .hasMessageContaining("Data columns: `id`"); + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("too many data columns") + .hasMessageContaining("View columns: `id`") + .hasMessageContaining("Data columns: `id`, `data`"); + } + + @Test + public void createViewWithColumnAliases() throws NoSuchTableException { + insertRows(6); + String viewName = "viewWithColumnAliases"; + + sql( + "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + sql("DROP VIEW %s", viewName); + + sql( + "CREATE VIEW %s (new_id, new_data) AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index 14c3d9a6b47c..2bed1e06bdf4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -568,6 +568,33 @@ public View createView( String[] columnComments, Map properties) throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != asViewCatalog) { + Schema icebergSchema = SparkSchemaUtil.convert(schema); + + try { + Map props = + ImmutableMap.builder() + .putAll(Spark3Util.rebuildCreateProperties(properties)) + .put("queryColumnNames", Arrays.toString(queryColumnNames)) + .build(); + org.apache.iceberg.view.View view = + asViewCatalog + .buildView(buildIdentifier(ident)) + .withDefaultCatalog(currentCatalog) + .withDefaultNamespace(Namespace.of(currentNamespace)) + .withQuery("spark", sql) + .withSchema(icebergSchema) + .withLocation(properties.get("location")) + .withProperties(props) + .create(); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException(currentNamespace); + } catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(ident); + } + } + throw new UnsupportedOperationException( "Creating a view is not supported by catalog: " + catalogName); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 424519623e4d..3a4a92ce390b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -35,8 +35,9 @@ public class SparkView implements org.apache.spark.sql.connector.catalog.View { + private static final String QUERY_COLUMN_NAMES = "queryColumnNames"; private static final Set RESERVED_PROPERTIES = - ImmutableSet.of("provider", "location", FORMAT_VERSION); + ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES); private final View icebergView; private final String catalogName; @@ -86,7 +87,9 @@ public StructType schema() { @Override public String[] queryColumnNames() { - return new String[0]; + return properties().containsKey(QUERY_COLUMN_NAMES) + ? properties().get(QUERY_COLUMN_NAMES).split(", ") + : new String[0]; } @Override @@ -109,6 +112,11 @@ public Map properties() { propsBuilder.put("provider", "iceberg"); propsBuilder.put("location", icebergView.location()); + if (icebergView.properties().containsKey(QUERY_COLUMN_NAMES)) { + String queryColumnNames = + icebergView.properties().get(QUERY_COLUMN_NAMES).replace("[", "").replace("]", ""); + propsBuilder.put(QUERY_COLUMN_NAMES, queryColumnNames); + } if (icebergView instanceof BaseView) { ViewOperations ops = ((BaseView) icebergView).operations();