-
Notifications
You must be signed in to change notification settings - Fork 1.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
AVRO-3871: Add blocking direct binary encoder #2521
Changes from all commits
5d58080
cb392cc
95982d7
834bc64
8927d97
5e42791
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.avro.io; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* An {@link Encoder} for Avro's binary encoding that does not buffer output. | ||
* <p/> | ||
* This encoder does not buffer writes in contrast to | ||
* {@link BufferedBinaryEncoder}. However, it is lighter-weight and useful when: | ||
* The buffering in BufferedBinaryEncoder is not desired because you buffer a | ||
* different level or the Encoder is very short-lived. | ||
* </p> | ||
* The BlockingDirectBinaryEncoder will encode the number of bytes of the Map | ||
* and Array blocks. This will allow to postpone the decoding, or skip over it | ||
* at all. | ||
* <p/> | ||
* To construct, use | ||
* {@link EncoderFactory#blockingDirectBinaryEncoder(OutputStream, BinaryEncoder)} | ||
* <p/> | ||
* {@link BlockingDirectBinaryEncoder} instances returned by this method are not | ||
* thread-safe | ||
* | ||
* @see BinaryEncoder | ||
* @see EncoderFactory | ||
* @see Encoder | ||
* @see Decoder | ||
*/ | ||
public class BlockingDirectBinaryEncoder extends DirectBinaryEncoder { | ||
private final BufferOutputStream buffer; | ||
|
||
private OutputStream originalStream; | ||
|
||
private boolean inBlock = false; | ||
|
||
private long blockItemCount; | ||
|
||
/** | ||
* Create a writer that sends its output to the underlying stream | ||
* <code>out</code>. | ||
* | ||
* @param out The Outputstream to write to | ||
*/ | ||
public BlockingDirectBinaryEncoder(OutputStream out) { | ||
super(out); | ||
this.buffer = new BufferOutputStream(); | ||
} | ||
|
||
private void startBlock() { | ||
if (inBlock) { | ||
throw new RuntimeException("Nested Maps/Arrays are not supported by the BlockingDirectBinaryEncoder"); | ||
} | ||
originalStream = out; | ||
buffer.reset(); | ||
out = buffer; | ||
inBlock = true; | ||
} | ||
|
||
private void endBlock() { | ||
if (!inBlock) { | ||
throw new RuntimeException("Called endBlock, while not buffering a block"); | ||
} | ||
out = originalStream; | ||
if (blockItemCount > 0) { | ||
try { | ||
// Make it negative, so the reader knows that the number of bytes is coming | ||
writeLong(-blockItemCount); | ||
writeLong(buffer.size()); | ||
writeFixed(buffer.toBufferWithoutCopy()); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
inBlock = false; | ||
buffer.reset(); | ||
} | ||
|
||
@Override | ||
public void setItemCount(long itemCount) throws IOException { | ||
blockItemCount = itemCount; | ||
} | ||
|
||
@Override | ||
public void writeArrayStart() throws IOException { | ||
startBlock(); | ||
} | ||
|
||
@Override | ||
public void writeArrayEnd() throws IOException { | ||
endBlock(); | ||
// Writes another zero to indicate that this is the last block | ||
super.writeArrayEnd(); | ||
} | ||
|
||
@Override | ||
public void writeMapStart() throws IOException { | ||
startBlock(); | ||
} | ||
|
||
@Override | ||
public void writeMapEnd() throws IOException { | ||
endBlock(); | ||
// Writes another zero to indicate that this is the last block | ||
super.writeMapEnd(); | ||
} | ||
|
||
private static class BufferOutputStream extends ByteArrayOutputStream { | ||
BufferOutputStream() { | ||
} | ||
|
||
ByteBuffer toBufferWithoutCopy() { | ||
return ByteBuffer.wrap(buf, 0, count); | ||
} | ||
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -181,6 +181,48 @@ void directBinaryEncoder() throws IOException { | |
assertArrayEquals(complexdata, result2); | ||
} | ||
|
||
@Test | ||
void blockingDirectBinaryEncoder() throws IOException { | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
BinaryEncoder e = factory.blockingDirectBinaryEncoder(baos, null); | ||
generateData(e, true); | ||
|
||
byte[] result = baos.toByteArray(); | ||
assertEquals(legacydata.length, result.length); | ||
assertArrayEquals(legacydata, result); | ||
baos.reset(); | ||
|
||
generateComplexData(e); | ||
byte[] result2 = baos.toByteArray(); | ||
// blocking will cause different length, should be two bytes larger | ||
assertEquals(complexdata.length + 2, result2.length); | ||
// the first byte is the array start, with the count of items negative | ||
assertEquals(complexdata[0] >>> 1, result2[0]); | ||
baos.reset(); | ||
|
||
e.writeArrayStart(); | ||
e.setItemCount(1); | ||
e.startItem(); | ||
e.writeInt(1); | ||
e.writeArrayEnd(); | ||
|
||
// 1: 1 element in the array | ||
// 2: 1 byte for the int | ||
// 3: zigzag encoded int | ||
// 4: 0 elements in the next block | ||
assertArrayEquals(baos.toByteArray(), new byte[] { 1, 2, 2, 0 }); | ||
baos.reset(); | ||
|
||
e.writeArrayStart(); | ||
e.setItemCount(0); | ||
e.writeArrayEnd(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you test this 2 last byte array with a binary decoder to ensure it works with this new encoder. (if it can't, create a specific decoder class) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added this test-case in |
||
|
||
// This is correct | ||
// 0: 0 elements in the block | ||
assertArrayEquals(baos.toByteArray(), new byte[] { 0 }); | ||
baos.reset(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you add a test where an array (or map) is skiped; if i understood well, by calling setItemCount(0) before end array ? (or i missed the purpose of this) e.writeArrayStart();
e.setItemCount(1);
e.startItem();
e.writeInt(1);
e.setItemCount(0); // here, to skip the array ??
e.writeArrayEnd(); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is used at read time. It can be used for skipping over the Array/Maps when they are not part of your read schema. Instead of reading all the fields, you can just skip over the whole Array/Map at once since the length is encoded in the Avro file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me check if I can add a test for it 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a test for it. LMKWYT |
||
} | ||
|
||
@Test | ||
void blockingBinaryEncoder() throws IOException { | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.avro.io; | ||
|
||
import org.apache.avro.Schema; | ||
import org.apache.avro.SchemaNormalization; | ||
import org.apache.avro.generic.GenericDatumReader; | ||
import org.apache.avro.message.BinaryMessageDecoder; | ||
import org.apache.avro.specific.TestRecordWithMapsAndArrays; | ||
import org.junit.jupiter.api.Test; | ||
|
||
import java.io.ByteArrayOutputStream; | ||
import java.io.IOException; | ||
import java.security.NoSuchAlgorithmException; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
|
||
import static org.hamcrest.MatcherAssert.assertThat; | ||
import static org.hamcrest.Matchers.is; | ||
import static org.mockito.Mockito.*; | ||
|
||
public class TestBlockingDirectBinaryEncoder { | ||
|
||
@Test | ||
void blockingDirectBinaryEncoder() throws IOException, NoSuchAlgorithmException { | ||
ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
BinaryEncoder encoder = EncoderFactory.get().blockingDirectBinaryEncoder(baos, null); | ||
|
||
// This is needed because there is no BlockingDirectBinaryEncoder | ||
// BinaryMessageWriter | ||
// available out of the box | ||
encoder.writeFixed(new byte[] { (byte) 0xC3, (byte) 0x01 }); | ||
encoder.writeFixed(SchemaNormalization.parsingFingerprint("CRC-64-AVRO", TestRecordWithMapsAndArrays.SCHEMA$)); | ||
|
||
int len = 5; | ||
|
||
encoder.writeArrayStart(); | ||
encoder.setItemCount(len); | ||
for (int i = 0; i < len; i++) { | ||
encoder.startItem(); | ||
encoder.writeString(Integer.toString(i)); | ||
} | ||
encoder.writeArrayEnd(); | ||
|
||
encoder.writeMapStart(); | ||
encoder.setItemCount(len); | ||
for (long i = 0; i < len; i++) { | ||
encoder.startItem(); | ||
encoder.writeString(Long.toString(i)); | ||
encoder.writeLong(i); | ||
} | ||
encoder.writeMapEnd(); | ||
encoder.flush(); | ||
|
||
BinaryMessageDecoder<TestRecordWithMapsAndArrays> decoder = TestRecordWithMapsAndArrays.getDecoder(); | ||
TestRecordWithMapsAndArrays r = decoder.decode(baos.toByteArray()); | ||
|
||
assertThat(r.getArr(), is(Arrays.asList("0", "1", "2", "3", "4"))); | ||
Map<String, Long> map = r.getMap(); | ||
assertThat(map.size(), is(5)); | ||
for (long i = 0; i < len; i++) { | ||
assertThat(map.get(Long.toString(i)), is(i)); | ||
} | ||
} | ||
|
||
@Test | ||
void testSkippingUsingBlocks() throws IOException, NoSuchAlgorithmException { | ||
// Create an empty schema for read, so we skip over all the fields | ||
Schema emptySchema = new Schema.Parser().parse( | ||
"{\"type\":\"record\",\"name\":\"TestRecordWithMapsAndArrays\",\"namespace\":\"org.apache.avro.specific\",\"fields\":[]}"); | ||
|
||
GenericDatumReader<?> in = new GenericDatumReader<>(TestRecordWithMapsAndArrays.SCHEMA$, emptySchema); | ||
Decoder mockDecoder = mock(BinaryDecoder.class); | ||
|
||
for (long i = 0; i < 1; i++) { | ||
in.read(null, mockDecoder); | ||
} | ||
|
||
verify(mockDecoder, times(1)).skipMap(); | ||
verify(mockDecoder, times(1)).skipArray(); | ||
verify(mockDecoder, times(0)).readString(); | ||
verify(mockDecoder, times(0)).readLong(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just put BUFFER as a stack of outputStream, and it would become possible; but not mandatory :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I was thinking of that as well, but this would potentially allocate quite some buffers. For Apache Iceberg we don't use any nested structures, so I went with the simplest approach, but if you think this should be in, I'm happy to add it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, let's go step by step; this is good for a first version, moreover if it's aims to be used for specific software like iceberg :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I do like about this, is the exception thrown: it's in the right place to start looking if you want to lift this limitation.