Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3871: Add blocking direct binary encoder #2521

Merged
merged 6 commits into from
Oct 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lang/java/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -250,5 +250,10 @@
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.io;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;

/**
* An {@link Encoder} for Avro's binary encoding that does not buffer output.
* <p/>
* This encoder does not buffer writes in contrast to
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when:
* The buffering in BufferedBinaryEncoder is not desired because you buffer a
* different level or the Encoder is very short-lived.
* </p>
* The BlockingDirectBinaryEncoder will encode the number of bytes of the Map
* and Array blocks. This will allow to postpone the decoding, or skip over it
* at all.
* <p/>
* To construct, use
* {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)}
* <p/>
* {@link BlockingDirectBinaryEncoder} instances returned by this method are not
* thread-safe
*
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder {
private final BufferOutputStream buffer;

private OutputStream originalStream;

private boolean inBlock = false;

private long blockItemCount;

/**
* Create a writer that sends its output to the underlying stream
* <code>out</code>.
*
* @param out The Outputstream to write to
*/
public BlockingDirectBinaryEncoder(OutputStream out) {
super(out);
this.buffer = new BufferOutputStream();
}

private void startBlock() {
if (inBlock) {
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just put BUFFER as a stack of outputStream, and it would become possible; but not mandatory :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was thinking of that as well, but this would potentially allocate quite some buffers. For Apache Iceberg we don't use any nested structures, so I went with the simplest approach, but if you think this should be in, I'm happy to add it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, let's go step by step; this is good for a first version, moreover if it's aims to be used for specific software like iceberg :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I do like about this, is the exception thrown: it's in the right place to start looking if you want to lift this limitation.

}
originalStream = out;
buffer.reset();
out = buffer;
inBlock = true;
}

