From f742a3ec2429e460fd16f3e2cbb934f9976a307e Mon Sep 17 00:00:00 2001 From: Steven Wu Date: Tue, 6 Jun 2023 21:20:16 -0700 Subject: [PATCH] Flink: implement range partitioner that leverages traffic distribution statistics --- .../shuffle/MapRangePartitionerBenchmark.java | 197 +++++++ .../sink/shuffle/MapRangePartitioner.java | 288 ++++++++++ .../sink/shuffle/TestMapRangePartitioner.java | 509 ++++++++++++++++++ jmh.gradle | 5 + 4 files changed, 999 insertions(+) create mode 100644 flink/v1.17/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java create mode 100644 flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java create mode 100644 flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java diff --git a/flink/v1.17/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java b/flink/v1.17/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java new file mode 100644 index 000000000000..a9d909f6adb5 --- /dev/null +++ b/flink/v1.17/flink/src/jmh/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitionerBenchmark.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@Fork(1) +@State(Scope.Benchmark) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.SingleShotTime) +public class MapRangePartitionerBenchmark { + private static final String CHARS = + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.!?"; + private static final int SAMPLE_SIZE = 100_000; + private static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name2", Types.StringType.get()), + Types.NestedField.required(3, "name3", Types.StringType.get()), + Types.NestedField.required(4, "name4", Types.StringType.get()), + Types.NestedField.required(5, "name5", Types.StringType.get()), + Types.NestedField.required(6, "name6", Types.StringType.get()), + Types.NestedField.required(7, "name7", Types.StringType.get()), + Types.NestedField.required(8, "name8", Types.StringType.get()), + Types.NestedField.required(9, "name9", Types.StringType.get())); + + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + private static final SortKey SORT_KEY = new SortKey(SCHEMA, SORT_ORDER); + + private MapRangePartitioner partitioner; + private RowData[] rows; + + @Setup + public void setupBenchmark() { + NavigableMap weights = longTailDistribution(100_000, 24, 240, 100, 2.0); + Map mapStatistics = Maps.newHashMapWithExpectedSize(weights.size()); + weights.forEach( + (id, weight) -> { + SortKey sortKey = SORT_KEY.copy(); + sortKey.set(0, id); + mapStatistics.put(sortKey, weight); + }); + + MapDataStatistics dataStatistics = new MapDataStatistics(mapStatistics); + this.partitioner = + new MapRangePartitioner( + SCHEMA, SortOrder.builderFor(SCHEMA).asc("id").build(), dataStatistics, 2); + + List keys = weights.keySet().stream().collect(Collectors.toList()); + long[] weightsCDF = new long[keys.size()]; + long totalWeight = 0; + for (int i = 0; i < keys.size(); ++i) { + totalWeight += weights.get(keys.get(i)); + weightsCDF[i] = totalWeight; + } + + // pre-calculate the samples for benchmark run + this.rows = new GenericRowData[SAMPLE_SIZE]; + for (int i = 0; i < SAMPLE_SIZE; ++i) { + long weight = ThreadLocalRandom.current().nextLong(totalWeight); + int index = binarySearchIndex(weightsCDF, weight); + rows[i] = + GenericRowData.of( + keys.get(index), + randomString("name2-"), + randomString("name3-"), + randomString("name4-"), + randomString("name5-"), + randomString("name6-"), + randomString("name7-"), + randomString("name8-"), + randomString("name9-")); + } + } + + @TearDown + public void tearDownBenchmark() {} + + @Benchmark + @Threads(1) + public void eventHourLongTail(Blackhole blackhole) { + for (int i = 0; i < SAMPLE_SIZE; ++i) { + blackhole.consume(partitioner.partition(rows[i], 128)); + } + } + + private static String randomString(String prefix) { + int length = ThreadLocalRandom.current().nextInt(200); + byte[] buffer = new byte[length]; + + for (int i = 0; i < length; i += 1) { + buffer[i] = (byte) CHARS.charAt(ThreadLocalRandom.current().nextInt(CHARS.length())); + } + + return prefix + new String(buffer); + } + + /** find the index where weightsUDF[index] < weight && weightsUDF[index+1] >= weight */ + private static int binarySearchIndex(long[] weightsUDF, long target) { + Preconditions.checkArgument( + target < weightsUDF[weightsUDF.length - 1], + "weight is out of range: total weight = %s, search target = %s", + weightsUDF[weightsUDF.length - 1], + target); + int start = 0; + int end = weightsUDF.length - 1; + while (start < end) { + int mid = (start + end) / 2; + if (weightsUDF[mid] < target && weightsUDF[mid + 1] >= target) { + return mid; + } + + if (weightsUDF[mid] >= target) { + end = mid - 1; + } else if (weightsUDF[mid + 1] < target) { + start = mid + 1; + } + } + return start; + } + + /** Key is the id string and value is the weight in long value. */ + private static NavigableMap longTailDistribution( + long startingWeight, + int longTailStartingIndex, + int longTailLength, + long longTailBaseWeight, + double weightRandomJitterPercentage) { + + NavigableMap weights = Maps.newTreeMap(); + + // first part just decays the weight by half + long currentWeight = startingWeight; + for (int index = 0; index < longTailStartingIndex; ++index) { + double jitter = ThreadLocalRandom.current().nextDouble(weightRandomJitterPercentage / 100); + long weight = (long) (currentWeight * (1.0 + jitter)); + weights.put(index, weight); + if (currentWeight > longTailBaseWeight) { + currentWeight = currentWeight / 2; + } + } + + // long tail part + for (int index = longTailStartingIndex; + index < longTailStartingIndex + longTailLength; + ++index) { + long longTaileight = + (long) + (longTailBaseWeight + * ThreadLocalRandom.current().nextDouble(weightRandomJitterPercentage)); + weights.put(index, longTaileight); + } + + return weights; + } +} diff --git a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java new file mode 100644 index 000000000000..424b3942080e --- /dev/null +++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.flink.api.common.functions.Partitioner; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.SortOrderComparators; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.ArrayUtil; +import org.apache.iceberg.util.Pair; + +/** + * Internal partitioner implementation that supports MapDataStatistics, which is typically used for + * low-cardinality use cases. While MapDataStatistics can keep accurate counters, it can't be used + * for high-cardinality use cases. Otherwise, the memory footprint is too high. + */ +class MapRangePartitioner implements Partitioner { + private final RowDataWrapper rowDataWrapper; + private final SortKey sortKey; + private final Comparator comparator; + private final Map mapStatistics; + private final double closeFileCostInWeightPercentage; + + // lazily computed due to the need of numPartitions + private Map assignment; + private NavigableMap sortedStatsWithCloseFileCost; + + MapRangePartitioner( + Schema schema, + SortOrder sortOrder, + MapDataStatistics dataStatistics, + double closeFileCostInWeightPercentage) { + this.rowDataWrapper = new RowDataWrapper(FlinkSchemaUtil.convert(schema), schema.asStruct()); + this.sortKey = new SortKey(schema, sortOrder); + this.comparator = SortOrderComparators.forSchema(schema, sortOrder); + this.mapStatistics = dataStatistics.statistics(); + this.closeFileCostInWeightPercentage = closeFileCostInWeightPercentage; + } + + @Override + public int partition(RowData row, int numPartitions) { + // assignment table can only be built lazily when first referenced here, + // because number of partitions (downstream subtasks) is needed + Map assignmentMap = assignment(numPartitions); + // reuse the sortKey and rowDataWrapper + sortKey.wrap(rowDataWrapper.wrap(row)); + KeyAssignment keyAssignment = assignmentMap.get(sortKey); + if (keyAssignment == null) { + // haven't learned about the key before. fall back to random selection. + return ThreadLocalRandom.current().nextInt(numPartitions); + } + + return keyAssignment.select(); + } + + @VisibleForTesting + Map assignment(int numPartitions) { + if (assignment == null) { + long totalWeight = mapStatistics.values().stream().mapToLong(l -> l).sum(); + double targetWeightPerSubtask = ((double) totalWeight) / numPartitions; + long closeFileCostInWeight = + (long) Math.ceil(targetWeightPerSubtask * closeFileCostInWeightPercentage / 100); + + // add one close file cost for each key even if a key with large weight may be assigned to + // multiple subtasks + this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator); + mapStatistics.forEach( + (k, v) -> { + int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask); + long estimatedCloseFileCost = closeFileCostInWeight * estimatedSplits; + sortedStatsWithCloseFileCost.put(k, v + estimatedCloseFileCost); + }); + + long totalWeightWithCloseFileCost = + sortedStatsWithCloseFileCost.values().stream().mapToLong(l -> l).sum(); + long targetWeightPerSubtaskWithCloseFileCost = + (long) Math.ceil(((double) totalWeightWithCloseFileCost) / numPartitions); + this.assignment = + buildAssignment( + numPartitions, sortedStatsWithCloseFileCost, targetWeightPerSubtaskWithCloseFileCost); + } + + return assignment; + } + + @VisibleForTesting + NavigableMap sortedStatsWithCloseFileCost() { + return sortedStatsWithCloseFileCost; + } + + /** + * @return assignment summary for every subtask. Key is subtaskId. Value pair is (weight assigned + * to the subtask, number of keys assigned to the subtask) + */ + Map> assignmentInfo() { + Map> assignmentInfo = Maps.newTreeMap(); + assignment.forEach( + (key, keyAssignment) -> { + for (int i = 0; i < keyAssignment.assignedSubtasks.length; ++i) { + int subtaskId = keyAssignment.assignedSubtasks[i]; + long subtaskWeight = keyAssignment.subtaskWeights[i]; + Pair oldValue = assignmentInfo.getOrDefault(subtaskId, Pair.of(0L, 0)); + assignmentInfo.put( + subtaskId, Pair.of(oldValue.first() + subtaskWeight, oldValue.second() + 1)); + } + }); + + return assignmentInfo; + } + + private Map buildAssignment( + int numPartitions, + NavigableMap sortedStatistics, + long targetWeightPerSubtask) { + Map assignmentMap = + Maps.newHashMapWithExpectedSize(sortedStatistics.size()); + Iterator mapKeyIterator = sortedStatistics.keySet().iterator(); + int subtaskId = 0; + SortKey currentKey = null; + long keyRemainingWeight = 0L; + long subtaskRemainingWeight = targetWeightPerSubtask; + List assignedSubtasks = Lists.newArrayList(); + List subtaskWeights = Lists.newArrayList(); + while (mapKeyIterator.hasNext() && subtaskId < numPartitions) { + if (currentKey == null) { + currentKey = mapKeyIterator.next(); + keyRemainingWeight = sortedStatistics.get(currentKey); + } + + assignedSubtasks.add(subtaskId); + // assign the remaining weight of key to the current subtask if it is the last subtask + // or if the subtask has more capacity than the remaining key weight + if (subtaskId == numPartitions - 1 || keyRemainingWeight < subtaskRemainingWeight) { + subtaskWeights.add(keyRemainingWeight); + subtaskRemainingWeight -= keyRemainingWeight; + keyRemainingWeight = 0; + } else { + // filled up the current subtask + subtaskWeights.add(subtaskRemainingWeight); + keyRemainingWeight -= subtaskRemainingWeight; + // move on to the next subtask + subtaskId += 1; + subtaskRemainingWeight = targetWeightPerSubtask; + } + + if (keyRemainingWeight <= 0) { + // finishing up the assignment for the current key + Preconditions.checkState( + assignedSubtasks.size() == subtaskWeights.size(), + "List size mismatch: assigned subtasks = %s, subtask weights = %s", + assignedSubtasks, + subtaskWeights); + KeyAssignment keyAssignment = + new KeyAssignment( + ArrayUtil.toIntArray(assignedSubtasks), ArrayUtil.toLongArray(subtaskWeights)); + assignmentMap.put(currentKey, keyAssignment); + assignedSubtasks.clear(); + subtaskWeights.clear(); + currentKey = null; + keyRemainingWeight = 0L; + } + } + + return assignmentMap; + } + + /** Subtask assignment for a key */ + @VisibleForTesting + static class KeyAssignment { + private final int[] assignedSubtasks; + private final long[] subtaskWeights; + private final long keyWeight; + private final long[] cumulativeWeights; + + /** + * @param assignedSubtasks assigned subtasks for this key. It could be a single subtask. It + * could also be multiple subtasks if the key has heavy weight that should be handled by + * multiple subtasks. + * @param subtaskWeights assigned weight for each subtask. E.g., if the keyWeight is 27 and the + * key is assigned to 3 subtasks, subtaskWeights could contain values as [10, 10, 7] for + * target weight of 10 per subtask. + */ + KeyAssignment(int[] assignedSubtasks, long[] subtaskWeights) { + Preconditions.checkArgument( + assignedSubtasks != null, "Invalid assigned subtasks array: null"); + Preconditions.checkArgument( + assignedSubtasks.length > 0, "Invalid assigned subtasks array: empty"); + Preconditions.checkArgument(subtaskWeights != null, "Invalid subtask weights array: null"); + Preconditions.checkArgument( + assignedSubtasks.length == subtaskWeights.length, + "Invalid assigned subtask weights array: length mismatch (expected = %s, length = %s)", + assignedSubtasks.length, + subtaskWeights.length); + + this.assignedSubtasks = assignedSubtasks; + this.subtaskWeights = subtaskWeights; + this.keyWeight = Arrays.stream(subtaskWeights).sum(); + this.cumulativeWeights = new long[subtaskWeights.length]; + long cumulativeWeight = 0; + for (int i = 0; i < subtaskWeights.length; ++i) { + cumulativeWeight += subtaskWeights[i]; + cumulativeWeights[i] = cumulativeWeight; + } + } + + /** @return subtask id */ + int select() { + if (assignedSubtasks.length == 1) { + // only choice. no need to run random number generator. + return assignedSubtasks[0]; + } else { + long randomNumber = ThreadLocalRandom.current().nextLong(keyWeight); + int index = Arrays.binarySearch(cumulativeWeights, randomNumber); + // choose the subtask where randomNumber < cumulativeWeights[pos]. + // this works regardless whether index is negative or not. + int position = Math.abs(index + 1); + Preconditions.checkState( + position < assignedSubtasks.length, + "Invalid selected position: out of range. key weight = %s, random number = %s, cumulative weights array = %s", + keyWeight, + randomNumber, + cumulativeWeights); + return assignedSubtasks[position]; + } + } + + @Override + public int hashCode() { + return 31 * Arrays.hashCode(assignedSubtasks) + Arrays.hashCode(subtaskWeights); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + KeyAssignment that = (KeyAssignment) o; + return Arrays.equals(assignedSubtasks, that.assignedSubtasks) + && Arrays.equals(subtaskWeights, that.subtaskWeights); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("assignedSubtasks", assignedSubtasks) + .add("subtaskWeights", subtaskWeights) + .toString(); + } + } +} diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java new file mode 100644 index 000000000000..168e632e477a --- /dev/null +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java @@ -0,0 +1,509 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.shuffle; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.SortKey; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataWrapper; +import org.apache.iceberg.flink.TestFixtures; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestMapRangePartitioner { + private static final SortOrder SORT_ORDER = + SortOrder.builderFor(TestFixtures.SCHEMA).asc("data").build(); + + private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, SORT_ORDER); + private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(TestFixtures.SCHEMA); + + private static final RowDataWrapper KEY_0_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_0 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_1_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_1 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_2_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_2 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_3_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_3 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_4_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_4 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_5_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_5 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_6_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_6 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_7_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_7 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_8_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_8 = SORT_KEY.copy(); + private static final RowDataWrapper KEY_9_WRAPPER = + new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct()); + private static final SortKey SORT_KEY_9 = SORT_KEY.copy(); + + static { + SORT_KEY_0.wrap( + KEY_0_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k0"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_1.wrap( + KEY_1_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k1"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_2.wrap( + KEY_2_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k2"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_3.wrap( + KEY_3_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k3"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_4.wrap( + KEY_4_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k4"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_5.wrap( + KEY_5_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k5"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_6.wrap( + KEY_6_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k6"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_7.wrap( + KEY_7_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k7"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_8.wrap( + KEY_8_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k8"), 1, StringData.fromString("2023-06-20")))); + + SORT_KEY_9.wrap( + KEY_9_WRAPPER.wrap( + GenericRowData.of( + StringData.fromString("k9"), 1, StringData.fromString("2023-06-20")))); + } + + // Total weight is 80 + private final MapDataStatistics mapDataStatistics = new MapDataStatistics(ImmutableMap.of( + SORT_KEY_0, + 35L, + SORT_KEY_1, + 23L, + SORT_KEY_2, + 12L, + SORT_KEY_3, + 4L, + SORT_KEY_4, + 1L, + SORT_KEY_5, + 1L, + SORT_KEY_6, + 1L, + SORT_KEY_7, + 1L, + SORT_KEY_8, + 1L, + SORT_KEY_9, + 1L)); + + @Test + public void testEvenlyDividableNoClosingFileCost() { + MapRangePartitioner partitioner = + new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, mapDataStatistics, 0.0); + int numPartitions = 8; + + // each task should get targeted weight of 10 (=80/8) + Map expectedAssignment = + ImmutableMap.of( + SORT_KEY_0, + new MapRangePartitioner.KeyAssignment( + new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}), + SORT_KEY_1, + new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {5L, 10L, 8L}), + SORT_KEY_2, + new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {2L, 10L}), + SORT_KEY_3, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {4L}), + SORT_KEY_4, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}), + SORT_KEY_5, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}), + SORT_KEY_6, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}), + SORT_KEY_7, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}), + SORT_KEY_8, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}), + SORT_KEY_9, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L})); + Map actualAssignment = + partitioner.assignment(numPartitions); + Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment); + + // key: subtask id + // value pair: first is the assigned weight, second is the number of assigned keys + Map> expectedAssignmentInfo = + ImmutableMap.of( + 0, + Pair.of(10L, 1), + 1, + Pair.of(10L, 1), + 2, + Pair.of(10L, 1), + 3, + Pair.of(10L, 2), + 4, + Pair.of(10L, 1), + 5, + Pair.of(10L, 2), + 6, + Pair.of(10L, 1), + 7, + Pair.of(10L, 7)); + Map> actualAssignmentInfo = partitioner.assignmentInfo(); + Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo); + + Map>> partitionResults = + runPartitioner(partitioner, numPartitions); + validatePartitionResults(expectedAssignmentInfo, partitionResults); + } + + @Test + public void testEvenlyDividable20PercentClosingFileCost() { + MapRangePartitioner partitioner = + new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, mapDataStatistics, 20.0); + int numPartitions = 8; + + // target subtask weight is 10 before close file cost factored in. + // close file cost is 2 = 20% * 10. + // key weights before and after close file cost factored in + // before: 35, 23, 12, 4, 1, 1, 1, 1, 1, 1 + // close-cost: 8, 6, 4, 2, 2, 2, 2, 2, 2, 2 + // after: 43, 29, 16, 6, 3, 3, 3, 3, 3, 3 + // target subtask weight per subtask is 14 (112/8) + Map expectedAssignment = + ImmutableMap.of( + SORT_KEY_0, + new MapRangePartitioner.KeyAssignment( + new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}), + SORT_KEY_1, + new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {13L, 14L, 2L}), + SORT_KEY_2, + new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {12L, 4L}), + SORT_KEY_3, + new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] {6L}), + SORT_KEY_4, + new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] {3L}), + SORT_KEY_5, + new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {1L, 2L}), + SORT_KEY_6, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_7, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_8, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_9, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L})); + Map actualAssignment = + partitioner.assignment(numPartitions); + Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment); + + // key: subtask id + // value pair: first is the assigned weight for the subtask, second is the number of keys + // assigned to the subtask + Map> expectedAssignmentInfo = + ImmutableMap.of( + 0, + Pair.of(14L, 1), + 1, + Pair.of(14L, 1), + 2, + Pair.of(14L, 1), + 3, + Pair.of(14L, 2), + 4, + Pair.of(14L, 1), + 5, + Pair.of(14L, 2), + 6, + Pair.of(14L, 4), + 7, + Pair.of(14L, 5)); + Map> actualAssignmentInfo = partitioner.assignmentInfo(); + Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo); + + Map>> partitionResults = + runPartitioner(partitioner, numPartitions); + validatePartitionResults(expectedAssignmentInfo, partitionResults); + } + + @Test + public void testNonDividableNoClosingFileCost() { + MapRangePartitioner partitioner = + new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, mapDataStatistics, 0.0); + int numPartitions = 9; + + // each task should get targeted weight of 9 = ceiling(80/9) + Map expectedAssignment = + ImmutableMap.of( + SORT_KEY_0, + new MapRangePartitioner.KeyAssignment( + new int[] {0, 1, 2, 3}, new long[] {9L, 9L, 9L, 8L}), + SORT_KEY_1, + new MapRangePartitioner.KeyAssignment( + new int[] {3, 4, 5, 6}, new long[] {1L, 9L, 9L, 4L}), + SORT_KEY_2, + new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {5L, 7L}), + SORT_KEY_3, + new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] {2L, 2L}), + SORT_KEY_4, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}), + SORT_KEY_5, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}), + SORT_KEY_6, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}), + SORT_KEY_7, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}), + SORT_KEY_8, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}), + SORT_KEY_9, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L})); + Map actualAssignment = + partitioner.assignment(numPartitions); + Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment); + + // key: subtask id + // value pair: first is the assigned weight, second is the number of assigned keys + Map> expectedAssignmentInfo = + ImmutableMap.of( + 0, + Pair.of(9L, 1), + 1, + Pair.of(9L, 1), + 2, + Pair.of(9L, 1), + 3, + Pair.of(9L, 2), + 4, + Pair.of(9L, 1), + 5, + Pair.of(9L, 1), + 6, + Pair.of(9L, 2), + 7, + Pair.of(9L, 2), + 8, + Pair.of(8L, 7)); + Map> actualAssignmentInfo = partitioner.assignmentInfo(); + Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo); + + Map>> partitionResults = + runPartitioner(partitioner, numPartitions); + validatePartitionResults(expectedAssignmentInfo, partitionResults); + } + + @Test + public void testNonDividable20PercentClosingFileCost() { + MapRangePartitioner partitioner = + new MapRangePartitioner(TestFixtures.SCHEMA, SORT_ORDER, mapDataStatistics, 20.0); + int numPartitions = 9; + + // target subtask weight is 9 before close file cost factored in. + // close file cost is 2 (= 20% * 9) per file. + // key weights before and after close file cost factored in + // before: 35, 23, 12, 4, 1, 1, 1, 1, 1, 1 + // close-cost: 8, 6, 4, 2, 2, 2, 2, 2, 2, 2 + // after: 43, 29, 16, 6, 3, 3, 3, 3, 3, 3 + // target subtask weight per subtask is 13 (112/9) + Map expectedAssignment = + ImmutableMap.of( + SORT_KEY_0, + new MapRangePartitioner.KeyAssignment( + new int[] {0, 1, 2, 3}, new long[] {13L, 13L, 13L, 4L}), + SORT_KEY_1, + new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {9L, 13L, 7L}), + SORT_KEY_2, + new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {6L, 10L}), + SORT_KEY_3, + new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {3L, 3L}), + SORT_KEY_4, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_5, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_6, + new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}), + SORT_KEY_7, + new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] {1L, 2L}), + SORT_KEY_8, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {3L}), + SORT_KEY_9, + new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {3L})); + Map actualAssignment = + partitioner.assignment(numPartitions); + Assertions.assertThat(actualAssignment).isEqualTo(expectedAssignment); + + // key: subtask id + // value pair: first is the assigned weight for the subtask, second is the number of keys + // assigned to the subtask + Map> expectedAssignmentInfo = + ImmutableMap.of( + 0, + Pair.of(13L, 1), + 1, + Pair.of(13L, 1), + 2, + Pair.of(13L, 1), + 3, + Pair.of(13L, 2), + 4, + Pair.of(13L, 1), + 5, + Pair.of(13L, 2), + 6, + Pair.of(13L, 2), + 7, + Pair.of(13L, 5), + 8, + Pair.of(8L, 3)); + Map> actualAssignmentInfo = partitioner.assignmentInfo(); + Assertions.assertThat(actualAssignmentInfo).isEqualTo(expectedAssignmentInfo); + + Map>> partitionResults = + runPartitioner(partitioner, numPartitions); + validatePartitionResults(expectedAssignmentInfo, partitionResults); + } + + private static Map>> runPartitioner( + MapRangePartitioner partitioner, int numPartitions) { + // The Map key is the subtaskId. + // For the map value pair, the first element is the count of assigned and + // the second element of Set is for the set of assigned keys. + Map>> partitionResults = Maps.newHashMap(); + partitioner + .sortedStatsWithCloseFileCost() + .forEach( + (sortKey, weight) -> { + String key = sortKey.get(0, String.class); + // run 100x times of the weight + long iterations = weight * 100; + for (int i = 0; i < iterations; ++i) { + RowData rowData = + GenericRowData.of( + StringData.fromString(key), 1, StringData.fromString("2023-06-20")); + // TODO + int subtaskId = partitioner.partition(rowData, numPartitions); + partitionResults.computeIfAbsent( + subtaskId, k -> Pair.of(new AtomicLong(0), Sets.newHashSet())); + Pair> pair = partitionResults.get(subtaskId); + pair.first().incrementAndGet(); + // TODO + pair.second().add(rowData); + } + }); + return partitionResults; + } + + private void validatePartitionResults( + Map> expectedAssignmentInfo, + Map>> partitionResults) { + + Assertions.assertThat(partitionResults.size()).isEqualTo(expectedAssignmentInfo.size()); + + List expectedAssignedKeyCounts = + Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size()); + List actualAssignedKeyCounts = + Lists.newArrayListWithExpectedSize(partitionResults.size()); + List expectedNormalizedWeights = + Lists.newArrayListWithExpectedSize(expectedAssignmentInfo.size()); + List actualNormalizedWeights = + Lists.newArrayListWithExpectedSize(partitionResults.size()); + + long expectedTotalWeight = + expectedAssignmentInfo.values().stream().mapToLong(Pair::first).sum(); + expectedAssignmentInfo.forEach( + (subtaskId, pair) -> { + expectedAssignedKeyCounts.add(pair.second()); + expectedNormalizedWeights.add(pair.first().doubleValue() / expectedTotalWeight); + }); + + long actualTotalWeight = + partitionResults.values().stream().mapToLong(pair -> pair.first().longValue()).sum(); + partitionResults.forEach( + (subtaskId, pair) -> { + actualAssignedKeyCounts.add(pair.second().size()); + actualNormalizedWeights.add(pair.first().doubleValue() / actualTotalWeight); + }); + + // number of assigned keys should match exactly + Assertions.assertThat(actualAssignedKeyCounts) + .as("the number of assigned keys should match for every subtask") + .isEqualTo(expectedAssignedKeyCounts); + + System.out.println("------------------------------------"); + // weight for every subtask shouldn't differ for more than 10% relative to the expected weight + for (int subtaskId = 0; subtaskId < expectedNormalizedWeights.size(); ++subtaskId) { + double expectedWeight = expectedNormalizedWeights.get(subtaskId); + double maxDriftPercentage = 10.0; + double min = expectedWeight * (1 - maxDriftPercentage / 100); + double max = expectedWeight * (1 + maxDriftPercentage / 100); + System.out.println( + "subtaskId: " + + subtaskId + + ", expectedWeight = " + + expectedWeight + + ", actualWeight = " + + actualNormalizedWeights.get(subtaskId)); + Assertions.assertThat(actualNormalizedWeights.get(subtaskId)) + .as( + "Subtask %d weight should within %.1f percent of the expected range %s", + subtaskId, maxDriftPercentage, expectedWeight) + .isBetween(min, max); + } + } +} diff --git a/jmh.gradle b/jmh.gradle index 1a28ee0083e1..42875b71aaeb 100644 --- a/jmh.gradle +++ b/jmh.gradle @@ -21,10 +21,15 @@ if (jdkVersion != '8' && jdkVersion != '11' && jdkVersion != '17') { throw new GradleException("The JMH benchamrks must be run with JDK 8 or JDK 11 or JDK 17") } +def flinkVersions = (System.getProperty("flinkVersions") != null ? System.getProperty("flinkVersions") : System.getProperty("defaultFlinkVersions")).split(",") def sparkVersions = (System.getProperty("sparkVersions") != null ? System.getProperty("sparkVersions") : System.getProperty("defaultSparkVersions")).split(",") def scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion") def jmhProjects = [project(":iceberg-core")] +if (flinkVersions.contains("1.17")) { + jmhProjects.add(project(":iceberg-flink:iceberg-flink-1.17")) +} + if (sparkVersions.contains("3.3")) { jmhProjects.add(project(":iceberg-spark:iceberg-spark-3.3_${scalaVersion}")) jmhProjects.add(project(":iceberg-spark:iceberg-spark-extensions-3.3_${scalaVersion}"))