Skip to content

Commit

Permalink
improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 8, 2024
1 parent b8c1820 commit 0b93267
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,35 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.connector.catalog.LookupCatalog
import org.apache.spark.sql.connector.catalog.ViewCatalog

/**
* ResolveSessionCatalog exits early for some v2 View commands,
* thus they are pre-substituted here and then handled in ResolveViews
*/
object HijackViewCommands extends Rule[LogicalPlan] {
case class HijackViewCommands(spark: SparkSession) extends Rule[LogicalPlan] with LookupCatalog {

protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists) =>
case DropView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
if isViewCatalog(catalogManager.currentCatalog) && !isTempView(nameParts) =>
DropIcebergView(UnresolvedIdentifier(nameParts, allowTemp), ifExists)
}

private def isTempView(nameParts: Seq[String]): Boolean = {
catalogManager.v1SessionCatalog.isTempView(nameParts)
}

private def isViewCatalog(catalog: CatalogPlugin): Boolean = {
catalog.isInstanceOf[ViewCatalog]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.expressions.UpCast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.DropView
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
import org.apache.spark.sql.catalyst.plans.logical.views.DropIcebergView
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.trees.Origin
Expand All @@ -47,17 +45,14 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case u@UnresolvedRelation(nameParts, _, _) if isTempView(nameParts) =>
case u@UnresolvedRelation(nameParts, _, _)
if catalogManager.v1SessionCatalog.isTempView(nameParts) =>
u

case u@UnresolvedRelation(parts@CatalogAndIdentifier(catalog, ident), _, _) =>
loadView(catalog, ident)
.map(createViewRelation(parts, _))
.getOrElse(u)

case DropIcebergView(r@ResolvedIdentifier(catalog, ident), ifExists)
if isTempView(ident.asMultipartIdentifier) || !isViewCatalog(catalog) =>
DropView(r, ifExists)
}

def loadView(catalog: CatalogPlugin, ident: Identifier): Option[View] = catalog match {
Expand Down Expand Up @@ -148,12 +143,4 @@ case class ResolveViews(spark: SparkSession) extends Rule[LogicalPlan] with Look
private def isBuiltinFunction(name: String): Boolean = {
catalogManager.v1SessionCatalog.isBuiltinFunction(FunctionIdentifier(name))
}

private def isTempView(nameParts: Seq[String]): Boolean = {
catalogManager.v1SessionCatalog.isTempView(nameParts)
}

private def isViewCatalog(catalog: CatalogPlugin): Boolean = {
catalog.isInstanceOf[ViewCatalog]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI
if (isIcebergCommand(sqlTextAfterSubstitution)) {
parse(sqlTextAfterSubstitution) { parser => astBuilder.visit(parser.singleStatement()) }.asInstanceOf[LogicalPlan]
} else {
HijackViewCommands.apply(delegate.parsePlan(sqlText))
HijackViewCommands(SparkSession.active).apply(delegate.parsePlan(sqlText))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,6 @@ public void dropNonExistingView() {
assertThatThrownBy(() -> sql("DROP VIEW non_existing"))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("The view %s.%s cannot be found", NAMESPACE, "non_existing");

assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS non_existing"));
}

@Test
Expand All @@ -685,6 +683,8 @@ public void dropViewIfExists() {

sql("DROP VIEW IF EXISTS %s", viewName);
assertThat(viewCatalog.viewExists(identifier)).isFalse();

assertThatNoException().isThrownBy(() -> sql("DROP VIEW IF EXISTS %s", viewName));
}

/** The purpose of this test is mainly to make sure that normal view deletion isn't messed up */
Expand Down

0 comments on commit 0b93267

Please sign in to comment.