Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable push partial aggregation though join #23812

Merged
merged 2 commits into from
Nov 21, 2024

Conversation

raunaqmorarka
Copy link
Member

@raunaqmorarka raunaqmorarka commented Oct 17, 2024

Description

For queries like

select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year

partial aggregation on date_dim.year can be pushed below join with grouping key of "date_id", which should can reduce number of rows greatly before join operator.

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text:

## General
* Improve performance of queries with grouping on joins. ({issue}`23812`)

/**
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator.
*/
private final Optional<Boolean> exchangeInputAggregation;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an abstraction leakage. The aggregation node shouldn't have any knowledge about whether it feeds into an exchange.

What's the concept this is trying to capture? Let's think of a name that's more descriptive of such concept without tying it to the physicality of an exchange.

Copy link
Member

@sopel39 sopel39 Oct 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem that is solved here:
PushPartialAggregationThroughJoin might want to keep or not intermediate aggregation above join. This is governed by:

        // Keep intermediate aggregation below remote exchange to reduce network traffic.
        // Intermediate aggregation can be skipped if pushed aggregation has subset of grouping
        // symbols as join is not expanding.
        if (aggregation.isExchangeInputAggregation() && !ImmutableSet.copyOf(aggregation.getGroupingKeys()).containsAll(pushedAggregation.getGroupingKeys())) {
            result = toIntermediateAggregation(aggregation, result, context);
        }

This is based on empirical observation and tuning:
a. We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.
b. We also want to make special case for PA before data shuffle because even if PA is pushed below join, than CBO rule could make wrong decision, which we want to contain.

@martint do you have some better idea for a name?

I can do another experiment with:

        if (context.getStatsProvider().getStats(aggregation).getOutputRowCount() * 1.1 >= context.getStatsProvider().getStats(pushedAggregation).getOutputRowCount()) {
            return result;
        }

        // if aggregation is reducing data, keep it
        return toIntermediateAggregation(aggregation, result, context);

to see if we can actually improve rule more and remove intermediate aggregation before exchange.

The thing is that for queries like

select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year

PA on date_id will be pushed below join, but keeping intermediate aggregation on year still makes sense.

I would still lean on keeping intermediate aggregation before exchange just in case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. That's brittle. There are no guarantees that the node will remain to be the input of an exchange if other rules push things below the join afterwards, for instance.

If we can model it solely based on properties of the aggregation (e.g., whether it reduces the size of its input, as you suggested) that would be better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've benchmarked Relax intermediate aggregation constraint, but I've seen some pretty big regressions like q47 (170%) vs initial approach. I suggest to keep initial approach

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.

Ideally, shouldn't adaptive partial aggregation avoid big regressions in this case ? Is this happening because partial/intermediate aggregation has significant cost even in the "disabled" state ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we rename this to isInputReducingAggregation ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be aligned with what I was suggesting above, but what needs to change to address the regression @sopel39 described?

Also, what would be the definition of “input reducing aggregation”?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understood it, the optimizer might be wrong about the decision to push down partial aggregation through join, so we want to retain a partial aggregation on top of the join while pushing a partial aggregation through. The simple approach would be to leave an intermediate aggregation on top of every join after pushdown. But when we have multiple joins on top of each other in a fragment, this leads to many intermediate aggregations. Ideally, these should adaptively turn off at runtime, but this is still not free and leads to significant regressions. Hence the extra field in AggregationNode to retain intermediate aggregation only on the join before remote data exchange. There's no regressions when benchmarking with this approach.
I would define isInputReducingAggregation as an auxiliary aggregation step introduced as a hedge against potentially non-optimal decision to push down partial aggregation more aggressively.

@sopel39
Copy link
Member

sopel39 commented Oct 18, 2024

fyi: benchmark results:
Zrzut ekranu 2024-10-18 o 15 13 36
20-25% gain for sf10000 part iceberg!

@sopel39
Copy link
Member

sopel39 commented Oct 22, 2024

I've benchmarked Relax intermediate aggregation constraint, but I've seen some pretty big regressions like q47 (170%) vs initial approach. I suggest to keep initial approach

@raunaqmorarka
Copy link
Member Author

raunaqmorarka commented Oct 23, 2024

@sopel39
Copy link
Member

sopel39 commented Oct 23, 2024

yeah, q47 regression is pretty big with alternative approach

/**
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator.
*/
private final Optional<Boolean> exchangeInputAggregation;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't want to always keep intermediate aggregations above join as it would lead to multiple intermediate aggregations if there are many joins on top of each other. This is causing significant regressions.

Ideally, shouldn't adaptive partial aggregation avoid big regressions in this case ? Is this happening because partial/intermediate aggregation has significant cost even in the "disabled" state ?

/**
* Indicates whether aggregation is potentially reducing rows that are propagated though exchange operator.
*/
private final Optional<Boolean> exchangeInputAggregation;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we rename this to isInputReducingAggregation ?

@raunaqmorarka raunaqmorarka force-pushed the ks/agg-pr branch 3 times, most recently from ab1d317 to dc9f171 Compare November 20, 2024 06:55
Comment on lines 56 to 59
/**
* Indicates whether this is an auxiliary aggregation step introduced as a hedge against
* potentially non-optimal decision to push down partial aggregation more aggressively.
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be described in terms of what the aggregation is expected to do, or what kind of input would result in this aggregation "reducing input". For instance, one could argue that almost every aggregation reduces inputs, since it collapses multiple rows into a single scalar value.

Make push partial aggregation CBO based.
Enable it for cases where pushed aggregation has same grouping keys.
Additionally, for queries like

select sum(sales) from fact, date_dim where fact.date_id = date_dim.date_id group by date_dim.year

partial aggregation on date_dim.year can be pushed
below join with grouping key of "date_id", which
can greatly reduce number of rows before join operator.
@raunaqmorarka raunaqmorarka merged commit ef267fc into trinodb:master Nov 21, 2024
91 checks passed
@raunaqmorarka raunaqmorarka deleted the ks/agg-pr branch November 21, 2024 09:23
@github-actions github-actions bot added this to the 466 milestone Nov 21, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

3 participants