-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Core: Avro writers use BlockingBinaryEncoder to enable array/map size calculations. #8625
base: main
Are you sure you want to change the base?
Conversation
@Fokko can you take a look at this too? |
aa83edc
to
3523c13
Compare
When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the encoder does not buffer the output to calculate a length. Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated. See: https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder) For details between the different Avro encoders.
3523c13
to
d4fa26d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rustyconover for raising this PR. I'm going to dig into the details tomorrow.
cc @aokolnychyi is probably also interested in this
@Fokko The 1MB was really just a guess. I think that Thinking of my use cases this is how I came up with my guess that 1 MB is a reasonable size. The largest maps I common encounter are the maps from the 8 bytes (field_id) * 256 bytes (value length) * 200 (column count) = 409,600 bytes. I'm happy to hear your thoughts on this, but 1 MB seems like a reasonable first guess, until we make it a table property. Do we want to make it a table property? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rustyconover Got it, the name is a bit confusing.
I did some testing using #8637 and unfortunately, I see some performance regression:
>>> main = [
... 25.859337458,
... 25.627092459,
... 25.630964416,
... 25.936710541,
... 26.189108917
... ]
>>> this_pr = [
... 27.194048667,
... 27.005648417,
... 26.84380025,
... 26.773129833,
... 27.269416167
... ]
>>> sum(main) / 5
25.848642758199997
>>> sum(this_pr) / 5
27.017208666799995
This is the number of seconds per operation. The implementation is different than I expected: https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java
What they do is they write the data and then move all the data to write the size in front of it.
Also, I noticed that it extends from BufferedBinaryEncoder
. I think we don't want to introduce another buffer here because I believe the object store libraries also do buffering. I'm wondering if we can't just change the DirectBinaryEncoder
to write to memory like you mentioned.
@@ -54,7 +54,8 @@ public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException { | |||
dataOut.writeUTF(avroSchema.toString()); | |||
|
|||
// Encode the datum with avro schema. | |||
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be a directBinaryEncoder since we directly write to memory.
Also took a stab at it: apache/avro#2521 Still a bit slower due to buffering and copying: >>> without_buffer = [
... 27.026061125,
... 26.702181417,
... 26.625919333,
... 26.615276875,
... 26.583737958
...]
>>> sum(without_buffer) / 5
26.7106353416 |
Removed another copy and introduced ThreadLocal, and we're close to the original: ➜ iceberg git:(fd-add-benchmark) ✗ python3
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> with_threadlocale = [
... 26.181581084,
... 26.2257085,
... 26.292216292,
... 25.931295708,
... 25.773379833
... ]
>>> sum(with
with with_threadlocale
>>> sum(with_threadlocale) / 5
26.080836283399996 |
@Fokko Those times seem close enough to me. I'm sorry this is a bit complicated to get working. |
I just realized that this would also speed up operations snapshot expiration, because we do need to access the manifest files, but don't need to use the metrics. |
Yes it would! |
I'd love to take a look early next week. |
@rustyconover @Fokko, do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate? Here is what we have:
|
Question. Aren't we using |
Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available? |
Since we use the decoders from Avro itself, we don't need any changes. The relevant code is here: https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java#L398-L424 It will speed up the reading tremendously when we don't need to read in the
This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?
Thanks! I can check in with the Avro community to see if we can do a release. |
@rustyconover @Fokko, I was wondering whether there were any updates. It would be great to have this in. |
@aokolnychyi I think we can start a release somewhere soon, but I need to align this with the Avro community. I also wanted to include nanosecond timestamp in there: apache/avro#2608 |
Hello @aokolnychyi and @Fokko,
I am particularly interested in the explicit inclusion of block sizes in manifest files. Currently, PyIceberg requires deserialization of three maps (high, low, and null counts) even when the query planner may not necessarily need that information. If block sizes are explicitly written, I plan to modify the PyIceberg Avro decoder to adopt a "lazy" approach. It would involve copying arrays or maps as byte buffers, deferring the decoding process until they are accessed by client code. Consequently, if the code never accesses the map, the deserialization code would not execute. This optimization could significantly reduce scan planning time, especially for tables with numerous files across multiple columns. For instance, in a table with 1,250,000 files, copying bytes in memory for later decoding proves much faster than the deserialization of Avro structures. Currently the PyIceberg Avro code isn't performing lazy decoding, it just decodes everything. This is because due to the choices in the Java code about how Avro is serialized directly without buffering so the byte length count be included. I can prepare a PR to do this. Rusty |
My point in the earlier message is that I am not sure this PR would actually have an effect because changes are not going to be used by our write path in Java. Am I missing anything here? |
Hi @aokolnychyi can we change it to be a buffered binary writer that way we would get the length counts written? |
@aokolnychyi This is about the Iceberg metadata, not about the Datafiles itself. It might also be interesting for the Datafiles, but then we should analyze the access patterns first. |
@Fokko, aren't we using |
@aokolnychyi Hmm, I did some quick checks and that seems to be correct. I'm pretty sure that it was using the code because I was seeing exceptions and differences in the benchmarks. Let me dig into this a bit more. I would love to solve this on the Iceberg side. |
Any updates on this? |
Curious if there were any updates as well. |
I could re-test it. It would take me a day or two. |
Yes, this is still top of mind! I'm going to see what's needed and make sure that it will be included in the next Avro release! |
Here we go: apache/avro#2874 |
This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
@Fokko should we keep this alive? |
Would love to have this |
@rustyconover Sorry, I was out on leave. I think the correct way to set it was done here #10973 This will write the offsets for the metadata, if that's what you're looking for. |
When writing Avro files often Iceberg is writing arrays and maps. The current use of
binaryEncoder()
anddirectBinaryEncoder()
oforg.apache.avro.io.EncoderFactory
do not write the length of the arrays or maps to Avro since thebinaryEncoder()
anddirectBinaryEncoder()
does not buffer the output to calculate a length.Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use
blockingBinaryEncoder()
, this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.Having the byte lengths of maps and arrays written will speed up the Python decoding of Avro files significantly for tables that have many columns.
See:
https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)
For details between the different Avro encoders.