From 1a9c3f78f5e2d0b831aadd4636eeb70975463e78 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 4 Jan 2024 17:38:18 +0100 Subject: [PATCH] Spark: Add support for reading Iceberg views (#9340) --- .../IcebergSparkSessionExtensions.scala | 2 + .../sql/catalyst/analysis/ResolveViews.scala | 146 ++++ .../iceberg/spark/extensions/TestViews.java | 647 ++++++++++++++++++ .../apache/iceberg/spark/SparkCatalog.java | 70 +- .../iceberg/spark/source/SparkView.java | 148 ++++ .../iceberg/spark/SparkCatalogConfig.java | 14 +- .../spark/SparkTestBaseWithCatalog.java | 2 +- .../spark/source/TestSparkCatalog.java | 4 +- 8 files changed, 1029 insertions(+), 4 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala create mode 100644 spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala index 3cfda994ebb9..ad9df3994fc0 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/iceberg/spark/extensions/IcebergSparkSessionExtensions.scala @@ -22,6 +22,7 @@ package org.apache.iceberg.spark.extensions import org.apache.spark.sql.SparkSessionExtensions import org.apache.spark.sql.catalyst.analysis.ProcedureArgumentCoercion import org.apache.spark.sql.catalyst.analysis.ResolveProcedures +import org.apache.spark.sql.catalyst.analysis.ResolveViews import org.apache.spark.sql.catalyst.optimizer.ReplaceStaticInvoke import org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser import org.apache.spark.sql.execution.datasources.v2.ExtendedDataSourceV2Strategy @@ -34,6 +35,7 @@ class IcebergSparkSessionExtensions extends (SparkSessionExtensions => Unit) { // analyzer extensions extensions.injectResolutionRule { spark => ResolveProcedures(spark) } + extensions.injectResolutionRule { spark => ResolveViews(spark) } extensions.injectResolutionRule { _ => ProcedureArgumentCoercion } // optimizer extensions diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala new file mode 100644 index 000000000000..a978b94f49ac --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions.Alias +import org.apache.spark.sql.catalyst.expressions.UpCast +import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.CurrentOrigin +import org.apache.spark.sql.catalyst.trees.Origin +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.View +import org.apache.spark.sql.connector.catalog.ViewCatalog +import org.apache.spark.sql.errors.QueryCompilationErrors + +case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { + + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager + + override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case u@UnresolvedRelation(nameParts, _, _) + if catalogManager.v1SessionCatalog.isTempView(nameParts) => + u + + case u@UnresolvedRelation(parts@CatalogAndIdentifier(catalog, ident), _, _) => + loadView(catalog, ident) + .map(createViewRelation(parts, _)) + .getOrElse(u) + } + + def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { + case viewCatalog: ViewCatalog => + try { + Option(viewCatalog.loadView(ident)) + } catch { + case _: NoSuchViewException => None + } + case _ => None + } + + private def createViewRelation(nameParts: Seq[String], view: View): LogicalPlan = { + val parsed = parseViewText(nameParts.quoted, view.query) + + // Apply any necessary rewrites to preserve correct resolution + val viewCatalogAndNamespace: Seq[String] = view.currentCatalog +: view.currentNamespace.toSeq + val rewritten = rewriteIdentifiers(parsed, viewCatalogAndNamespace); + + // Apply the field aliases and column comments + // This logic differs from how Spark handles views in SessionCatalog.fromCatalogTable. + // This is more strict because it doesn't allow resolution by field name. + val aliases = view.schema.fields.zipWithIndex.map { case (expected, pos) => + val attr = GetColumnByOrdinal(pos, expected.dataType) + Alias(UpCast(attr, expected.dataType), expected.name)(explicitMetadata = Some(expected.metadata)) + } + + SubqueryAlias(nameParts, Project(aliases, rewritten)) + } + + private def parseViewText(name: String, viewText: String): LogicalPlan = { + val origin = Origin( + objectType = Some("VIEW"), + objectName = Some(name) + ) + + try { + CurrentOrigin.withOrigin(origin) { + spark.sessionState.sqlParser.parseQuery(viewText) + } + } catch { + case _: ParseException => + throw QueryCompilationErrors.invalidViewText(viewText, name) + } + } + + private def rewriteIdentifiers( + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = { + // Substitute CTEs within the view, then rewrite unresolved functions and relations + qualifyTableIdentifiers( + qualifyFunctionIdentifiers( + CTESubstitution.apply(plan), + catalogAndNamespace), + catalogAndNamespace) + } + + private def qualifyFunctionIdentifiers( + plan: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = plan transformExpressions { + case u@UnresolvedFunction(Seq(name), _, _, _, _) => + if (!isBuiltinFunction(name)) { + u.copy(nameParts = catalogAndNamespace :+ name) + } else { + u + } + case u@UnresolvedFunction(parts, _, _, _, _) if !isCatalog(parts.head) => + u.copy(nameParts = catalogAndNamespace.head +: parts) + } + + /** + * Qualify table identifiers with default catalog and namespace if necessary. + */ + private def qualifyTableIdentifiers( + child: LogicalPlan, + catalogAndNamespace: Seq[String]): LogicalPlan = + child transform { + case u@UnresolvedRelation(Seq(table), _, _) => + u.copy(multipartIdentifier = catalogAndNamespace :+ table) + case u@UnresolvedRelation(parts, _, _) if !isCatalog(parts.head) => + u.copy(multipartIdentifier = catalogAndNamespace.head +: parts) + } + + private def isCatalog(name: String): Boolean = { + spark.sessionState.catalogManager.isCatalogRegistered(name) + } + + private def isBuiltinFunction(name: String): Boolean = { + spark.sessionState.catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) + } +} diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java new file mode 100644 index 000000000000..94e86e5ee3cc --- /dev/null +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.extensions; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.iceberg.IcebergBuild; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.iceberg.spark.SparkCatalogConfig; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.AnalysisException; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; + +public class TestViews extends SparkExtensionsTestBase { + private static final Namespace NAMESPACE = Namespace.of("default"); + private final String tableName = "table"; + + @Before + public void before() { + spark.conf().set("spark.sql.defaultCatalog", catalogName); + sql("USE %s", catalogName); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + } + + @After + public void removeTable() { + sql("USE %s", catalogName); + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Parameterized.Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}") + public static Object[][] parameters() { + return new Object[][] { + { + SparkCatalogConfig.SPARK_WITH_VIEWS.catalogName(), + SparkCatalogConfig.SPARK_WITH_VIEWS.implementation(), + SparkCatalogConfig.SPARK_WITH_VIEWS.properties() + } + }; + } + + public TestViews(String catalog, String implementation, Map properties) { + super(catalog, implementation, properties); + } + + @Test + public void readFromView() throws NoSuchTableException { + insertRows(10); + String viewName = "simpleView"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + // use non-existing column name to make sure only the SQL definition for spark is loaded + .withQuery("trino", String.format("SELECT non_existing FROM %s", tableName)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void readFromTrinoView() throws NoSuchTableException { + insertRows(10); + String viewName = "trinoView"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("trino", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + List expected = + IntStream.rangeClosed(1, 10).mapToObj(this::row).collect(Collectors.toList()); + + // there's no explicit view defined for spark, so it will fall back to the defined trino view + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(10) + .containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void readFromMultipleViews() throws NoSuchTableException { + insertRows(6); + String viewName = "firstView"; + String secondView = "secondView"; + String viewSQL = String.format("SELECT id FROM %s WHERE id <= 3", tableName); + String secondViewSQL = String.format("SELECT id FROM %s WHERE id > 3", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", viewSQL) + .withDefaultNamespace(NAMESPACE) + .withSchema(schema(viewSQL)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, secondView)) + .withQuery("spark", secondViewSQL) + .withDefaultNamespace(NAMESPACE) + .withSchema(schema(secondViewSQL)) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(3) + .containsExactlyInAnyOrder(row(1), row(2), row(3)); + + assertThat(sql("SELECT * FROM %s", secondView)) + .hasSize(3) + .containsExactlyInAnyOrder(row(4), row(5), row(6)); + } + + @Test + public void readFromViewUsingNonExistingTable() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithNonExistingTable"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.LongType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", "SELECT id FROM non_existing") + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format( + "The table or view `%s`.`%s`.`non_existing` cannot be found", + catalogName, NAMESPACE)); + } + + @Test + public void readFromViewUsingNonExistingTableColumn() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithNonExistingColumn"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "non_existing", Types.LongType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", String.format("SELECT non_existing FROM %s", tableName)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + "A column or function parameter with name `non_existing` cannot be resolved"); + } + + @Test + public void readFromViewUsingInvalidSQL() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithInvalidSQL"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", "invalid SQL") + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining( + String.format("The view `%s` cannot be displayed due to invalid view text", viewName)); + } + + @Test + public void readFromViewWithStaleSchema() throws NoSuchTableException { + insertRows(10); + String viewName = "staleView"; + String sql = String.format("SELECT id, data FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // drop a column the view depends on + // note that this tests `data` because it has an invalid ordinal + sql("ALTER TABLE %s DROP COLUMN data", tableName); + + // reading from the view should now fail + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("A column or function parameter with name `data` cannot be resolved"); + } + + @Test + public void readFromViewHiddenByTempView() throws NoSuchTableException { + insertRows(10); + String viewName = "viewHiddenByTempView"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", String.format("SELECT id FROM %s WHERE id > 5", tableName)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + // returns the results from the TEMP VIEW + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + } + + @Test + public void readFromViewWithGlobalTempView() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithGlobalTempView"; + String sql = String.format("SELECT id FROM %s WHERE id > 5", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", viewName, tableName); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + // GLOBAL TEMP VIEWS are stored in a global_temp namespace + assertThat(sql("SELECT * FROM global_temp.%s", viewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList())); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf( + IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList())); + } + + @Test + public void readFromViewReferencingAnotherView() throws NoSuchTableException { + insertRows(10); + String firstView = "viewBeingReferencedInAnotherView"; + String viewReferencingOtherView = "viewReferencingOtherView"; + String firstSQL = String.format("SELECT id FROM %s WHERE id <= 5", tableName); + String secondSQL = String.format("SELECT id FROM %s WHERE id > 4", firstView); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, firstView)) + .withQuery("spark", firstSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(firstSQL)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingOtherView)) + .withQuery("spark", secondSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(secondSQL)) + .create(); + + assertThat(sql("SELECT * FROM %s", viewReferencingOtherView)) + .hasSize(1) + .containsExactly(row(5)); + } + + @Test + public void readFromViewReferencingTempView() throws NoSuchTableException { + insertRows(10); + String tempView = "tempViewBeingReferencedInAnotherView"; + String viewReferencingTempView = "viewReferencingTempView"; + String sql = String.format("SELECT id FROM %s", tempView); + + ViewCatalog viewCatalog = viewCatalog(); + + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", tempView, tableName); + + // it wouldn't be possible to reference a TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", tempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view") + .hasMessageContaining(tempView) + .hasMessageContaining("cannot be found"); + } + + @Test + public void readFromViewReferencingAnotherViewHiddenByTempView() throws NoSuchTableException { + insertRows(10); + String innerViewName = "inner_view"; + String outerViewName = "outer_view"; + String innerViewSQL = String.format("SELECT * FROM %s WHERE id > 5", tableName); + String outerViewSQL = String.format("SELECT id FROM %s", innerViewName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, innerViewName)) + .withQuery("spark", innerViewSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(innerViewSQL)) + .create(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, outerViewName)) + .withQuery("spark", outerViewSQL) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(outerViewSQL)) + .create(); + + // create a temporary view that conflicts with the inner view to verify the inner name is + // resolved using the catalog and namespace defaults from the outer view + sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", innerViewName, tableName); + + // ensure that the inner view resolution uses the view namespace and catalog + sql("USE spark_catalog"); + + List tempViewRows = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s", innerViewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(tempViewRows); + + List expectedViewRows = + IntStream.rangeClosed(6, 10).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, outerViewName)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expectedViewRows); + } + + @Test + public void readFromViewReferencingGlobalTempView() throws NoSuchTableException { + insertRows(10); + String globalTempView = "globalTempViewBeingReferenced"; + String viewReferencingTempView = "viewReferencingGlobalTempView"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = tableCatalog().loadTable(TableIdentifier.of(NAMESPACE, tableName)).schema(); + + sql( + "CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s WHERE id <= 5", + globalTempView, tableName); + + // it wouldn't be possible to reference a GLOBAL TEMP VIEW if the view had been created via SQL, + // but this can't be prevented when using the API directly + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewReferencingTempView)) + .withQuery("spark", String.format("SELECT id FROM global_temp.%s", globalTempView)) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + List expected = + IntStream.rangeClosed(1, 5).mapToObj(this::row).collect(Collectors.toList()); + + assertThat(sql("SELECT * FROM global_temp.%s", globalTempView)) + .hasSize(5) + .containsExactlyInAnyOrderElementsOf(expected); + + // reading from a view that references a GLOBAL TEMP VIEW shouldn't be possible + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewReferencingTempView)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The table or view") + .hasMessageContaining(globalTempView) + .hasMessageContaining("cannot be found"); + } + + @Test + public void readFromViewWithCTE() throws NoSuchTableException { + insertRows(10); + String viewName = "viewWithCTE"; + String sql = + String.format( + "WITH max_by_data AS (SELECT max(id) as max FROM %s) " + + "SELECT max, count(1) AS count FROM max_by_data GROUP BY max", + tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row(10, 1L)); + } + + @Test + public void rewriteFunctionIdentifier() { + String viewName = "rewriteFunctionIdentifier"; + String sql = "SELECT iceberg_version() AS version"; + + assertThatThrownBy(() -> sql(sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot resolve function") + .hasMessageContaining("iceberg_version"); + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "version", Types.StringType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)) + .hasSize(1) + .containsExactly(row(IcebergBuild.version())); + } + + @Test + public void builtinFunctionIdentifierNotRewritten() { + String viewName = "builtinFunctionIdentifierNotRewritten"; + String sql = "SELECT trim(' abc ') AS result"; + + ViewCatalog viewCatalog = viewCatalog(); + Schema schema = new Schema(Types.NestedField.required(1, "result", Types.StringType.get())); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + assertThat(sql("SELECT * FROM %s", viewName)).hasSize(1).containsExactly(row("abc")); + } + + @Test + public void rewriteFunctionIdentifierWithNamespace() { + String viewName = "rewriteFunctionIdentifierWithNamespace"; + String sql = "SELECT system.bucket(100, 'a') AS bucket_result, 'a' AS value"; + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + sql("USE spark_catalog"); + + assertThatThrownBy(() -> sql(sql)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot resolve function") + .hasMessageContaining("`system`.`bucket`"); + + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName)) + .hasSize(1) + .containsExactly(row(50, "a")); + } + + @Test + public void fullFunctionIdentifier() { + String viewName = "fullFunctionIdentifier"; + String sql = + String.format( + "SELECT %s.system.bucket(100, 'a') AS bucket_result, 'a' AS value", catalogName); + + sql("USE spark_catalog"); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(sql("SELECT * FROM %s.%s.%s", catalogName, NAMESPACE, viewName)) + .hasSize(1) + .containsExactly(row(50, "a")); + } + + @Test + public void fullFunctionIdentifierNotRewrittenLoadFailure() { + String viewName = "fullFunctionIdentifierNotRewrittenLoadFailure"; + String sql = + String.format( + "SELECT spark_catalog.system.bucket(100, 'a') AS bucket_result, 'a' AS value", + catalogName); + + // avoid namespace failures + sql("USE spark_catalog"); + sql("CREATE NAMESPACE IF NOT EXISTS system"); + sql("USE %s", catalogName); + + Schema schema = + new Schema( + Types.NestedField.required(1, "bucket_result", Types.IntegerType.get()), + Types.NestedField.required(2, "value", Types.StringType.get())); + + ViewCatalog viewCatalog = viewCatalog(); + + viewCatalog + .buildView(TableIdentifier.of(NAMESPACE, viewName)) + .withQuery("spark", sql) + .withDefaultNamespace(Namespace.of("system")) + .withDefaultCatalog(catalogName) + .withSchema(schema) + .create(); + + // verify the v1 error message + assertThatThrownBy(() -> sql("SELECT * FROM %s", viewName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("The function `system`.`bucket` cannot be found"); + } + + private Schema schema(String sql) { + return SparkSchemaUtil.convert(spark.sql(sql).schema()); + } + + private ViewCatalog viewCatalog() { + Catalog icebergCatalog = Spark3Util.loadIcebergCatalog(spark, catalogName); + assertThat(icebergCatalog).isInstanceOf(ViewCatalog.class); + return (ViewCatalog) icebergCatalog; + } + + private Catalog tableCatalog() { + return Spark3Util.loadIcebergCatalog(spark, catalogName); + } + + private void insertRows(int numRows) throws NoSuchTableException { + List records = Lists.newArrayListWithCapacity(numRows); + for (int i = 1; i <= numRows; i++) { + records.add(new SimpleRecord(i, UUID.randomUUID().toString())); + } + + Dataset df = spark.createDataFrame(records, SimpleRecord.class); + df.writeTo(tableName).append(); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java index eef0f0703bc3..01804e010834 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java @@ -46,6 +46,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.catalog.ViewCatalog; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HadoopCatalog; @@ -61,6 +62,7 @@ import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkChangelogTable; import org.apache.iceberg.spark.source.SparkTable; +import org.apache.iceberg.spark.source.SparkView; import org.apache.iceberg.spark.source.StagedSparkTable; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; @@ -69,7 +71,9 @@ import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.StagedTable; @@ -79,6 +83,8 @@ import org.apache.spark.sql.connector.catalog.TableChange.ColumnChange; import org.apache.spark.sql.connector.catalog.TableChange.RemoveProperty; import org.apache.spark.sql.connector.catalog.TableChange.SetProperty; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -112,7 +118,8 @@ * *

*/ -public class SparkCatalog extends BaseCatalog { +public class SparkCatalog extends BaseCatalog + implements org.apache.spark.sql.connector.catalog.ViewCatalog { private static final Set DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private static final Splitter COMMA = Splitter.on(","); private static final Pattern AT_TIMESTAMP = Pattern.compile("at_timestamp_(\\d+)"); @@ -124,6 +131,7 @@ public class SparkCatalog extends BaseCatalog { private Catalog icebergCatalog = null; private boolean cacheEnabled = CatalogProperties.CACHE_ENABLED_DEFAULT; private SupportsNamespaces asNamespaceCatalog = null; + private ViewCatalog asViewCatalog = null; private String[] defaultNamespace = null; private HadoopTables tables; @@ -528,6 +536,62 @@ public boolean dropNamespace(String[] namespace, boolean cascade) return false; } + @Override + public Identifier[] listViews(String... namespace) { + throw new UnsupportedOperationException( + "Listing views is not supported by catalog: " + catalogName); + } + + @Override + public View loadView(Identifier ident) throws NoSuchViewException { + if (null != asViewCatalog) { + try { + org.apache.iceberg.view.View view = asViewCatalog.loadView(buildIdentifier(ident)); + return new SparkView(catalogName, view); + } catch (org.apache.iceberg.exceptions.NoSuchViewException e) { + throw new NoSuchViewException(ident); + } + } + + throw new NoSuchViewException(ident); + } + + @Override + public View createView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map properties) + throws ViewAlreadyExistsException, NoSuchNamespaceException { + throw new UnsupportedOperationException( + "Creating a view is not supported by catalog: " + catalogName); + } + + @Override + public View alterView(Identifier ident, ViewChange... changes) + throws NoSuchViewException, IllegalArgumentException { + throw new UnsupportedOperationException( + "Altering a view is not supported by catalog: " + catalogName); + } + + @Override + public boolean dropView(Identifier ident) { + throw new UnsupportedOperationException( + "Dropping a view is not supported by catalog: " + catalogName); + } + + @Override + public void renameView(Identifier fromIdentifier, Identifier toIdentifier) + throws NoSuchViewException, ViewAlreadyExistsException { + throw new UnsupportedOperationException( + "Renaming a view is not supported by catalog: " + catalogName); + } + @Override public final void initialize(String name, CaseInsensitiveStringMap options) { this.cacheEnabled = @@ -570,6 +634,10 @@ public final void initialize(String name, CaseInsensitiveStringMap options) { } } + if (catalog instanceof ViewCatalog) { + this.asViewCatalog = (ViewCatalog) catalog; + } + EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark"); EnvironmentContext.put( EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java new file mode 100644 index 000000000000..424519623e4d --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkView.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.iceberg.TableProperties.FORMAT_VERSION; + +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.view.BaseView; +import org.apache.iceberg.view.SQLViewRepresentation; +import org.apache.iceberg.view.View; +import org.apache.iceberg.view.ViewOperations; +import org.apache.spark.sql.types.StructType; + +public class SparkView implements org.apache.spark.sql.connector.catalog.View { + + private static final Set RESERVED_PROPERTIES = + ImmutableSet.of("provider", "location", FORMAT_VERSION); + + private final View icebergView; + private final String catalogName; + private StructType lazySchema = null; + + public SparkView(String catalogName, View icebergView) { + this.catalogName = catalogName; + this.icebergView = icebergView; + } + + public View view() { + return icebergView; + } + + @Override + public String name() { + return icebergView.name(); + } + + @Override + public String query() { + SQLViewRepresentation sqlRepr = icebergView.sqlFor("spark"); + Preconditions.checkState(sqlRepr != null, "Cannot load SQL for view %s", name()); + return sqlRepr.sql(); + } + + @Override + public String currentCatalog() { + return icebergView.currentVersion().defaultCatalog() != null + ? icebergView.currentVersion().defaultCatalog() + : catalogName; + } + + @Override + public String[] currentNamespace() { + return icebergView.currentVersion().defaultNamespace().levels(); + } + + @Override + public StructType schema() { + if (null == lazySchema) { + this.lazySchema = SparkSchemaUtil.convert(icebergView.schema()); + } + + return lazySchema; + } + + @Override + public String[] queryColumnNames() { + return new String[0]; + } + + @Override + public String[] columnAliases() { + return icebergView.schema().columns().stream() + .map(Types.NestedField::name) + .toArray(String[]::new); + } + + @Override + public String[] columnComments() { + return icebergView.schema().columns().stream() + .map(Types.NestedField::doc) + .toArray(String[]::new); + } + + @Override + public Map properties() { + ImmutableMap.Builder propsBuilder = ImmutableMap.builder(); + + propsBuilder.put("provider", "iceberg"); + propsBuilder.put("location", icebergView.location()); + + if (icebergView instanceof BaseView) { + ViewOperations ops = ((BaseView) icebergView).operations(); + propsBuilder.put(FORMAT_VERSION, String.valueOf(ops.current().formatVersion())); + } + + icebergView.properties().entrySet().stream() + .filter(entry -> !RESERVED_PROPERTIES.contains(entry.getKey())) + .forEach(propsBuilder::put); + + return propsBuilder.build(); + } + + @Override + public String toString() { + return icebergView.toString(); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } else if (other == null || getClass() != other.getClass()) { + return false; + } + + // use only name in order to correctly invalidate Spark cache + SparkView that = (SparkView) other; + return icebergView.name().equals(that.icebergView.name()); + } + + @Override + public int hashCode() { + // use only name in order to correctly invalidate Spark cache + return icebergView.name().hashCode(); + } +} diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java index fc18ed3bb174..abfd7da0c7bd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkCatalogConfig.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark; import java.util.Map; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; public enum SparkCatalogConfig { @@ -41,7 +43,17 @@ public enum SparkCatalogConfig { "parquet-enabled", "true", "cache-enabled", "false" // Spark will delete tables using v1, leaving the cache out of sync - )); + )), + SPARK_WITH_VIEWS( + "spark_with_views", + SparkCatalog.class.getName(), + ImmutableMap.of( + CatalogProperties.CATALOG_IMPL, + InMemoryCatalog.class.getName(), + "default-namespace", + "default", + "cache-enabled", + "false")); private final String catalogName; private final String implementation; diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java index 658159894543..d5708c9e575e 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/SparkTestBaseWithCatalog.java @@ -87,7 +87,7 @@ public SparkTestBaseWithCatalog( config.forEach( (key, value) -> spark.conf().set("spark.sql.catalog." + catalogName + "." + key, value)); - if (config.get("type").equalsIgnoreCase("hadoop")) { + if ("hadoop".equalsIgnoreCase(config.get("type"))) { spark.conf().set("spark.sql.catalog." + catalogName + ".warehouse", "file:" + warehouse); } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java index 0c6cad7f369c..02a74e5803be 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java @@ -31,8 +31,10 @@ import org.apache.spark.sql.connector.catalog.SupportsNamespaces; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.ViewCatalog; -public class TestSparkCatalog +public class TestSparkCatalog< + T extends TableCatalog & FunctionCatalog & SupportsNamespaces & ViewCatalog> extends SparkSessionCatalog { private static final Map tableMap = Maps.newHashMap();