Skip to content

Commit

Permalink
Optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko committed Sep 25, 2023
1 parent 5d58080 commit d2cd82a
Showing 1 changed file with 22 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* An {@link Encoder} for Avro's binary encoding that does not buffer output.
Expand All @@ -30,19 +31,19 @@
* very short-lived.
* <p/>
* To construct, use
* {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
* {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)}
* <p/>
* DirectBinaryEncoder is not thread-safe
* BlockingDirectBinaryEncoder is not thread-safe
*
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
private OutputStream originalStream;
private static final ThreadLocal<BufferOutputStream> BUFFER = ThreadLocal.withInitial(BufferOutputStream::new);

private final ByteArrayOutputStream buffer;
private OutputStream originalStream;

private boolean inBlock = false;

Expand All @@ -56,35 +57,37 @@ public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
*/
public BlockingDirectBinaryEncoder(OutputStream out) {
super(out);
buffer = new ByteArrayOutputStream();
}

private void startBlock() {
if (inBlock) {
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder");
}
originalStream = out;
out = buffer;
BufferOutputStream buf = BUFFER.get();
buf.reset();
out = buf;
inBlock = true;
}

private void endBlock() {
if (!inBlock) {
throw new RuntimeException("Called endBlock, while not buffering a block");
}
BufferOutputStream buf = (BufferOutputStream) out;
out = originalStream;
if (blockItemCount > 0) {
try {
// Make it negative, so the reader knows that the number of bytes is coming
writeLong(-blockItemCount);
writeLong(buffer.size());
writeBytes(buffer.toByteArray());
writeLong(buf.size());
writeFixed(buf.toBufferWithoutCopy());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
inBlock = false;
buffer.reset();
buf.reset();
}

@Override
Expand Down Expand Up @@ -115,4 +118,14 @@ public void writeMapEnd() throws IOException {
// Writes another zero to indicate that this is the last block
super.writeMapEnd();
}

private static class BufferOutputStream extends ByteArrayOutputStream {
BufferOutputStream() {
}

ByteBuffer toBufferWithoutCopy() {
return ByteBuffer.wrap(buf, 0, count);
}

}
}

0 comments on commit d2cd82a

Please sign in to comment.