Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Avro writers use BlockingBinaryEncoder to enable array/map size calculations. #8625

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

rustyconover
Copy link
Contributor

When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the binaryEncoder() and directBinaryEncoder() does not buffer the output to calculate a length.

Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.

Having the byte lengths of maps and arrays written will speed up the Python decoding of Avro files significantly for tables that have many columns.

See:

https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)

For details between the different Avro encoders.

@github-actions github-actions bot added the core label Sep 23, 2023
@rustyconover
Copy link
Contributor Author

@Fokko can you take a look at this too?

@rustyconover rustyconover force-pushed the fix_avro_buffering_writes branch 5 times, most recently from aa83edc to 3523c13 Compare September 24, 2023 14:29
When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the encoder does not buffer the output to calculate a length.

Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file.  This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.

See:

https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)

For details between the different Avro encoders.
@rustyconover rustyconover force-pushed the fix_avro_buffering_writes branch from 3523c13 to d4fa26d Compare September 24, 2023 14:34
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rustyconover for raising this PR. I'm going to dig into the details tomorrow.

cc @aokolnychyi is probably also interested in this

@rustyconover
Copy link
Contributor Author

@Fokko The 1MB was really just a guess.

I think that configureBlockSize() represents the largest block of the map/array that will be buffered in memory before being written, of course an array or map can consist of multiple blocks.

Thinking of my use cases this is how I came up with my guess that 1 MB is a reasonable size.

The largest maps I common encounter are the maps from the field_id to the highest or lowest value for a column in a particular file. The highest or lowest value is byte array which can be variable length, lets commonly lets bound those values at 256 bytes. The field_id also won't be greater than 8 bytes in length (commonly it will be shorter due to zigzag encoding). So for a table of 200 columns lets try:

8 bytes (field_id) * 256 bytes (value length) * 200 (column count) = 409,600 bytes.

I'm happy to hear your thoughts on this, but 1 MB seems like a reasonable first guess, until we make it a table property. Do we want to make it a table property?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rustyconover Got it, the name is a bit confusing.

I did some testing using #8637 and unfortunately, I see some performance regression:

>>> main = [
...     25.859337458,
...     25.627092459,
...     25.630964416,
...     25.936710541,
...     26.189108917
... ]
>>> this_pr = [
...     27.194048667,
...     27.005648417,
...     26.84380025,
...     26.773129833,
...     27.269416167
... ]
>>> sum(main) / 5
25.848642758199997
>>> sum(this_pr) / 5
27.017208666799995

This is the number of seconds per operation. The implementation is different than I expected: https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/io/BlockingBinaryEncoder.java

What they do is they write the data and then move all the data to write the size in front of it.

Also, I noticed that it extends from BufferedBinaryEncoder. I think we don't want to introduce another buffer here because I believe the object store libraries also do buffering. I'm wondering if we can't just change the DirectBinaryEncoder to write to memory like you mentioned.

@@ -54,7 +54,8 @@ public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
dataOut.writeUTF(avroSchema.toString());

// Encode the datum with avro schema.
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be a directBinaryEncoder since we directly write to memory.

@Fokko
Copy link
Contributor

Fokko commented Sep 25, 2023

Also took a stab at it: apache/avro#2521

Still a bit slower due to buffering and copying:

>>> without_buffer = [
...        27.026061125,
...        26.702181417,
...        26.625919333,
...        26.615276875,
...        26.583737958
...]
>>> sum(without_buffer) / 5
26.7106353416

@Fokko
Copy link
Contributor

Fokko commented Sep 25, 2023

Removed another copy and introduced ThreadLocal, and we're close to the original:

iceberg git:(fd-add-benchmark) ✗ python3
Python 3.11.5 (main, Aug 24 2023, 15:09:45) [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> with_threadlocale = [
...     26.181581084,
...     26.2257085,
...     26.292216292,
...     25.931295708,
...     25.773379833
... ]
>>> sum(with
with              with_threadlocale
>>> sum(with_threadlocale) / 5
26.080836283399996

@rustyconover
Copy link
Contributor Author

@Fokko Those times seem close enough to me.

I'm sorry this is a bit complicated to get working.

@Fokko
Copy link
Contributor

Fokko commented Oct 17, 2023

I just realized that this would also speed up operations snapshot expiration, because we do need to access the manifest files, but don't need to use the metrics.

@rustyconover
Copy link
Contributor Author

Yes it would!

@aokolnychyi
Copy link
Contributor

I'd love to take a look early next week.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Nov 15, 2023

@rustyconover @Fokko, do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?

Here is what we have:

ManifestReadBenchmark (may need to be extended?)
PlanningBenchmark (Spark specific, can ignore distributed cases)

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Nov 15, 2023

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

https://github.com/apache/avro/blob/85ddfcdaba60d1cb18a16c9908d737bb32369df7/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java#L245

@aokolnychyi
Copy link
Contributor

Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?

@Fokko
Copy link
Contributor

Fokko commented Nov 15, 2023

do we need any changes in readers to benefit from this? If not, can we run some existing benchmarks to showcase the read improvement is as we anticipate?

Since we use the decoders from Avro itself, we don't need any changes. The relevant code is here: https://github.com/apache/avro/blob/main/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java#L398-L424

It will speed up the reading tremendously when we don't need to read in the map[int, bytes] that we use to store statistics. This way you can jump right over them without having to skip each key-value individually.

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?

Also, nice work on a new encoder in Avro, @Fokko! Do you know when will that be available?

Thanks! I can check in with the Avro community to see if we can do a release.

@aokolnychyi
Copy link
Contributor

@rustyconover @Fokko, I was wondering whether there were any updates. It would be great to have this in.

@Fokko
Copy link
Contributor

Fokko commented Dec 4, 2023

@aokolnychyi I think we can start a release somewhere soon, but I need to align this with the Avro community. I also wanted to include nanosecond timestamp in there: apache/avro#2608

@rustyconover
Copy link
Contributor Author

Hello @aokolnychyi and @Fokko,

Question. Aren't we using DataFileWriter from Avro in our AvroFileAppender? If so, how is this PR affecting it? Won't we still use direct encoders there?

This is a good question. The goal of this PR is to write the block sizes for the manifests. @rustyconover any thoughts on this?

I am particularly interested in the explicit inclusion of block sizes in manifest files. Currently, PyIceberg requires deserialization of three maps (high, low, and null counts) even when the query planner may not necessarily need that information. If block sizes are explicitly written, I plan to modify the PyIceberg Avro decoder to adopt a "lazy" approach. It would involve copying arrays or maps as byte buffers, deferring the decoding process until they are accessed by client code. Consequently, if the code never accesses the map, the deserialization code would not execute.

This optimization could significantly reduce scan planning time, especially for tables with numerous files across multiple columns. For instance, in a table with 1,250,000 files, copying bytes in memory for later decoding proves much faster than the deserialization of Avro structures.

Currently the PyIceberg Avro code isn't performing lazy decoding, it just decodes everything. This is because due to the choices in the Java code about how Avro is serialized directly without buffering so the byte length count be included. I can prepare a PR to do this.

Rusty

@aokolnychyi
Copy link
Contributor

My point in the earlier message is that I am not sure this PR would actually have an effect because changes are not going to be used by our write path in Java. Am I missing anything here?

@rustyconover
Copy link
Contributor Author

Hi @aokolnychyi can we change it to be a buffered binary writer that way we would get the length counts written?

@Fokko
Copy link
Contributor

Fokko commented Feb 12, 2024

@aokolnychyi This is about the Iceberg metadata, not about the Datafiles itself. It might also be interesting for the Datafiles, but then we should analyze the access patterns first.

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Feb 22, 2024

@Fokko, aren't we using DataFileWriter from Avro for Iceberg metadata? Yeah, I fully support the idea, it is just my preliminary analysis showed it would have no effect on currently produced metadata by Java (as the PR stands today).

@Fokko
Copy link
Contributor

Fokko commented Feb 22, 2024

@aokolnychyi Hmm, I did some quick checks and that seems to be correct. I'm pretty sure that it was using the code because I was seeing exceptions and differences in the benchmarks. Let me dig into this a bit more. I would love to solve this on the Iceberg side.

@rustyconover
Copy link
Contributor Author

Any updates on this?

@aokolnychyi
Copy link
Contributor

Curious if there were any updates as well.

@rustyconover
Copy link
Contributor Author

I could re-test it. It would take me a day or two.

@Fokko
Copy link
Contributor

Fokko commented Apr 25, 2024

Yes, this is still top of mind! I'm going to see what's needed and make sure that it will be included in the next Avro release!

@Fokko
Copy link
Contributor

Fokko commented Apr 26, 2024

Here we go: apache/avro#2874

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Sep 20, 2024
@rustyconover
Copy link
Contributor Author

@Fokko should we keep this alive?

@github-actions github-actions bot removed the stale label Sep 22, 2024
@namrathamyske
Copy link
Contributor

Would love to have this

@Fokko
Copy link
Contributor

Fokko commented Oct 30, 2024

@rustyconover Sorry, I was out on leave. I think the correct way to set it was done here #10973 This will write the offsets for the metadata, if that's what you're looking for.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants