Skip to content

Commit

Permalink
plan: distribute more binary joins
Browse files Browse the repository at this point in the history
* distribute joins that can be executed within a remote engine

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 5, 2024
1 parent 103e6f3 commit ee3311b
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 18 deletions.
6 changes: 3 additions & 3 deletions engine/distributed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestDistributedAggregations(t *testing.T) {
distOpts.DisableFallback = !query.expectFallback
for _, instantTS := range instantTSs {
t.Run(fmt.Sprintf("instant/ts=%d", instantTS.Unix()), func(t *testing.T) {
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines), []string{"zone"})

Check failure on line 306 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests

too many arguments in call to engine.NewDistributedEngine

Check failure on line 306 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run fuzz

too many arguments in call to engine.NewDistributedEngine

Check failure on line 306 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 306 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 306 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests --tags=stringlabels

too many arguments in call to engine.NewDistributedEngine
distQry, err := distEngine.NewInstantQuery(ctx, completeSeriesSet, queryOpts, query.query, instantTS)
testutil.Ok(t, err)

Expand All @@ -324,7 +324,7 @@ func TestDistributedAggregations(t *testing.T) {
if test.rangeEnd == (time.Time{}) {
test.rangeEnd = rangeEnd
}
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines))
distEngine := engine.NewDistributedEngine(distOpts, api.NewStaticEndpoints(remoteEngines), []string{"zone"})

Check failure on line 327 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests

too many arguments in call to engine.NewDistributedEngine

Check failure on line 327 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run fuzz

too many arguments in call to engine.NewDistributedEngine

Check failure on line 327 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 327 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 327 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests --tags=stringlabels

too many arguments in call to engine.NewDistributedEngine
distQry, err := distEngine.NewRangeQuery(ctx, completeSeriesSet, queryOpts, query.query, query.rangeStart, test.rangeEnd, rangeStep)
testutil.Ok(t, err)

Expand Down Expand Up @@ -361,7 +361,7 @@ func TestDistributedEngineWarnings(t *testing.T) {
},
}
remote := engine.NewRemoteEngine(opts, querier, math.MinInt64, math.MaxInt64, nil)
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote}))
ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remote}), []string{"zone"})

Check failure on line 364 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests

too many arguments in call to engine.NewDistributedEngine

Check failure on line 364 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run fuzz

too many arguments in call to engine.NewDistributedEngine

Check failure on line 364 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 364 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 364 in engine/distributed_test.go

View workflow job for this annotation

GitHub Actions / Run tests --tags=stringlabels

too many arguments in call to engine.NewDistributedEngine
var (
start = time.UnixMilli(0)
end = time.UnixMilli(600)
Expand Down
4 changes: 2 additions & 2 deletions engine/enginefuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func FuzzDistributedEnginePromQLSmithRangeQuery(f *testing.F) {
)
remoteEngines = append(remoteEngines, e)
}
distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines))
distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines), []string{"zone"})

Check failure on line 269 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run tests

too many arguments in call to engine.NewDistributedEngine

Check failure on line 269 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run fuzz

too many arguments in call to engine.NewDistributedEngine

Check failure on line 269 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 269 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 269 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run tests --tags=stringlabels

too many arguments in call to engine.NewDistributedEngine
oldEngine := promql.NewEngine(opts)

mergeStore := storage.NewFanout(nil, storage1, storage2)
Expand Down Expand Up @@ -369,7 +369,7 @@ func FuzzDistributedEnginePromQLSmithInstantQuery(f *testing.F) {
)
remoteEngines = append(remoteEngines, e)
}
distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines))
distEngine := engine.NewDistributedEngine(engineOpts, api.NewStaticEndpoints(remoteEngines), []string{"zone"})

Check failure on line 372 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run tests

too many arguments in call to engine.NewDistributedEngine

Check failure on line 372 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run fuzz

too many arguments in call to engine.NewDistributedEngine

Check failure on line 372 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 372 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

too many arguments in call to engine.NewDistributedEngine

Check failure on line 372 in engine/enginefuzz_test.go

View workflow job for this annotation

GitHub Actions / Run tests --tags=stringlabels

too many arguments in call to engine.NewDistributedEngine
oldEngine := promql.NewEngine(opts)

mergeStore := storage.NewFanout(nil, storage1, storage2)
Expand Down
42 changes: 30 additions & 12 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logicalplan
import (
"fmt"
"math"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -184,10 +185,10 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
return false
})

