-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.5: Implement RewriteTablePath #11555
Conversation
4e18bc3
to
be5d2f5
Compare
* Path to a comma-separated list of source and target paths for all files added to the table | ||
* between startVersion and endVersion, including original data files and metadata files | ||
* rewritten to staging. | ||
* Result file list location. This file contains a 'copy plan', a comma-separated list of all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still feels a little ambiguous.
Maybe?
A file containing a listing of both original file names and file names under the new prefix, comma separated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could give an example of the format to make it clear like this:
sourcepath/datafile1.parquet targetpath/datafile1.parquet,
sourcepath/datafile2.parquet targetpath/datafile2.parquet,
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the example in the suggestion and reworded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szehon-ho for working on it. Left some comments. The major concern is the perf in case of multiple delete files in a manifest file.
* Path to a comma-separated list of source and target paths for all files added to the table | ||
* between startVersion and endVersion, including original data files and metadata files | ||
* rewritten to staging. | ||
* Result file list location. This file contains a 'copy plan', a comma-separated list of all |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could give an example of the format to make it clear like this:
sourcepath/datafile1.parquet targetpath/datafile1.parquet,
sourcepath/datafile2.parquet targetpath/datafile2.parquet,
...
metadata.statisticsFiles(), | ||
metadata.partitionStatisticsFiles(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to rewrite statistic file path as well, but I'm OK to support it in a follow-up PR.
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
9bfd8d0
to
6880510
Compare
@flyrain thanks for review! I spent some time cleaning it up. The comments should be addressed, let me know if I missed any.
|
5445f00
to
36b3365
Compare
* @param targetPrefix target prefix which will replace it | ||
* @return metrics for the new delete file entry | ||
*/ | ||
public static Metrics replacePathBounds( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this new logic is to rewrite the position delete entry's file_path bounds, which are used in the delete index. now we moved the position delete rewrite to a separate spark job instead of inline with writing the delete entry, we need to fix the bounds for the metadata here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szehon-ho for working on it. LGTM overall. Left questions and minor comments.
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
return metricsWithoutPathBounds(deleteFile); | ||
} | ||
|
||
if (lowerPathBound.equals(upperPathBound)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might miss the context. Do we have to handle the cases that lower path bound doesn't equals to upper path bound? Or are always the same in case of file path in a pos delete file? How does filtering work in that case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea if I understand it correctly, the existing logic doesnt do filtering unless the bounds are equal, see just above this method:
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L81
I believe the reasoning is, its not even worth it (as paths are so random). So for simplicity I just remove these metrics if the two bounds are not the same, what do you think? Or let me know if you prefer I just change both paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another context, I think having the same upper/lower bound is related to file-scoped delete files that seems will be the default soon, ref: https://lists.apache.org/thread/2mpmdp4ns66gp595c9b3clstgskbslsp hence my thought its not worth to copy the metrics for the other case (which doesnt even attempt filtering).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth to have a comment to clarify the behavior. It isn't a blocker to me though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you @szehon-ho for your great work! I left some nitpicks but overall LGTM
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/RewriteTablePathUtil.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java
Outdated
Show resolved
Hide resolved
33d53ce
to
e33d1a5
Compare
Rebased and addressed review comments, thanks a lot @flyrain and @dramaticlly for reviewing this big change. |
} | ||
} | ||
|
||
public static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this extra override class is added because Spark encoder didnt like class (RewriteResult) that has type parameter T, hence the need to make a class with concrete type parameter.
It also then needed some extra methods to be able to cleanly aggregate RewriteResult of different content file types , now with change of #11555 (comment) that makes the data file entry logic also return RewriteResult.
@@ -43,4 +43,15 @@ public static Schema pathPosSchema() { | |||
public static Schema posDeleteSchema(Schema rowSchema) { | |||
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); | |||
} | |||
|
|||
public static Schema posDeleteReadSchema(Schema rowSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow after the rebase this is needed for position delete rewrite (there must be some intervening change related to delete readers). Previously this used the method above pathPosSchema(rowSchema)
for the read schema, which has 'row' as required. This would fail saying 'row' is required but not found in the delete file, as 'row' is usually not set.
Note that Spark and all readers actually don't include the 'row' field in the read schema https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java#L70.
But here, I do want to read the 'row' field and preserve it if it is set by some engine.
So I am taking the strategy of RewritePositionDelete and actually reading this field, but as optional to avoid the assert error if it is not found. https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java#L118. (the reader there is derived from schema of metadata table PositionDeletesTable).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not directly related to this PR, but seems like the column row
should NOT be marked as required
anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @szehon-ho ! LGTM.
return metricsWithoutPathBounds(deleteFile); | ||
} | ||
|
||
if (lowerPathBound.equals(upperPathBound)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's worth to have a comment to clarify the behavior. It isn't a blocker to me though.
@@ -43,4 +43,15 @@ public static Schema pathPosSchema() { | |||
public static Schema posDeleteSchema(Schema rowSchema) { | |||
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema); | |||
} | |||
|
|||
public static Schema posDeleteReadSchema(Schema rowSchema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might not directly related to this PR, but seems like the column row
should NOT be marked as required
anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Szehon for all the changes and put together RewriteResult interface and RewriteContentFileResult!
@flyrain addressed comments, and also added a unit test for testing case with 'row' column set on position delete file and ensuring that the value is carried over if set. |
Thanks @szehon-ho for working on this. Feel free to merge it. |
Thanks a lot @flyrain and @dramaticlly for review, we can continue improving this in follow up prs |
This is the implementation for #10920 (an action to prepare metadata for an Iceberg table for DR copy)
This has been used in production for awhile in our setup, although support for rewrite of V2 position delete is new. I performed the following cleanups while contributing it.