Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Implement RewriteTablePath #11555

Merged
merged 8 commits into from
Jan 8, 2025

Conversation

szehon-ho
Copy link
Collaborator

This is the implementation for #10920 (an action to prepare metadata for an Iceberg table for DR copy)

This has been used in production for awhile in our setup, although support for rewrite of V2 position delete is new. I performed the following cleanups while contributing it.

  • Made RewriteTableSparkAction code more functional (avoid using member variable on the action to track state)
  • Moved some RewriteTableSparkAction code to core Utll classes to avoid having to make some classes public as was previously done.

@szehon-ho szehon-ho force-pushed the rewrite_table_path branch 2 times, most recently from 4e18bc3 to be5d2f5 Compare November 16, 2024 09:53
* Path to a comma-separated list of source and target paths for all files added to the table
* between startVersion and endVersion, including original data files and metadata files
* rewritten to staging.
* Result file list location. This file contains a 'copy plan', a comma-separated list of all
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still feels a little ambiguous.

Maybe?

A file containing a listing of both original file names and file names under the new prefix, comma separated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could give an example of the format to make it clear like this:

sourcepath/datafile1.parquet targetpath/datafile1.parquet,
sourcepath/datafile2.parquet targetpath/datafile2.parquet,
...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put the example in the suggestion and reworded.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho for working on it. Left some comments. The major concern is the perf in case of multiple delete files in a manifest file.

* Path to a comma-separated list of source and target paths for all files added to the table
* between startVersion and endVersion, including original data files and metadata files
* rewritten to staging.
* Result file list location. This file contains a 'copy plan', a comma-separated list of all
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could give an example of the format to make it clear like this:

sourcepath/datafile1.parquet targetpath/datafile1.parquet,
sourcepath/datafile2.parquet targetpath/datafile2.parquet,
...

Comment on lines 63 to 64
metadata.statisticsFiles(),
metadata.partitionStatisticsFiles(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to rewrite statistic file path as well, but I'm OK to support it in a follow-up PR.

@szehon-ho szehon-ho force-pushed the rewrite_table_path branch 3 times, most recently from 9bfd8d0 to 6880510 Compare December 7, 2024 01:49
@szehon-ho
Copy link
Collaborator Author

szehon-ho commented Dec 7, 2024

@flyrain thanks for review! I spent some time cleaning it up. The comments should be addressed, let me know if I missed any.

  1. More things are moved to RewriteTablePathsUtil, removing the need to make ManifestLists public.
  2. The rewrite of position deletes is now a Spark job for better performance. It took me a bit of time to realize, I need to rewrite the delete entry's bounds as well in the rewrite manifest job (before because I rewrote the position delete file inline, the bounds were correctly populated, but now I have to fix them because the delete file is rewritten separately).

* @param targetPrefix target prefix which will replace it
* @return metrics for the new delete file entry
*/
public static Metrics replacePathBounds(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this new logic is to rewrite the position delete entry's file_path bounds, which are used in the delete index. now we moved the position delete rewrite to a separate spark job instead of inline with writing the delete entry, we need to fix the bounds for the metadata here.

Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho for working on it. LGTM overall. Left questions and minor comments.

return metricsWithoutPathBounds(deleteFile);
}

if (lowerPathBound.equals(upperPathBound)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might miss the context. Do we have to handle the cases that lower path bound doesn't equals to upper path bound? Or are always the same in case of file path in a pos delete file? How does filtering work in that case?

Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea if I understand it correctly, the existing logic doesnt do filtering unless the bounds are equal, see just above this method:
https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java#L81

I believe the reasoning is, its not even worth it (as paths are so random). So for simplicity I just remove these metrics if the two bounds are not the same, what do you think? Or let me know if you prefer I just change both paths.

Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another context, I think having the same upper/lower bound is related to file-scoped delete files that seems will be the default soon, ref: https://lists.apache.org/thread/2mpmdp4ns66gp595c9b3clstgskbslsp hence my thought its not worth to copy the metrics for the other case (which doesnt even attempt filtering).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth to have a comment to clarify the behavior. It isn't a blocker to me though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you @szehon-ho for your great work! I left some nitpicks but overall LGTM

@szehon-ho
Copy link
Collaborator Author

Rebased and addressed review comments, thanks a lot @flyrain and @dramaticlly for reviewing this big change.

}
}

public static class RewriteContentFileResult extends RewriteResult<ContentFile<?>> {
Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this extra override class is added because Spark encoder didnt like class (RewriteResult) that has type parameter T, hence the need to make a class with concrete type parameter.

It also then needed some extra methods to be able to cleanly aggregate RewriteResult of different content file types , now with change of #11555 (comment) that makes the data file entry logic also return RewriteResult.

@@ -43,4 +43,15 @@ public static Schema pathPosSchema() {
public static Schema posDeleteSchema(Schema rowSchema) {
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema);
}

public static Schema posDeleteReadSchema(Schema rowSchema) {
Copy link
Collaborator Author

@szehon-ho szehon-ho Jan 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somehow after the rebase this is needed for position delete rewrite (there must be some intervening change related to delete readers). Previously this used the method above pathPosSchema(rowSchema) for the read schema, which has 'row' as required. This would fail saying 'row' is required but not found in the delete file, as 'row' is usually not set.

Note that Spark and all readers actually don't include the 'row' field in the read schema https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java#L70.

But here, I do want to read the 'row' field and preserve it if it is set by some engine.
So I am taking the strategy of RewritePositionDelete and actually reading this field, but as optional to avoid the assert error if it is not found. https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/PositionDeletesTable.java#L118. (the reader there is derived from schema of metadata table PositionDeletesTable).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not directly related to this PR, but seems like the column row should NOT be marked as required anywhere.

@szehon-ho szehon-ho closed this Jan 6, 2025
@szehon-ho szehon-ho reopened this Jan 6, 2025
Copy link
Contributor

@flyrain flyrain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @szehon-ho ! LGTM.

return metricsWithoutPathBounds(deleteFile);
}

if (lowerPathBound.equals(upperPathBound)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's worth to have a comment to clarify the behavior. It isn't a blocker to me though.

@@ -43,4 +43,15 @@ public static Schema pathPosSchema() {
public static Schema posDeleteSchema(Schema rowSchema) {
return rowSchema == null ? pathPosSchema() : pathPosSchema(rowSchema);
}

public static Schema posDeleteReadSchema(Schema rowSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not directly related to this PR, but seems like the column row should NOT be marked as required anywhere.

Copy link
Contributor

@dramaticlly dramaticlly left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Szehon for all the changes and put together RewriteResult interface and RewriteContentFileResult!

@szehon-ho
Copy link
Collaborator Author

szehon-ho commented Jan 7, 2025

@flyrain addressed comments, and also added a unit test for testing case with 'row' column set on position delete file and ensuring that the value is carried over if set.

@flyrain
Copy link
Contributor

flyrain commented Jan 7, 2025

Thanks @szehon-ho for working on this. Feel free to merge it.

@szehon-ho szehon-ho merged commit 39a4cfd into apache:main Jan 8, 2025
50 checks passed
@szehon-ho
Copy link
Collaborator Author

Thanks a lot @flyrain and @dramaticlly for review, we can continue improving this in follow up prs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants