From 5d3d703b98ad7a49d948a5517032079d0028ae02 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 4 Dec 2023 16:31:00 +0100 Subject: [PATCH] plan, engine: fix push down of binary expressions in distributed optimizer (#343) * plan, engine: fix value type of logical distributed nodes Signed-off-by: Michael Hoffmann * plan: fix distributing of constant expressions Signed-off-by: Michael Hoffmann --------- Signed-off-by: Michael Hoffmann --- engine/distributed_test.go | 81 ++++++++++++++++---------------------- logicalplan/distribute.go | 35 +++++++++------- 2 files changed, 56 insertions(+), 60 deletions(-) diff --git a/engine/distributed_test.go b/engine/distributed_test.go index 430d1647..00ca049f 100644 --- a/engine/distributed_test.go +++ b/engine/distributed_test.go @@ -19,7 +19,6 @@ import ( "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/engine" - "github.com/thanos-io/promql-engine/logicalplan" ) type partition struct { @@ -220,6 +219,8 @@ func TestDistributedAggregations(t *testing.T) { {name: "aggregation with function operand", query: `sum by (pod) (rate(bar[1m]))`}, {name: "binary expression with constant operand", query: `sum by (region) (bar * 60)`}, {name: "binary aggregation", query: `sum by (region) (bar) / sum by (pod) (bar)`}, + {name: "binary nested with constants", query: `(1 + 2) + (1 atan2 (-1 % -1))`}, + {name: "binary nested with functions", query: `(1 + exp(vector(1))) + (1 atan2 (-1 % -1))`}, {name: "filtered selector interaction", query: `sum by (region) (bar{region="east"}) / sum by (region) (bar)`}, {name: "unsupported aggregation", query: `count_values("pod", bar)`, expectFallback: true}, {name: "absent_over_time for non-existing metric", query: `absent_over_time(foo[2m])`}, @@ -228,12 +229,6 @@ func TestDistributedAggregations(t *testing.T) { {name: "absent for existing metric", query: `absent(bar{pod="nginx-1"})`}, } - optimizersOpts := map[string][]logicalplan.Optimizer{ - "none": logicalplan.NoOptimizers, - "default": logicalplan.DefaultOptimizers, - "all": logicalplan.AllOptimizers, - } - lookbackDeltas := []time.Duration{0, 30 * time.Second, 5 * time.Minute} allQueryOpts := []promql.QueryOpts{nil} for _, l := range lookbackDeltas { @@ -272,50 +267,44 @@ func TestDistributedAggregations(t *testing.T) { ctx := context.Background() for _, query := range queries { t.Run(query.name, func(t *testing.T) { - for o, optimizers := range optimizersOpts { - t.Run(fmt.Sprintf("withOptimizers=%s", o), func(t *testing.T) { - localOpts.LogicalOptimizers = optimizers - distOpts := localOpts + distOpts := localOpts + distOpts.DisableFallback = !query.expectFallback + for _, instantTS := range instantTSs { + t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) { + distEngine := engine.NewDistributedEngine(distOpts, + api.NewStaticEndpoints(remoteEngines), + ) + distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS) + testutil.Ok(t, err) - distOpts.DisableFallback = !query.expectFallback - for _, instantTS := range instantTSs { - t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) { - distEngine := engine.NewDistributedEngine(distOpts, - api.NewStaticEndpoints(remoteEngines), - ) - distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS) - testutil.Ok(t, err) + distResult := distQry.Exec(ctx) + promEngine := promql.NewEngine(localOpts.EngineOpts) + promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS) + testutil.Ok(t, err) + promResult := promQry.Exec(ctx) - distResult := distQry.Exec(ctx) - promEngine := promql.NewEngine(localOpts.EngineOpts) - promQry, err := promEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS) - testutil.Ok(t, err) - promResult := promQry.Exec(ctx) - - testutil.WithGoCmp(comparer).Equals(t, promResult, distResult) - }) - } + testutil.WithGoCmp(comparer).Equals(t, promResult, distResult) + }) + } - t.Run("range", func(t *testing.T) { - if test.rangeEnd == (time.Time{}) { - test.rangeEnd = rangeEnd - } - distEngine := engine.NewDistributedEngine(distOpts, - api.NewStaticEndpoints(remoteEngines), - ) - distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep) - testutil.Ok(t, err) + t.Run("range", func(t *testing.T) { + if test.rangeEnd == (time.Time{}) { + test.rangeEnd = rangeEnd + } + distEngine := engine.NewDistributedEngine(distOpts, + api.NewStaticEndpoints(remoteEngines), + ) + distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep) + testutil.Ok(t, err) - distResult := distQry.Exec(ctx) - promEngine := promql.NewEngine(localOpts.EngineOpts) - promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep) - testutil.Ok(t, err) - promResult := promQry.Exec(ctx) + distResult := distQry.Exec(ctx) + promEngine := promql.NewEngine(localOpts.EngineOpts) + promQry, err := promEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep) + testutil.Ok(t, err) + promResult := promQry.Exec(ctx) - testutil.WithGoCmp(comparer).Equals(t, promResult, distResult) - }) - }) - } + testutil.WithGoCmp(comparer).Equals(t, promResult, distResult) + }) }) } }) diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 541357d6..b67fe31c 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -77,6 +77,8 @@ type RemoteExecution struct { Engine api.RemoteEngine Query string QueryRangeStart time.Time + + valueType parser.ValueType } func (r RemoteExecution) String() string { @@ -90,7 +92,7 @@ func (r RemoteExecution) Pretty(level int) string { return r.String() } func (r RemoteExecution) PositionRange() posrange.PositionRange { return posrange.PositionRange{} } -func (r RemoteExecution) Type() parser.ValueType { return parser.ValueTypeMatrix } +func (r RemoteExecution) Type() parser.ValueType { return r.valueType } func (r RemoteExecution) PromQLExpr() {} @@ -107,7 +109,7 @@ func (r Deduplicate) Pretty(level int) string { return r.String() } func (r Deduplicate) PositionRange() posrange.PositionRange { return posrange.PositionRange{} } -func (r Deduplicate) Type() parser.ValueType { return parser.ValueTypeMatrix } +func (r Deduplicate) Type() parser.ValueType { return r.Expressions[0].Type() } func (r Deduplicate) PromQLExpr() {} @@ -119,7 +121,7 @@ func (r Noop) Pretty(level int) string { return r.String() } func (r Noop) PositionRange() posrange.PositionRange { return posrange.PositionRange{} } -func (r Noop) Type() parser.ValueType { return parser.ValueTypeMatrix } +func (r Noop) Type() parser.ValueType { return parser.ValueTypeVector } func (r Noop) PromQLExpr() {} @@ -268,6 +270,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engine Engine: e, Query: (*expr).String(), QueryRangeStart: start, + valueType: (*expr).Type(), }) } @@ -287,6 +290,7 @@ func (m DistributedExecutionOptimizer) distributeAbsent(expr parser.Expr, engine Engine: engines[i], Query: expr.String(), QueryRangeStart: opts.Start, + valueType: expr.Type(), }) } @@ -399,9 +403,9 @@ func isDistributive(expr *parser.Expr) bool { // data set. This is why we cannot push down aggregations where // the operand is a binary expression. // The only exception currently is pushing down binary expressions with a constant operand. - lhsConstant := isNumberLiteral(aggr.LHS) - rhsConstant := isNumberLiteral(aggr.RHS) - return lhsConstant || rhsConstant + lhsConstant := isConstantExpr(aggr.LHS) + rhsConstant := isConstantExpr(aggr.RHS) + return (lhsConstant || rhsConstant) case *parser.AggregateExpr: // Certain aggregations are currently not supported. if _, ok := distributiveAggregations[aggr.Op]; !ok { @@ -455,17 +459,20 @@ func matchesExternalLabels(ms []*labels.Matcher, externalLabels labels.Labels) b return true } -func isNumberLiteral(expr parser.Expr) bool { - if _, ok := expr.(*parser.NumberLiteral); ok { +func isConstantExpr(expr parser.Expr) bool { + // TODO: there are more possibilities for constant expressions + switch texpr := expr.(type) { + case *parser.NumberLiteral: return true - } - - stepInvariant, ok := expr.(*parser.StepInvariantExpr) - if !ok { + case *parser.StepInvariantExpr: + return isConstantExpr(texpr.Expr) + case *parser.ParenExpr: + return isConstantExpr(texpr.Expr) + case *parser.BinaryExpr: + return isConstantExpr(texpr.LHS) && isConstantExpr(texpr.RHS) + default: return false } - - return isNumberLiteral(stepInvariant.Expr) } func maxTime(a, b time.Time) time.Time {