From cc9b6687029f3a97095d7e3edff943f89fedd5da Mon Sep 17 00:00:00 2001
From: Fokko Driesprong
Date: Sat, 21 Oct 2023 22:12:24 +0200
Subject: [PATCH] AVRO-3871: Add blocking direct binary encoder (#2521)
* Java: Add blocking direct binary encoder
* Optimize
* Comments and more tests
* Comments and more tests
* Fix rat check
---
lang/java/avro/pom.xml | 5 +
.../avro/io/BlockingDirectBinaryEncoder.java | 135 +++++
.../apache/avro/io/DirectBinaryEncoder.java | 12 +-
.../org/apache/avro/io/EncoderFactory.java | 43 ++
.../avro/io/TestBinaryEncoderFidelity.java | 42 ++
.../io/TestBlockingDirectBinaryEncoder.java | 99 ++++
.../java/org/apache/avro/io/TestEncoders.java | 54 +-
.../specific/TestRecordWithLogicalTypes.java | 18 +-
.../specific/TestRecordWithMapsAndArrays.java | 535 ++++++++++++++++++
.../TestRecordWithMapsAndArrays.avsc | 23 +
10 files changed, 937 insertions(+), 29 deletions(-)
create mode 100644 lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java
create mode 100644 lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java
create mode 100644 lang/java/avro/src/test/java/org/apache/avro/specific/TestRecordWithMapsAndArrays.java
create mode 100644 lang/java/avro/src/test/resources/TestRecordWithMapsAndArrays.avsc
diff --git a/lang/java/avro/pom.xml b/lang/java/avro/pom.xml
index b96673d1851..672bbbd105d 100644
--- a/lang/java/avro/pom.xml
+++ b/lang/java/avro/pom.xml
@@ -250,5 +250,10 @@
hamcrest-librarytest
+
+ org.mockito
+ mockito-core
+ test
+
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java
new file mode 100644
index 00000000000..b029034d0fb
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * An {@link Encoder} for Avro's binary encoding that does not buffer output.
+ *
+ * This encoder does not buffer writes in contrast to
+ * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when:
+ * The buffering in BufferedBinaryEncoder is not desired because you buffer a
+ * different level or the Encoder is very short-lived.
+ *
+ * The BlockingDirectBinaryEncoder will encode the number of bytes of the Map
+ * and Array blocks. This will allow to postpone the decoding, or skip over it
+ * at all.
+ *
+ * To construct, use
+ * {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)}
+ *
+ * {@link BlockingDirectBinaryEncoder} instances returned by this method are not
+ * thread-safe
+ *
+ * @see BinaryEncoder
+ * @see EncoderFactory
+ * @see Encoder
+ * @see Decoder
+ */
+public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
+ private final BufferOutputStream buffer;
+
+ private OutputStream originalStream;
+
+ private boolean inBlock = false;
+
+ private long blockItemCount;
+
+ /**
+ * Create a writer that sends its output to the underlying stream
+ * out.
+ *
+ * @param out The Outputstream to write to
+ */
+ public BlockingDirectBinaryEncoder(OutputStream out) {
+ super(out);
+ this.buffer = new BufferOutputStream();
+ }
+
+ private void startBlock() {
+ if (inBlock) {
+ throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder");
+ }
+ originalStream = out;
+ buffer.reset();
+ out = buffer;
+ inBlock = true;
+ }
+
+ private void endBlock() {
+ if (!inBlock) {
+ throw new RuntimeException("Called endBlock, while not buffering a block");
+ }
+ out = originalStream;
+ if (blockItemCount > 0) {
+ try {
+ // Make it negative, so the reader knows that the number of bytes is coming
+ writeLong(-blockItemCount);
+ writeLong(buffer.size());
+ writeFixed(buffer.toBufferWithoutCopy());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ inBlock = false;
+ buffer.reset();
+ }
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ blockItemCount = itemCount;
+ }
+
+ @Override
+ public void writeArrayStart() throws IOException {
+ startBlock();
+ }
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ endBlock();
+ // Writes another zero to indicate that this is the last block
+ super.writeArrayEnd();
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ startBlock();
+ }
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ endBlock();
+ // Writes another zero to indicate that this is the last block
+ super.writeMapEnd();
+ }
+
+ private static class BufferOutputStream extends ByteArrayOutputStream {
+ BufferOutputStream() {
+ }
+
+ ByteBuffer toBufferWithoutCopy() {
+ return ByteBuffer.wrap(buf, 0, count);
+ }
+
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
index 62b2a482627..8d8172bc2f5 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
@@ -27,20 +27,20 @@
* This encoder does not buffer writes, and as a result is slower than
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when
* the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is
- * very short lived.
+ * very short-lived.
*
* To construct, use
* {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
*
* DirectBinaryEncoder is not thread-safe
- *
+ *
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class DirectBinaryEncoder extends BinaryEncoder {
- private OutputStream out;
+ protected OutputStream out;
// the buffer is used for writing floats, doubles, and large longs.
private final byte[] buf = new byte[12];
@@ -48,7 +48,7 @@ public class DirectBinaryEncoder extends BinaryEncoder {
* Create a writer that sends its output to the underlying stream
* out.
**/
- DirectBinaryEncoder(OutputStream out) {
+ public DirectBinaryEncoder(OutputStream out) {
configure(out);
}
@@ -69,8 +69,8 @@ public void writeBoolean(boolean b) throws IOException {
}
/*
- * buffering is slower for ints that encode to just 1 or two bytes, and and
- * faster for large ones. (Sun JRE 1.6u22, x64 -server)
+ * buffering is slower for ints that encode to just 1 or two bytes, and faster
+ * for large ones. (Sun JRE 1.6u22, x64 -server)
*/
@Override
public void writeInt(int n) throws IOException {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
index 055ef9541d9..2039f30097a 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
@@ -217,6 +217,49 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse)
}
}
+ /**
+ * Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the
+ * OutputStream provided as the destination for written data. If reuse is
+ * provided, an attempt will be made to reconfigure reuse rather than
+ * construct a new instance, but this is not guaranteed, a new instance may be
+ * returned.
+ *
+ * The {@link BinaryEncoder} implementation returned does not buffer its output,
+ * calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to
+ * be flushed.
+ *
+ * The {@link BlockingDirectBinaryEncoder} will write the block sizes for the
+ * arrays and maps so efficient skipping can be done.
+ *
+ * Performance of unbuffered writes can be significantly slower than buffered
+ * writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns
+ * BinaryEncoder instances that are tuned for performance but may buffer output.
+ * The unbuffered, 'direct' encoder may be desired when buffering semantics are
+ * problematic, or if the lifetime of the encoder is so short that the buffer
+ * would not be useful.
+ *
+ * {@link BinaryEncoder} instances returned by this method are not thread-safe.
+ *
+ * @param out The OutputStream to initialize to. Cannot be null.
+ * @param reuse The BinaryEncoder to attempt to reuse given the factory
+ * configuration. A BinaryEncoder implementation may not be
+ * compatible with reuse, causing a new instance to be returned. If
+ * null, a new instance is returned.
+ * @return A BinaryEncoder that uses out as its data output. If
+ * reuse is null, this will be a new instance. If reuse is
+ * not null, then the returned instance may be a new instance or
+ * reuse reconfigured to use out.
+ * @see DirectBinaryEncoder
+ * @see Encoder
+ */
+ public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
+ if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) {
+ return new BlockingDirectBinaryEncoder(out);
+ } else {
+ return ((DirectBinaryEncoder) reuse).configure(out);
+ }
+ }
+
/**
* Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
* provided as the destination for written data. If reuse is provided, an
diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
index 1d9009aacb1..1f699ea8266 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
@@ -181,6 +181,48 @@ void directBinaryEncoder() throws IOException {
assertArrayEquals(complexdata, result2);
}
+ @Test
+ void blockingDirectBinaryEncoder() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null);
+ generateData(e, true);
+
+ byte[] result = baos.toByteArray();
+ assertEquals(legacydata.length, result.length);
+ assertArrayEquals(legacydata, result);
+ baos.reset();
+
+ generateComplexData(e);
+ byte[] result2 = baos.toByteArray();
+ // blocking will cause different length, should be two bytes larger
+ assertEquals(complexdata.length + 2, result2.length);
+ // the first byte is the array start, with the count of items negative
+ assertEquals(complexdata[0] >>> 1, result2[0]);
+ baos.reset();
+
+ e.writeArrayStart();
+ e.setItemCount(1);
+ e.startItem();
+ e.writeInt(1);
+ e.writeArrayEnd();
+
+ // 1: 1 element in the array
+ // 2: 1 byte for the int
+ // 3: zigzag encoded int
+ // 4: 0 elements in the next block
+ assertArrayEquals(baos.toByteArray(), new byte[] { 1, 2, 2, 0 });
+ baos.reset();
+
+ e.writeArrayStart();
+ e.setItemCount(0);
+ e.writeArrayEnd();
+
+ // This is correct
+ // 0: 0 elements in the block
+ assertArrayEquals(baos.toByteArray(), new byte[] { 0 });
+ baos.reset();
+ }
+
@Test
void blockingBinaryEncoder() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java
new file mode 100644
index 00000000000..27d23916968
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBlockingDirectBinaryEncoder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.io;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaNormalization;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.message.BinaryMessageDecoder;
+import org.apache.avro.specific.TestRecordWithMapsAndArrays;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Map;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.*;
+
+public class TestBlockingDirectBinaryEncoder {
+
+ @Test
+ void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder encoder = EncoderFactory.get().blockingDirectBinaryEncoder(baos, null);
+
+ // This is needed because there is no BlockingDirectBinaryEncoder
+ // BinaryMessageWriter
+ // available out of the box
+ encoder.writeFixed(new byte[] { (byte) 0xC3, (byte) 0x01 });
+ encoder.writeFixed(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", TestRecordWithMapsAndArrays.SCHEMA$));
+
+ int len = 5;
+
+ encoder.writeArrayStart();
+ encoder.setItemCount(len);
+ for (int i = 0; i < len; i++) {
+ encoder.startItem();
+ encoder.writeString(Integer.toString(i));
+ }
+ encoder.writeArrayEnd();
+
+ encoder.writeMapStart();
+ encoder.setItemCount(len);
+ for (long i = 0; i < len; i++) {
+ encoder.startItem();
+ encoder.writeString(Long.toString(i));
+ encoder.writeLong(i);
+ }
+ encoder.writeMapEnd();
+ encoder.flush();
+
+ BinaryMessageDecoder decoder = TestRecordWithMapsAndArrays.getDecoder();
+ TestRecordWithMapsAndArrays r = decoder.decode(baos.toByteArray());
+
+ assertThat(r.getArr(), is(Arrays.asList("0", "1", "2", "3", "4")));
+ Map map = r.getMap();
+ assertThat(map.size(), is(5));
+ for (long i = 0; i < len; i++) {
+ assertThat(map.get(Long.toString(i)), is(i));
+ }
+ }
+
+ @Test
+ void testSkippingUsingBlocks() throws IOException, NoSuchAlgorithmException {
+ // Create an empty schema for read, so we skip over all the fields
+ Schema emptySchema = new Schema.Parser().parse(
+ "{\"type\":\"record\",\"name\":\"TestRecordWithMapsAndArrays\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[]}");
+
+ GenericDatumReader> in = new GenericDatumReader<>(TestRecordWithMapsAndArrays.SCHEMA$, emptySchema);
+ Decoder mockDecoder = mock(BinaryDecoder.class);
+
+ for (long i = 0; i < 1; i++) {
+ in.read(null, mockDecoder);
+ }
+
+ verify(mockDecoder, times(1)).skipMap();
+ verify(mockDecoder, times(1)).skipArray();
+ verify(mockDecoder, times(0)).readString();
+ verify(mockDecoder, times(0)).readLong();
+ }
+}
diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
index 2995bf56709..51ef375e307 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
@@ -54,7 +54,7 @@ public class TestEncoders {
private static final int ENCODER_BUFFER_SIZE = 32;
private static final int EXAMPLE_DATA_SIZE = 17;
- private static EncoderFactory factory = EncoderFactory.get();
+ private static final EncoderFactory FACTORY = EncoderFactory.get();
@TempDir
public Path dataDir;
@@ -62,14 +62,14 @@ public class TestEncoders {
@Test
void binaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
- BinaryEncoder enc = factory.binaryEncoder(out, null);
- assertSame(enc, factory.binaryEncoder(out, enc));
+ BinaryEncoder enc = FACTORY.binaryEncoder(out, null);
+ assertSame(enc, FACTORY.binaryEncoder(out, enc));
}
@Test
void badBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.binaryEncoder(null, null);
+ FACTORY.binaryEncoder(null, null);
});
}
@@ -77,29 +77,43 @@ void badBinaryEncoderInit() {
void blockingBinaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
BinaryEncoder reuse = null;
- reuse = factory.blockingBinaryEncoder(out, reuse);
- assertSame(reuse, factory.blockingBinaryEncoder(out, reuse));
+ reuse = FACTORY.blockingBinaryEncoder(out, reuse);
+ assertSame(reuse, FACTORY.blockingBinaryEncoder(out, reuse));
// comparison
}
@Test
void badBlockintBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.binaryEncoder(null, null);
+ FACTORY.binaryEncoder(null, null);
});
}
@Test
void directBinaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
- BinaryEncoder enc = factory.directBinaryEncoder(out, null);
- assertSame(enc, factory.directBinaryEncoder(out, enc));
+ BinaryEncoder enc = FACTORY.directBinaryEncoder(out, null);
+ assertSame(enc, FACTORY.directBinaryEncoder(out, enc));
}
@Test
void badDirectBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.directBinaryEncoder(null, null);
+ FACTORY.directBinaryEncoder(null, null);
+ });
+ }
+
+ @Test
+ void blockingDirectBinaryEncoderInit() throws IOException {
+ OutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder enc = FACTORY.blockingDirectBinaryEncoder(out, null);
+ assertSame(enc, FACTORY.blockingDirectBinaryEncoder(out, enc));
+ }
+
+ @Test
+ void badBlockingDirectBinaryEncoderInit() {
+ assertThrows(NullPointerException.class, () -> {
+ FACTORY.blockingDirectBinaryEncoder(null, null);
});
}
@@ -107,22 +121,22 @@ void badDirectBinaryEncoderInit() {
void jsonEncoderInit() throws IOException {
Schema s = new Schema.Parser().parse("\"int\"");
OutputStream out = new ByteArrayOutputStream();
- factory.jsonEncoder(s, out);
- JsonEncoder enc = factory.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8));
+ FACTORY.jsonEncoder(s, out);
+ JsonEncoder enc = FACTORY.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8));
enc.configure(out);
}
@Test
void badJsonEncoderInitOS() throws IOException {
assertThrows(NullPointerException.class, () -> {
- factory.jsonEncoder(Schema.create(Type.INT), (OutputStream) null);
+ FACTORY.jsonEncoder(Schema.create(Type.INT), (OutputStream) null);
});
}
@Test
void badJsonEncoderInit() throws IOException {
assertThrows(NullPointerException.class, () -> {
- factory.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null);
+ FACTORY.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null);
});
}
@@ -130,7 +144,7 @@ void badJsonEncoderInit() throws IOException {
void jsonEncoderNewlineDelimited() throws IOException {
OutputStream out = new ByteArrayOutputStream();
Schema ints = Schema.create(Type.INT);
- Encoder e = factory.jsonEncoder(ints, out);
+ Encoder e = FACTORY.jsonEncoder(ints, out);
String separator = System.getProperty("line.separator");
GenericDatumWriter writer = new GenericDatumWriter<>(ints);
writer.write(1, e);
@@ -169,8 +183,8 @@ void jsonEncoderWhenIncludeNamespaceOptionIsTrue() throws IOException {
void validatingEncoderInit() throws IOException {
Schema s = new Schema.Parser().parse("\"int\"");
OutputStream out = new ByteArrayOutputStream();
- Encoder e = factory.directBinaryEncoder(out, null);
- factory.validatingEncoder(s, e).configure(e);
+ Encoder e = FACTORY.directBinaryEncoder(out, null);
+ FACTORY.validatingEncoder(s, e).configure(e);
}
@Test
@@ -324,7 +338,7 @@ private String fromAvroToJson(byte[] avroBytes, Schema schema, boolean includeNa
DatumWriter