Skip to content

Commit

Permalink
Prune remote engines by time (#340)
Browse files Browse the repository at this point in the history
The distributed optimizer generates one query per remote engine
without taking into account the range of the engine. This reduces
resiliency since an engine with an older timestamp could be unavailable,
but it still gets queried for recent data.

This commit adds time-based pruning to the optimizer which excludes
endpoints which do not match the query range.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski authored Nov 28, 2023
1 parent 257543a commit fede49a
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 3 deletions.
8 changes: 7 additions & 1 deletion logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engine
}

startOffset := calculateStartOffset(expr, opts.LookbackDelta)
if allowedStartOffset < maxDuration(opts.LookbackDelta, startOffset) {
if allowedStartOffset < startOffset {
return *expr
}

Expand All @@ -253,6 +253,12 @@ func (m DistributedExecutionOptimizer) distributeQuery(expr *parser.Expr, engine
if !matchesExternalLabelSet(*expr, e.LabelSets()) {
continue
}
if e.MinT() > opts.End.UnixMilli() {
continue
}
if e.MaxT() < opts.Start.UnixMilli()-startOffset.Milliseconds() {
continue
}

start, keep := getStartTimeForEngine(e, opts, startOffset, globalMinT)
if !keep {
Expand Down
78 changes: 76 additions & 2 deletions logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ remote(sum by (pod, region) (rate(http_requests_total[2m]) * 60))))`,
}

engines := []api.RemoteEngine{
newEngineMock(math.MinInt64, math.MinInt64, []labels.Labels{labels.FromStrings("region", "east"), labels.FromStrings("region", "south")}),
newEngineMock(math.MinInt64, math.MinInt64, []labels.Labels{labels.FromStrings("region", "west")}),
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "east"), labels.FromStrings("region", "south")}),
newEngineMock(math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("region", "west")}),
}
optimizers := []Optimizer{
DistributeAvgOptimizer{},
Expand Down Expand Up @@ -363,6 +363,80 @@ dedup(
}
}

func TestDistributedExecutionPruningByTime(t *testing.T) {
replacements := map[string]*regexp.Regexp{
" ": spaces,
"(": openParenthesis,
")": closedParenthesis,
}

firstEngineOpts := engineOpts{
minTime: time.Unix(0, 0),
maxTime: time.Unix(0, 0).Add(6 * time.Hour),
}
secondEngineOpts := engineOpts{
minTime: time.Unix(0, 0).Add(4 * time.Hour),
maxTime: time.Unix(0, 0).Add(8 * time.Hour),
}

cases := []struct {
name string
expr string
expected string
queryStart time.Time
queryEnd time.Time
}{
{
name: "1 hour query at the end of the range prunes the first engine",
expr: `sum(metric)`,
queryStart: time.Unix(0, 0).Add(7 * time.Hour),
queryEnd: time.Unix(0, 0).Add(8 * time.Hour),
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 07:00:00 +0000 UTC]))`,
},
{
name: "1 hour range query at the start of the range prunes the second engine",
expr: `sum(metric)`,
queryStart: time.Unix(0, 0).Add(1 * time.Hour),
queryEnd: time.Unix(0, 0).Add(2 * time.Hour),
expected: `sum(dedup(remote(sum by (region) (metric)) [1970-01-01 01:00:00 +0000 UTC]))`,
},
{
name: "instant query in the overlapping range queries both engines",
expr: `sum(metric)`,
queryStart: time.Unix(0, 0).Add(6 * time.Hour),
queryEnd: time.Unix(0, 0).Add(6 * time.Hour),
expected: `
sum(
dedup(
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC],
remote(sum by (region) (metric)) [1970-01-01 06:00:00 +0000 UTC]
)
)`,
},
}

for _, tcase := range cases {
t.Run(tcase.name, func(t *testing.T) {
engines := []api.RemoteEngine{
newEngineMock(firstEngineOpts.mint(), firstEngineOpts.maxt(), []labels.Labels{labels.FromStrings("region", "east")}),
newEngineMock(secondEngineOpts.mint(), secondEngineOpts.maxt(), []labels.Labels{labels.FromStrings("region", "east")}),
}
optimizers := []Optimizer{
DistributeAvgOptimizer{},
DistributedExecutionOptimizer{Endpoints: api.NewStaticEndpoints(engines)},
}

expr, err := parser.ParseExpr(tcase.expr)
testutil.Ok(t, err)

plan := New(expr, &query.Options{Start: tcase.queryStart, End: tcase.queryEnd, Step: time.Minute})
optimizedPlan := plan.Optimize(optimizers)
expectedPlan := cleanUp(replacements, tcase.expected)
testutil.Equals(t, expectedPlan, optimizedPlan.Expr().String())
})
}
}

type engineMock struct {
api.RemoteEngine
minT int64
Expand Down

0 comments on commit fede49a

Please sign in to comment.