-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Bypass Spark's ViewCatalog API when replacing a view #9596
Conversation
* @throws NoSuchViewException If the view doesn't exist or is a table | ||
* @throws NoSuchNamespaceException If the identifier namespace does not exist (optional) | ||
*/ | ||
View replaceView( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to have createOrReplace()
and replace()
here. Spark's API for these semantics look a bit different and are defined here, so I think just having replaceView()
should be ok.
Also that reminds me to change the return type of Spark's replaceView(ViewInfo viewInfo, boolean orCreate)
from void
to View
87e3247
to
45732fe
Compare
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SupportsReplacingViews.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestViews.java
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
Outdated
Show resolved
Hide resolved
.withDefaultCatalog(currentCatalog) | ||
.withDefaultNamespace(Namespace.of(currentNamespace)) | ||
.withQuery("spark", sql) | ||
.withSchema(icebergSchema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works, but we may want to handle field IDs differently in the future. Because there are no data files, it doesn't really matter how we assign IDs, but it is nice to have consistent IDs across schema versions because it could be confusing otherwise.
The table implementation keeps schemas consistent for replace table operations by passing the previous schema in to assign the new IDs, like this:
Schema freshSchema =
TypeUtil.assignFreshIds(updatedSchema, schema(), newLastColumnId::incrementAndGet);
This would be a separate PR if we wanted to handle it in ViewMetadata
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you mean and this makes sense. I'll take a closer look at this outside of this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened #10253 to address this
} catch (org.apache.iceberg.exceptions.NoSuchNamespaceException e) { | ||
throw new NoSuchNamespaceException(currentNamespace); | ||
} catch (org.apache.iceberg.exceptions.NoSuchViewException e) { | ||
throw new NoSuchViewException(ident); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is thrown, then I think the Spark exec node should catch it and create the view. The CREATE OR REPLACE
operation is supposed to be idempotent so it should not fail if the view is concurrently dropped.
This probably isn't a big deal, but it would be nice to handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this only exists due to the limited default implementation (which does a drop+create) in Spark's replace()
as can be seen here. That being said, I don't think we need to handle NoSuchViewException
in our implementation, since ours is idempotent. I went ahead and removed catching this here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we need to handle the exception here? Translating to the right Spark exception seems like something we should definitely do.
I realize that we already check whether the view exists, but if the view is dropped concurrently, it could still fail. It's an edge case, but the right thing to do is to catch and issue the create, right? CREATE OR REPLACE should not ever fail because the view doesn't exist.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a blocker, so I'll merge this PR. We can follow up on this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll follow-up on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great! There are some minor things, but nothing that is a big blocker.
Spark's `ViewCatalog` API doesn't have a `replace()` in 3.5 as it was only introduced later. Therefore we're bypassing Spark's `ViewCatalog` so that we can keep the view's history after executing a `CREATE OR REPLACE`
45732fe
to
dd20550
Compare
Spark's
ViewCatalog
API doesn't have areplace()
in 3.5 as it was only introduced later. Therefore we're bypassing Spark'sViewCatalog
so that we can keep the view's history after executing aCREATE OR REPLACE