Skip to content

Commit

Permalink
Core: Avro writers use BlockingBinaryEncoder to enable array/map sizes
Browse files Browse the repository at this point in the history
When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the encoder does not buffer the output to calculate a length.

Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file.  This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated.

See:

https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder)

For details between the different Avro encoders.
  • Loading branch information
rustyconover committed Sep 24, 2023
1 parent 4f22dd8 commit d4fa26d
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static <T> byte[] encode(T datum, Schema avroSchema) throws IOException {
dataOut.writeUTF(avroSchema.toString());

// Encode the datum with avro schema.
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
BinaryEncoder encoder =
new EncoderFactory().configureBlockSize(1024 * 1024).blockingBinaryEncoder(out, null);
DatumWriter<T> writer = new GenericAvroWriter<>(avroSchema);
writer.write(datum, encoder);
encoder.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ public ByteBuffer encode(D datum) throws IOException {

@Override
public void encode(D datum, OutputStream stream) throws IOException {
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get());
BinaryEncoder encoder =
new EncoderFactory()
.configureBlockSize(1024 * 1024)
.blockingBinaryEncoder(stream, ENCODER.get());
ENCODER.set(encoder);
writer.write(datum, encoder);
encoder.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public ByteBuffer encode(KeyMetadata datum) throws IOException {

@Override
public void encode(KeyMetadata datum, OutputStream stream) throws IOException {
BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get());
BinaryEncoder encoder =
new EncoderFactory()
.configureBlockSize(1024 * 1024)
.blockingBinaryEncoder(stream, ENCODER.get());
ENCODER.set(encoder);
writer.write(datum, encoder);
encoder.flush();
Expand Down

0 comments on commit d4fa26d

Please sign in to comment.