From 0b932673b06194787a5ca79b66d3534f4d4a7442 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Mon, 8 Jan 2024 11:02:39 +0100 Subject: [PATCH] improvements --- .../analysis/HijackViewCommands.scala | 20 +++++++++++++++++-- .../sql/catalyst/analysis/ResolveViews.scala | 17 ++-------------- .../IcebergSparkSqlExtensionsParser.scala | 2 +- .../iceberg/spark/extensions/TestViews.java | 4 ++-- 4 files changed, 23 insertions(+), 20 deletions(-) diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala index 5f7cecbf6537..a878f912c65b 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/HijackViewCommands.scala @@ -19,19 +19,35 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.CatalogPlugin +import org.apache.spark.sql.connector.catalog.LookupCatalog +import org.apache.spark.sql.connector.catalog.ViewCatalog /** * ResolveSessionCatalog exits early for some v2 View commands, * thus they are pre-substituted here and then handled in ResolveViews */ -object HijackViewCommands extends Rule[LogicalPlan] { +case class HijackViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog { + + protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp { - case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) => + case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) + if isViewCatalog(catalogManager.currentCatalog) && !isTempView(nameParts) => DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) } + + private def isTempView(nameParts: Seq[String]): Boolean = { + catalogManager.v1SessionCatalog.isTempView(nameParts) + } + + private def isViewCatalog(catalog: CatalogPlugin): Boolean = { + catalog.isInstanceOf[ViewCatalog] + } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala index 28f4e7dafdf5..28f051e2b02d 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveViews.scala @@ -24,11 +24,9 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.UpCast import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.DropView import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias -import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.CurrentOrigin import org.apache.spark.sql.catalyst.trees.Origin @@ -47,17 +45,14 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case u@UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) => + case u@UnresolvedRelation(nameParts, _, _) + if catalogManager.v1SessionCatalog.isTempView(nameParts) => u case u@UnresolvedRelation(parts@CatalogAndIdentifier(catalog, ident), _, _) => loadView(catalog, ident) .map(createViewRelation(parts, _)) .getOrElse(u) - - case DropIcebergView(r@ResolvedIdentifier(catalog, ident), ifExists) - if isTempView(ident.asMultipartIdentifier) || !isViewCatalog(catalog) => - DropView(r, ifExists) } def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match { @@ -148,12 +143,4 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look private def isBuiltinFunction(name: String): Boolean = { catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name)) } - - private def isTempView(nameParts: Seq[String]): Boolean = { - catalogManager.v1SessionCatalog.isTempView(nameParts) - } - - private def isViewCatalog(catalog: CatalogPlugin): Boolean = { - catalog.isInstanceOf[ViewCatalog] - } } diff --git a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 3b4f4e5298bb..db1193cc1aac 100644 --- a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -123,7 +123,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI if (isIcebergCommand(sqlTextAfterSubstitution)) { parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan] } else { - HijackViewCommands.apply(delegate.parsePlan(sqlText)) + HijackViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText)) } } diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java index c67c9bc49c5b..c986fbcde76c 100644 --- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java +++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java @@ -661,8 +661,6 @@ public void dropNonExistingView() { assertThatThrownBy(() -> sql("DROP VIEW non_existing")) .isInstanceOf(AnalysisException.class) .hasMessageContaining("The view %s.%s cannot be found", NAMESPACE, "non_existing"); - - assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS non_existing")); } @Test @@ -685,6 +683,8 @@ public void dropViewIfExists() { sql("DROP VIEW IF EXISTS %s", viewName); assertThat(viewCatalog.viewExists(identifier)).isFalse(); + + assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS %s", viewName)); } /** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */