Skip to content

Commit

Permalink
plan: fix distributing of constant expressions
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Dec 4, 2023
1 parent 1d84691 commit 1e927c6
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 58 deletions.
80 changes: 34 additions & 46 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

"github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"
"github.com/thanos-io/promql-engine/logicalplan"
)

type partition struct {
Expand Down Expand Up @@ -221,6 +220,7 @@ func TestDistributedAggregations(t *testing.T) {
{name: "binary expression with constant operand", query: `sum by (region) (bar * 60)`},
{name: "binary aggregation", query: `sum by (region) (bar) / sum by (pod) (bar)`},
{name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`},
{name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`},
{name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`},
{name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true},
{name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`},
Expand All @@ -229,12 +229,6 @@ func TestDistributedAggregations(t *testing.T) {
{name: "absent for existing metric", query: `absent(bar{pod="nginx-1"})`},
}

optimizersOpts := map[string][]logicalplan.Optimizer{
"none": logicalplan.NoOptimizers,
"default": logicalplan.DefaultOptimizers,
"all": logicalplan.AllOptimizers,
}

lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute}
allQueryOpts := []promql.QueryOpts{nil}
for _, l := range lookbackDeltas {
Expand Down Expand Up @@ -273,50 +267,44 @@ func TestDistributedAggregations(t *testing.T) {
ctx := context.Background()
for _, query := range queries {
t.Run(query.name, func(t *testing.T) {
for o, optimizers := range optimizersOpts {
t.Run(fmt.Sprintf("withOptimizers=%s", o), func(t *testing.T) {
localOpts.LogicalOptimizers = optimizers
distOpts := localOpts
distOpts := localOpts
distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)

distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
}
testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
}

t.Run("range", func(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
t.Run("range", func(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)

distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)
distResult := distQry.Exec(ctx)
promEngine := promql.NewEngine(localOpts.EngineOpts)
promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)
promResult := promQry.Exec(ctx)

testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
})
}
testutil.WithGoCmp(comparer).Equals(t, promResult, distResult)
})
})
}
})
Expand Down
27 changes: 15 additions & 12 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r Noop) Pretty(level int) string { return r.String() }

func (r Noop) PositionRange() posrange.PositionRange { return posrange.PositionRange{} }

func (r Noop) Type() parser.ValueType { return parser.ValueTypeNone }
func (r Noop) Type() parser.ValueType { return parser.ValueTypeVector }

func (r Noop) PromQLExpr() {}

Expand Down Expand Up @@ -403,9 +403,9 @@ func isDistributive(expr *parser.Expr) bool {
// data set. This is why we cannot push down aggregations where
// the operand is a binary expression.
// The only exception currently is pushing down binary expressions with a constant operand.
lhsConstant := isNumberLiteral(aggr.LHS)
rhsConstant := isNumberLiteral(aggr.RHS)
return lhsConstant || rhsConstant
lhsConstant := isConstantExpr(aggr.LHS)
rhsConstant := isConstantExpr(aggr.RHS)
return (lhsConstant || rhsConstant)
case *parser.AggregateExpr:
// Certain aggregations are currently not supported.
if _, ok := distributiveAggregations[aggr.Op]; !ok {
Expand Down Expand Up @@ -459,17 +459,20 @@ func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) b
return true
}

func isNumberLiteral(expr parser.Expr) bool {
if _, ok := expr.(*parser.NumberLiteral); ok {
func isConstantExpr(expr parser.Expr) bool {
// TODO: there are more possibilities for constant expressions
switch texpr := expr.(type) {
case *parser.NumberLiteral:
return true
}

stepInvariant, ok := expr.(*parser.StepInvariantExpr)
if !ok {
case *parser.StepInvariantExpr:
return isConstantExpr(texpr.Expr)
case *parser.ParenExpr:
return isConstantExpr(texpr.Expr)
case *parser.BinaryExpr:
return isConstantExpr(texpr.LHS) && isConstantExpr(texpr.RHS)
default:
return false
}

return isNumberLiteral(stepInvariant.Expr)
}

func maxTime(a, b time.Time) time.Time {
Expand Down

0 comments on commit 1e927c6

Please sign in to comment.