From cfb73041fd80d95bee57d7ef5e411f0879c62e31 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 20 Dec 2024 10:22:05 -0800 Subject: [PATCH] Fix unsorted values. --- .../iceberg/variants/SerializedObject.java | 61 ++++++++++++++----- .../variants/TestSerializedObject.java | 39 ++++++++++++ .../iceberg/variants/TestShreddedObject.java | 6 +- 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java index f941b62731d7..6eb103959420 100644 --- a/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java +++ b/core/src/main/java/org/apache/iceberg/variants/SerializedObject.java @@ -21,8 +21,12 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.util.Pair; class SerializedObject extends Variants.SerializedValue implements VariantObject { @@ -52,6 +56,8 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int private final Integer[] fieldIds; private final int offsetSize; private final int offsetListOffset; + private final int[] offsets; + private final int[] lengths; private final int dataOffset; private final VariantValue[] values; @@ -66,8 +72,44 @@ private SerializedObject(SerializedMetadata metadata, ByteBuffer value, int head this.fieldIdListOffset = Variants.HEADER_SIZE + numElementsSize; this.fieldIds = new Integer[numElements]; this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize); + this.offsets = new int[numElements]; + this.lengths = new int[numElements]; this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize); this.values = new VariantValue[numElements]; + + if (numElements > 0) { + initOffsetsAndLengths(numElements); + } + } + + private void initOffsetsAndLengths(int numElements) { + // populate offsets list + Map offsetToLength = Maps.newHashMap(); + for (int index = 0; index < numElements; index += 1) { + offsets[index] = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (index * offsetSize), offsetSize); + + offsetToLength.put(offsets[index], 0); + } + + int dataLength = + VariantUtil.readLittleEndianUnsigned( + value, offsetListOffset + (numElements * offsetSize), offsetSize); + offsetToLength.put(dataLength, 0); + + // populate lengths list by sorting offsets + List sortedOffsets = + offsetToLength.keySet().stream().sorted().collect(Collectors.toList()); + for (int index = 0; index < numElements; index += 1) { + int offset = sortedOffsets.get(index); + int length = sortedOffsets.get(index + 1) - offset; + offsetToLength.put(offset, length); + } + + for (int index = 0; index < lengths.length; index += 1) { + lengths[index] = offsetToLength.get(offsets[index]); + } } @VisibleForTesting @@ -123,6 +165,7 @@ private int id(int index) { VariantUtil.readLittleEndianUnsigned( value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize); } + return fieldIds[index]; } @@ -136,14 +179,9 @@ public VariantValue get(String name) { } if (null == values[index]) { - int offset = - VariantUtil.readLittleEndianUnsigned( - value, offsetListOffset + (index * offsetSize), offsetSize); - int next = - VariantUtil.readLittleEndianUnsigned( - value, offsetListOffset + ((1 + index) * offsetSize), offsetSize); values[index] = - Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset)); + Variants.from( + metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index])); } return values[index]; @@ -176,14 +214,7 @@ ByteBuffer sliceValue(int index) { return ((Variants.Serialized) values[index]).buffer(); } - int offset = - VariantUtil.readLittleEndianUnsigned( - value, offsetListOffset + (index * offsetSize), offsetSize); - int next = - VariantUtil.readLittleEndianUnsigned( - value, offsetListOffset + ((1 + index) * offsetSize), offsetSize); - - return VariantUtil.slice(value, dataOffset + offset, next - offset); + return VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]); } @Override diff --git a/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java index 3c5fb808d835..9fe71ed0e92a 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestSerializedObject.java @@ -44,6 +44,24 @@ public class TestSerializedObject { private static final SerializedPrimitive TRUE = SerializedPrimitive.from(new byte[] {0b100}); private static final SerializedPrimitive DATE = SerializedPrimitive.from(new byte[] {0b101100, (byte) 0xF4, 0x43, 0x00, 0x00}); + private static final byte[] UNSORTED_VALUES = + new byte[] { + 0b10, + 0x03, // 3 item object + 0x00, + 0x01, + 0x02, // ascending key IDs (a, b, c) + 0x02, + 0x04, + 0x00, + 0x06, // values at offsets (2, 4, 0) + 0b1100, + 0x03, // c = 3 (int8) + 0b1100, + 0x01, // a = 1 (int8) + 0b1100, + 0x02 // b = 2 (int8) + }; private final Random random = new Random(198725); @@ -86,6 +104,27 @@ public void testSimpleObject() { assertThat(object.get("d")).isEqualTo(null); } + @Test + public void testUnsortedValues() { + ByteBuffer meta = + VariantTestUtil.createMetadata(Sets.newHashSet("a", "b", "c"), true /* sort names */); + + SerializedMetadata metadata = SerializedMetadata.from(meta); + SerializedObject object = SerializedObject.from(metadata, UNSORTED_VALUES); + + assertThat(object.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(object.numElements()).isEqualTo(3); + + assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1); + assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2); + assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8); + assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3); + + assertThat(object.get("d")).isEqualTo(null); + } + @Test public void testOutOfOrderKeys() { Map data = ImmutableMap.of("b", I2, "a", I1, "c", I3); diff --git a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java index 75606b602dd0..0a2c2b26b004 100644 --- a/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java +++ b/core/src/test/java/org/apache/iceberg/variants/TestShreddedObject.java @@ -395,7 +395,8 @@ public void testThreeByteFieldIds(boolean sortFieldNames) { assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo(new BigDecimal("12.21")); } - private static VariantValue roundTripMinimalBuffer(ShreddedObject object, SerializedMetadata metadata) { + private static VariantValue roundTripMinimalBuffer( + ShreddedObject object, SerializedMetadata metadata) { ByteBuffer serialized = ByteBuffer.allocate(object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); object.writeTo(serialized, 0); @@ -403,7 +404,8 @@ private static VariantValue roundTripMinimalBuffer(ShreddedObject object, Serial return Variants.from(metadata, serialized); } - private static VariantValue roundTripLargeBuffer(ShreddedObject object, SerializedMetadata metadata) { + private static VariantValue roundTripLargeBuffer( + ShreddedObject object, SerializedMetadata metadata) { ByteBuffer serialized = ByteBuffer.allocate(1000 + object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); object.writeTo(serialized, 300);