Skip to content

Commit

Permalink
Distribute label_replace (#357)
Browse files Browse the repository at this point in the history
* Distribute label_replace

The label_replace function is not distributed because we don't handle
string literals in the traversal function.

This commit fixes that.

Signed-off-by: Filip Petkovski <[email protected]>

* Warn when relabeling external label

Signed-off-by: Filip Petkovski <[email protected]>

* Use internal traversal function

Signed-off-by: Filip Petkovski <[email protected]>

* Merge warnings from context

Signed-off-by: Filip Petkovski <[email protected]>

* Drop unused opts

Signed-off-by: Filip Petkovski <[email protected]>

---------

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Dec 12, 2023
1 parent d2c8f42 commit af3a9a7
Show file tree
Hide file tree
Showing 16 changed files with 174 additions and 98 deletions.
74 changes: 36 additions & 38 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func TestDistributedAggregations(t *testing.T) {
{name: "avg", query: `avg(bar)`},
{name: "avg by __name__", query: `avg by (__name__) ({__name__=~".+"})`},
{name: "avg with grouping", query: `avg by (pod) (bar)`},
{name: "label_replace", query: `max by (instance) (label_replace(bar, "instance", "$1", "pod", "(.*)"))`},
{name: "count", query: `count by (pod) (bar)`},
{name: "count by __name__", query: `count by (__name__) ({__name__=~".+"})`},
{name: "group", query: `group by (pod) (bar)`},
Expand Down Expand Up @@ -239,45 +240,44 @@ func TestDistributedAggregations(t *testing.T) {
allQueryOpts = append(allQueryOpts, promql.NewPrometheusQueryOpts(false, l))
}

for _, test := range tests {
for _, lookbackDelta := range lookbackDeltas {
localOpts.LookbackDelta = lookbackDelta
for _, queryOpts := range allQueryOpts {
for _, query := range queries {
t.Run(query.name, func(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var allSeries []*mockSeries
remoteEngines := make([]api.RemoteEngine, 0, len(test.seriesSets)+1)
for _, s := range test.seriesSets {
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
localOpts,
storageWithMockSeries(s.series...),
s.mint(),
s.maxt(),
s.extLset,
))
allSeries = append(allSeries, s.series...)
}
if len(test.timeOverlap.series) > 0 {
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
localOpts,
storageWithMockSeries(test.timeOverlap.series...),
test.timeOverlap.mint(),
test.timeOverlap.maxt(),
test.timeOverlap.extLset,
))
allSeries = append(allSeries, test.timeOverlap.series...)
}
completeSeriesSet := storageWithSeries(mergeWithSampleDedup(allSeries)...)
for _, lookbackDelta := range lookbackDeltas {
localOpts.LookbackDelta = lookbackDelta
for _, queryOpts := range allQueryOpts {
var allSeries []*mockSeries
remoteEngines := make([]api.RemoteEngine, 0, len(test.seriesSets)+1)
for _, s := range test.seriesSets {
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
localOpts,
storageWithMockSeries(s.series...),
s.mint(),
s.maxt(),
s.extLset,
))
allSeries = append(allSeries, s.series...)
}
if len(test.timeOverlap.series) > 0 {
remoteEngines = append(remoteEngines, engine.NewRemoteEngine(
localOpts,
storageWithMockSeries(test.timeOverlap.series...),
test.timeOverlap.mint(),
test.timeOverlap.maxt(),
test.timeOverlap.extLset,
))
allSeries = append(allSeries, test.timeOverlap.series...)
}
completeSeriesSet := storageWithSeries(mergeWithSampleDedup(allSeries)...)

ctx := context.Background()

ctx := context.Background()
for _, query := range queries {
t.Run(query.name, func(t *testing.T) {
distOpts := localOpts
distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)

Expand All @@ -295,9 +295,7 @@ func TestDistributedAggregations(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts,
api.NewStaticEndpoints(remoteEngines),
)
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)

Expand All @@ -309,11 +307,11 @@ func TestDistributedAggregations(t *testing.T) {

testutil.WithGoCmp(comparer).Equals(t, promResult, distResult, queryExplanation(distQry))
})
})
}
}
})
}
}
})
}
}

Expand Down
25 changes: 14 additions & 11 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/stats"
v1 "github.com/prometheus/prometheus/web/api/v1"

Expand Down Expand Up @@ -216,7 +217,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
}

lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
lplan, warns := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
exec, err := execution.New(lplan.Expr(), q, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
Expand All @@ -232,6 +233,7 @@ func (e *compatibilityEngine) NewInstantQuery(ctx context.Context, q storage.Que
engine: e,
expr: expr,
ts: ts,
warns: warns,
t: InstantQuery,
resultSort: resultSort,
}, nil
Expand Down Expand Up @@ -268,7 +270,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
}

lplan := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
lplan, warns := logicalplan.New(expr, qOpts).Optimize(e.logicalOptimizers)
exec, err := execution.New(lplan.Expr(), q, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
Expand All @@ -283,6 +285,7 @@ func (e *compatibilityEngine) NewRangeQuery(ctx context.Context, q storage.Query
Query: &Query{exec: exec, opts: opts},
engine: e,
expr: expr,
warns: warns,
t: RangeQuery,
}, nil
}
Expand All @@ -307,21 +310,20 @@ func (q *Query) Analyze() *AnalyzeOutputNode {

type compatibilityQuery struct {
*Query
engine *compatibilityEngine
expr parser.Expr
ts time.Time // Empty for range queries.
engine *compatibilityEngine
expr parser.Expr
ts time.Time // Empty for range queries.
warns annotations.Annotations

t QueryType
resultSort resultSorter

cancel context.CancelFunc
cancel context.CancelFunc
}

func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
ctx = warnings.NewContext(ctx)
defer func() {
if warns := warnings.FromContext(ctx); len(warns) > 0 {
ret.Warnings = warns
}
ret.Warnings = ret.Warnings.Merge(warnings.FromContext(ctx))
}()

// Handle case with strings early on as this does not need us to process samples.
Expand All @@ -330,7 +332,8 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
return &promql.Result{Value: promql.String{V: e.Val, T: q.ts.UnixMilli()}}
}
ret = &promql.Result{
Value: promql.Vector{},
Value: promql.Vector{},
Warnings: q.warns,
}
defer recoverEngine(q.engine.logger, q.expr, &ret.Err)

