Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: implement range partitioner for map data statistics #9321

Merged
merged 3 commits into from
Mar 27, 2024

Conversation

stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Dec 17, 2023

No description provided.

@stevenzwu stevenzwu force-pushed the range-partitioner branch 5 times, most recently from 66d6976 to f742a3e Compare December 18, 2023 05:07
@stevenzwu stevenzwu changed the title Flink: implement range partitioner that leverages traffic distributio… Flink: implement range partitioner for map data statistics Dec 18, 2023
@stevenzwu stevenzwu requested a review from pvary December 18, 2023 05:09
assignedSubtasks.add(subtaskId);
// assign the remaining weight of key to the current subtask if it is the last subtask
// or if the subtask has more capacity than the remaining key weight
if (subtaskId == numPartitions - 1 || keyRemainingWeight < subtaskRemainingWeight) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly keyRemainingWeight < subtaskRemainingWeight should always be true for subtaskId == numPartitions - 1.
How confident is you in the algorithm above? (I did not find any issue, but...) Would it worth to log a message at least if something is off, and we put keys to the last task just because we made some issue during calculation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly keyRemainingWeight < subtaskRemainingWeight should always be true for subtaskId == numPartitions - 1.

not sure I fully understand the comment. this is an or condition. fully assign the remaining key weight to the subtask

  1. if it is the last subtask
  2. (or) if the weight is less than the subtask remaining capacity

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we done the calculation correctly, then even for the last subtask, the size of the last subtask should be smaller than the subtaskRemainingWeight
If we depend on the subtaskId == numPartitions - 1 part of the if clause, then we have a wrong distribution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we compute the target weigh using ceiling function. so last subtask should only get less or equal to the fair share.

long targetWeightPerSubtaskWithCloseFileCost =
          (long) Math.ceil(((double) totalWeightWithCloseFileCost) / numPartitions);

I agree that we shouldn't need subtaskId == numPartitions - 1 in theory. it was added for extra safety. Please let me know your opinion on the current behavior (option 1) vs the alternatives below.

Option 2: log an error while maintaining the permissive behavior

if (subtaskId == numPartitions - 1 && keyRemainingWeight > subtaskRemainingWeight) {
    LOG.error("Invalid assignment: last subtask is assigned more weight than target");
}

Option 3: throw an exception

if (subtaskId == numPartitions - 1 && keyRemainingWeight > subtaskRemainingWeight) {
    throw new InvalidStateException("Invalid assignment: last subtask is assigned more weight than target");
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would go for the option 2, and maybe a metric?

With option 3 we just restart the job. The first checkpoint will run without a known distribution, and the job would probably continue to run without an issue.

Both option 2 and 3 requires conscious monitoring from the job owners, and option 2 is better in many ways

Copy link
Contributor Author

@stevenzwu stevenzwu Jan 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning toward option 3 (failure). I agree with your assessment that the first checkpoint will still succeed and job will restart after it. but at least, the job is constantly restarting so that the algorithm error can be surfaced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the job is constantly restarting

Do we store the distribution in the state? That’s the reason why the job will fail again after a restart?

jmh.gradle Show resolved Hide resolved
this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator);
mapStatistics.forEach(
(k, v) -> {
int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not forget, that this is an estimation only.
The number is correct for the first key, but could be off for subsequent keys, as we are filling out the remaining places.

Example:

  • targetWeightPerSubtask = 10
  • SORT_KEY_0 = 5, SORT_KEY_1 = 20

In this case we estimate 2 for SORT_KEY_1, but it will be definitely distributed to 3 splits.

@stevenzwu stevenzwu force-pushed the range-partitioner branch 2 times, most recently from 82d4854 to 46745b3 Compare January 18, 2024 00:35
@stevenzwu stevenzwu force-pushed the range-partitioner branch 4 times, most recently from 25152b9 to c6c7837 Compare March 6, 2024 22:35
return assignmentInfo;
}

private Map<SortKey, KeyAssignment> buildAssignment(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary @yegangy0718 this is the main/tricky part.

@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.SingleShotTime)
public class MapRangePartitionerBenchmark {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmark shows about the cost of partitioner.partition(row, numPartitions) is about 0.1 us per call.

the following screenshot is for 100K calls
image

@stevenzwu stevenzwu force-pushed the range-partitioner branch from c6c7837 to 66c60c2 Compare March 7, 2024 00:20
@stevenzwu stevenzwu force-pushed the range-partitioner branch from 6c2dcc2 to 36cd418 Compare March 8, 2024 15:15
// If assigned weight is less than close file cost, pad it up with close file cost.
// This might cause the subtask assigned weight over the target weight.
// But it should be no more than one close file cost. Small skew is acceptable.
if (assignedWeight <= closeFileCostInWeight) {
Copy link
Contributor

@pvary pvary Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, instead of not assigning key if there is not enough weight left for the current task, we push a bit more there to warrant opening a new file?

How do we handle if the current key doesn't have enough weight left for the next task for warrant opening a new file?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... I see, we ignore those

Copy link
Contributor Author

@stevenzwu stevenzwu Mar 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might lead to some inaccuracy in weight calculation. E.g., assuming the key weight is 2
and close file cost is 2. key weight with close cost is 4. Let's assume the previous task
has a weight of 3 available. So weight of 3 for this key is assigned to the task and the
residual weight of 1 is dropped. Then the routing weight for this key is 1 (minus the close
file cost), which is inaccurate as the accurate weight should be 2.

I thought about add the residual weight to the previous assignment. but it is also not alway accurate.
e.g. key weight is 11, target task weight before close cost is 10. so this key should be split into 2 files.
assuming close file cost is 1. key weight with close cost would be 13 (11 + 1x2). let's say the target
task weight with close cost is 12. with add-back, the task would be assigned with weight of 13.
routing weight would be 12 (13 - 1 close file cost). that is also inaccurate.

With this simple greedy heuristic, there is always some inaccuracy one way or the other.
but the inaccuracy should be small and doesn't skew the traffic distribution much.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3 things to consider:

  • If we merge, we might prefer merging files instead of splitting out (to help future readers)
  • If the key weight is very small we might end up removing it altogether
  • We might prefer you simpler algorithm to handle all of the edge cases mentioned above

Will try to take another, more serious look at this soon

Copy link
Contributor

@pvary pvary left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stevenzwu!
LGTM

@stevenzwu stevenzwu merged commit 81b62c7 into apache:main Mar 27, 2024
41 checks passed
@stevenzwu
Copy link
Contributor Author

thanks @pvary and @yegangy0718 for the code review

stevenzwu added a commit to stevenzwu/iceberg that referenced this pull request Mar 28, 2024
nk1506 pushed a commit to nk1506/iceberg that referenced this pull request Apr 2, 2024
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
sasankpagolu pushed a commit to sasankpagolu/iceberg that referenced this pull request Oct 27, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
zachdisc pushed a commit to zachdisc/iceberg that referenced this pull request Dec 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants