From 3523c139fd886066bb47f33b20ef1cc8914dec65 Mon Sep 17 00:00:00 2001 From: Rusty Conover Date: Sat, 23 Sep 2023 15:48:28 -0400 Subject: [PATCH] Core: Avro writers use BlockingBinaryEncoder to enable array/map sizes When writing Avro files often Iceberg is writing arrays and maps. The current use of binaryEncoder() and directBinaryEncoder() of org.apache.avro.io.EncoderFactory do not write the length of the arrays or maps to Avro since the encoder does not buffer the output to calculate a length. Knowing the length of an array or map is useful to clients decoding the Avro file since they can skip decoding the entire array or map if it is not needed when reading the file. This PR changes all Avro writers to use blockingBinaryEncoder(), this encoder does not "block" in the concurrency sense but it does buffer the output of objects such that the lengths of arrays and maps will be calculated. See: https://avro.apache.org/docs/1.5.1/api/java/org/apache/avro/io/EncoderFactory.html#blockingBinaryEncoder(java.io.OutputStream,%20org.apache.avro.io.BinaryEncoder) For details between the different Avro encoders. --- .../src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java | 3 ++- .../main/java/org/apache/iceberg/data/avro/IcebergEncoder.java | 3 ++- .../java/org/apache/iceberg/encryption/KeyMetadataEncoder.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java index ba3c6fece7f9..bb46ae72ea71 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroEncoderUtil.java @@ -54,7 +54,8 @@ public static byte[] encode(T datum, Schema avroSchema) throws IOException { dataOut.writeUTF(avroSchema.toString()); // Encode the datum with avro schema. - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + BinaryEncoder encoder = + EncoderFactory.configureBlockSize(1024 * 1024).blockingBinaryEncoder(out, null); DatumWriter writer = new GenericAvroWriter<>(avroSchema); writer.write(datum, encoder); encoder.flush(); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java index a70e79aac2c5..8f59bd193632 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/IcebergEncoder.java @@ -98,7 +98,8 @@ public ByteBuffer encode(D datum) throws IOException { @Override public void encode(D datum, OutputStream stream) throws IOException { - BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); + BinaryEncoder encoder = + EncoderFactory.configureBlockSize(1024 * 1024).blockingBinaryEncoder(stream, ENCODER.get()); ENCODER.set(encoder); writer.write(datum, encoder); encoder.flush(); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java index faab6a47c814..48135b496510 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataEncoder.java @@ -90,7 +90,8 @@ public ByteBuffer encode(KeyMetadata datum) throws IOException { @Override public void encode(KeyMetadata datum, OutputStream stream) throws IOException { - BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder(stream, ENCODER.get()); + BinaryEncoder encoder = + EncoderFactory.configureBlockSize(1024 * 1024).blockingBinaryEncoder(stream, ENCODER.get()); ENCODER.set(encoder); writer.write(datum, encoder); encoder.flush();