Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: Handle concurrently dropped view during CREATE OR REPLACE #9623

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.iceberg.spark.SupportsReplaceView
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchViewException
import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.Identifier
Expand Down Expand Up @@ -59,45 +60,24 @@ case class CreateV2ViewExec(
// CREATE OR REPLACE VIEW
catalog match {
case c: SupportsReplaceView =>
c.replaceView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
try {
replaceView(c, currentCatalog, currentNamespace, newProperties)
} catch {
// view might have been concurrently dropped during replace
case _: NoSuchViewException =>
replaceView(c, currentCatalog, currentNamespace, newProperties)
}
case _ =>
if (catalog.viewExists(ident)) {
catalog.dropView(ident)
}

catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
createView(currentCatalog, currentNamespace, newProperties)
}
} else {
try {
// CREATE VIEW [IF NOT EXISTS]
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
createView(currentCatalog, currentNamespace, newProperties)
} catch {
case _: ViewAlreadyExistsException if allowExisting => // Ignore
}
Expand All @@ -106,6 +86,39 @@ case class CreateV2ViewExec(
Nil
}

private def replaceView(
supportsReplaceView: SupportsReplaceView,
currentCatalog: String,
currentNamespace: Array[String],
newProperties: Map[String, String]) = {
supportsReplaceView.replaceView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
}

private def createView(
currentCatalog: String,
currentNamespace: Array[String],
newProperties: Map[String, String]) = {
catalog.createView(
ident,
queryText,
currentCatalog,
currentNamespace,
viewSchema,
queryColumnNames.toArray,
columnAliases.toArray,
columnComments.map(c => c.orNull).toArray,
newProperties.asJava)
}

override def simpleString(maxFields: Int): String = {
s"CreateV2ViewExec: ${ident}"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ public View replaceView(
String[] columnAliases,
String[] columnComments,
Map<String, String> properties)
throws NoSuchNamespaceException {
throws NoSuchNamespaceException, NoSuchViewException {
if (null != asViewCatalog) {
Schema icebergSchema = SparkSchemaUtil.convert(schema);

Expand All @@ -643,6 +643,8 @@ public View replaceView(
return new SparkView(catalogName, view);
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) {
throw new NoSuchNamespaceException(currentNamespace);
} catch (org.apache.iceberg.exceptions.NoSuchViewException e) {
throw new NoSuchViewException(ident);
}
}

Expand Down