Skip to content

Commit

Permalink
Make binary pushdown optional (#358)
Browse files Browse the repository at this point in the history
Pushing down binary operations with no vector matching makes sense
for Thanos and we should do have it enabled by default. However, we use
the engine to query other metric sources and here we see issues where
matching series between two metrics are not present in the same engine.

To provide an escape hatch while we figure out a more generic solution,
this commit makes the binary pushdown optional and enabled by default,
but can be disabled by setting a flag.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Dec 12, 2023
1 parent af3a9a7 commit 82aec09
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 10 deletions.
11 changes: 6 additions & 5 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ var distributiveAggregations = map[parser.ItemType]struct{}{
// DistributedExecutionOptimizer produces a logical plan suitable for
// distributed Query execution.
type DistributedExecutionOptimizer struct {
Endpoints api.RemoteEndpoints
Endpoints api.RemoteEndpoints
SkipBinaryPushdown bool
}

func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
Expand Down Expand Up @@ -176,7 +177,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Op

TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
// If the current operation is not distributive, stop the traversal.
if !isDistributive(current) {
if !isDistributive(current, m.SkipBinaryPushdown) {
return true
}

Expand All @@ -202,7 +203,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Op
}

// If the parent operation is distributive, continue the traversal.
if isDistributive(parent) {
if isDistributive(parent, m.SkipBinaryPushdown) {
return false
}

Expand Down Expand Up @@ -422,14 +423,14 @@ func numSteps(start, end time.Time, step time.Duration) int64 {
return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1
}

func isDistributive(expr *parser.Expr) bool {
func isDistributive(expr *parser.Expr, skipBinaryPushdown bool) bool {
if expr == nil {
return false
}

switch aggr := (*expr).(type) {
case *parser.BinaryExpr:
return isBinaryExpressionWithOneConstantSide(aggr) || isBinaryExpressionWithDistributableMatching(aggr)
return isBinaryExpressionWithOneConstantSide(aggr) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(aggr))
case *parser.AggregateExpr:
// Certain aggregations are currently not supported.
if _, ok := distributiveAggregations[aggr.Op]; !ok {
Expand Down
12 changes: 7 additions & 5 deletions logicalplan/distribute_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (

// DistributeAvgOptimizer rewrites an AVG aggregation into a SUM/COUNT aggregation so that
// it can be executed in a distributed manner.
type DistributeAvgOptimizer struct{}
type DistributeAvgOptimizer struct {
SkipBinaryPushdown bool
}

func (r DistributeAvgOptimizer) Optimize(plan parser.Expr, _ *query.Options) (parser.Expr, annotations.Annotations) {
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
if !isDistributiveOrAverage(current) {
if !isDistributiveOrAverage(current, r.SkipBinaryPushdown) {
return true
}
// If the current node is avg(), distribute the operation and
Expand All @@ -39,18 +41,18 @@ func (r DistributeAvgOptimizer) Optimize(plan parser.Expr, _ *query.Options) (pa
}
return true
}
return !isDistributiveOrAverage(parent)
return !isDistributiveOrAverage(parent, r.SkipBinaryPushdown)
})
return plan, nil
}

func isDistributiveOrAverage(expr *parser.Expr) bool {
func isDistributiveOrAverage(expr *parser.Expr, skipBinaryPushdown bool) bool {
if expr == nil {
return false
}
var isAvg bool
if aggr, ok := (*expr).(*parser.AggregateExpr); ok {
isAvg = aggr.Op == parser.AVG
}
return isDistributive(expr) || isAvg
return isDistributive(expr, skipBinaryPushdown) || isAvg
}

0 comments on commit 82aec09

Please sign in to comment.