-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark 3.5: Parallelize reading files in add_files procedure #9274
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @manuzhang I'm good with having an option to parallelizee file listing but I have concerns on some public API compatibility breakages (these util APIs have existed for a while, and people are already using them, so changing this makes it harder for folks to upgrade).
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
Outdated
Show resolved
Hide resolved
c86c7ba
to
42def01
Compare
42def01
to
5d8719d
Compare
@amogh-jahagirdar @singhpk234 please check again. I've restored public util APIs |
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/procedures/ProcedureInput.java
Show resolved
Hide resolved
...park-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
Outdated
Show resolved
Hide resolved
5d8719d
to
19299a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for not catching this earlier @manuzhang I took a look at the procedure, I don't think this really is parallelizing the listing. Really what we're doing is parallelizing the file reads. The listing happens here: https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java#L126 as you can see there really is no parallelism (other than what the file system does internally).
The parallelism comes into play in the Task
which is reading the files after the listing. https://github.com/apache/iceberg/blob/main/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java#L146
I think we should rename the parameter, and any other references (comments, method parameters) to just parallelism
.
The docs can just say parallelism controls the parallelism when reading files.
Let me know what you think, or if I missed something.
data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java
Outdated
Show resolved
Hide resolved
19299a3
to
b79634e
Compare
c6045f2
to
b8603a9
Compare
@amogh-jahagirdar since there are other usages of "parallelism", I updated all references to be more specific "readFilesParallelism". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@manuzhang where are the other references to parallelism? Are you referring to the existing one in SparkTableUtil
? https://github.com/apache/iceberg/blob/main/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java#L545 is the actual listing parallelism and I'd just rename that variable in the method if that's what you're referring to
readfilesParallelism
seems more verbose than needed
0c38098
to
6c7a80d
Compare
@amogh-jahagirdar updated. Thanks for review. Merry Christmas 🎄 |
6c7a80d
to
7efd874
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working through the iterations @manuzhang , I think it looks really good now. Merry Christmas to you too!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed one of the importSparkTable
methods still breaks API compatibility. We should address that before merging. Some level of duplication is OK but we should avoid breaking APIs (this means adding new parameters, removing parameters, changing type signatures etc.). In this case I think we should be able to define another importSparkTable which uses parallelism. The old importSparkTable
API can call the new API with parallelism 1.
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
Show resolved
Hide resolved
7efd874
to
a76bba1
Compare
a76bba1
to
06062ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the quick turnaround @manuzhang , looks good now. Merging. Thanks @singhpk234 for reviews!
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
Show resolved
Hide resolved
Hi,May I ask if there are any plans to port to Spark 3.3? |
I will submit a PR to back-port to Spark 3.3 and 3.4 soon. |
…he#11043) Back-port of apache#9274 Back-port of apache#10037
Currently, only one thread is used to list files when importing a Spark table in
add_files
procedure. It can be very slow for a table or a partition with many files. This PR adds an argumentlisting_parallelism
toadd_files
procedure such that multiple threads can be used to list files.