-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AVRO-3871: Add blocking direct binary encoder #2521
Conversation
6cf27d9
to
8e4bf2d
Compare
8e4bf2d
to
5d58080
Compare
d2cd82a
to
cb392cc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions, proposal and change request :)
* @see Decoder | ||
*/ | ||
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { | ||
private static final ThreadLocal<BufferOutputStream> BUFFER = ThreadLocal.withInitial(BufferOutputStream::new); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why here a "static ThreadLocal" and not a simple field member.
BinaryEncoder are not robust to multi-thread, but you can have 2 encoder on one thread, that are writing data alternatively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh that's an excellent question. The idea was to re-use the buffer since it can grow quite a bit (in the case of the Apache Iceberg metadata), but a local variable is indeed a better plan to avoid race conditions.
|
||
private void startBlock() { | ||
if (inBlock) { | ||
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just put BUFFER as a stack of outputStream, and it would become possible; but not mandatory :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was thinking of that as well, but this would potentially allocate quite some buffers. For Apache Iceberg we don't use any nested structures, so I went with the simplest approach, but if you think this should be in, I'm happy to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's go step by step; this is good for a first version, moreover if it's aims to be used for specific software like iceberg :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I do like about this, is the exception thrown: it's in the right place to start looking if you want to lift this limitation.
|
||
e.writeArrayStart(); | ||
e.setItemCount(0); | ||
e.writeArrayEnd(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you test this 2 last byte array with a binary decoder to ensure it works with this new encoder. (if it can't, create a specific decoder class)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added this test-case in TestBlockingDirectBinaryEncoder
// This is correct | ||
// 0: 0 elements in the block | ||
assertArrayEquals(baos.toByteArray(), new byte[] { 0 }); | ||
baos.reset(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a test where an array (or map) is skiped; if i understood well, by calling setItemCount(0) before end array ? (or i missed the purpose of this)
e.writeArrayStart();
e.setItemCount(1);
e.startItem();
e.writeInt(1);
e.setItemCount(0); // here, to skip the array ??
e.writeArrayEnd();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used at read time. It can be used for skipping over the Array/Maps when they are not part of your read schema. Instead of reading all the fields, you can just skip over the whole Array/Map at once since the length is encoded in the Avro file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me check if I can add a test for it 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a test for it. LMKWYT
} else | ||
a0.clear(); | ||
SpecificData.Array<java.lang.String> ga0 = (a0 instanceof SpecificData.Array | ||
? (SpecificData.Array<java.lang.String>) a0 |
Check warning
Code scanning / CodeQL
Cast from abstract to concrete collection Warning test
List
Array
} else | ||
a0.clear(); | ||
SpecificData.Array<java.lang.String> ga0 = (a0 instanceof SpecificData.Array | ||
? (SpecificData.Array<java.lang.String>) a0 |
Check warning
Code scanning / CodeQL
Cast from abstract to concrete collection Warning test
List
Array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just i didn't check why CI is failing; fix it before merging
|
||
private void startBlock() { | ||
if (inBlock) { | ||
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's go step by step; this is good for a first version, moreover if it's aims to be used for specific software like iceberg :)
…ache/avro into fd-add-blocking-direct-binary-encoder
@opwvhk : Could you have a look at this PR please ?; to have a second eyes on it before i merge it ? |
* Java: Add blocking direct binary encoder * Optimize * Comments and more tests * Comments and more tests * Fix rat check
* Java: Add blocking direct binary encoder * Optimize * Comments and more tests * Comments and more tests * Fix rat check
* Java: Add blocking direct binary encoder * Optimize * Comments and more tests * Comments and more tests * Fix rat check
* AVRO-3983: Allow setting a custom encoder in DataFileWriter (#2874) * AVRO-3871: Add blocking direct binary encoder (#2521) * Java: Add blocking direct binary encoder * Optimize * Comments and more tests * Comments and more tests * Fix rat check * AVRO-3871: Support nested lists/maps in BlockingDirectBinaryEncoder (#2732) * Support nested lists/maps * Add some tests
What is the purpose of the change
Adds an encoder to buffer the arrays/maps and adds the bytes to allow the reader to skip the whole map/array.
Verifying this change
This change added tests and can be verified as follows:
Documentation