Skip to content

Commit

Permalink
update test code based on Peter's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenzwu committed Jan 18, 2024
1 parent 1264354 commit 82d4854
Showing 1 changed file with 64 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,113 +45,45 @@ public class TestMapRangePartitioner {

private static final SortKey SORT_KEY = new SortKey(TestFixtures.SCHEMA, SORT_ORDER);
private static final RowType ROW_TYPE = FlinkSchemaUtil.convert(TestFixtures.SCHEMA);

private static final RowDataWrapper KEY_0_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_0 = SORT_KEY.copy();
private static final RowDataWrapper KEY_1_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_1 = SORT_KEY.copy();
private static final RowDataWrapper KEY_2_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_2 = SORT_KEY.copy();
private static final RowDataWrapper KEY_3_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_3 = SORT_KEY.copy();
private static final RowDataWrapper KEY_4_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_4 = SORT_KEY.copy();
private static final RowDataWrapper KEY_5_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_5 = SORT_KEY.copy();
private static final RowDataWrapper KEY_6_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_6 = SORT_KEY.copy();
private static final RowDataWrapper KEY_7_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_7 = SORT_KEY.copy();
private static final RowDataWrapper KEY_8_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_8 = SORT_KEY.copy();
private static final RowDataWrapper KEY_9_WRAPPER =
new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
private static final SortKey SORT_KEY_9 = SORT_KEY.copy();

static {
SORT_KEY_0.wrap(
KEY_0_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k0"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_1.wrap(
KEY_1_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k1"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_2.wrap(
KEY_2_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k2"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_3.wrap(
KEY_3_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k3"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_4.wrap(
KEY_4_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k4"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_5.wrap(
KEY_5_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k5"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_6.wrap(
KEY_6_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k6"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_7.wrap(
KEY_7_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k7"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_8.wrap(
KEY_8_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k8"), 1, StringData.fromString("2023-06-20"))));

SORT_KEY_9.wrap(
KEY_9_WRAPPER.wrap(
GenericRowData.of(
StringData.fromString("k9"), 1, StringData.fromString("2023-06-20"))));
private static final SortKey[] SORT_KEYS = initSortKeys();

private static SortKey[] initSortKeys() {
SortKey[] sortKeys = new SortKey[10];
for (int i = 0; i < 10; ++i) {
RowData rowData = GenericRowData.of(
StringData.fromString("k" + i), i, StringData.fromString("2023-06-20"));
RowDataWrapper keyWrapper = new RowDataWrapper(ROW_TYPE, TestFixtures.SCHEMA.asStruct());
keyWrapper.wrap(rowData);
SortKey sortKey = SORT_KEY.copy();
sortKey.wrap(keyWrapper);
sortKeys[i] = sortKey;
}
return sortKeys;
}

// Total weight is 80
private final MapDataStatistics mapDataStatistics =
new MapDataStatistics(
ImmutableMap.of(
SORT_KEY_0,
SORT_KEYS[0],
35L,
SORT_KEY_1,
SORT_KEYS[1],
23L,
SORT_KEY_2,
SORT_KEYS[2],
12L,
SORT_KEY_3,
SORT_KEYS[3],
4L,
SORT_KEY_4,
SORT_KEYS[4],
1L,
SORT_KEY_5,
SORT_KEYS[5],
1L,
SORT_KEY_6,
SORT_KEYS[6],
1L,
SORT_KEY_7,
SORT_KEYS[7],
1L,
SORT_KEY_8,
SORT_KEYS[8],
1L,
SORT_KEY_9,
SORT_KEYS[9],
1L));

@Test
Expand All @@ -163,26 +95,26 @@ public void testEvenlyDividableNoClosingFileCost() {
// each task should get targeted weight of 10 (=80/8)
Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
ImmutableMap.of(
SORT_KEY_0,
SORT_KEYS[0],
new MapRangePartitioner.KeyAssignment(
new int[] {0, 1, 2, 3}, new long[] {10L, 10L, 10L, 5L}),
SORT_KEY_1,
SORT_KEYS[1],
new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {5L, 10L, 8L}),
SORT_KEY_2,
SORT_KEYS[2],
new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {2L, 10L}),
SORT_KEY_3,
SORT_KEYS[3],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {4L}),
SORT_KEY_4,
SORT_KEYS[4],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}),
SORT_KEY_5,
SORT_KEYS[5],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}),
SORT_KEY_6,
SORT_KEYS[6],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}),
SORT_KEY_7,
SORT_KEYS[7],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}),
SORT_KEY_8,
SORT_KEYS[8],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}),
SORT_KEY_9,
SORT_KEYS[9],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {1L}));
Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
partitioner.assignment(numPartitions);
Expand Down Expand Up @@ -231,26 +163,26 @@ public void testEvenlyDividable20PercentClosingFileCost() {
// target subtask weight per subtask is 14 (112/8)
Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
ImmutableMap.of(
SORT_KEY_0,
SORT_KEYS[0],
new MapRangePartitioner.KeyAssignment(
new int[] {0, 1, 2, 3}, new long[] {14L, 14L, 14L, 1L}),
SORT_KEY_1,
SORT_KEYS[1],
new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {13L, 14L, 2L}),
SORT_KEY_2,
SORT_KEYS[2],
new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {12L, 4L}),
SORT_KEY_3,
SORT_KEYS[3],
new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] {6L}),
SORT_KEY_4,
SORT_KEYS[4],
new MapRangePartitioner.KeyAssignment(new int[] {6}, new long[] {3L}),
SORT_KEY_5,
SORT_KEYS[5],
new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {1L, 2L}),
SORT_KEY_6,
SORT_KEYS[6],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_7,
SORT_KEYS[7],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_8,
SORT_KEYS[8],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_9,
SORT_KEYS[9],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}));
Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
partitioner.assignment(numPartitions);
Expand Down Expand Up @@ -294,27 +226,27 @@ public void testNonDividableNoClosingFileCost() {
// each task should get targeted weight of 9 = ceiling(80/9)
Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
ImmutableMap.of(
SORT_KEY_0,
SORT_KEYS[0],
new MapRangePartitioner.KeyAssignment(
new int[] {0, 1, 2, 3}, new long[] {9L, 9L, 9L, 8L}),
SORT_KEY_1,
SORT_KEYS[1],
new MapRangePartitioner.KeyAssignment(
new int[] {3, 4, 5, 6}, new long[] {1L, 9L, 9L, 4L}),
SORT_KEY_2,
SORT_KEYS[2],
new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {5L, 7L}),
SORT_KEY_3,
SORT_KEYS[3],
new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] {2L, 2L}),
SORT_KEY_4,
SORT_KEYS[4],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}),
SORT_KEY_5,
SORT_KEYS[5],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}),
SORT_KEY_6,
SORT_KEYS[6],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}),
SORT_KEY_7,
SORT_KEYS[7],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}),
SORT_KEY_8,
SORT_KEYS[8],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}),
SORT_KEY_9,
SORT_KEYS[9],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {1L}));
Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
partitioner.assignment(numPartitions);
Expand Down Expand Up @@ -365,26 +297,26 @@ public void testNonDividable20PercentClosingFileCost() {
// target subtask weight per subtask is 13 (112/9)
Map<SortKey, MapRangePartitioner.KeyAssignment> expectedAssignment =
ImmutableMap.of(
SORT_KEY_0,
SORT_KEYS[0],
new MapRangePartitioner.KeyAssignment(
new int[] {0, 1, 2, 3}, new long[] {13L, 13L, 13L, 4L}),
SORT_KEY_1,
SORT_KEYS[1],
new MapRangePartitioner.KeyAssignment(new int[] {3, 4, 5}, new long[] {9L, 13L, 7L}),
SORT_KEY_2,
SORT_KEYS[2],
new MapRangePartitioner.KeyAssignment(new int[] {5, 6}, new long[] {6L, 10L}),
SORT_KEY_3,
SORT_KEYS[3],
new MapRangePartitioner.KeyAssignment(new int[] {6, 7}, new long[] {3L, 3L}),
SORT_KEY_4,
SORT_KEYS[4],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_5,
SORT_KEYS[5],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_6,
SORT_KEYS[6],
new MapRangePartitioner.KeyAssignment(new int[] {7}, new long[] {3L}),
SORT_KEY_7,
SORT_KEYS[7],
new MapRangePartitioner.KeyAssignment(new int[] {7, 8}, new long[] {1L, 2L}),
SORT_KEY_8,
SORT_KEYS[8],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {3L}),
SORT_KEY_9,
SORT_KEYS[9],
new MapRangePartitioner.KeyAssignment(new int[] {8}, new long[] {3L}));
Map<SortKey, MapRangePartitioner.KeyAssignment> actualAssignment =
partitioner.assignment(numPartitions);
Expand Down Expand Up @@ -487,20 +419,12 @@ private void validatePartitionResults(
.as("the number of assigned keys should match for every subtask")
.isEqualTo(expectedAssignedKeyCounts);

System.out.println("------------------------------------");
// weight for every subtask shouldn't differ for more than 10% relative to the expected weight
for (int subtaskId = 0; subtaskId < expectedNormalizedWeights.size(); ++subtaskId) {
double expectedWeight = expectedNormalizedWeights.get(subtaskId);
double maxDriftPercentage = 10.0;
double min = expectedWeight * (1 - maxDriftPercentage / 100);
double max = expectedWeight * (1 + maxDriftPercentage / 100);
System.out.println(
"subtaskId: "
+ subtaskId
+ ", expectedWeight = "
+ expectedWeight
+ ", actualWeight = "
+ actualNormalizedWeights.get(subtaskId));
Assertions.assertThat(actualNormalizedWeights.get(subtaskId))
.as(
"Subtask %d weight should within %.1f percent of the expected range %s",
Expand Down

0 comments on commit 82d4854

Please sign in to comment.