Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aihuaxu committed Dec 20, 2024
1 parent 3bb9330 commit 615cdfc
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ class SerializedArray extends Variants.SerializedValue implements VariantArray {
private static final int OFFSET_SIZE_SHIFT = 2;
private static final int IS_LARGE = 0b10000;


static SerializedArray from(Variant variant) {
return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue());
}

@VisibleForTesting
static SerializedArray from(SerializedMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
Expand Down Expand Up @@ -61,8 +66,8 @@ private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int heade
this.array = new VariantValue[numElements];
}

@VisibleForTesting
int numElements() {
@Override
public int numElements() {
return array.length;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class SerializedObject extends Variants.SerializedValue implements VariantObject
private static final int FIELD_ID_SIZE_SHIFT = 4;
private static final int IS_LARGE = 0b1000000;

static SerializedObject from(Variant variant) {
return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue());
}

static SerializedObject from(SerializedMetadata metadata, byte[] bytes) {
return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class SerializedPrimitive extends Variants.SerializedValue implements VariantPri
private static final int PRIMITIVE_TYPE_SHIFT = 2;
private static final int PRIMITIVE_OFFSET = Variants.HEADER_SIZE;

static SerializedPrimitive from(Variant variant) {
return from(variant.getValue());
}

static SerializedPrimitive from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ class SerializedShortString extends Variants.SerializedValue implements VariantP
private static final int LENGTH_MASK = 0b11111100;
private static final int LENGTH_SHIFT = 2;

static SerializedShortString from(byte[] bytes) {
static SerializedShortString from(Variant variant) {
return from(variant.getValue());
}

static SerializedShortString from(byte[] bytes) {
return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]);
}

Expand Down
36 changes: 13 additions & 23 deletions core/src/main/java/org/apache/iceberg/variants/Variant.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,27 @@
*/
package org.apache.iceberg.variants;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public final class Variant {
private final byte[] value;
private final byte[] metadata;
// The variant value doesn't use the whole `value` binary, but starts from its `pos` index and
// spans a size of `valueSize(value, pos)`. This design avoids frequent copies of the value binary
// when reading a sub-variant in the array/object element.
private final int pos;

public Variant(byte[] value, byte[] metadata) {
this(value, metadata, 0);
}
Preconditions.checkArgument(metadata != null && metadata.length >= 1,
"Metadata must not be null or empty.");
Preconditions.checkArgument(value != null && value.length >= 1,
"Value must not be null or empty.");

Variant(byte[] value, byte[] metadata, int pos) {
this.value = value;
this.metadata = metadata;
this.pos = pos;
// There is currently only one allowed version.
if (metadata.length < 1
|| (metadata[0] & VariantConstants.VERSION_MASK) != VariantConstants.VERSION) {
throw new IllegalStateException();
}
// Don't attempt to use a Variant larger than 16 MiB. We'll never produce one, and it risks
// memory instability.
if (metadata.length > VariantConstants.SIZE_LIMIT
|| value.length > VariantConstants.SIZE_LIMIT) {
Preconditions.checkArgument((metadata[0] & VariantConstants.VERSION_MASK) == VariantConstants.VERSION,
"Unsupported metadata version.");

if (value.length > VariantConstants.SIZE_LIMIT || metadata.length > VariantConstants.SIZE_LIMIT) {
throw new VariantSizeLimitException();
}

this.value = value;
this.metadata = metadata;
}

public byte[] getMetadata() {
Expand All @@ -54,8 +48,4 @@ public byte[] getMetadata() {
public byte[] getValue() {
return value;
}

public int getPos() {
return pos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

/** An variant array value. */
public interface VariantArray extends VariantValue {
int numElements();

/** Returns the {@link VariantValue} at {@code index} in this array. */
VariantValue get(int index);

Expand Down
24 changes: 11 additions & 13 deletions core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -63,7 +62,7 @@ public static Variant parseJson(String json) throws IOException {
VariantBuilder builder = new VariantBuilder();
builder.buildJson(parser);

return builder.result();
return builder.build();
}
}

Expand All @@ -72,7 +71,7 @@ public static Variant parseJson(String json) throws IOException {
*
* @return The constructed Variant object.
*/
private Variant result() {
public Variant build() {
int numKeys = dictionaryKeys.size();

// Calculate total size of dictionary strings
Expand Down Expand Up @@ -110,7 +109,7 @@ private Variant result() {
metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize);

// Write dictionary strings
dictionaryKeys.stream().forEach(metadataBuffer::addBytes);
dictionaryKeys.forEach(metadataBuffer::addBytes);

return new Variant(buffer.toByteArray(), metadataBuffer.toByteArray());
}
Expand Down Expand Up @@ -174,20 +173,19 @@ private void appendObject(JsonParser parser) throws IOException {
buildJson(parser);
}

finishWritingObject(startPos, fields);
endObject(startPos, fields);
}

private void appendArray(JsonParser parser) throws IOException {
List<Integer> offsets = Lists.newArrayList();
int start = buffer.pos;

parser.nextToken();
while (parser.nextToken() != JsonToken.END_ARRAY) {
offsets.add(buffer.pos - start);
buildJson(parser);
}

finishWritingArray(start, offsets);
endArray(start, offsets);
}

private void appendInteger(JsonParser parser) throws IOException {
Expand Down Expand Up @@ -232,19 +230,19 @@ public void appendBoolean(boolean value) {
*/
public void appendNumeric(long value) {
if (value == (byte) value) {
// INT8: Requires 1 byte for value + 1 byte for header
// INT8: Requires 1 byte for header + 1 byte for value
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT8));
buffer.writeLittleEndianUnsigned(value, 1);
} else if (value == (short) value) {
// INT16: Requires 2 bytes for value + 1 byte for header
// INT16: Requires 1 byte for header + 2 bytes for value
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT16));
buffer.writeLittleEndianUnsigned(value, 2);
} else if (value == (int) value) {
// INT32: Requires 4 bytes for value + 1 byte for header
// INT32: Requires 1 byte for header + 4 bytes for value
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT32));
buffer.writeLittleEndianUnsigned(value, 4);
} else {
// INT64: Requires 8 bytes for value + 1 byte for header
// INT64: Requires 1 byte for header + 8 bytes for value
buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT64));
buffer.writeLittleEndianUnsigned(value, 8);
}
Expand Down Expand Up @@ -324,7 +322,7 @@ public void appendBinary(byte[] value) throws VariantSizeLimitException {
* @param startPos The starting position of the object data in the buffer.
* @param fields The list of field entries (key, ID, offset).
*/
private void finishWritingObject(int startPos, List<FieldEntry> fields) {
private void endObject(int startPos, List<FieldEntry> fields) {
int numElements = fields.size();

// Sort fields by key and ensure no duplicate keys
Expand Down Expand Up @@ -376,7 +374,7 @@ private void finishWritingObject(int startPos, List<FieldEntry> fields) {
* @param startPos The starting position of the array values in the buffer.
* @param offsets The offsets for each array value.
*/
private void finishWritingArray(int startPos, List<Integer> offsets) {
private void endArray(int startPos, List<Integer> offsets) {
int dataSize = buffer.pos - startPos; // Total byte size of the array values
int numElements = offsets.size();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static byte metadataHeader(int version, int offsetSize) {
}

static byte primitiveHeader(int primitiveType) {
return (byte) (primitiveType << Variants.Primitives. PRIMITIVE_TYPE_SHIFT);
return (byte) (primitiveType << Variants.Primitives.PRIMITIVE_TYPE_SHIFT);
}

public static byte shortStrHeader(int size) {
Expand Down
Loading

0 comments on commit 615cdfc

Please sign in to comment.