Skip to content

Commit

Permalink
plan, engine: fix push down of binary expressions in distributed opti…
Browse files Browse the repository at this point in the history
…mizer (#343)

* plan, engine: fix value type of logical distributed nodes

Signed-off-by: Michael Hoffmann <[email protected]>

* plan: fix distributing of constant expressions

Signed-off-by: Michael Hoffmann <[email protected]>

---------

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann authored Dec 4, 2023
1 parent a00149f commit 5d3d703
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 60 deletions.
81 changes: 35 additions & 46 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
)

type partition struct {
Expand Down Expand Up @@ -220,6 +219,8 @@ func TestDistributedAggregations(t *testing.T) {
{name: "aggregation with function operand", query: `sum by (pod) (rate(bar[1m]))`},
{name: "binary expression with constant operand", query: `sum by (region) (bar * 60)`},
{name: "binary aggregation", query: `sum by (region) (bar) / sum by (pod) (bar)`},
{name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`},
{name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`},
{name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`},
{name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true},
{name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`},
Expand All @@ -228,12 +229,6 @@ func TestDistributedAggregations(t *testing.T) {
{name: "absent for existing metric", query: `absent(bar{pod="nginx-1"})`},
}

optimizersOpts := map[string][]logicalplan.Optimizer{
"none": logicalplan.NoOptimizers,
"default": logicalplan.DefaultOptimizers,
"all": logicalplan.AllOptimizers,
}

lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}
allQueryOpts := []promql.QueryOpts{nil}
for _, l := range lookbackDeltas {
Expand Down Expand Up @@ -272,50 +267,44 @@ func TestDistributedAggregations(t *testing.T) {
ctx := context.Background()
for _, query := range queries {
t.Run(query.name, func(t *testing.T) {
for o, optimizers := range optimizersOpts {
t.Run(fmt.Sprintf("withOptimizers=%s", o), func(t *testing.T) {
localOpts.LogicalOptimizers = optimizers
distOpts := localOpts
distOpts := localOpts
distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)

distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
}
testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
}

t.Run("range", func(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
t.Run("range", func(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)

distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)
distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
})
}
testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
})
}
})
Expand Down
35 changes: 21 additions & 14 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type RemoteExecution struct {
Engine api.RemoteEngine
Query string
QueryRangeStart time.Time

valueType parser.ValueType
}

func (r RemoteExecution) String() string {
Expand All @@ -90,7 +92,7 @@ func (r RemoteExecution) Pretty(level int) string { return r.String() }

func (r RemoteExecution) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r RemoteExecution) Type() parser.ValueType { return parser.ValueTypeMatrix }
func (r RemoteExecution) Type() parser.ValueType { return r.valueType }

func (r RemoteExecution) PromQLExpr() {}

Expand All @@ -107,7 +109,7 @@ func (r Deduplicate) Pretty(level int) string { return r.String() }

func (r Deduplicate) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r Deduplicate) Type() parser.ValueType { return parser.ValueTypeMatrix }
func (r Deduplicate) Type() parser.ValueType { return r.Expressions[0].Type() }

func (r Deduplicate) PromQLExpr() {}

Expand All @@ -119,7 +121,7 @@ func (r Noop) Pretty(level int) string { return r.String() }

func (r Noop) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r Noop) Type() parser.ValueType { return parser.ValueTypeMatrix }
func (r Noop) Type() parser.ValueType { return parser.ValueTypeVector }

func (r Noop) PromQLExpr() {}

Expand Down Expand Up @@ -268,6 +270,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engine
Engine: e,
Query: (*expr).String(),
QueryRangeStart: start,
valueType: (*expr).Type(),
})
}

Expand All @@ -287,6 +290,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr parser.Expr, engine
Engine: engines[i],
Query: expr.String(),
QueryRangeStart: opts.Start,
valueType: expr.Type(),
})
}

Expand Down Expand Up @@ -399,9 +403,9 @@ func isDistributive(expr *parser.Expr) bool {
// data set. This is why we cannot push down aggregations where
// the operand is a binary expression.
// The only exception currently is pushing down binary expressions with a constant operand.
lhsConstant := isNumberLiteral(aggr.LHS)
rhsConstant := isNumberLiteral(aggr.RHS)
return lhsConstant || rhsConstant
lhsConstant := isConstantExpr(aggr.LHS)
rhsConstant := isConstantExpr(aggr.RHS)
return (lhsConstant || rhsConstant)
case *parser.AggregateExpr:
// Certain aggregations are currently not supported.
if _, ok := distributiveAggregations[aggr.Op]; !ok {
Expand Down Expand Up @@ -455,17 +459,20 @@ func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) b
return true
}

func isNumberLiteral(expr parser.Expr) bool {
if _, ok := expr.(*parser.NumberLiteral); ok {
func isConstantExpr(expr parser.Expr) bool {
// TODO: there are more possibilities for constant expressions
switch texpr := expr.(type) {
case *parser.NumberLiteral:
return true
}

stepInvariant, ok := expr.(*parser.StepInvariantExpr)
if !ok {
case *parser.StepInvariantExpr:
return isConstantExpr(texpr.Expr)
case *parser.ParenExpr:
return isConstantExpr(texpr.Expr)
case *parser.BinaryExpr:
return isConstantExpr(texpr.LHS) && isConstantExpr(texpr.RHS)
default:
return false
}

return isNumberLiteral(stepInvariant.Expr)
}

func maxTime(a, b time.Time) time.Time {
Expand Down

0 comments on commit 5d3d703

Please sign in to comment.