diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java index e4cba174f0f2..a8bf0f839e49 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java @@ -108,4 +108,21 @@ boolean isEmpty() { return keyFrequency().isEmpty(); } } + + boolean isValid() { + if (type == StatisticsType.Sketch) { + if (null == keySamples) { + return false; + } + } else { + if (null == keyFrequency()) { + return false; + } + if (keyFrequency().values().contains(null)) { + return false; + } + } + + return true; + } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java index 7f55188e7f8c..3659ce217c58 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java @@ -48,6 +48,18 @@ class CompletedStatisticsSerializer extends TypeSerializer this.keySamplesSerializer = new ListSerializer<>(sortKeySerializer); } + public void changeSortKeySerializerVersion(int version) { + if (sortKeySerializer instanceof SortKeySerializer) { + ((SortKeySerializer) sortKeySerializer).setVersion(version); + } + } + + public void changeSortKeySerializerVersionLatest() { + if (sortKeySerializer instanceof SortKeySerializer) { + ((SortKeySerializer) sortKeySerializer).restoreToLatestVersion(); + } + } + @Override public boolean isImmutableType() { return false; diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java index e2a282efd82e..4f2afd60fed1 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -370,7 +370,8 @@ public void resetToCheckpoint(long checkpointId, byte[] checkpointData) { "Restoring data statistic coordinator {} from checkpoint {}", operatorName, checkpointId); this.completedStatistics = StatisticsUtil.deserializeCompletedStatistics( - checkpointData, completedStatisticsSerializer); + checkpointData, (CompletedStatisticsSerializer) completedStatisticsSerializer); + // recompute global statistics in case downstream parallelism changed this.globalStatistics = globalStatistics( diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java index 61c6973463ef..9947e2b78e3a 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java @@ -53,9 +53,12 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version; + private transient SortKey sortKey; - SortKeySerializer(Schema schema, SortOrder sortOrder) { + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { + this.version = version; this.schema = schema; this.sortOrder = sortOrder; this.size = sortOrder.fields().size(); @@ -76,6 +79,10 @@ class SortKeySerializer extends TypeSerializer { } } + SortKeySerializer(Schema schema, SortOrder sortOrder) { + this(schema, sortOrder, SortKeySerializerSnapshot.CURRENT_VERSION); + } + private SortKey lazySortKey() { if (sortKey == null) { this.sortKey = new SortKey(schema, sortOrder); @@ -84,6 +91,18 @@ private SortKey lazySortKey() { return sortKey; } + public int getLatestVersion() { + return snapshotConfiguration().getCurrentVersion(); + } + + public void restoreToLatestVersion() { + this.version = snapshotConfiguration().getCurrentVersion(); + } + + public void setVersion(int version) { + this.version = version; + } + @Override public boolean isImmutableType() { return false; @@ -125,6 +144,16 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + if (version > 1) { + Object value = record.get(i, Object.class); + if (value == null) { + target.writeBoolean(true); + continue; + } else { + target.writeBoolean(false); + } + } + switch (typeId) { case BOOLEAN: target.writeBoolean(record.get(i, Boolean.class)); @@ -193,6 +222,14 @@ public SortKey deserialize(SortKey reuse, DataInputView source) throws IOExcepti reuse.size(), size); for (int i = 0; i < size; ++i) { + if (version > 1) { + boolean isNull = source.readBoolean(); + if (isNull) { + reuse.set(i, null); + continue; + } + } + int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); switch (typeId) { @@ -277,11 +314,13 @@ public TypeSerializerSnapshot snapshotConfiguration() { } public static class SortKeySerializerSnapshot implements TypeSerializerSnapshot { - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; private Schema schema; private SortOrder sortOrder; + private int version = CURRENT_VERSION; + /** Constructor for read instantiation. */ @SuppressWarnings({"unused", "checkstyle:RedundantModifier"}) public SortKeySerializerSnapshot() { @@ -311,10 +350,16 @@ public void writeSnapshot(DataOutputView out) throws IOException { @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - if (readVersion == 1) { - readV1(in); - } else { - throw new IllegalArgumentException("Unknown read version: " + readVersion); + switch (readVersion) { + case 1: + read(in); + this.version = 1; + break; + case 2: + read(in); + break; + default: + throw new IllegalArgumentException("Unknown read version: " + readVersion); } } @@ -325,9 +370,13 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return TypeSerializerSchemaCompatibility.incompatible(); } - // Sort order should be identical SortKeySerializerSnapshot newSnapshot = (SortKeySerializerSnapshot) newSerializer.snapshotConfiguration(); + if (newSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) { + return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); + } + + // Sort order should be identical if (!sortOrder.sameOrder(newSnapshot.sortOrder)) { return TypeSerializerSchemaCompatibility.incompatible(); } @@ -351,10 +400,10 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( public TypeSerializer restoreSerializer() { Preconditions.checkState(schema != null, "Invalid schema: null"); Preconditions.checkState(sortOrder != null, "Invalid sort order: null"); - return new SortKeySerializer(schema, sortOrder); + return new SortKeySerializer(schema, sortOrder, version); } - private void readV1(DataInputView in) throws IOException { + private void read(DataInputView in) throws IOException { String schemaJson = StringUtils.readString(in); String sortOrderJson = StringUtils.readString(in); this.schema = SchemaParser.fromJson(schemaJson); diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java index 5d48ec57ca49..f2efc7fa9834 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java @@ -73,12 +73,29 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); - return statisticsSerializer.deserialize(input); - } catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); + CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input); + if (!completedStatistics.isValid()) { + throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1"); + } + + return completedStatistics; + } catch (Exception e) { + try { + // If we restore from a lower version, the new version of SortKeySerializer cannot correctly + // parse the checkpointData, so we need to first switch the version to v1. Once the state + // data is successfully parsed, we need to switch the serialization version to the latest + // version to parse the subsequent data passed from the TM. + statisticsSerializer.changeSortKeySerializerVersion(1); + DataInputDeserializer input = new DataInputDeserializer(bytes); + CompletedStatistics deserialize = statisticsSerializer.deserialize(input); + statisticsSerializer.changeSortKeySerializerVersionLatest(); + return deserialize; + } catch (IOException ioException) { + throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException); + } } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java index df8c3c79d3e3..aa9a0291b38f 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java @@ -46,6 +46,7 @@ import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.shuffle.StatisticsType; import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -252,6 +253,44 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); } + @TestTemplate + public void testRangeDistributionWithNullValue() throws Exception { + assumeThat(partitioned).isTrue(); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + int numOfCheckpoints = 6; + List> charRows = createCharRows(numOfCheckpoints, 10); + charRows.add(ImmutableList.of(Row.of(1, null))); + DataStream dataStream = + env.addSource(createRangeDistributionBoundedSource(charRows), ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism); + + // sort based on partition columns + builder.append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + } + @TestTemplate public void testRangeDistributionWithSortOrder() throws Exception { table diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java index 4ee9888934a8..1975d7e8d654 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java @@ -19,11 +19,15 @@ package org.apache.iceberg.flink.sink.shuffle; import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.iceberg.SortKey; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; public class TestCompletedStatisticsSerializer extends SerializerTestBase { @@ -51,4 +55,49 @@ protected CompletedStatistics[] getTestData() { CompletedStatistics.fromKeySamples(2L, new SortKey[] {CHAR_KEYS.get("a"), CHAR_KEYS.get("b")}) }; } + + @Test + public void testSerializer() throws Exception { + TypeSerializer completedStatisticsTypeSerializer = createSerializer(); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + DataInputDeserializer input = new DataInputDeserializer(serializedBytes); + CompletedStatistics deserialized = completedStatisticsTypeSerializer.deserialize(input); + assertThat(deserialized).isEqualTo(data[0]); + } + + @Test + public void testRestoreOldVersionSerializer() throws Exception { + CompletedStatisticsSerializer completedStatisticsTypeSerializer = + (CompletedStatisticsSerializer) createSerializer(); + completedStatisticsTypeSerializer.changeSortKeySerializerVersion(1); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + completedStatisticsTypeSerializer.changeSortKeySerializerVersionLatest(); + CompletedStatistics completedStatistics = + StatisticsUtil.deserializeCompletedStatistics( + serializedBytes, completedStatisticsTypeSerializer); + assertThat(completedStatistics).isEqualTo(data[0]); + } + + @Test + public void testRestoreNewSerializer() throws Exception { + CompletedStatisticsSerializer completedStatisticsTypeSerializer = + (CompletedStatisticsSerializer) createSerializer(); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + CompletedStatistics completedStatistics = + StatisticsUtil.deserializeCompletedStatistics( + serializedBytes, completedStatisticsTypeSerializer); + assertThat(completedStatistics).isEqualTo(data[0]); + } } diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java index a08a8a73e80c..acecc5b60af1 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { + try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey nullSortKey = Fixtures.SORT_KEY.copy(); + nullSortKey.set(0, null); + + StatisticsEvent checkpoint1Subtask0DataStatisticEvent = + Fixtures.createStatisticsEvent( + type, + Fixtures.TASK_STATISTICS_SERIALIZER, + 1L, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + StatisticsEvent checkpoint1Subtask1DataStatisticEvent = + Fixtures.createStatisticsEvent( + type, + Fixtures.TASK_STATISTICS_SERIALIZER, + 1L, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + // Handle events from operators for checkpoint 1 + dataStatisticsCoordinator.handleEventFromOperator( + 0, 0, checkpoint1Subtask0DataStatisticEvent); + dataStatisticsCoordinator.handleEventFromOperator( + 1, 0, checkpoint1Subtask1DataStatisticEvent); + + waitForCoordinatorToProcessActions(dataStatisticsCoordinator); + + Map keyFrequency = + ImmutableMap.of(nullSortKey, 2L, CHAR_KEYS.get("b"), 3L, CHAR_KEYS.get("c"), 5L); + MapAssignment mapAssignment = + MapAssignment.fromKeyFrequency(NUM_SUBTASKS, keyFrequency, 0.0d, SORT_ORDER_COMPARTOR); + + CompletedStatistics completedStatistics = dataStatisticsCoordinator.completedStatistics(); + assertThat(completedStatistics.checkpointId()).isEqualTo(1L); + assertThat(completedStatistics.type()).isEqualTo(StatisticsUtil.collectType(type)); + if (StatisticsUtil.collectType(type) == StatisticsType.Map) { + assertThat(completedStatistics.keyFrequency()).isEqualTo(keyFrequency); + } else { + assertThat(completedStatistics.keySamples()) + .containsExactly( + nullSortKey, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + } + + GlobalStatistics globalStatistics = dataStatisticsCoordinator.globalStatistics(); + assertThat(globalStatistics.checkpointId()).isEqualTo(1L); + assertThat(globalStatistics.type()).isEqualTo(StatisticsUtil.collectType(type)); + if (StatisticsUtil.collectType(type) == StatisticsType.Map) { + assertThat(globalStatistics.mapAssignment()).isEqualTo(mapAssignment); + } else { + assertThat(globalStatistics.rangeBounds()).containsExactly(CHAR_KEYS.get("b")); + } + } + } + @Test public void testRequestGlobalStatisticsEventHandling() throws Exception { try (DataStatisticsCoordinator dataStatisticsCoordinator = diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index c760f1ba96d3..70837f5ef480 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -34,6 +34,8 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; @@ -135,6 +137,34 @@ public void testProcessElement(StatisticsType type) throws Exception { } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testProcessElementWithNull(StatisticsType type) throws Exception { + DataStatisticsOperator operator = createOperator(type, Fixtures.NUM_SUBTASKS); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.processElement(new StreamRecord<>(GenericRowData.of(null, 5))); + operator.processElement(new StreamRecord<>(GenericRowData.of(StringData.fromString("a"), 3))); + + DataStatistics localStatistics = operator.localStatistics(); + SortKeySerializer sortKeySerializer = + new SortKeySerializer(Fixtures.SCHEMA, Fixtures.SORT_ORDER); + DataStatisticsSerializer taskStatisticsSerializer = + new DataStatisticsSerializer(sortKeySerializer); + DataOutputSerializer outputView = new DataOutputSerializer(1024); + + taskStatisticsSerializer.serialize(localStatistics, outputView); + DataInputDeserializer inputView = new DataInputDeserializer(outputView.getCopyOfBuffer()); + DataStatistics dataStatistics = taskStatisticsSerializer.deserialize(inputView); + + testHarness.endInput(); + + assertThat(localStatistics).isEqualTo(dataStatistics); + } + } + @ParameterizedTest @EnumSource(StatisticsType.class) public void testOperatorOutput(StatisticsType type) throws Exception { diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java index 54cceae6e55b..ac2e2784e681 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java @@ -80,8 +80,8 @@ public void testSerializationSize() throws Exception { byte[] serializedBytes = output.getCopyOfBuffer(); assertThat(serializedBytes.length) .as( - "Serialized bytes for sort key should be 38 bytes (34 UUID text + 4 byte integer of string length") - .isEqualTo(38); + "Serialized bytes for sort key should be 39 bytes (34 UUID text + 4 byte integer of string length + 1 byte of isnull flag") + .isEqualTo(39); DataInputDeserializer input = new DataInputDeserializer(serializedBytes); SortKey deserialized = serializer.deserialize(input); diff --git a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java index 012654603b04..2359f4b32c35 100644 --- a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java +++ b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java @@ -73,6 +73,28 @@ public void testRestoredSerializer() throws Exception { assertThat(deserialized).isEqualTo(sortKey); } + @Test + public void testRestoredOldSerializer() throws Exception { + RowData rowData = GenericRowData.of(StringData.fromString("str"), 1); + RowDataWrapper rowDataWrapper = new RowDataWrapper(ROW_TYPE, SCHEMA.asStruct()); + StructLike struct = rowDataWrapper.wrap(rowData); + SortKey sortKey = SORT_KEY.copy(); + sortKey.wrap(struct); + + SortKeySerializer originalSerializer = new SortKeySerializer(SCHEMA, SORT_ORDER, 1); + TypeSerializerSnapshot snapshot = + roundTrip(originalSerializer.snapshotConfiguration()); + TypeSerializer restoredSerializer = snapshot.restoreSerializer(); + ((SortKeySerializer) restoredSerializer).setVersion(1); + DataOutputSerializer output = new DataOutputSerializer(1024); + originalSerializer.serialize(sortKey, output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + DataInputDeserializer input = new DataInputDeserializer(serializedBytes); + SortKey deserialized = restoredSerializer.deserialize(input); + assertThat(deserialized).isEqualTo(sortKey); + } + @Test public void testSnapshotIsCompatibleWithSameSortOrder() throws Exception { SortKeySerializer oldSerializer = new SortKeySerializer(schema, sortOrder); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java index e4cba174f0f2..a8bf0f839e49 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatistics.java @@ -108,4 +108,21 @@ boolean isEmpty() { return keyFrequency().isEmpty(); } } + + boolean isValid() { + if (type == StatisticsType.Sketch) { + if (null == keySamples) { + return false; + } + } else { + if (null == keyFrequency()) { + return false; + } + if (keyFrequency().values().contains(null)) { + return false; + } + } + + return true; + } } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java index 1ac0e386a011..48c85a9bd91e 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/CompletedStatisticsSerializer.java @@ -48,6 +48,18 @@ class CompletedStatisticsSerializer extends TypeSerializer this.keySamplesSerializer = new ListSerializer<>(sortKeySerializer); } + public void changeSortKeySerializerVersion(int version) { + if (sortKeySerializer instanceof SortKeySerializer) { + ((SortKeySerializer) sortKeySerializer).setVersion(version); + } + } + + public void changeSortKeySerializerVersionLatest() { + if (sortKeySerializer instanceof SortKeySerializer) { + ((SortKeySerializer) sortKeySerializer).restoreToLatestVersion(); + } + } + @Override public boolean isImmutableType() { return false; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java index e2a282efd82e..4f2afd60fed1 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.java @@ -370,7 +370,8 @@ public void resetToCheckpoint(long checkpointId, byte[] checkpointData) { "Restoring data statistic coordinator {} from checkpoint {}", operatorName, checkpointId); this.completedStatistics = StatisticsUtil.deserializeCompletedStatistics( - checkpointData, completedStatisticsSerializer); + checkpointData, (CompletedStatisticsSerializer) completedStatisticsSerializer); + // recompute global statistics in case downstream parallelism changed this.globalStatistics = globalStatistics( diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java index b3e536bdde52..5b979e546d51 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java @@ -52,9 +52,12 @@ class SortKeySerializer extends TypeSerializer { private final int size; private final Types.NestedField[] transformedFields; + private int version; + private transient SortKey sortKey; - SortKeySerializer(Schema schema, SortOrder sortOrder) { + SortKeySerializer(Schema schema, SortOrder sortOrder, int version) { + this.version = version; this.schema = schema; this.sortOrder = sortOrder; this.size = sortOrder.fields().size(); @@ -75,6 +78,10 @@ class SortKeySerializer extends TypeSerializer { } } + SortKeySerializer(Schema schema, SortOrder sortOrder) { + this(schema, sortOrder, SortKeySerializerSnapshot.CURRENT_VERSION); + } + private SortKey lazySortKey() { if (sortKey == null) { this.sortKey = new SortKey(schema, sortOrder); @@ -83,6 +90,18 @@ private SortKey lazySortKey() { return sortKey; } + public int getLatestVersion() { + return snapshotConfiguration().getCurrentVersion(); + } + + public void restoreToLatestVersion() { + this.version = snapshotConfiguration().getCurrentVersion(); + } + + public void setVersion(int version) { + this.version = version; + } + @Override public boolean isImmutableType() { return false; @@ -124,6 +143,16 @@ public void serialize(SortKey record, DataOutputView target) throws IOException for (int i = 0; i < size; ++i) { int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); + if (version > 1) { + Object value = record.get(i, Object.class); + if (value == null) { + target.writeBoolean(true); + continue; + } else { + target.writeBoolean(false); + } + } + switch (typeId) { case BOOLEAN: target.writeBoolean(record.get(i, Boolean.class)); @@ -192,6 +221,14 @@ public SortKey deserialize(SortKey reuse, DataInputView source) throws IOExcepti reuse.size(), size); for (int i = 0; i < size; ++i) { + if (version > 1) { + boolean isNull = source.readBoolean(); + if (isNull) { + reuse.set(i, null); + continue; + } + } + int fieldId = transformedFields[i].fieldId(); Type.TypeID typeId = transformedFields[i].type().typeId(); switch (typeId) { @@ -276,11 +313,13 @@ public TypeSerializerSnapshot snapshotConfiguration() { } public static class SortKeySerializerSnapshot implements TypeSerializerSnapshot { - private static final int CURRENT_VERSION = 1; + private static final int CURRENT_VERSION = 2; private Schema schema; private SortOrder sortOrder; + private int version = CURRENT_VERSION; + /** Constructor for read instantiation. */ @SuppressWarnings({"unused", "checkstyle:RedundantModifier"}) public SortKeySerializerSnapshot() { @@ -310,10 +349,16 @@ public void writeSnapshot(DataOutputView out) throws IOException { @Override public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException { - if (readVersion == 1) { - readV1(in); - } else { - throw new IllegalArgumentException("Unknown read version: " + readVersion); + switch (readVersion) { + case 1: + read(in); + this.version = 1; + break; + case 2: + read(in); + break; + default: + throw new IllegalArgumentException("Unknown read version: " + readVersion); } } @@ -324,6 +369,10 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( return TypeSerializerSchemaCompatibility.incompatible(); } + if (oldSerializerSnapshot.getCurrentVersion() == 1 && this.getCurrentVersion() == 2) { + return TypeSerializerSchemaCompatibility.compatibleAfterMigration(); + } + // Sort order should be identical SortKeySerializerSnapshot oldSnapshot = (SortKeySerializerSnapshot) oldSerializerSnapshot; if (!sortOrder.sameOrder(oldSnapshot.sortOrder)) { @@ -349,10 +398,10 @@ public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( public TypeSerializer restoreSerializer() { Preconditions.checkState(schema != null, "Invalid schema: null"); Preconditions.checkState(sortOrder != null, "Invalid sort order: null"); - return new SortKeySerializer(schema, sortOrder); + return new SortKeySerializer(schema, sortOrder, version); } - private void readV1(DataInputView in) throws IOException { + private void read(DataInputView in) throws IOException { String schemaJson = StringUtils.readString(in); String sortOrderJson = StringUtils.readString(in); this.schema = SchemaParser.fromJson(schemaJson); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java index 5d48ec57ca49..f2efc7fa9834 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/StatisticsUtil.java @@ -73,12 +73,29 @@ static byte[] serializeCompletedStatistics( } static CompletedStatistics deserializeCompletedStatistics( - byte[] bytes, TypeSerializer statisticsSerializer) { + byte[] bytes, CompletedStatisticsSerializer statisticsSerializer) { try { DataInputDeserializer input = new DataInputDeserializer(bytes); - return statisticsSerializer.deserialize(input); - } catch (IOException e) { - throw new UncheckedIOException("Fail to deserialize aggregated statistics", e); + CompletedStatistics completedStatistics = statisticsSerializer.deserialize(input); + if (!completedStatistics.isValid()) { + throw new RuntimeException("Fail to deserialize aggregated statistics,change to v1"); + } + + return completedStatistics; + } catch (Exception e) { + try { + // If we restore from a lower version, the new version of SortKeySerializer cannot correctly + // parse the checkpointData, so we need to first switch the version to v1. Once the state + // data is successfully parsed, we need to switch the serialization version to the latest + // version to parse the subsequent data passed from the TM. + statisticsSerializer.changeSortKeySerializerVersion(1); + DataInputDeserializer input = new DataInputDeserializer(bytes); + CompletedStatistics deserialize = statisticsSerializer.deserialize(input); + statisticsSerializer.changeSortKeySerializerVersionLatest(); + return deserialize; + } catch (IOException ioException) { + throw new UncheckedIOException("Fail to deserialize aggregated statistics", ioException); + } } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java index df8c3c79d3e3..aa9a0291b38f 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkDistributionMode.java @@ -46,6 +46,7 @@ import org.apache.iceberg.flink.TestFixtures; import org.apache.iceberg.flink.sink.shuffle.StatisticsType; import org.apache.iceberg.flink.source.BoundedTestSource; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -252,6 +253,44 @@ public void testRangeDistributionWithoutSortOrderPartitioned() throws Exception assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); } + @TestTemplate + public void testRangeDistributionWithNullValue() throws Exception { + assumeThat(partitioned).isTrue(); + + table + .updateProperties() + .set(TableProperties.WRITE_DISTRIBUTION_MODE, DistributionMode.RANGE.modeName()) + .commit(); + + int numOfCheckpoints = 6; + List> charRows = createCharRows(numOfCheckpoints, 10); + charRows.add(ImmutableList.of(Row.of(1, null))); + DataStream dataStream = + env.addSource(createRangeDistributionBoundedSource(charRows), ROW_TYPE_INFO); + FlinkSink.Builder builder = + FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) + .table(table) + .tableLoader(tableLoader) + .writeParallelism(parallelism); + + // sort based on partition columns + builder.append(); + env.execute(getClass().getSimpleName()); + + table.refresh(); + // ordered in reverse timeline from the newest snapshot to the oldest snapshot + List snapshots = Lists.newArrayList(table.snapshots().iterator()); + // only keep the snapshots with added data files + snapshots = + snapshots.stream() + .filter(snapshot -> snapshot.addedDataFiles(table.io()).iterator().hasNext()) + .collect(Collectors.toList()); + + // Sometimes we will have more checkpoints than the bounded source if we pass the + // auto checkpoint interval. Thus producing multiple snapshots. + assertThat(snapshots).hasSizeGreaterThanOrEqualTo(numOfCheckpoints); + } + @TestTemplate public void testRangeDistributionWithSortOrder() throws Exception { table diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java index 4ee9888934a8..1975d7e8d654 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestCompletedStatisticsSerializer.java @@ -19,11 +19,15 @@ package org.apache.iceberg.flink.sink.shuffle; import static org.apache.iceberg.flink.sink.shuffle.Fixtures.CHAR_KEYS; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.iceberg.SortKey; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.Test; public class TestCompletedStatisticsSerializer extends SerializerTestBase { @@ -51,4 +55,49 @@ protected CompletedStatistics[] getTestData() { CompletedStatistics.fromKeySamples(2L, new SortKey[] {CHAR_KEYS.get("a"), CHAR_KEYS.get("b")}) }; } + + @Test + public void testSerializer() throws Exception { + TypeSerializer completedStatisticsTypeSerializer = createSerializer(); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + DataInputDeserializer input = new DataInputDeserializer(serializedBytes); + CompletedStatistics deserialized = completedStatisticsTypeSerializer.deserialize(input); + assertThat(deserialized).isEqualTo(data[0]); + } + + @Test + public void testRestoreOldVersionSerializer() throws Exception { + CompletedStatisticsSerializer completedStatisticsTypeSerializer = + (CompletedStatisticsSerializer) createSerializer(); + completedStatisticsTypeSerializer.changeSortKeySerializerVersion(1); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + completedStatisticsTypeSerializer.changeSortKeySerializerVersionLatest(); + CompletedStatistics completedStatistics = + StatisticsUtil.deserializeCompletedStatistics( + serializedBytes, completedStatisticsTypeSerializer); + assertThat(completedStatistics).isEqualTo(data[0]); + } + + @Test + public void testRestoreNewSerializer() throws Exception { + CompletedStatisticsSerializer completedStatisticsTypeSerializer = + (CompletedStatisticsSerializer) createSerializer(); + CompletedStatistics[] data = getTestData(); + DataOutputSerializer output = new DataOutputSerializer(1024); + completedStatisticsTypeSerializer.serialize(data[0], output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + CompletedStatistics completedStatistics = + StatisticsUtil.deserializeCompletedStatistics( + serializedBytes, completedStatisticsTypeSerializer); + assertThat(completedStatistics).isEqualTo(data[0]); + } } diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java index a08a8a73e80c..acecc5b60af1 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsCoordinator.java @@ -152,6 +152,80 @@ public void testDataStatisticsEventHandling(StatisticsType type) throws Exceptio } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testDataStatisticsEventHandlingWithNullValue(StatisticsType type) throws Exception { + try (DataStatisticsCoordinator dataStatisticsCoordinator = createCoordinator(type)) { + dataStatisticsCoordinator.start(); + tasksReady(dataStatisticsCoordinator); + + SortKey nullSortKey = Fixtures.SORT_KEY.copy(); + nullSortKey.set(0, null); + + StatisticsEvent checkpoint1Subtask0DataStatisticEvent = + Fixtures.createStatisticsEvent( + type, + Fixtures.TASK_STATISTICS_SERIALIZER, + 1L, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + StatisticsEvent checkpoint1Subtask1DataStatisticEvent = + Fixtures.createStatisticsEvent( + type, + Fixtures.TASK_STATISTICS_SERIALIZER, + 1L, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + // Handle events from operators for checkpoint 1 + dataStatisticsCoordinator.handleEventFromOperator( + 0, 0, checkpoint1Subtask0DataStatisticEvent); + dataStatisticsCoordinator.handleEventFromOperator( + 1, 0, checkpoint1Subtask1DataStatisticEvent); + + waitForCoordinatorToProcessActions(dataStatisticsCoordinator); + + Map keyFrequency = + ImmutableMap.of(nullSortKey, 2L, CHAR_KEYS.get("b"), 3L, CHAR_KEYS.get("c"), 5L); + MapAssignment mapAssignment = + MapAssignment.fromKeyFrequency(NUM_SUBTASKS, keyFrequency, 0.0d, SORT_ORDER_COMPARTOR); + + CompletedStatistics completedStatistics = dataStatisticsCoordinator.completedStatistics(); + assertThat(completedStatistics.checkpointId()).isEqualTo(1L); + assertThat(completedStatistics.type()).isEqualTo(StatisticsUtil.collectType(type)); + if (StatisticsUtil.collectType(type) == StatisticsType.Map) { + assertThat(completedStatistics.keyFrequency()).isEqualTo(keyFrequency); + } else { + assertThat(completedStatistics.keySamples()) + .containsExactly( + nullSortKey, + nullSortKey, + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("b"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c"), + CHAR_KEYS.get("c")); + } + + GlobalStatistics globalStatistics = dataStatisticsCoordinator.globalStatistics(); + assertThat(globalStatistics.checkpointId()).isEqualTo(1L); + assertThat(globalStatistics.type()).isEqualTo(StatisticsUtil.collectType(type)); + if (StatisticsUtil.collectType(type) == StatisticsType.Map) { + assertThat(globalStatistics.mapAssignment()).isEqualTo(mapAssignment); + } else { + assertThat(globalStatistics.rangeBounds()).containsExactly(CHAR_KEYS.get("b")); + } + } + } + @Test public void testRequestGlobalStatisticsEventHandling() throws Exception { try (DataStatisticsCoordinator dataStatisticsCoordinator = diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java index bc248b778184..f7a7a147e73a 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java @@ -34,6 +34,8 @@ import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway; @@ -136,6 +138,34 @@ public void testProcessElement(StatisticsType type) throws Exception { } } + @ParameterizedTest + @EnumSource(StatisticsType.class) + public void testProcessElementWithNull(StatisticsType type) throws Exception { + DataStatisticsOperator operator = createOperator(type, Fixtures.NUM_SUBTASKS); + try (OneInputStreamOperatorTestHarness testHarness = + createHarness(operator)) { + StateInitializationContext stateContext = getStateContext(); + operator.initializeState(stateContext); + operator.processElement(new StreamRecord<>(GenericRowData.of(null, 5))); + operator.processElement(new StreamRecord<>(GenericRowData.of(StringData.fromString("a"), 3))); + + DataStatistics localStatistics = operator.localStatistics(); + SortKeySerializer sortKeySerializer = + new SortKeySerializer(Fixtures.SCHEMA, Fixtures.SORT_ORDER); + DataStatisticsSerializer taskStatisticsSerializer = + new DataStatisticsSerializer(sortKeySerializer); + DataOutputSerializer outputView = new DataOutputSerializer(1024); + + taskStatisticsSerializer.serialize(localStatistics, outputView); + DataInputDeserializer inputView = new DataInputDeserializer(outputView.getCopyOfBuffer()); + DataStatistics dataStatistics = taskStatisticsSerializer.deserialize(inputView); + + testHarness.endInput(); + + assertThat(localStatistics).isEqualTo(dataStatistics); + } + } + @ParameterizedTest @EnumSource(StatisticsType.class) public void testOperatorOutput(StatisticsType type) throws Exception { diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java index 54cceae6e55b..ac2e2784e681 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java @@ -80,8 +80,8 @@ public void testSerializationSize() throws Exception { byte[] serializedBytes = output.getCopyOfBuffer(); assertThat(serializedBytes.length) .as( - "Serialized bytes for sort key should be 38 bytes (34 UUID text + 4 byte integer of string length") - .isEqualTo(38); + "Serialized bytes for sort key should be 39 bytes (34 UUID text + 4 byte integer of string length + 1 byte of isnull flag") + .isEqualTo(39); DataInputDeserializer input = new DataInputDeserializer(serializedBytes); SortKey deserialized = serializer.deserialize(input); diff --git a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java index c0f688f2589e..2d87b089cecb 100644 --- a/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java +++ b/flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerSnapshot.java @@ -73,6 +73,28 @@ public void testRestoredSerializer() throws Exception { assertThat(deserialized).isEqualTo(sortKey); } + @Test + public void testRestoredOldSerializer() throws Exception { + RowData rowData = GenericRowData.of(StringData.fromString("str"), 1); + RowDataWrapper rowDataWrapper = new RowDataWrapper(ROW_TYPE, SCHEMA.asStruct()); + StructLike struct = rowDataWrapper.wrap(rowData); + SortKey sortKey = SORT_KEY.copy(); + sortKey.wrap(struct); + + SortKeySerializer originalSerializer = new SortKeySerializer(SCHEMA, SORT_ORDER, 1); + TypeSerializerSnapshot snapshot = + roundTrip(originalSerializer.snapshotConfiguration()); + TypeSerializer restoredSerializer = snapshot.restoreSerializer(); + ((SortKeySerializer) restoredSerializer).setVersion(1); + DataOutputSerializer output = new DataOutputSerializer(1024); + originalSerializer.serialize(sortKey, output); + byte[] serializedBytes = output.getCopyOfBuffer(); + + DataInputDeserializer input = new DataInputDeserializer(serializedBytes); + SortKey deserialized = restoredSerializer.deserialize(input); + assertThat(deserialized).isEqualTo(sortKey); + } + @Test public void testSnapshotIsCompatibleWithSameSortOrder() throws Exception { SortKeySerializer.SortKeySerializerSnapshot oldSnapshot =