Expand Down
9 changes: 5 additions & 4 deletions engine/user_defined_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/annotations"

"github.com/prometheus/prometheus/promql/parser"

Expand Down Expand Up @@ -60,8 +61,8 @@ load 30s

type injectVectorSelector struct{}

func (i injectVectorSelector) Optimize(expr parser.Expr, _ *query.Options) parser.Expr {
logicalplan.TraverseBottomUp(nil, &expr, func(_, current *parser.Expr) bool {
func (i injectVectorSelector) Optimize(plan parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
logicalplan.TraverseBottomUp(nil, &plan, func(_, current *parser.Expr) bool {
switch (*current).(type) {
case *parser.VectorSelector:
*current = &logicalVectorSelector{
Expand All @@ -70,14 +71,14 @@ func (i injectVectorSelector) Optimize(expr parser.Expr, _ *query.Options) parse
}
return false
})
return expr
return plan, nil
}

type logicalVectorSelector struct {
*parser.VectorSelector
}

func (c logicalVectorSelector) MakeExecutionOperator(vectors *model.VectorPool, selectors *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
func (c logicalVectorSelector) MakeExecutionOperator(vectors *model.VectorPool, _ *engstore.SelectorPool, opts *query.Options, hints storage.SelectHints) (model.VectorOperator, error) {
return &vectorSelectorOperator{
stepsBatch: opts.StepsBatch,
vectors: vectors,
Expand Down
34 changes: 32 additions & 2 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"strings"
"time"

"github.com/efficientgo/core/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/util/annotations"

"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand All @@ -19,6 +21,10 @@ import (
"github.com/thanos-io/promql-engine/query"
)

var (
RewrittenExternalLabelWarning = errors.Newf("%s: rewriting an external label with label_replace could lead to unpredictable results", annotations.PromQLWarning.Error())
)

type timeRange struct {
start time.Time
end time.Time
Expand Down Expand Up @@ -143,23 +149,30 @@ type DistributedExecutionOptimizer struct {
Endpoints api.RemoteEndpoints
}

func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Options) parser.Expr {
func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Options) (parser.Expr, annotations.Annotations) {
engines := m.Endpoints.Engines()
sort.Slice(engines, func(i, j int) bool {
return engines[i].MinT() < engines[j].MinT()
})

labelRanges := make(labelSetRanges)
engineLabels := make(map[string]struct{})
for _, e := range engines {
for _, lset := range e.LabelSets() {
lsetKey := lset.String()
labelRanges.addRange(lsetKey, timeRange{
start: time.UnixMilli(e.MinT()),
end: time.UnixMilli(e.MaxT()),
})
lset.Range(func(lbl labels.Label) {
engineLabels[lbl.Name] = struct{}{}
})
}
}
minEngineOverlap := labelRanges.minOverlap()
if rewritesEngineLabels(plan, engineLabels) {
return plan, annotations.New().Add(RewrittenExternalLabelWarning)
}

TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
// If the current operation is not distributive, stop the traversal.
Expand Down Expand Up @@ -197,7 +210,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr, opts *query.Op
return true
})

return plan
return plan, nil
}

func newRemoteAggregation(rootAggregation *parser.AggregateExpr, engines []api.RemoteEngine) parser.Expr {
Expand Down Expand Up @@ -502,6 +515,23 @@ func isConstantExpr(expr parser.Expr) bool {
}
}

func rewritesEngineLabels(e parser.Expr, engineLabels map[string]struct{}) bool {
var result bool
TraverseBottomUp(nil, &e, func(parent *parser.Expr, node *parser.Expr) bool {
call, ok := (*node).(*parser.Call)
if !ok || call.Func.Name != "label_replace" {
return false
}
targetLabel := call.Args[1].(*parser.StringLiteral).Val
if _, ok := engineLabels[targetLabel]; ok {
result = true
return true
}
return false
})
return result
}

func maxTime(a, b time.Time) time.Time {
if a.After(b) {
return a
Expand Down
5 changes: 3 additions & 2 deletions logicalplan/distribute_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logicalplan

import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/annotations"

"github.com/thanos-io/promql-engine/query"
)
Expand All @@ -10,7 +11,7 @@ import (
// it can be executed in a distributed manner.
type DistributeAvgOptimizer struct{}

func (r DistributeAvgOptimizer) Optimize(plan parser.Expr, _ *query.Options) parser.Expr {
func (r DistributeAvgOptimizer) Optimize(plan parser.Expr, _ *query.Options) (parser.Expr, annotations.Annotations) {
TraverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) {
if !isDistributiveOrAverage(current) {
return true
Expand Down Expand Up @@ -40,7 +41,7 @@ func (r DistributeAvgOptimizer) Optimize(plan parser.Expr, _ *query.Options) par
}
return !isDistributiveOrAverage(parent)
})
return plan
return plan, nil
}

func isDistributiveOrAverage(expr *parser.Expr) bool {
Expand Down
Loading

0 comments on commit af3a9a7

Please sign in to comment.