From c7410ca94634dc2dfd3c37b050609fef316d15f7 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 21 Dec 2023 16:00:01 +0100 Subject: [PATCH] Spark: Support creating views via SQL --- .../IcebergSparkSessionExtensions.scala | 2 + .../analysis/RewriteViewCommands.scala | 7 + .../sql/catalyst/analysis/ViewCheck.scala | 39 +++++ .../logical/views/CreateIcebergView.scala | 41 +++++ .../datasources/v2/CreateV2ViewExec.scala | 147 ++++++++++++++++ .../v2/ExtendedDataSourceV2Strategy.scala | 14 ++ .../iceberg/spark/extensions/TestViews.java | 158 ++++++++++-------- .../apache/iceberg/spark/SparkCatalog.java | 27 +++ .../iceberg/spark/source/SparkView.java | 12 +- 9 files changed, 377 insertions(+), 70 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index ad9df3994fc0..beea0c10f209 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures import org.apache.spark.sql.catalyst.analysis.ResolveViews +import org.apache.spark.sql.catalyst.analysis.ViewCheck import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy @@ -37,6 +38,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { extensions.injectResolutionRule { spark => ResolveProcedures(spark) } extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } + extensions.injectCheckRule(_ => ViewCheck) // optimizer extensions extensions.injectOptimizerRule { _ => ReplaceStaticInvoke } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala index 2b35db33c0c5..651be7409781 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteViewCommands.scala @@ -20,8 +20,10 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.CreateView import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.connector.catalog.CatalogManager @@ -40,6 +42,11 @@ case class RewriteViewCommands(spark: SparkSession) extends Rule[LogicalPlan] wi override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { case DropView(ResolvedView(resolved), ifExists) => DropIcebergView(resolved, ifExists) + + case CreateView(r@ResolvedView(_), userSpecifiedColumns, comment, properties, + originalText, query, allowExisting, replace) => + CreateIcebergView(r, userSpecifiedColumns, comment, properties, originalText, + query, allowExisting, replace) } private def isTempView(nameParts: Seq[String]): Boolean = { diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala new file mode 100644 index 000000000000..a0e3b7a6639e --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ViewCheck.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.execution.command.ViewHelper + +object ViewCheck extends (LogicalPlan => Unit) { + + override def apply(plan: LogicalPlan): Unit = { + plan foreach { + case CreateIcebergView(ResolvedIdentifier(_: ViewCatalog, ident), _, _, _, _, query, _, _) => + val identifier = Spark3Util.toV1TableIdentifier(ident) + ViewHelper.verifyTemporaryObjectsNotExists(false, identifier, query, Seq.empty) + ViewHelper.verifyAutoGeneratedAliasesNotExists(query, false, identifier) + case _ => // OK + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala new file mode 100644 index 000000000000..036ec8ce2b95 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/views/CreateIcebergView.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.views + +import org.apache.spark.sql.catalyst.plans.logical.BinaryCommand +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +case class CreateIcebergView( + child: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + originalText: Option[String], + query: LogicalPlan, + allowExisting: Boolean, + replace: Boolean) extends BinaryCommand { + override def left: LogicalPlan = child + + override def right: LogicalPlan = query + + override protected def withNewChildrenInternal( + newLeft: LogicalPlan, newRight: LogicalPlan): LogicalPlan = + copy(child = newLeft, query = newRight) +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala new file mode 100644 index 000000000000..4db76601385a --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CreateV2ViewExec.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors +import org.apache.spark.sql.execution.CommandExecutionMode +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.MetadataBuilder +import org.apache.spark.sql.util.SchemaUtils +import scala.collection.JavaConverters._ + + +case class CreateV2ViewExec( + catalog: ViewCatalog, + ident: Identifier, + originalText: String, + query: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])], + comment: Option[String], + properties: Map[String, String], + allowExisting: Boolean, + replace: Boolean) extends LeafV2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + val qe = session.sessionState.executePlan(query, CommandExecutionMode.SKIP) + qe.assertAnalyzed() + val analyzedPlan = qe.analyzed + + val identifier = Spark3Util.toV1TableIdentifier(ident) + + if (userSpecifiedColumns.nonEmpty) { + if (userSpecifiedColumns.length > analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewNotEnoughColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } else if (userSpecifiedColumns.length < analyzedPlan.output.length) { + throw QueryCompilationErrors.cannotCreateViewTooManyColumnsError( + identifier, userSpecifiedColumns.map(_._1), analyzedPlan) + } + } + + val queryColumnNames = analyzedPlan.schema.fieldNames + SchemaUtils.checkColumnNameDuplication(queryColumnNames, SQLConf.get.resolver) + + val viewSchema = aliasPlan(analyzedPlan, userSpecifiedColumns).schema + val columnAliases = userSpecifiedColumns.map(_._1).toArray + val columnComments = userSpecifiedColumns.map(_._2.getOrElse("")).toArray + + val currentCatalog = session.sessionState.catalogManager.currentCatalog.name + val currentNamespace = session.sessionState.catalogManager.currentNamespace + + val engineVersion = "Spark " + org.apache.spark.SPARK_VERSION + val createEngineVersion = Some(engineVersion) + val newProperties = properties ++ + comment.map(ViewCatalog.PROP_COMMENT -> _) ++ + createEngineVersion.map(ViewCatalog.PROP_CREATE_ENGINE_VERSION -> _) + + (ViewCatalog.PROP_ENGINE_VERSION -> engineVersion) + + if (replace) { + // CREATE OR REPLACE VIEW + if (catalog.viewExists(ident)) { + catalog.dropView(ident) + } + // FIXME: replaceView API doesn't exist in Spark 3.5 + catalog.createView( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } else { + try { + // CREATE VIEW [IF NOT EXISTS] + catalog.createView( + ident, + originalText, + currentCatalog, + currentNamespace, + viewSchema, + queryColumnNames, + columnAliases, + columnComments, + newProperties.asJava) + } catch { + case _: ViewAlreadyExistsException if allowExisting => // Ignore + } + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"CreateV2ViewExec: ${ident}" + } + + /** + * If `userSpecifiedColumns` is defined, alias the analyzed plan to the user specified columns, + * else return the analyzed plan directly. + */ + private def aliasPlan( + analyzedPlan: LogicalPlan, + userSpecifiedColumns: Seq[(String, Option[String])]): LogicalPlan = { + if (userSpecifiedColumns.isEmpty) { + analyzedPlan + } else { + val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map { + case (attr, (colName, None)) => Alias(attr, colName)() + case (attr, (colName, Some(colComment))) => + val meta = new MetadataBuilder().putString("comment", colComment).build() + Alias(attr, colName)(explicitMetadata = Some(meta)) + } + val projectedPlan = Project(projectList, analyzedPlan) + session.sessionState.executePlan(projectedPlan, CommandExecutionMode.SKIP).analyzed + } + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala index 44157fc3823c..81bb559785c7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDataSourceV2Strategy.scala @@ -44,6 +44,7 @@ import org.apache.spark.sql.catalyst.plans.logical.RenameTable import org.apache.spark.sql.catalyst.plans.logical.ReplacePartitionField import org.apache.spark.sql.catalyst.plans.logical.SetIdentifierFields import org.apache.spark.sql.catalyst.plans.logical.SetWriteDistributionAndOrdering +import org.apache.spark.sql.catalyst.plans.logical.views.CreateIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.plans.logical.views.ResolvedV2View import org.apache.spark.sql.connector.catalog.Identifier @@ -107,6 +108,19 @@ case class ExtendedDataSourceV2Strategy(spark: SparkSession) extends Strategy wi case DropIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), ifExists) => DropV2ViewExec(viewCatalog, ident, ifExists) :: Nil + case CreateIcebergView(ResolvedIdentifier(viewCatalog: ViewCatalog, ident), userSpecifiedColumns, comment, + properties, Some(originalText), query, allowExisting, replace) => + CreateV2ViewExec( + catalog = viewCatalog, + ident = ident, + originalText = originalText, + query = query, + userSpecifiedColumns = userSpecifiedColumns, + comment = comment, + properties = properties, + allowExisting = allowExisting, + replace = replace) :: Nil + case _ => Nil } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 0eb8c96b3cc5..5733a07d9a29 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -139,24 +139,9 @@ public void readFromMultipleViews() throws NoSuchTableException { insertRows(6); String viewName = "firstView"; String secondView = "secondView"; - String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); - String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); - ViewCatalog viewCatalog = viewCatalog(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewName)) - .withQuery("spark", viewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(viewSQL)) - .create(); - - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, secondView)) - .withQuery("spark", secondViewSQL) - .withDefaultNamespace(NAMESPACE) - .withSchema(schema(secondViewSQL)) - .create(); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id <= 3", viewName, tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s WHERE id > 3", secondView, tableName); assertThat(sql("SELECT * FROM %s", viewName)) .hasSize(3) @@ -350,39 +335,22 @@ public void readFromViewReferencingAnotherView() throws NoSuchTableException { } @Test - public void readFromViewReferencingTempView() throws NoSuchTableException { + public void createViewReferencingTempView() throws NoSuchTableException { insertRows(10); String tempView = "tempViewBeingReferencedInAnotherView"; String viewReferencingTempView = "viewReferencingTempView"; String sql = String.format("SELECT id FROM %s", tempView); - ViewCatalog viewCatalog = viewCatalog(); - sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); - // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", sql) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema(sql)) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM %s", tempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewReferencingTempView, tempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(tempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(tempView); } @Test @@ -434,41 +402,26 @@ public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTa } @Test - public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { + public void createViewReferencingGlobalTempView() throws NoSuchTableException { insertRows(10); String globalTempView = "globalTempViewBeingReferenced"; String viewReferencingTempView = "viewReferencingGlobalTempView"; - ViewCatalog viewCatalog = viewCatalog(); - Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); - sql( "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", globalTempView, tableName); - // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, - // but this can't be prevented when using the API directly - viewCatalog - .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) - .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) - .withDefaultNamespace(NAMESPACE) - .withDefaultCatalog(catalogName) - .withSchema(schema) - .create(); - - List expected = - IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); - - assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) - .hasSize(5) - .containsExactlyInAnyOrderElementsOf(expected); - - // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible - assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + // creating a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy( + () -> + sql( + "CREATE VIEW %s AS SELECT id FROM global_temp.%s", + viewReferencingTempView, globalTempView)) .isInstanceOf(AnalysisException.class) - .hasMessageContaining("The table or view") - .hasMessageContaining(globalTempView) - .hasMessageContaining("cannot be found"); + .hasMessageContaining("Cannot create the persistent object") + .hasMessageContaining(viewReferencingTempView) + .hasMessageContaining("of the type VIEW because it references to the temporary object") + .hasMessageContaining(globalTempView); } @Test @@ -886,6 +839,75 @@ private String viewName(String viewName) { return viewName + new Random().nextInt(1000000); } + @Test + public void createViewIfNotExists() { + String viewName = "viewThatAlreadyExists"; + sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName); + + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "Cannot create view %s.%s because it already exists", NAMESPACE, viewName)); + + // using IF NOT EXISTS should work + assertThatNoException() + .isThrownBy( + () -> sql("CREATE VIEW IF NOT EXISTS %s AS SELECT id FROM %s", viewName, tableName)); + } + + @Test + public void createViewUsingNonExistingTable() { + assertThatThrownBy(() -> sql("CREATE VIEW %s AS SELECT id FROM %s", "viewName", "non_existing")) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view `non_existing` cannot be found"); + } + + @Test + public void createViewColumnMismatch() { + String viewName = "viewWithMismatchedColumns"; + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id, data) AS SELECT id FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("not enough data columns") + .hasMessageContaining("View columns: `id`, `data`") + .hasMessageContaining("Data columns: `id`"); + + assertThatThrownBy( + () -> sql("CREATE VIEW %s (id) AS SELECT id, data FROM %s", viewName, tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining(String.format("Cannot create view `%s`.`%s`", NAMESPACE, viewName)) + .hasMessageContaining("too many data columns") + .hasMessageContaining("View columns: `id`") + .hasMessageContaining("Data columns: `id`, `data`"); + } + + @Test + public void createViewWithColumnAliases() throws NoSuchTableException { + insertRows(6); + String viewName = "viewWithColumnAliases"; + + sql( + "CREATE VIEW %s (new_id COMMENT 'ID', new_data COMMENT 'DATA') AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + sql("DROP VIEW %s", viewName); + + sql( + "CREATE VIEW %s (new_id, new_data) AS SELECT id, data FROM %s WHERE id <= 3", + viewName, tableName); + + assertThat(sql("SELECT new_id FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + } + private void insertRows(int numRows) throws NoSuchTableException { List records = Lists.newArrayListWithCapacity(numRows); for (int i = 1; i <= numRows; i++) { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index a62f6bb95aaa..5ab86e265ae1 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -568,6 +568,33 @@ public View createView( String[] columnComments, Map properties) throws ViewAlreadyExistsException, NoSuchNamespaceException { + if (null != asViewCatalog) { + Schema icebergSchema = SparkSchemaUtil.convert(schema); + + try { + Map props = + ImmutableMap.builder() + .putAll(Spark3Util.rebuildCreateProperties(properties)) + .put("queryColumnNames", Arrays.toString(queryColumnNames)) + .build(); + org.apache.iceberg.view.View view = + asViewCatalog + .buildView(buildIdentifier(ident)) + .withDefaultCatalog(currentCatalog) + .withDefaultNamespace(Namespace.of(currentNamespace)) + .withQuery("spark", sql) + .withSchema(icebergSchema) + .withLocation(properties.get("location")) + .withProperties(props) + .create(); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { + throw new NoSuchNamespaceException(currentNamespace); + } catch (AlreadyExistsException e) { + throw new ViewAlreadyExistsException(ident); + } + } + throw new UnsupportedOperationException( "Creating a view is not supported by catalog: " + catalogName); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java index 424519623e4d..3a4a92ce390b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -35,8 +35,9 @@ public class SparkView implements org.apache.spark.sql.connector.catalog.View { + private static final String QUERY_COLUMN_NAMES = "queryColumnNames"; private static final Set RESERVED_PROPERTIES = - ImmutableSet.of("provider", "location", FORMAT_VERSION); + ImmutableSet.of("provider", "location", FORMAT_VERSION, QUERY_COLUMN_NAMES); private final View icebergView; private final String catalogName; @@ -86,7 +87,9 @@ public StructType schema() { @Override public String[] queryColumnNames() { - return new String[0]; + return properties().containsKey(QUERY_COLUMN_NAMES) + ? properties().get(QUERY_COLUMN_NAMES).split(", ") + : new String[0]; } @Override @@ -109,6 +112,11 @@ public Map properties() { propsBuilder.put("provider", "iceberg"); propsBuilder.put("location", icebergView.location()); + if (icebergView.properties().containsKey(QUERY_COLUMN_NAMES)) { + String queryColumnNames = + icebergView.properties().get(QUERY_COLUMN_NAMES).replace("[", "").replace("]", ""); + propsBuilder.put(QUERY_COLUMN_NAMES, queryColumnNames); + } if (icebergView instanceof BaseView) { ViewOperations ops = ((BaseView) icebergView).operations();