diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java index aaea657c3e6..fa6e9ddf289 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java @@ -20,6 +20,7 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; /** * An {@link Encoder} for Avro's binary encoding that does not buffer output. @@ -30,9 +31,9 @@ * very short-lived. *

* To construct, use - * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)} + * {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)} *

- * DirectBinaryEncoder is not thread-safe + * BlockingDirectBinaryEncoder is not thread-safe * * @see BinaryEncoder * @see EncoderFactory @@ -40,9 +41,9 @@ * @see Decoder */ public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { - private OutputStream originalStream; + private static final ThreadLocal BUFFER = ThreadLocal.withInitial(BufferOutputStream::new); - private final ByteArrayOutputStream buffer; + private OutputStream originalStream; private boolean inBlock = false; @@ -56,7 +57,6 @@ public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { */ public BlockingDirectBinaryEncoder(OutputStream out) { super(out); - buffer = new ByteArrayOutputStream(); } private void startBlock() { @@ -64,7 +64,9 @@ private void startBlock() { throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); } originalStream = out; - out = buffer; + BufferOutputStream buf = BUFFER.get(); + buf.reset(); + out = buf; inBlock = true; } @@ -72,19 +74,20 @@ private void endBlock() { if (!inBlock) { throw new RuntimeException("Called endBlock, while not buffering a block"); } + BufferOutputStream buf = (BufferOutputStream) out; out = originalStream; if (blockItemCount > 0) { try { // Make it negative, so the reader knows that the number of bytes is coming writeLong(-blockItemCount); - writeLong(buffer.size()); - writeBytes(buffer.toByteArray()); + writeLong(buf.size()); + writeFixed(buf.toBufferWithoutCopy()); } catch (IOException e) { throw new RuntimeException(e); } } inBlock = false; - buffer.reset(); + buf.reset(); } @Override @@ -115,4 +118,14 @@ public void writeMapEnd() throws IOException { // Writes another zero to indicate that this is the last block super.writeMapEnd(); } + + private static class BufferOutputStream extends ByteArrayOutputStream { + BufferOutputStream() { + } + + ByteBuffer toBufferWithoutCopy() { + return ByteBuffer.wrap(buf, 0, count); + } + + } }