diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java index 774553cbb4a3..eaaecedcc578 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedArray.java @@ -28,6 +28,11 @@ class SerializedArray extends Variants.SerializedValue implements VariantArray { private static final int OFFSET_SIZE_SHIFT = 2; private static final int IS_LARGE = 0b10000; + + static SerializedArray from(Variant variant) { + return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue()); + } + @VisibleForTesting static SerializedArray from(SerializedMetadata metadata, byte[] bytes) { return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); @@ -61,8 +66,8 @@ private SerializedArray(SerializedMetadata metadata, ByteBuffer value, int heade this.array = new VariantValue[numElements]; } - @VisibleForTesting - int numElements() { + @Override + public int numElements() { return array.length; } diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java index 6eb103959420..55cf26ddf1ea 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java @@ -36,6 +36,10 @@ class SerializedObject extends Variants.SerializedValue implements VariantObject private static final int FIELD_ID_SIZE_SHIFT = 4; private static final int IS_LARGE = 0b1000000; + static SerializedObject from(Variant variant) { + return from(SerializedMetadata.from(variant.getMetadata()), variant.getValue()); + } + static SerializedObject from(SerializedMetadata metadata, byte[] bytes) { return from(metadata, ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java index 1a6bd37a4ff3..eee62bcd37a5 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedPrimitive.java @@ -28,6 +28,10 @@ class SerializedPrimitive extends Variants.SerializedValue implements VariantPri private static final int PRIMITIVE_TYPE_SHIFT = 2; private static final int PRIMITIVE_OFFSET = Variants.HEADER_SIZE; + static SerializedPrimitive from(Variant variant) { + return from(variant.getValue()); + } + static SerializedPrimitive from(byte[] bytes) { return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java index 3004a075def1..8d66ac2093e3 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedShortString.java @@ -26,7 +26,11 @@ class SerializedShortString extends Variants.SerializedValue implements VariantP private static final int LENGTH_MASK = 0b11111100; private static final int LENGTH_SHIFT = 2; - static SerializedShortString from(byte[] bytes) { + static SerializedShortString from(Variant variant) { + return from(variant.getValue()); + } + + static SerializedShortString from(byte[] bytes) { return from(ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN), bytes[0]); } diff --git a/core/src/main/java/org/apache/iceberg/variants/Variant.java b/core/src/main/java/org/apache/iceberg/variants/Variant.java index 02027c2ffd63..0a01f02d1541 100644 --- a/core/src/main/java/org/apache/iceberg/variants/Variant.java +++ b/core/src/main/java/org/apache/iceberg/variants/Variant.java @@ -18,33 +18,27 @@ */ package org.apache.iceberg.variants; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + public final class Variant { private final byte[] value; private final byte[] metadata; - // The variant value doesn't use the whole `value` binary, but starts from its `pos` index and - // spans a size of `valueSize(value, pos)`. This design avoids frequent copies of the value binary - // when reading a sub-variant in the array/object element. - private final int pos; public Variant(byte[] value, byte[] metadata) { - this(value, metadata, 0); - } + Preconditions.checkArgument(metadata != null && metadata.length >= 1, + "Metadata must not be null or empty."); + Preconditions.checkArgument(value != null && value.length >= 1, + "Value must not be null or empty."); - Variant(byte[] value, byte[] metadata, int pos) { - this.value = value; - this.metadata = metadata; - this.pos = pos; - // There is currently only one allowed version. - if (metadata.length < 1 - || (metadata[0] & VariantConstants.VERSION_MASK) != VariantConstants.VERSION) { - throw new IllegalStateException(); - } - // Don't attempt to use a Variant larger than 16 MiB. We'll never produce one, and it risks - // memory instability. - if (metadata.length > VariantConstants.SIZE_LIMIT - || value.length > VariantConstants.SIZE_LIMIT) { + Preconditions.checkArgument((metadata[0] & VariantConstants.VERSION_MASK) == VariantConstants.VERSION, + "Unsupported metadata version."); + + if (value.length > VariantConstants.SIZE_LIMIT || metadata.length > VariantConstants.SIZE_LIMIT) { throw new VariantSizeLimitException(); } + + this.value = value; + this.metadata = metadata; } public byte[] getMetadata() { @@ -54,8 +48,4 @@ public byte[] getMetadata() { public byte[] getValue() { return value; } - - public int getPos() { - return pos; - } } diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantArray.java b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java index 55dbc071f15b..a245d5cb3614 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantArray.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantArray.java @@ -20,6 +20,8 @@ /** An variant array value. */ public interface VariantArray extends VariantValue { + int numElements(); + /** Returns the {@link VariantValue} at {@code index} in this array. */ VariantValue get(int index); diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java b/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java index e472420e6382..bb47acd9621e 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantBuilder.java @@ -29,7 +29,6 @@ import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -63,7 +62,7 @@ public static Variant parseJson(String json) throws IOException { VariantBuilder builder = new VariantBuilder(); builder.buildJson(parser); - return builder.result(); + return builder.build(); } } @@ -72,7 +71,7 @@ public static Variant parseJson(String json) throws IOException { * * @return The constructed Variant object. */ - private Variant result() { + public Variant build() { int numKeys = dictionaryKeys.size(); // Calculate total size of dictionary strings @@ -110,7 +109,7 @@ private Variant result() { metadataBuffer.writeLittleEndianUnsigned(numStringBytes, offsetSize); // Write dictionary strings - dictionaryKeys.stream().forEach(metadataBuffer::addBytes); + dictionaryKeys.forEach(metadataBuffer::addBytes); return new Variant(buffer.toByteArray(), metadataBuffer.toByteArray()); } @@ -174,20 +173,19 @@ private void appendObject(JsonParser parser) throws IOException { buildJson(parser); } - finishWritingObject(startPos, fields); + endObject(startPos, fields); } private void appendArray(JsonParser parser) throws IOException { List offsets = Lists.newArrayList(); int start = buffer.pos; - parser.nextToken(); while (parser.nextToken() != JsonToken.END_ARRAY) { offsets.add(buffer.pos - start); buildJson(parser); } - finishWritingArray(start, offsets); + endArray(start, offsets); } private void appendInteger(JsonParser parser) throws IOException { @@ -232,19 +230,19 @@ public void appendBoolean(boolean value) { */ public void appendNumeric(long value) { if (value == (byte) value) { - // INT8: Requires 1 byte for value + 1 byte for header + // INT8: Requires 1 byte for header + 1 byte for value buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT8)); buffer.writeLittleEndianUnsigned(value, 1); } else if (value == (short) value) { - // INT16: Requires 2 bytes for value + 1 byte for header + // INT16: Requires 1 byte for header + 2 bytes for value buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT16)); buffer.writeLittleEndianUnsigned(value, 2); } else if (value == (int) value) { - // INT32: Requires 4 bytes for value + 1 byte for header + // INT32: Requires 1 byte for header + 4 bytes for value buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT32)); buffer.writeLittleEndianUnsigned(value, 4); } else { - // INT64: Requires 8 bytes for value + 1 byte for header + // INT64: Requires 1 byte for header + 8 bytes for value buffer.addByte(VariantUtil.primitiveHeader(Variants.Primitives.TYPE_INT64)); buffer.writeLittleEndianUnsigned(value, 8); } @@ -324,7 +322,7 @@ public void appendBinary(byte[] value) throws VariantSizeLimitException { * @param startPos The starting position of the object data in the buffer. * @param fields The list of field entries (key, ID, offset). */ - private void finishWritingObject(int startPos, List fields) { + private void endObject(int startPos, List fields) { int numElements = fields.size(); // Sort fields by key and ensure no duplicate keys @@ -376,7 +374,7 @@ private void finishWritingObject(int startPos, List fields) { * @param startPos The starting position of the array values in the buffer. * @param offsets The offsets for each array value. */ - private void finishWritingArray(int startPos, List offsets) { + private void endArray(int startPos, List offsets) { int dataSize = buffer.pos - startPos; // Total byte size of the array values int numElements = offsets.size(); diff --git a/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java b/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java index e37d0c2e0afb..85cde9d11ae9 100644 --- a/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java +++ b/core/src/main/java/org/apache/iceberg/variants/VariantUtil.java @@ -169,7 +169,7 @@ public static byte metadataHeader(int version, int offsetSize) { } static byte primitiveHeader(int primitiveType) { - return (byte) (primitiveType << Variants.Primitives. PRIMITIVE_TYPE_SHIFT); + return (byte) (primitiveType << Variants.Primitives.PRIMITIVE_TYPE_SHIFT); } public static byte shortStrHeader(int size) { diff --git a/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java new file mode 100644 index 000000000000..d77272f0b35d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/variants/TestVariantBuilder.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.variants; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.stream.Stream; +import net.minidev.json.JSONArray; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class TestVariantBuilder { + @ParameterizedTest + @MethodSource("primitiveInputs") + public void testPrimitive(String input, Variants.PhysicalType expectedType, Object expectedValue) throws IOException { + Variant variant = VariantBuilder.parseJson(input); + + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(expectedType); + assertThat(primitive.get()).isEqualTo(expectedValue); + } + + private static Stream primitiveInputs() { + return Stream.of( + Arguments.of("null", Variants.PhysicalType.NULL, null), + Arguments.of("true", Variants.PhysicalType.BOOLEAN_TRUE, true), + Arguments.of("false", Variants.PhysicalType.BOOLEAN_FALSE, false), + Arguments.of("34", Variants.PhysicalType.INT8, (byte)34), + Arguments.of("1234", Variants.PhysicalType.INT16, (short)1234), + Arguments.of("1234567890", Variants.PhysicalType.INT32, 1234567890), + Arguments.of("1234567890987654321", Variants.PhysicalType.INT64, 1234567890987654321L), + Arguments.of("1234e-2", Variants.PhysicalType.DOUBLE, 12.34), + Arguments.of("123456.789", Variants.PhysicalType.DECIMAL4, new BigDecimal("123456.789")), + Arguments.of("123456789.987654321", Variants.PhysicalType.DECIMAL8, new BigDecimal("123456789.987654321")), + Arguments.of("12345678901234567890.987654321", Variants.PhysicalType.DECIMAL16, new BigDecimal("12345678901234567890.987654321")), + Arguments.of("\"This test string is used to generate a primitive string type of variant\"", Variants.PhysicalType.STRING, "This test string is used to generate a primitive string type of variant") + + ); + } + + @Test + public void testPrimitiveFloat() { + VariantBuilder builder = new VariantBuilder(); + builder.appendFloat(12.34f); + Variant variant = builder.build(); + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.FLOAT); + assertThat(primitive.get()).isEqualTo(12.34f); + } + + @Test + public void testPrimitiveDate() { + String dateString = "2017-08-18"; + int daysSinceEpoch = DateTimeUtil.isoDateToDays(dateString); + + VariantBuilder builder = new VariantBuilder(); + builder.appendDate(daysSinceEpoch); + Variant variant = builder.build(); + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.DATE); + assertThat(DateTimeUtil.daysToIsoDate((int)primitive.get())).isEqualTo(dateString); + } + + @Test + public void testPrimitiveTimestampTZ() { + String tzString = "2017-08-18T14:21:01.919+00:00"; + long microsSinceEpoch = DateTimeUtil.isoTimestamptzToMicros(tzString); + + VariantBuilder builder = new VariantBuilder(); + builder.appendTimestampTz(microsSinceEpoch); + Variant variant = builder.build(); + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPTZ); + assertThat(DateTimeUtil.microsToIsoTimestamptz((long)primitive.get())).isEqualTo(tzString); + } + + @Test + public void testPrimitiveTimestampNTZ() { + String ntzString = "2017-08-18T14:21:01.919"; + long microsSinceEpoch = DateTimeUtil.isoTimestampToMicros(ntzString); + + VariantBuilder builder = new VariantBuilder(); + builder.appendTimestampNtz(microsSinceEpoch); + Variant variant = builder.build(); + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.TIMESTAMPNTZ); + assertThat(DateTimeUtil.microsToIsoTimestamp((long)primitive.get())).isEqualTo(ntzString); + } + + @Test + public void testPrimitiveBinary() { + VariantBuilder builder = new VariantBuilder(); + builder.appendBinary("iceberg".getBytes()); + Variant variant = builder.build(); + SerializedPrimitive primitive = SerializedPrimitive.from(variant); + + assertThat(primitive.type()).isEqualTo(Variants.PhysicalType.BINARY); + assertThat(primitive.get()).isEqualTo(ByteBuffer.wrap("iceberg".getBytes())); + } + + @Test + public void testShortString() throws IOException { + Variant variant = VariantBuilder.parseJson("\"iceberg\""); + SerializedShortString shortString = SerializedShortString.from(variant); + + assertThat(shortString.type()).isEqualTo(Variants.PhysicalType.STRING); + assertThat(shortString.get()).isEqualTo("iceberg"); + } + + @Test + public void testArray() throws IOException { + List input = List.of("Ford", "BMW", "Fiat"); + Variant variant = VariantBuilder.parseJson(JSONArray.toJSONString(input)); + SerializedArray arr = SerializedArray.from(variant); + + assertThat(arr.type()).isEqualTo(Variants.PhysicalType.ARRAY); + for (int i = 0; i < arr.numElements(); i++) { + assertThat(arr.get(i).asPrimitive().get()).isEqualTo(input.get(i)); + } + } + + @Test + public void testEmptyObject() throws IOException { + Variant variant = VariantBuilder.parseJson("{}"); + SerializedObject object = SerializedObject.from(variant); + + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(0); + } + + @Test + public void testObject() throws IOException { + Variant variant = VariantBuilder.parseJson("{ \"id\": 1234, \"firstName\": \"Joe\", \"lastName\": \"Smith\", \"phones\":[\"123-456-7890\", \"789-123-4560\"] }"); + SerializedObject object = SerializedObject.from(variant); + + assertThat(object.type()).isEqualTo(Variants.PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(4); + + assertThat(object.get("id").asPrimitive().get()).isEqualTo((short)1234); + assertThat(object.get("firstName").asPrimitive().get()).isEqualTo("Joe"); + assertThat(object.get("lastName").asPrimitive().get()).isEqualTo("Smith"); + + VariantArray phones = object.get("phones").asArray(); + assertThat(phones.numElements()).isEqualTo(2); + assertThat(phones.get(0).asPrimitive().get()).isEqualTo("123-456-7890"); + assertThat(phones.get(1).asPrimitive().get()).isEqualTo("789-123-4560"); + } +}