// Preprocess rewrite distributable averages as sum/count
var warns = annotations.New()
// Preprocess rewrite highest distributable average as sum/count
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) {
if !(m.isDistributive(current, warns, engineLabels) || isAvgAggregation(current)) {
return true
}
// If the current node is avg(), distribute the operation and
Expand All @@ -213,12 +214,12 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
}
return true
}
return !(isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(parent))
return !(m.isDistributive(parent, warns, engineLabels) || isAvgAggregation(parent))
})

TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
// If the current operation is not distributive, stop the traversal.
if !isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) {
if !m.isDistributive(current, warns, engineLabels) {
return true
}

Expand Down Expand Up @@ -247,7 +248,7 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
}

// If the parent operation is distributive, continue the traversal.
if isDistributive(parent, m.SkipBinaryPushdown, engineLabels, warns) {
if m.isDistributive(parent, warns, engineLabels) {
return false
}

Expand Down Expand Up @@ -484,7 +485,7 @@ func numSteps(start, end time.Time, step time.Duration) int64 {
return (end.UnixMilli()-start.UnixMilli())/step.Milliseconds() + 1
}

func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string]struct{}, warns *annotations.Annotations) bool {
func (m DistributedExecutionOptimizer) isDistributive(expr *Node, warns *annotations.Annotations, engineLabels map[string]struct{}) bool {
if expr == nil {
return false
}
Expand All @@ -493,7 +494,7 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string
case Deduplicate, RemoteExecution:
return false
case *Binary:
return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e))
return m.isBinaryExpressionWithOneScalarSide(e) || (!m.SkipBinaryPushdown && m.isBinaryExpressionWithDistributableMatching(e, engineLabels))
case *Aggregation:
// Certain aggregations are currently not supported.
if _, ok := distributiveAggregations[e.Op]; !ok {
Expand All @@ -512,20 +513,37 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string
return true
}

func isBinaryExpressionWithOneScalarSide(expr *Binary) bool {
func (m DistributedExecutionOptimizer) isBinaryExpressionWithOneScalarSide(expr *Binary) bool {
lhsConstant := IsConstantScalarExpr(expr.LHS)
rhsConstant := IsConstantScalarExpr(expr.RHS)
return lhsConstant || rhsConstant
}

func isBinaryExpressionWithDistributableMatching(expr *Binary) bool {
func (m DistributedExecutionOptimizer) isBinaryExpressionWithDistributableMatching(expr *Binary, engineLabels map[string]struct{}) bool {
if expr.VectorMatching == nil {
return false
}
// TODO: think about "or" but for safety we dont push it down for now.
if expr.Op == parser.LOR {
return false
}

// we can distribute if the vector matching contains the external labels so that
// all potential matching partners are contained in one engine
return !expr.VectorMatching.On && len(expr.VectorMatching.MatchingLabels) == 0
if expr.VectorMatching.On {
// on (...) - if ... contains all partition labels we can distribute
for lbl := range engineLabels {
if !slices.Contains(expr.VectorMatching.MatchingLabels, lbl) {
return false
}
}
return true
}
// ignoring (...) - if ... does contain any engine labels we cannot distribute
for lbl := range engineLabels {
if slices.Contains(expr.VectorMatching.MatchingLabels, lbl) {
return false
}
}
return true
}

// matchesExternalLabels returns false if given matchers are not matching external labels.
Expand Down
21 changes: 20 additions & 1 deletion logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/api"
Expand Down Expand Up @@ -363,6 +362,26 @@ sum_over_time(max(dedup(
expr: `sum by (pod) (rate(http_requests_total{region="south"}[2m]))`,
expected: `sum by (pod) (dedup(remote(sum by (pod, region) (rate(http_requests_total{region="south"}[2m])))))`,
},
{
name: "binary matching where hash contains partitioning label with on",
expr: `X * on (region) Y`,
expected: `dedup(remote(X * on (region) Y), remote(X * on (region) Y))`,
},
{
name: "binary matching where hash contains partitioning label with ignoring",
expr: `X * ignoring (foo) Y`,
expected: `dedup(remote(X * ignoring (foo) Y), remote(X * ignoring (foo) Y))`,
},
{
name: "binary matching where hash doesnt contain partitioning label with ignoring",
expr: `X * ignoring (region) Y`,
expected: `dedup(remote(X), remote(X)) * ignoring (region) dedup(remote(Y), remote(Y))`,
},
{
name: "binary matching where hash doesnt contain partitioning label with on",
expr: `X * on (foo) Y`,
expected: `dedup(remote(X), remote(X)) * on (foo) dedup(remote(Y), remote(Y))`,
},
}

engines := []api.RemoteEngine{
Expand Down

0 comments on commit ee3311b

Please sign in to comment.