Skip to content

Commit

Permalink
plan: remove dedicated average distribution optimizer (#477)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Aug 2, 2024
1 parent 075688d commit 103e6f3
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 75 deletions.
1 change: 0 additions & 1 deletion engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type DistributedEngine struct {
func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *DistributedEngine {
opts.LogicalOptimizers = []logicalplan.Optimizer{
logicalplan.PassthroughOptimizer{Endpoints: endpoints},
logicalplan.DistributeAvgOptimizer{},
logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints},
}

Expand Down
34 changes: 32 additions & 2 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ import (

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/annotations"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/query"
Expand Down Expand Up @@ -185,7 +184,38 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
return false
})

// Preprocess rewrite distributable averages as sum/count
var warns = annotations.New()
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) {
return true
}
// If the current node is avg(), distribute the operation and
// stop the traversal.
if aggr, ok := (*current).(*Aggregation); ok {
if aggr.Op != parser.AVG {
return true
}

sum := *(*current).(*Aggregation)
sum.Op = parser.SUM
count := *(*current).(*Aggregation)
count.Op = parser.COUNT
*current = &Binary{
Op: parser.DIV,
LHS: &sum,
RHS: &count,
VectorMatching: &parser.VectorMatching{
Include: aggr.Grouping,
MatchingLabels: aggr.Grouping,
On: true,
},
}
return true
}
return !(isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(parent))
})

TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
// If the current operation is not distributive, stop the traversal.
if !isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) {
Expand Down
62 changes: 0 additions & 62 deletions logicalplan/distribute_avg.go

This file was deleted.

28 changes: 18 additions & 10 deletions logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,26 @@ max by (location) (dedup(
expectWarn: true,
},
{
name: "label replace to external label before an avg",
expr: `avg by (location) (label_replace(http_requests_total, "region", "$1", "location", "(.*)"))`,
expected: `
sum by (location) (label_replace(dedup(remote(http_requests_total), remote(http_requests_total)), "region", "$1", "location", "(.*)"))
/ on (location)
count by (location) (label_replace(dedup(remote(http_requests_total), remote(http_requests_total)), "region", "$1", "location", "(.*)")
)`,
name: "label replace to external label before an avg",
expr: `avg by (location) (label_replace(http_requests_total, "region", "$1", "location", "(.*)"))`,
expected: `avg by (location) (label_replace(dedup(remote(http_requests_total), remote(http_requests_total)), "region", "$1", "location", "(.*)"))`,
expectWarn: true,
},
{
name: "label replace to internal label before an avg",
expr: `avg by (location) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))`,
expected: `
sum by (location) (
dedup(
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(sum by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
/ on (location)
sum by (location) (
dedup(
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)"))),
remote(count by (location, region) (label_replace(http_requests_total, "zone", "$1", "location", "(.*)")))))
`,
},
{
name: "label replace after an aggregation",
expr: `label_replace(max by (location) (http_requests_total), "region", "$1", "location", "(.*)")`,
Expand Down Expand Up @@ -359,7 +370,6 @@ sum_over_time(max(dedup(
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west")}),
}
optimizers := []Optimizer{
DistributeAvgOptimizer{},
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
}

Expand Down Expand Up @@ -511,7 +521,6 @@ dedup(
newEngineMock(tcase.secondEngineOpts.mint(), tcase.secondEngineOpts.maxt(), []labels.Labels{labels.FromStrings("region", "east")}),
}
optimizers := []Optimizer{
DistributeAvgOptimizer{},
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
}

Expand Down Expand Up @@ -579,7 +588,6 @@ sum(
newEngineMock(secondEngineOpts.mint(), secondEngineOpts.maxt(), []labels.Labels{labels.FromStrings("region", "east")}),
}
optimizers := []Optimizer{
DistributeAvgOptimizer{},
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
}

Expand Down
10 changes: 10 additions & 0 deletions logicalplan/logical_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,3 +560,13 @@ func shallowCloneSlice[T any](s []T) []T {
copy(clone, s)
return clone
}

func isAvgAggregation(expr *Node) bool {
if expr == nil {
return false
}
if aggr, ok := (*expr).(*Aggregation); ok {
return aggr.Op == parser.AVG
}
return false
}

0 comments on commit 103e6f3

Please sign in to comment.