-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable push partial aggregation though join #23812
Conversation
/** | ||
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator. | ||
*/ | ||
private final Optional<Boolean> exchangeInputAggregation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like an abstraction leakage. The aggregation node shouldn't have any knowledge about whether it feeds into an exchange.
What's the concept this is trying to capture? Let's think of a name that's more descriptive of such concept without tying it to the physicality of an exchange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem that is solved here:
PushPartialAggregationThroughJoin
might want to keep or not intermediate aggregation above join. This is governed by:
// Keep intermediate aggregation below remote exchange to reduce network traffic.
// Intermediate aggregation can be skipped if pushed aggregation has subset of grouping
// symbols as join is not expanding.
if (aggregation.isExchangeInputAggregation() && !ImmutableSet.copyOf(aggregation.getGroupingKeys()).containsAll(pushedAggregation.getGroupingKeys())) {
result = toIntermediateAggregation(aggregation, result, context);
}
This is based on empirical observation and tuning:
a. We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.
b. We also want to make special case for PA before data shuffle because even if PA is pushed below join, than CBO rule could make wrong decision, which we want to contain.
@martint do you have some better idea for a name?
I can do another experiment with:
if (context.getStatsProvider().getStats(aggregation).getOutputRowCount() * 1.1 >= context.getStatsProvider().getStats(pushedAggregation).getOutputRowCount()) {
return result;
}
// if aggregation is reducing data, keep it
return toIntermediateAggregation(aggregation, result, context);
to see if we can actually improve rule more and remove intermediate aggregation before exchange.
The thing is that for queries like
select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year
PA on date_id
will be pushed below join, but keeping intermediate aggregation on year
still makes sense.
I would still lean on keeping intermediate aggregation before exchange just in case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That's brittle. There are no guarantees that the node will remain to be the input of an exchange if other rules push things below the join afterwards, for instance.
If we can model it solely based on properties of the aggregation (e.g., whether it reduces the size of its input, as you suggested) that would be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've benchmarked Relax intermediate aggregation constraint, but I've seen some pretty big regressions like q47
(170%) vs initial approach. I suggest to keep initial approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.
Ideally, shouldn't adaptive partial aggregation avoid big regressions in this case ? Is this happening because partial/intermediate aggregation has significant cost even in the "disabled" state ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we rename this to isInputReducingAggregation
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that would be aligned with what I was suggesting above, but what needs to change to address the regression @sopel39 described?
Also, what would be the definition of “input reducing aggregation”?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understood it, the optimizer might be wrong about the decision to push down partial aggregation through join, so we want to retain a partial aggregation on top of the join while pushing a partial aggregation through. The simple approach would be to leave an intermediate aggregation on top of every join after pushdown. But when we have multiple joins on top of each other in a fragment, this leads to many intermediate aggregations. Ideally, these should adaptively turn off at runtime, but this is still not free and leads to significant regressions. Hence the extra field in AggregationNode
to retain intermediate aggregation only on the join before remote data exchange. There's no regressions when benchmarking with this approach.
I would define isInputReducingAggregation
as an auxiliary aggregation step introduced as a hedge against potentially non-optimal decision to push down partial aggregation more aggressively.
...ain/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
Show resolved
Hide resolved
...ain/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
Show resolved
Hide resolved
...ain/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
Show resolved
Hide resolved
I've benchmarked Relax intermediate aggregation constraint, but I've seen some pretty big regressions like |
yeah, q47 regression is pretty big with alternative approach |
...ain/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
Outdated
Show resolved
Hide resolved
...ain/src/main/java/io/trino/sql/planner/iterative/rule/PushPartialAggregationThroughJoin.java
Outdated
Show resolved
Hide resolved
/** | ||
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator. | ||
*/ | ||
private final Optional<Boolean> exchangeInputAggregation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.
Ideally, shouldn't adaptive partial aggregation avoid big regressions in this case ? Is this happening because partial/intermediate aggregation has significant cost even in the "disabled" state ?
/** | ||
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator. | ||
*/ | ||
private final Optional<Boolean> exchangeInputAggregation; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we rename this to isInputReducingAggregation
?
ab1d317
to
dc9f171
Compare
/** | ||
* Indicates whether this is an auxiliary aggregation step introduced as a hedge against | ||
* potentially non-optimal decision to push down partial aggregation more aggressively. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be described in terms of what the aggregation is expected to do, or what kind of input would result in this aggregation "reducing input". For instance, one could argue that almost every aggregation reduces inputs, since it collapses multiple rows into a single scalar value.
dc9f171
to
1cc38ba
Compare
Make push partial aggregation CBO based. Enable it for cases where pushed aggregation has same grouping keys. Additionally, for queries like select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year partial aggregation on date_dim.year can be pushed below join with grouping key of "date_id", which can greatly reduce number of rows before join operator.
1cc38ba
to
b9f0374
Compare
Description
For queries like
partial aggregation on date_dim.year can be pushed below join with grouping key of "date_id", which should can reduce number of rows greatly before join operator.
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: