From fd90edb634fd99a151219bda25c7e0098eadda40 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 29 Jul 2024 13:39:07 +0200 Subject: [PATCH] plan: distribute more binary joins * distribute joins that can be executed within a remote engine Signed-off-by: Michael Hoffmann --- logicalplan/distribute.go | 30 ++++++++++++++++++++++++------ logicalplan/distribute_test.go | 21 ++++++++++++++++++++- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index 0823822d..542577eb 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -6,6 +6,7 @@ package logicalplan import ( "fmt" "math" + "slices" "sort" "strings" "time" @@ -184,8 +185,8 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) return false }) - // Preprocess rewrite distributable averages as sum/count var warns = annotations.New() + // Preprocess rewrite highest distributable average as sum/count TraverseBottomUp(nil, &plan, func(parent, current *Node) (stop bool) { if !(isDistributive(current, m.SkipBinaryPushdown, engineLabels, warns) || isAvgAggregation(current)) { return true @@ -493,7 +494,7 @@ func isDistributive(expr *Node, skipBinaryPushdown bool, engineLabels map[string case Deduplicate, RemoteExecution: return false case *Binary: - return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e)) + return isBinaryExpressionWithOneScalarSide(e) || (!skipBinaryPushdown && isBinaryExpressionWithDistributableMatching(e, engineLabels)) case *Aggregation: // Certain aggregations are currently not supported. if _, ok := distributiveAggregations[e.Op]; !ok { @@ -518,14 +519,31 @@ func isBinaryExpressionWithOneScalarSide(expr *Binary) bool { return lhsConstant || rhsConstant } -func isBinaryExpressionWithDistributableMatching(expr *Binary) bool { +func isBinaryExpressionWithDistributableMatching(expr *Binary, engineLabels map[string]struct{}) bool { if expr.VectorMatching == nil { return false } + // TODO: think about "or" but for safety we dont push it down for now. + if expr.Op == parser.LOR { + return false + } - // we can distribute if the vector matching contains the external labels so that - // all potential matching partners are contained in one engine - return !expr.VectorMatching.On && len(expr.VectorMatching.MatchingLabels) == 0 + if expr.VectorMatching.On { + // on (...) - if ... contains all partition labels we can distribute + for lbl := range engineLabels { + if !slices.Contains(expr.VectorMatching.MatchingLabels, lbl) { + return false + } + } + return true + } + // ignoring (...) - if ... does contain any engine labels we cannot distribute + for lbl := range engineLabels { + if slices.Contains(expr.VectorMatching.MatchingLabels, lbl) { + return false + } + } + return true } // matchesExternalLabels returns false if given matchers are not matching external labels. diff --git a/logicalplan/distribute_test.go b/logicalplan/distribute_test.go index 3d2f93a2..1973c446 100644 --- a/logicalplan/distribute_test.go +++ b/logicalplan/distribute_test.go @@ -11,7 +11,6 @@ import ( "github.com/efficientgo/core/testutil" "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/promql-engine/api" @@ -363,6 +362,26 @@ sum_over_time(max(dedup( expr: `sum by (pod) (rate(http_requests_total{region="south"}[2m]))`, expected: `sum by (pod) (dedup(remote(sum by (pod, region) (rate(http_requests_total{region="south"}[2m])))))`, }, + { + name: "binary matching where hash contains partitioning label with on", + expr: `X * on (region) Y`, + expected: `dedup(remote(X * on (region) Y), remote(X * on (region) Y))`, + }, + { + name: "binary matching where hash contains partitioning label with ignoring", + expr: `X * ignoring (foo) Y`, + expected: `dedup(remote(X * ignoring (foo) Y), remote(X * ignoring (foo) Y))`, + }, + { + name: "binary matching where hash doesnt contain partitioning label with ignoring", + expr: `X * ignoring (region) Y`, + expected: `dedup(remote(X), remote(X)) * ignoring (region) dedup(remote(Y), remote(Y))`, + }, + { + name: "binary matching where hash doesnt contain partitioning label with on", + expr: `X * on (foo) Y`, + expected: `dedup(remote(X), remote(X)) * on (foo) dedup(remote(Y), remote(Y))`, + }, } engines := []api.RemoteEngine{