diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java index 22d0326165c..aacb83b88f4 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java @@ -48,7 +48,7 @@ public void writeString(Utf8 utf8) throws IOException { @Override public void writeString(String string) throws IOException { - if (0 == string.length()) { + if (string.isEmpty()) { writeZero(); return; } diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java new file mode 100644 index 00000000000..aaea657c3e6 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/io/BlockingDirectBinaryEncoder.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro.io; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +/** + * An {@link Encoder} for Avro's binary encoding that does not buffer output. + *
+ * This encoder does not buffer writes, and as a result is slower than + * {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when + * the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is + * very short-lived. + * + * To construct, use + * {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)} + * + * DirectBinaryEncoder is not thread-safe + * + * @see BinaryEncoder + * @see EncoderFactory + * @see Encoder + * @see Decoder + */ +public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { + private OutputStream originalStream; + + private final ByteArrayOutputStream buffer; + + private boolean inBlock = false; + + private long blockItemCount; + + /** + * Create a writer that sends its output to the underlying stream + *out
.
+ *
+ * @param out The Outputstream to write to
+ */
+ public BlockingDirectBinaryEncoder(OutputStream out) {
+ super(out);
+ buffer = new ByteArrayOutputStream();
+ }
+
+ private void startBlock() {
+ if (inBlock) {
+ throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder");
+ }
+ originalStream = out;
+ out = buffer;
+ inBlock = true;
+ }
+
+ private void endBlock() {
+ if (!inBlock) {
+ throw new RuntimeException("Called endBlock, while not buffering a block");
+ }
+ out = originalStream;
+ if (blockItemCount > 0) {
+ try {
+ // Make it negative, so the reader knows that the number of bytes is coming
+ writeLong(-blockItemCount);
+ writeLong(buffer.size());
+ writeBytes(buffer.toByteArray());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ inBlock = false;
+ buffer.reset();
+ }
+
+ @Override
+ public void setItemCount(long itemCount) throws IOException {
+ blockItemCount = itemCount;
+ }
+
+ @Override
+ public void writeArrayStart() throws IOException {
+ startBlock();
+ }
+
+ @Override
+ public void writeArrayEnd() throws IOException {
+ endBlock();
+ // Writes another zero to indicate that this is the last block
+ super.writeArrayEnd();
+ }
+
+ @Override
+ public void writeMapStart() throws IOException {
+ startBlock();
+ }
+
+ @Override
+ public void writeMapEnd() throws IOException {
+ endBlock();
+ // Writes another zero to indicate that this is the last block
+ super.writeMapEnd();
+ }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
index 62b2a482627..8d8172bc2f5 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryEncoder.java
@@ -27,20 +27,20 @@
* This encoder does not buffer writes, and as a result is slower than
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when
* the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is
- * very short lived.
+ * very short-lived.
*
* To construct, use
* {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
*
* DirectBinaryEncoder is not thread-safe
- *
+ *
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class DirectBinaryEncoder extends BinaryEncoder {
- private OutputStream out;
+ protected OutputStream out;
// the buffer is used for writing floats, doubles, and large longs.
private final byte[] buf = new byte[12];
@@ -48,7 +48,7 @@ public class DirectBinaryEncoder extends BinaryEncoder {
* Create a writer that sends its output to the underlying stream
* out
.
**/
- DirectBinaryEncoder(OutputStream out) {
+ public DirectBinaryEncoder(OutputStream out) {
configure(out);
}
@@ -69,8 +69,8 @@ public void writeBoolean(boolean b) throws IOException {
}
/*
- * buffering is slower for ints that encode to just 1 or two bytes, and and
- * faster for large ones. (Sun JRE 1.6u22, x64 -server)
+ * buffering is slower for ints that encode to just 1 or two bytes, and faster
+ * for large ones. (Sun JRE 1.6u22, x64 -server)
*/
@Override
public void writeInt(int n) throws IOException {
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
index 055ef9541d9..2039f30097a 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/EncoderFactory.java
@@ -217,6 +217,49 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse)
}
}
+ /**
+ * Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the
+ * OutputStream provided as the destination for written data. If reuse is
+ * provided, an attempt will be made to reconfigure reuse rather than
+ * construct a new instance, but this is not guaranteed, a new instance may be
+ * returned.
+ *
+ * The {@link BinaryEncoder} implementation returned does not buffer its output,
+ * calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to
+ * be flushed.
+ *
+ * The {@link BlockingDirectBinaryEncoder} will write the block sizes for the
+ * arrays and maps so efficient skipping can be done.
+ *
+ * Performance of unbuffered writes can be significantly slower than buffered
+ * writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns
+ * BinaryEncoder instances that are tuned for performance but may buffer output.
+ * The unbuffered, 'direct' encoder may be desired when buffering semantics are
+ * problematic, or if the lifetime of the encoder is so short that the buffer
+ * would not be useful.
+ *
+ * {@link BinaryEncoder} instances returned by this method are not thread-safe.
+ *
+ * @param out The OutputStream to initialize to. Cannot be null.
+ * @param reuse The BinaryEncoder to attempt to reuse given the factory
+ * configuration. A BinaryEncoder implementation may not be
+ * compatible with reuse, causing a new instance to be returned. If
+ * null, a new instance is returned.
+ * @return A BinaryEncoder that uses out as its data output. If
+ * reuse is null, this will be a new instance. If reuse is
+ * not null, then the returned instance may be a new instance or
+ * reuse reconfigured to use out.
+ * @see DirectBinaryEncoder
+ * @see Encoder
+ */
+ public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
+ if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) {
+ return new BlockingDirectBinaryEncoder(out);
+ } else {
+ return ((DirectBinaryEncoder) reuse).configure(out);
+ }
+ }
+
/**
* Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
* provided as the destination for written data. If reuse is provided, an
diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
index 1d9009aacb1..939cee63c9e 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryEncoderFidelity.java
@@ -181,6 +181,50 @@ void directBinaryEncoder() throws IOException {
assertArrayEquals(complexdata, result2);
}
+ @Test
+ void blockingDirectBinaryEncoder() throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null);
+ generateData(e, true);
+
+ byte[] result = baos.toByteArray();
+ assertEquals(legacydata.length, result.length);
+ assertArrayEquals(legacydata, result);
+ baos.reset();
+
+ generateComplexData(e);
+ byte[] result2 = baos.toByteArray();
+ // blocking will cause different length, should be four bytes larger
+ assertEquals(complexdata.length + 4, result2.length);
+ // the first byte is the array start, with the count of items negative
+ assertEquals(complexdata[0] >>> 1, result2[0]);
+ baos.reset();
+
+ e.writeArrayStart();
+ e.setItemCount(1);
+ e.startItem();
+ e.writeInt(1);
+ e.writeArrayEnd();
+
+ // 1: 1 element in the array
+ // 2: 2 bytes for the int
+ // 2-3: data
+ // 4: 0 elements in the next block
+ assertArrayEquals(baos.toByteArray(), new byte[] { 1, 2, 2, 2, 0 });
+ baos.reset();
+
+ e.writeArrayStart();
+ e.setItemCount(0);
+ e.writeArrayEnd();
+
+ // This is correct
+ // 0: 0 elements in the block
+ assertArrayEquals(baos.toByteArray(), new byte[] { 0 });
+ baos.reset();
+
+ baos.reset();
+ }
+
@Test
void blockingBinaryEncoder() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
diff --git a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
index dbed64d6a18..14930ebce7b 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestEncoders.java
@@ -54,7 +54,7 @@ public class TestEncoders {
private static final int ENCODER_BUFFER_SIZE = 32;
private static final int EXAMPLE_DATA_SIZE = 17;
- private static EncoderFactory factory = EncoderFactory.get();
+ private static final EncoderFactory FACTORY = EncoderFactory.get();
@TempDir
public File DIR;
@@ -62,14 +62,14 @@ public class TestEncoders {
@Test
void binaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
- BinaryEncoder enc = factory.binaryEncoder(out, null);
- assertSame(enc, factory.binaryEncoder(out, enc));
+ BinaryEncoder enc = FACTORY.binaryEncoder(out, null);
+ assertSame(enc, FACTORY.binaryEncoder(out, enc));
}
@Test
void badBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.binaryEncoder(null, null);
+ FACTORY.binaryEncoder(null, null);
});
}
@@ -77,29 +77,43 @@ void badBinaryEncoderInit() {
void blockingBinaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
BinaryEncoder reuse = null;
- reuse = factory.blockingBinaryEncoder(out, reuse);
- assertSame(reuse, factory.blockingBinaryEncoder(out, reuse));
+ reuse = FACTORY.blockingBinaryEncoder(out, reuse);
+ assertSame(reuse, FACTORY.blockingBinaryEncoder(out, reuse));
// comparison
}
@Test
void badBlockintBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.binaryEncoder(null, null);
+ FACTORY.binaryEncoder(null, null);
});
}
@Test
void directBinaryEncoderInit() throws IOException {
OutputStream out = new ByteArrayOutputStream();
- BinaryEncoder enc = factory.directBinaryEncoder(out, null);
- assertSame(enc, factory.directBinaryEncoder(out, enc));
+ BinaryEncoder enc = FACTORY.directBinaryEncoder(out, null);
+ assertSame(enc, FACTORY.directBinaryEncoder(out, enc));
}
@Test
void badDirectBinaryEncoderInit() {
assertThrows(NullPointerException.class, () -> {
- factory.directBinaryEncoder(null, null);
+ FACTORY.directBinaryEncoder(null, null);
+ });
+ }
+
+ @Test
+ void blockingDirectBinaryEncoderInit() throws IOException {
+ OutputStream out = new ByteArrayOutputStream();
+ BinaryEncoder enc = FACTORY.blockingDirectBinaryEncoder(out, null);
+ assertSame(enc, FACTORY.blockingDirectBinaryEncoder(out, enc));
+ }
+
+ @Test
+ void badBlockingDirectBinaryEncoderInit() {
+ assertThrows(NullPointerException.class, () -> {
+ FACTORY.blockingDirectBinaryEncoder(null, null);
});
}
@@ -107,22 +121,22 @@ void badDirectBinaryEncoderInit() {
void jsonEncoderInit() throws IOException {
Schema s = new Schema.Parser().parse("\"int\"");
OutputStream out = new ByteArrayOutputStream();
- factory.jsonEncoder(s, out);
- JsonEncoder enc = factory.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8));
+ FACTORY.jsonEncoder(s, out);
+ JsonEncoder enc = FACTORY.jsonEncoder(s, new JsonFactory().createGenerator(out, JsonEncoding.UTF8));
enc.configure(out);
}
@Test
void badJsonEncoderInitOS() throws IOException {
assertThrows(NullPointerException.class, () -> {
- factory.jsonEncoder(Schema.create(Type.INT), (OutputStream) null);
+ FACTORY.jsonEncoder(Schema.create(Type.INT), (OutputStream) null);
});
}
@Test
void badJsonEncoderInit() throws IOException {
assertThrows(NullPointerException.class, () -> {
- factory.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null);
+ FACTORY.jsonEncoder(Schema.create(Type.INT), (JsonGenerator) null);
});
}
@@ -130,7 +144,7 @@ void badJsonEncoderInit() throws IOException {
void jsonEncoderNewlineDelimited() throws IOException {
OutputStream out = new ByteArrayOutputStream();
Schema ints = Schema.create(Type.INT);
- Encoder e = factory.jsonEncoder(ints, out);
+ Encoder e = FACTORY.jsonEncoder(ints, out);
String separator = System.getProperty("line.separator");
GenericDatumWriter