From b8c18200b9a3e51d754bcc2248e14397dd82a021 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Sun, 7 Jan 2024 11:07:54 +0100 Subject: [PATCH] simplifications --- .../analysis/HijackViewCommands.scala | 37 +++++++++++++ .../IcebergSparkSqlExtensionsParser.scala | 29 +--------- .../datasources/v2/DropV2ViewExec.scala | 5 +- .../iceberg/spark/extensions/TestViews.java | 55 +++++++++++++++++++ 4 files changed, 96 insertions(+), 30 deletions(-) create mode 100644 spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala new file mode 100644 index 000000000000..5f7cecbf6537 --- /dev/null +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical.DropView +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView +import org.apache.spark.sql.catalyst.rules.Rule + +/** + * ResolveSessionCatalog exits early for some v2 View commands, + * thus they are pre-substituted here and then handled in ResolveViews + */ +object HijackViewCommands extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { + case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) => + DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) + } +} diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index c37d7c0ace25..3b4f4e5298bb 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -35,17 +35,13 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases -import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier +import org.apache.spark.sql.catalyst.analysis.HijackViewCommands import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.NonReservedContext import org.apache.spark.sql.catalyst.parser.extensions.IcebergSqlExtensionsParser.QuotedIdentifierContext -import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.catalog.TableCatalog @@ -62,7 +58,6 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI private lazy val substitutor = substitutorCtor.newInstance(SQLConf.get) private lazy val astBuilder = new IcebergSqlExtensionsAstBuilder(delegate) - private lazy val maxIterations = SQLConf.get.analyzerMaxIterations /** * Parse a string to a DataType. @@ -128,27 +123,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - ViewSubstitutionExecutor.execute(delegate.parsePlan(sqlText)) - } - } - - private object ViewSubstitutionExecutor extends RuleExecutor[LogicalPlan] { - private val fixedPoint = FixedPoint( - maxIterations, - errorOnExceed = true, - maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) - - override protected def batches: Seq[Batch] = Seq(Batch("pre-substitution", fixedPoint, V2ViewSubstitution)) - } - - /** - * ResolveSessionCatalog exits early for some v2 View commands, - * thus they are pre-substituted here and then handled in ResolveViews - */ - private object V2ViewSubstitution extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) => - DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) + HijackViewCommands.apply(delegate.parsePlan(sqlText)) } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala index 4f4c665f5f02..c35af1486fc7 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropV2ViewExec.scala @@ -34,9 +34,8 @@ case class DropV2ViewExec( override lazy val output: Seq[Attribute] = Nil override protected def run(): Seq[InternalRow] = { - if (catalog.viewExists(ident)) { - catalog.dropView(ident) - } else if (!ifExists) { + val dropped = catalog.dropView(ident) + if (!dropped && !ifExists) { throw new NoSuchViewException(ident) } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index 445cebb7bfe2..c67c9bc49c5b 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -43,6 +43,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.catalog.SessionCatalog; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -664,18 +665,72 @@ public void dropNonExistingView() { assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS non_existing")); } + @Test + public void dropViewIfExists() { + String viewName = "viewToBeDropped"; + String sql = String.format("SELECT id FROM %s", tableName); + + ViewCatalog viewCatalog = viewCatalog(); + + TableIdentifier identifier = TableIdentifier.of(NAMESPACE, viewName); + viewCatalog + .buildView(identifier) + .withQuery("spark", sql) + .withDefaultNamespace(NAMESPACE) + .withDefaultCatalog(catalogName) + .withSchema(schema(sql)) + .create(); + + assertThat(viewCatalog.viewExists(identifier)).isTrue(); + + sql("DROP VIEW IF EXISTS %s", viewName); + assertThat(viewCatalog.viewExists(identifier)).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ @Test public void dropGlobalTempView() { String globalTempView = "globalViewToBeDropped"; sql("CREATE GLOBAL TEMPORARY VIEW %s AS SELECT id FROM %s", globalTempView, tableName); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isTrue(); + sql("DROP VIEW global_temp.%s", globalTempView); + assertThat(v1SessionCatalog().getGlobalTempView(globalTempView).isDefined()).isFalse(); } + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ @Test public void dropTempView() { String tempView = "tempViewToBeDropped"; sql("CREATE TEMPORARY VIEW %s AS SELECT id FROM %s", tempView, tableName); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isTrue(); + sql("DROP VIEW %s", tempView); + assertThat(v1SessionCatalog().getTempView(tempView).isDefined()).isFalse(); + } + + /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */ + @Test + public void dropV1View() { + String v1View = "v1ViewToBeDropped"; + sql("USE spark_catalog"); + sql("CREATE NAMESPACE IF NOT EXISTS %s", NAMESPACE); + sql("CREATE TABLE %s (id INT, data STRING)", tableName); + sql("CREATE VIEW %s AS SELECT id FROM %s", v1View, tableName); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isTrue(); + + sql("DROP VIEW %s", v1View); + assertThat( + v1SessionCatalog() + .tableExists(new org.apache.spark.sql.catalyst.TableIdentifier(v1View))) + .isFalse(); + } + + private SessionCatalog v1SessionCatalog() { + return spark.sessionState().catalogManager().v1SessionCatalog(); } private void insertRows(int numRows) throws NoSuchTableException {