-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: implement range partitioner for map data statistics #9321
Conversation
66d6976
to
f742a3e
Compare
f742a3e
to
1503181
Compare
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Outdated
Show resolved
Hide resolved
assignedSubtasks.add(subtaskId); | ||
// assign the remaining weight of key to the current subtask if it is the last subtask | ||
// or if the subtask has more capacity than the remaining key weight | ||
if (subtaskId == numPartitions - 1 || keyRemainingWeight < subtaskRemainingWeight) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly keyRemainingWeight < subtaskRemainingWeight
should always be true for subtaskId == numPartitions - 1
.
How confident is you in the algorithm above? (I did not find any issue, but...) Would it worth to log a message at least if something is off, and we put keys to the last task just because we made some issue during calculation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly keyRemainingWeight < subtaskRemainingWeight should always be true for subtaskId == numPartitions - 1.
not sure I fully understand the comment. this is an or
condition. fully assign the remaining key weight to the subtask
- if it is the last subtask
- (or) if the weight is less than the subtask remaining capacity
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we done the calculation correctly, then even for the last subtask, the size of the last subtask should be smaller than the subtaskRemainingWeight
If we depend on the subtaskId == numPartitions - 1
part of the if
clause, then we have a wrong
distribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we compute the target weigh using ceiling
function. so last subtask should only get less or equal to the fair share.
long targetWeightPerSubtaskWithCloseFileCost =
(long) Math.ceil(((double) totalWeightWithCloseFileCost) / numPartitions);
I agree that we shouldn't need subtaskId == numPartitions - 1
in theory. it was added for extra safety. Please let me know your opinion on the current behavior (option 1) vs the alternatives below.
Option 2: log an error while maintaining the permissive behavior
if (subtaskId == numPartitions - 1 && keyRemainingWeight > subtaskRemainingWeight) {
LOG.error("Invalid assignment: last subtask is assigned more weight than target");
}
Option 3: throw an exception
if (subtaskId == numPartitions - 1 && keyRemainingWeight > subtaskRemainingWeight) {
throw new InvalidStateException("Invalid assignment: last subtask is assigned more weight than target");
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would go for the option 2, and maybe a metric?
With option 3 we just restart the job. The first checkpoint will run without a known distribution, and the job would probably continue to run without an issue.
Both option 2 and 3 requires conscious monitoring from the job owners, and option 2 is better in many ways
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am leaning toward option 3 (failure). I agree with your assessment that the first checkpoint will still succeed and job will restart after it. but at least, the job is constantly restarting so that the algorithm error can be surfaced.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the job is constantly restarting
Do we store the distribution in the state? That’s the reason why the job will fail again after a restart?
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java
Outdated
Show resolved
Hide resolved
...v1.17/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestMapRangePartitioner.java
Outdated
Show resolved
Hide resolved
this.sortedStatsWithCloseFileCost = Maps.newTreeMap(comparator); | ||
mapStatistics.forEach( | ||
(k, v) -> { | ||
int estimatedSplits = (int) Math.ceil(v / targetWeightPerSubtask); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not forget, that this is an estimation only.
The number is correct for the first key, but could be off for subsequent keys, as we are filling out the remaining places.
Example:
targetWeightPerSubtask
= 10SORT_KEY_0
= 5,SORT_KEY_1
= 20
In this case we estimate 2 for SORT_KEY_1
, but it will be definitely distributed to 3 splits.
82d4854
to
46745b3
Compare
25152b9
to
c6c7837
Compare
return assignmentInfo; | ||
} | ||
|
||
private Map<SortKey, KeyAssignment> buildAssignment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pvary @yegangy0718 this is the main/tricky part.
@Warmup(iterations = 3) | ||
@Measurement(iterations = 5) | ||
@BenchmarkMode(Mode.SingleShotTime) | ||
public class MapRangePartitionerBenchmark { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
c6c7837
to
66c60c2
Compare
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Outdated
Show resolved
Hide resolved
6c2dcc2
to
36cd418
Compare
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Show resolved
Hide resolved
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/MapRangePartitioner.java
Show resolved
Hide resolved
// If assigned weight is less than close file cost, pad it up with close file cost. | ||
// This might cause the subtask assigned weight over the target weight. | ||
// But it should be no more than one close file cost. Small skew is acceptable. | ||
if (assignedWeight <= closeFileCostInWeight) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, instead of not assigning key if there is not enough weight left for the current task, we push a bit more there to warrant opening a new file?
How do we handle if the current key doesn't have enough weight left for the next task for warrant opening a new file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh... I see, we ignore those
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this might lead to some inaccuracy in weight calculation. E.g., assuming the key weight is 2
and close file cost is 2. key weight with close cost is 4. Let's assume the previous task
has a weight of 3 available. So weight of 3 for this key is assigned to the task and the
residual weight of 1 is dropped. Then the routing weight for this key is 1 (minus the close
file cost), which is inaccurate as the accurate weight should be 2.
I thought about add the residual weight to the previous assignment. but it is also not alway accurate.
e.g. key weight is 11, target task weight before close cost is 10. so this key should be split into 2 files.
assuming close file cost is 1. key weight with close cost would be 13 (11 + 1x2). let's say the target
task weight with close cost is 12. with add-back, the task would be assigned with weight of 13.
routing weight would be 12 (13 - 1 close file cost). that is also inaccurate.
With this simple greedy heuristic, there is always some inaccuracy one way or the other.
but the inaccuracy should be small and doesn't skew the traffic distribution much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3 things to consider:
- If we merge, we might prefer merging files instead of splitting out (to help future readers)
- If the key weight is very small we might end up removing it altogether
- We might prefer you simpler algorithm to handle all of the edge cases mentioned above
Will try to take another, more serious look at this soon
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stevenzwu!
LGTM
thanks @pvary and @yegangy0718 for the code review |
No description provided.