Skip to content

Commit

Permalink
Spark: Handle concurrently dropped view during CREATE OR REPLACE (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra authored and devangjhabakh committed Apr 22, 2024
1 parent 5321ffb commit eadd59c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.SupportsReplaceView
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -59,45 +60,24 @@ case class CreateV2ViewExec(
// CREATE OR REPLACE VIEW
catalog match {
case c: SupportsReplaceView =>
c.replaceView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
try {
replaceView(c, currentCatalog, currentNamespace, newProperties)
} catch {
// view might have been concurrently dropped during replace
case _: NoSuchViewException =>
replaceView(c, currentCatalog, currentNamespace, newProperties)
}
case _ =>
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}

catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
createView(currentCatalog, currentNamespace, newProperties)
}
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
createView(currentCatalog, currentNamespace, newProperties)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
Expand All @@ -106,6 +86,39 @@ case class CreateV2ViewExec(
Nil
}

private def replaceView(
supportsReplaceView: SupportsReplaceView,
currentCatalog: String,
currentNamespace: Array[String],
newProperties: Map[String, String]) = {
supportsReplaceView.replaceView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
}

private def createView(
currentCatalog: String,
currentNamespace: Array[String],
newProperties: Map[String, String]) = {
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public View replaceView(
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
throws NoSuchNamespaceException {
throws NoSuchNamespaceException, NoSuchViewException {
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);

Expand All @@ -643,6 +643,8 @@ public View replaceView(
return new SparkView(catalogName, view);
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
throw new NoSuchNamespaceException(currentNamespace);
} catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
throw new NoSuchViewException(ident);
}
}

Expand Down

0 comments on commit eadd59c

Please sign in to comment.