Skip to content

Commit

Permalink
plan: distribute more binary joins
Browse files Browse the repository at this point in the history
* distribute joins that can be executed within a remote engine

Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Aug 5, 2024
1 parent 103e6f3 commit fd90edb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
30 changes: 24 additions & 6 deletions logicalplan/distribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package logicalplan
import (
"fmt"
"math"
"slices"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -184,8 +185,8 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options)
return false
})

// Preprocess rewrite distributable averages as sum/count
var warns = annotations.New()
// Preprocess rewrite highest distributable average as sum/count
TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) {
if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) {
return true
Expand Down Expand Up @@ -493,7 +494,7 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string
case Deduplicate, RemoteExecution:
return false
case *Binary:
return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e))
return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e, engineLabels))
case *Aggregation:
// Certain aggregations are currently not supported.
if _, ok := distributiveAggregations[e.Op]; !ok {
Expand All @@ -518,14 +519,31 @@ func isBinaryExpressionWithOneScalarSide(expr *Binary) bool {
return lhsConstant || rhsConstant
}

func isBinaryExpressionWithDistributableMatching(expr *Binary) bool {
func isBinaryExpressionWithDistributableMatching(expr *Binary, engineLabels map[string]struct{}) bool {
if expr.VectorMatching == nil {
return false
}
// TODO: think about "or" but for safety we dont push it down for now.
if expr.Op == parser.LOR {
return false
}

// we can distribute if the vector matching contains the external labels so that
// all potential matching partners are contained in one engine
return !expr.VectorMatching.On && len(expr.VectorMatching.MatchingLabels) == 0
if expr.VectorMatching.On {
// on (...) - if ... contains all partition labels we can distribute
for lbl := range engineLabels {
if !slices.Contains(expr.VectorMatching.MatchingLabels, lbl) {
return false
}
}
return true
}
// ignoring (...) - if ... does contain any engine labels we cannot distribute
for lbl := range engineLabels {
if slices.Contains(expr.VectorMatching.MatchingLabels, lbl) {
return false
}
}
return true
}

// matchesExternalLabels returns false if given matchers are not matching external labels.
Expand Down
21 changes: 20 additions & 1 deletion logicalplan/distribute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/efficientgo/core/testutil"
"github.com/prometheus/prometheus/model/labels"

"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/api"
Expand Down Expand Up @@ -363,6 +362,26 @@ sum_over_time(max(dedup(
expr: `sum by (pod) (rate(http_requests_total{region="south"}[2m]))`,
expected: `sum by (pod) (dedup(remote(sum by (pod, region) (rate(http_requests_total{region="south"}[2m])))))`,
},
{
name: "binary matching where hash contains partitioning label with on",
expr: `X * on (region) Y`,
expected: `dedup(remote(X * on (region) Y), remote(X * on (region) Y))`,
},
{
name: "binary matching where hash contains partitioning label with ignoring",
expr: `X * ignoring (foo) Y`,
expected: `dedup(remote(X * ignoring (foo) Y), remote(X * ignoring (foo) Y))`,
},
{
name: "binary matching where hash doesnt contain partitioning label with ignoring",
expr: `X * ignoring (region) Y`,
expected: `dedup(remote(X), remote(X)) * ignoring (region) dedup(remote(Y), remote(Y))`,
},
{
name: "binary matching where hash doesnt contain partitioning label with on",
expr: `X * on (foo) Y`,
expected: `dedup(remote(X), remote(X)) * on (foo) dedup(remote(Y), remote(Y))`,
},
}

engines := []api.RemoteEngine{
Expand Down

0 comments on commit fd90edb

Please sign in to comment.