private void endBlock() {
if (!inBlock) {
throw new RuntimeException("Called endBlock, while not buffering a block");
}
out = originalStream;
if (blockItemCount > 0) {
try {
// Make it negative, so the reader knows that the number of bytes is coming
writeLong(-blockItemCount);
writeLong(buffer.size());
writeFixed(buffer.toBufferWithoutCopy());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
inBlock = false;
buffer.reset();
}

@Override
public void setItemCount(long itemCount) throws IOException {
blockItemCount = itemCount;
}

@Override
public void writeArrayStart() throws IOException {
startBlock();
}

@Override
public void writeArrayEnd() throws IOException {
endBlock();
// Writes another zero to indicate that this is the last block
super.writeArrayEnd();
}

@Override
public void writeMapStart() throws IOException {
startBlock();
}

@Override
public void writeMapEnd() throws IOException {
endBlock();
// Writes another zero to indicate that this is the last block
super.writeMapEnd();
}

private static class BufferOutputStream extends ByteArrayOutputStream {
BufferOutputStream() {
}

ByteBuffer toBufferWithoutCopy() {
return ByteBuffer.wrap(buf, 0, count);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,28 @@
* This encoder does not buffer writes, and as a result is slower than
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when
* the buffering in BufferedBinaryEncoder is not desired and/or the Encoder is
* very short lived.
* very short-lived.
* <p/>
* To construct, use
* {@link EncoderFactory#directBinaryEncoder(OutputStream, BinaryEncoder)}
* <p/>
* DirectBinaryEncoder is not thread-safe
*
*
* @see BinaryEncoder
* @see EncoderFactory
* @see Encoder
* @see Decoder
*/
public class DirectBinaryEncoder extends BinaryEncoder {
private OutputStream out;
protected OutputStream out;
// the buffer is used for writing floats, doubles, and large longs.
private final byte[] buf = new byte[12];

/**
* Create a writer that sends its output to the underlying stream
* <code>out</code>.
**/
DirectBinaryEncoder(OutputStream out) {
public DirectBinaryEncoder(OutputStream out) {
configure(out);
}

Expand All @@ -69,8 +69,8 @@ public void writeBoolean(boolean b) throws IOException {
}

/*
* buffering is slower for ints that encode to just 1 or two bytes, and and
* faster for large ones. (Sun JRE 1.6u22, x64 -server)
* buffering is slower for ints that encode to just 1 or two bytes, and faster
* for large ones. (Sun JRE 1.6u22, x64 -server)
*/
@Override
public void writeInt(int n) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,49 @@ public BinaryEncoder directBinaryEncoder(OutputStream out, BinaryEncoder reuse)
}
}

/**
* Creates or reinitializes a {@link BlockingDirectBinaryEncoder} with the
* OutputStream provided as the destination for written data. If <i>reuse</i> is
* provided, an attempt will be made to reconfigure <i>reuse</i> rather than
* construct a new instance, but this is not guaranteed, a new instance may be
* returned.
* <p/>
* The {@link BinaryEncoder} implementation returned does not buffer its output,
* calling {@link Encoder#flush()} will simply cause the wrapped OutputStream to
* be flushed.
* <p/>
* The {@link BlockingDirectBinaryEncoder} will write the block sizes for the
* arrays and maps so efficient skipping can be done.
* <p/>
* Performance of unbuffered writes can be significantly slower than buffered
* writes. {@link #binaryEncoder(OutputStream, BinaryEncoder)} returns
* BinaryEncoder instances that are tuned for performance but may buffer output.
* The unbuffered, 'direct' encoder may be desired when buffering semantics are
* problematic, or if the lifetime of the encoder is so short that the buffer
* would not be useful.
* <p/>
* {@link BinaryEncoder} instances returned by this method are not thread-safe.
*
* @param out The OutputStream to initialize to. Cannot be null.
* @param reuse The BinaryEncoder to <i>attempt</i> to reuse given the factory
* configuration. A BinaryEncoder implementation may not be
* compatible with reuse, causing a new instance to be returned. If
* null, a new instance is returned.
* @return A BinaryEncoder that uses <i>out</i> as its data output. If
* <i>reuse</i> is null, this will be a new instance. If <i>reuse</i> is
* not null, then the returned instance may be a new instance or
* <i>reuse</i> reconfigured to use <i>out</i>.
* @see DirectBinaryEncoder
* @see Encoder
*/
public BinaryEncoder blockingDirectBinaryEncoder(OutputStream out, BinaryEncoder reuse) {
if (null == reuse || !reuse.getClass().equals(BlockingDirectBinaryEncoder.class)) {
return new BlockingDirectBinaryEncoder(out);
} else {
return ((DirectBinaryEncoder) reuse).configure(out);
}
}

/**
* Creates or reinitializes a {@link BinaryEncoder} with the OutputStream
* provided as the destination for written data. If <i>reuse</i> is provided, an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,48 @@ void directBinaryEncoder() throws IOException {
assertArrayEquals(complexdata, result2);
}

@Test
void blockingDirectBinaryEncoder() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null);
generateData(e, true);

byte[] result = baos.toByteArray();
assertEquals(legacydata.length, result.length);
assertArrayEquals(legacydata, result);
baos.reset();

generateComplexData(e);
byte[] result2 = baos.toByteArray();
// blocking will cause different length, should be two bytes larger
assertEquals(complexdata.length + 2, result2.length);
// the first byte is the array start, with the count of items negative
assertEquals(complexdata[0] >>> 1, result2[0]);
baos.reset();

e.writeArrayStart();
e.setItemCount(1);
e.startItem();
e.writeInt(1);
e.writeArrayEnd();

// 1: 1 element in the array
// 2: 1 byte for the int
// 3: zigzag encoded int
// 4: 0 elements in the next block
assertArrayEquals(baos.toByteArray(), new byte[] { 1, 2, 2, 0 });
baos.reset();

e.writeArrayStart();
e.setItemCount(0);
e.writeArrayEnd();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test this 2 last byte array with a binary decoder to ensure it works with this new encoder. (if it can't, create a specific decoder class)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this test-case in TestBlockingDirectBinaryEncoder


// This is correct
// 0: 0 elements in the block
assertArrayEquals(baos.toByteArray(), new byte[] { 0 });
baos.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a test where an array (or map) is skiped; if i understood well, by calling setItemCount(0) before end array ? (or i missed the purpose of this)

    e.writeArrayStart();
    e.setItemCount(1);
    e.startItem();
    e.writeInt(1);
    e.setItemCount(0);  // here, to skip the array ??
    e.writeArrayEnd();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used at read time. It can be used for skipping over the Array/Maps when they are not part of your read schema. Instead of reading all the fields, you can just skip over the whole Array/Map at once since the length is encoded in the Avro file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me check if I can add a test for it 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test for it. LMKWYT

}

@Test
void blockingBinaryEncoder() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.io;

import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.message.BinaryMessageDecoder;
import org.apache.avro.specific.TestRecordWithMapsAndArrays;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.Map;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.*;

public class TestBlockingDirectBinaryEncoder {

@Test
void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().blockingDirectBinaryEncoder(baos, null);

// This is needed because there is no BlockingDirectBinaryEncoder
// BinaryMessageWriter
// available out of the box
encoder.writeFixed(new byte[] { (byte) 0xC3, (byte) 0x01 });
encoder.writeFixed(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", TestRecordWithMapsAndArrays.SCHEMA$));

int len = 5;

encoder.writeArrayStart();
encoder.setItemCount(len);
for (int i = 0; i < len; i++) {
encoder.startItem();
encoder.writeString(Integer.toString(i));
}
encoder.writeArrayEnd();

encoder.writeMapStart();
encoder.setItemCount(len);
for (long i = 0; i < len; i++) {
encoder.startItem();
encoder.writeString(Long.toString(i));
encoder.writeLong(i);
}
encoder.writeMapEnd();
encoder.flush();

BinaryMessageDecoder<TestRecordWithMapsAndArrays> decoder = TestRecordWithMapsAndArrays.getDecoder();
TestRecordWithMapsAndArrays r = decoder.decode(baos.toByteArray());

assertThat(r.getArr(), is(Arrays.asList("0", "1", "2", "3", "4")));
Map<String, Long> map = r.getMap();
assertThat(map.size(), is(5));
for (long i = 0; i < len; i++) {
assertThat(map.get(Long.toString(i)), is(i));
}
}

@Test
void testSkippingUsingBlocks() throws IOException, NoSuchAlgorithmException {
// Create an empty schema for read, so we skip over all the fields
Schema emptySchema = new Schema.Parser().parse(
"{\"type\":\"record\",\"name\":\"TestRecordWithMapsAndArrays\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[]}");

GenericDatumReader<?> in = new GenericDatumReader<>(TestRecordWithMapsAndArrays.SCHEMA$, emptySchema);
Decoder mockDecoder = mock(BinaryDecoder.class);

for (long i = 0; i < 1; i++) {
in.read(null, mockDecoder);
}

verify(mockDecoder, times(1)).skipMap();
verify(mockDecoder, times(1)).skipArray();
verify(mockDecoder, times(0)).readString();
verify(mockDecoder, times(0)).readLong();
}
}
Loading
Loading