From 337b02b0ba83990cc3f8e214d8bbaa0877287e0d Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 25 Sep 2023 19:57:41 +0200 Subject: [PATCH] execution: rework binary operators, add set operatos Signed-off-by: Michael Hoffmann --- engine/engine.go | 15 +- engine/engine_test.go | 60 ++- execution/binary/index.go | 35 -- execution/binary/{table.go => utils.go} | 140 +----- execution/binary/vector.go | 635 +++++++++++++++--------- extlabels/labels.go | 19 - go.mod | 1 + go.sum | 2 + 8 files changed, 510 insertions(+), 397 deletions(-) delete mode 100644 execution/binary/index.go rename execution/binary/{table.go => utils.go} (57%) diff --git a/engine/engine.go b/engine/engine.go index a05998c05..93a33137a 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -577,9 +577,6 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { if err != nil { return newErrResult(ret, err) } - if extlabels.ContainsDuplicateLabelSet(resultSeries) { - return newErrResult(ret, extlabels.ErrDuplicateLabelSet) - } series := make([]promql.Series, len(resultSeries)) for i := 0; i < len(resultSeries); i++ { @@ -640,6 +637,9 @@ loop: resultMatrix = append(resultMatrix, s) } sort.Sort(resultMatrix) + if resultMatrix.ContainsSameLabelset() { + return newErrResult(ret, extlabels.ErrDuplicateLabelSet) + } ret.Value = resultMatrix if q.debugWriter != nil { analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "") @@ -650,7 +650,11 @@ loop: var result parser.Value switch q.expr.Type() { case parser.ValueTypeMatrix: - result = promql.Matrix(series) + matrix := promql.Matrix(series) + if matrix.ContainsSameLabelset() { + return newErrResult(ret, extlabels.ErrDuplicateLabelSet) + } + result = matrix case parser.ValueTypeVector: // Convert matrix with one value per series into vector. vector := make(promql.Vector, 0, len(resultSeries)) @@ -675,6 +679,9 @@ loop: } } sort.Slice(vector, q.resultSort.comparer(&vector)) + if vector.ContainsSameLabelset() { + return newErrResult(ret, extlabels.ErrDuplicateLabelSet) + } result = vector case parser.ValueTypeScalar: v := math.NaN() diff --git a/engine/engine_test.go b/engine/engine_test.go index 12705eb1f..7d47ddb85 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -3359,6 +3359,64 @@ func TestInstantQuery(t *testing.T) { bar{method="get", code="404"} 1+1x30`, query: `sum(foo) by (method) % sum(bar) by (method)`, }, + { + name: "vector binary op and 1", + load: `load 30s + foo{method="get", code="500"} 1+2x40 + bar{method="get", code="404"} 1+1x30`, + query: `sum(foo) by (method) and sum(bar) by (method)`, + }, + { + name: "vector binary op and 2", + load: `load 30s + foo{method="get", code="500"} 1+2x40 + bar{method="get", code="404"} 1+1x30`, + query: `sum(foo) by (code) and sum(bar) by (code)`, + }, + { + name: "vector binary op unless 1", + load: `load 30s + foo{method="get", code="500"} 1+2x40 + bar{method="get", code="404"} 1+1x30`, + query: `sum(foo) by (method) unless sum(bar) by (method)`, + }, + { + name: "vector binary op unless 2", + load: `load 30s + foo{method="get", code="500"} 1+2x40 + bar{method="get", code="404"} 1+1x30`, + query: `sum(foo) by (code) unless sum(bar) by (code)`, + }, + { + name: "vector binary op unless 3", + load: `load 30s + foo{method="get", code="500"} 1+2x40`, + query: `sum(foo) by (code) unless nonexistent`, + }, + { + name: "vector binary op or 1", + load: `load 30s + foo{A="1"} 1+1x40 + foo{A="2"} 2+2x40`, + query: `sinh(foo or exp(foo))`, + }, + { + name: "vector binary op one-to-one left multiple matches", + load: `load 30s + foo{method="get", code="500"} 1 + foo{method="get", code="200"} 1 + bar{method="get", code="200"} 1`, + query: `foo / ignoring (code) bar`, + }, + { + name: "vector binary operation with many-to-many matching rhs high card", + load: `load 30s + foo{code="200", method="get"} 1+1x20 + foo{code="200", method="post"} 1+1x20 + bar{code="200", method="get"} 1+1x20 + bar{code="200", method="post"} 1+1x20`, + query: `foo + on(code) group_right bar`, + }, { name: "vector binary op > scalar", load: `load 30s @@ -3774,7 +3832,7 @@ func TestInstantQuery(t *testing.T) { if hasNaNs(oldResult) { t.Log("Applying comparison with NaN equality.") equalsWithNaNs(t, oldResult, newResult) - } else if oldResult.Err != nil { + } else if oldResult.Err != nil && newResult.Err != nil { testutil.Equals(t, oldResult.Err.Error(), newResult.Err.Error()) } else { testutil.Equals(t, oldResult, newResult) diff --git a/execution/binary/index.go b/execution/binary/index.go deleted file mode 100644 index 4fb4c7a3e..000000000 --- a/execution/binary/index.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright (c) The Thanos Community Authors. -// Licensed under the Apache License 2.0. - -package binary - -type outputIndex interface { - outputSamples(inputSampleID uint64) []uint64 -} - -type highCardinalityIndex struct { - result []uint64 - index []*uint64 -} - -func newHighCardIndex(index []*uint64) *highCardinalityIndex { - return &highCardinalityIndex{ - result: make([]uint64, 1), - index: index, - } -} - -func (h *highCardinalityIndex) outputSamples(inputSampleID uint64) []uint64 { - outputSampleID := h.index[inputSampleID] - if outputSampleID == nil { - return nil - } - h.result[0] = *outputSampleID - return h.result -} - -type lowCardinalityIndex [][]uint64 - -func (l lowCardinalityIndex) outputSamples(inputSampleID uint64) []uint64 { - return l[inputSampleID] -} diff --git a/execution/binary/table.go b/execution/binary/utils.go similarity index 57% rename from execution/binary/table.go rename to execution/binary/utils.go index 4fef184ea..be4bc7f78 100644 --- a/execution/binary/table.go +++ b/execution/binary/utils.go @@ -4,13 +4,13 @@ package binary import ( + "fmt" "math" "github.com/prometheus/prometheus/model/histogram" - + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" - "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/execution/parse" ) @@ -22,122 +22,26 @@ const ( ) type errManyToManyMatch struct { - sampleID uint64 - duplicateSampleID uint64 - side binOpSide + matching *parser.VectorMatching + side binOpSide + + original, duplicate labels.Labels } -func newManyToManyMatchError(sampleID, duplicateSampleID uint64, side binOpSide) *errManyToManyMatch { +func newManyToManyMatchError(matching *parser.VectorMatching, original, duplicate labels.Labels, side binOpSide) *errManyToManyMatch { return &errManyToManyMatch{ - sampleID: sampleID, - duplicateSampleID: duplicateSampleID, - side: side, + original: original, + duplicate: duplicate, + matching: matching, + side: side, } } -type outputSample struct { - lhT int64 - rhT int64 - lhSampleID uint64 - rhSampleID uint64 - v float64 -} - -type table struct { - pool *model.VectorPool - - operation operation - card parser.VectorMatchCardinality - - outputValues []outputSample - // highCardOutputIndex is a mapping from series ID of the high cardinality - // operator to an output series ID. - // During joins, each high cardinality series that has a matching - // low cardinality series will map to exactly one output series. - highCardOutputIndex outputIndex - // lowCardOutputIndex is a mapping from series ID of the low cardinality - // operator to an output series ID. - // Each series from the low cardinality operator can join with many - // series of the high cardinality operator. - lowCardOutputIndex outputIndex -} - -func newTable( - pool *model.VectorPool, - card parser.VectorMatchCardinality, - operation operation, - outputValues []outputSample, - highCardOutputCache outputIndex, - lowCardOutputCache outputIndex, -) *table { - for i := range outputValues { - outputValues[i].lhT = -1 - outputValues[i].rhT = -1 - } - return &table{ - pool: pool, - card: card, - - operation: operation, - outputValues: outputValues, - highCardOutputIndex: highCardOutputCache, - lowCardOutputIndex: lowCardOutputCache, - } -} - -func (t *table) execBinaryOperation(lhs model.StepVector, rhs model.StepVector, returnBool bool) (model.StepVector, *errManyToManyMatch) { - ts := lhs.T - step := t.pool.GetStepVector(ts) - - lhsIndex, rhsIndex := t.highCardOutputIndex, t.lowCardOutputIndex - if t.card == parser.CardOneToMany { - lhsIndex, rhsIndex = rhsIndex, lhsIndex - } - - for i, sampleID := range lhs.SampleIDs { - lhsVal := lhs.Samples[i] - outputSampleIDs := lhsIndex.outputSamples(sampleID) - for _, outputSampleID := range outputSampleIDs { - if t.card != parser.CardManyToOne && t.outputValues[outputSampleID].lhT == ts { - prevSampleID := t.outputValues[outputSampleID].lhSampleID - return model.StepVector{}, newManyToManyMatchError(prevSampleID, sampleID, lhBinOpSide) - } - - t.outputValues[outputSampleID].lhSampleID = sampleID - t.outputValues[outputSampleID].lhT = lhs.T - t.outputValues[outputSampleID].v = lhsVal - } - } - - for i, sampleID := range rhs.SampleIDs { - rhVal := rhs.Samples[i] - outputSampleIDs := rhsIndex.outputSamples(sampleID) - for _, outputSampleID := range outputSampleIDs { - outputSample := t.outputValues[outputSampleID] - if rhs.T != outputSample.lhT { - continue - } - if t.card != parser.CardOneToMany && outputSample.rhT == rhs.T { - prevSampleID := t.outputValues[outputSampleID].rhSampleID - return model.StepVector{}, newManyToManyMatchError(prevSampleID, sampleID, rhBinOpSide) - } - t.outputValues[outputSampleID].rhSampleID = sampleID - t.outputValues[outputSampleID].rhT = rhs.T - - outputVal, keep := t.operation([2]float64{outputSample.v, rhVal}, 0) - if returnBool { - outputVal = 0 - if keep { - outputVal = 1 - } - } else if !keep { - continue - } - step.AppendSample(t.pool, outputSampleID, outputVal) - } - } - - return step, nil +func (e *errManyToManyMatch) Error() string { + group := e.original.MatchLabels(e.matching.On, e.matching.MatchingLabels...) + msg := "found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]" + + ";many-to-many matching not allowed: matching labels must be unique on one side" + return fmt.Sprintf(msg, group, e.side, e.original.String(), e.duplicate.String()) } // operands is a length 2 array which contains lhs and rhs. @@ -188,6 +92,10 @@ var vectorBinaryOperations = map[string]operation{ func newOperation(expr parser.ItemType, vectorBinOp bool) (operation, error) { t := parser.ItemTypeStr[expr] + if expr.IsSetOperator() && vectorBinOp { + // handled in the operator + return nil, nil + } if expr.IsComparisonOperator() && vectorBinOp { if o, ok := vectorBinaryOperations[t]; ok { return o, nil @@ -245,10 +153,10 @@ func btof(b bool) float64 { } func shouldDropMetricName(op parser.ItemType, returnBool bool) bool { - switch op.String() { - case "+", "-", "*", "/", "%", "^", "atan2": + switch op { + case parser.ADD, parser.SUB, parser.MUL, parser.DIV, parser.MOD, parser.POW, parser.ATAN2: return true + default: + return op.IsComparisonOperator() && returnBool } - - return op.IsComparisonOperator() && returnBool } diff --git a/execution/binary/vector.go b/execution/binary/vector.go index d6338e9f9..74e74e214 100644 --- a/execution/binary/vector.go +++ b/execution/binary/vector.go @@ -6,44 +6,54 @@ package binary import ( "context" "fmt" + "math" "sync" - "github.com/efficientgo/core/errors" - "github.com/prometheus/prometheus/model/labels" "golang.org/x/exp/slices" + "github.com/cespare/xxhash/v2" + "github.com/efficientgo/core/errors" + "github.com/prometheus/prometheus/model/histogram" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" + "github.com/zhangyunhao116/umap" "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/query" ) +type joinBucket struct { + ats, bts int64 + sid uint64 + val float64 +} + // vectorOperator evaluates an expression between two step vectors. type vectorOperator struct { pool *model.VectorPool once sync.Once - lhs model.VectorOperator - rhs model.VectorOperator - matching *parser.VectorMatching - groupingLabels []string - operation operation - opType parser.ItemType - - lhSampleIDs []labels.Labels - rhSampleIDs []labels.Labels - - // series contains the output series of the operator - series []labels.Labels - // The outputCache is an internal cache used to calculate - // the binary operation of the lhs and rhs operator. - outputCache []outputSample - // table is used to calculate the binary operation of two step vectors between - // the lhs and rhs operator. - table *table + lhs model.VectorOperator + rhs model.VectorOperator + lhsSampleIDs []labels.Labels + rhsSampleIDs []labels.Labels + series []labels.Labels + + // join signature + sigFunc func(labels.Labels) uint64 + + // join helpers + joinPairToSeriesIds *umap.Uint64Map + + lcJoinBuckets []*joinBucket + hcJoinBuckets []*joinBucket + + matching *parser.VectorMatching + opType parser.ItemType // If true then 1/0 needs to be returned instead of the value. returnBool bool + model.OperatorTelemetry } @@ -52,31 +62,20 @@ func NewVectorOperator( lhs model.VectorOperator, rhs model.VectorOperator, matching *parser.VectorMatching, - operation parser.ItemType, + opType parser.ItemType, returnBool bool, opts *query.Options, ) (model.VectorOperator, error) { - op, err := newOperation(operation, true) - if err != nil { - return nil, err - } - - // Make a copy of MatchingLabels to avoid potential side-effects - // in some downstream operation. - groupings := make([]string, len(matching.MatchingLabels)) - copy(groupings, matching.MatchingLabels) - slices.Sort(groupings) - o := &vectorOperator{ - pool: pool, - lhs: lhs, - rhs: rhs, - matching: matching, - groupingLabels: groupings, - operation: op, - opType: operation, - returnBool: returnBool, + pool: pool, + lhs: lhs, + rhs: rhs, + matching: matching, + opType: opType, + returnBool: returnBool, + sigFunc: signatureFunc(matching.On, matching.MatchingLabels...), } + o.OperatorTelemetry = &model.NoopTelemetry{} if opts.EnableAnalysis { o.OperatorTelemetry = &model.TrackedTelemetry{} @@ -104,77 +103,12 @@ func (o *vectorOperator) Explain() (me string, next []model.VectorOperator) { } func (o *vectorOperator) Series(ctx context.Context) ([]labels.Labels, error) { - var err error - o.once.Do(func() { err = o.initOutputs(ctx) }) - if err != nil { + if err := o.initOnce(ctx); err != nil { return nil, err } - return o.series, nil } -func (o *vectorOperator) initOutputs(ctx context.Context) error { - var highCardSide []labels.Labels - var errChan = make(chan error, 1) - go func() { - var err error - highCardSide, err = o.lhs.Series(ctx) - if err != nil { - errChan <- err - } - close(errChan) - }() - - lowCardSide, err := o.rhs.Series(ctx) - if err != nil { - return err - } - if err := <-errChan; err != nil { - return err - } - - o.lhSampleIDs = highCardSide - o.rhSampleIDs = lowCardSide - - if o.matching.Card == parser.CardOneToMany { - highCardSide, lowCardSide = lowCardSide, highCardSide - } - - buf := make([]byte, 1024) - var includeLabels []string - if len(o.matching.Include) > 0 { - includeLabels = o.matching.Include - } - keepLabels := o.matching.Card != parser.CardOneToOne - keepName := !shouldDropMetricName(o.opType, o.returnBool) - highCardHashes, highCardInputMap := o.hashSeries(highCardSide, keepLabels, keepName, buf) - lowCardHashes, lowCardInputMap := o.hashSeries(lowCardSide, keepLabels, keepName, buf) - output, highCardOutputIndex, lowCardOutputIndex := o.join(highCardHashes, highCardInputMap, lowCardHashes, lowCardInputMap, includeLabels) - - series := make([]labels.Labels, len(output)) - for _, s := range output { - series[s.ID] = s.Metric - } - o.series = series - - o.outputCache = make([]outputSample, len(series)) - for i := range o.outputCache { - o.outputCache[i].lhT = -1 - } - o.pool.SetStepSize(len(highCardSide)) - - o.table = newTable( - o.pool, - o.matching.Card, - o.operation, - o.outputCache, - newHighCardIndex(highCardOutputIndex), - lowCardinalityIndex(lowCardOutputIndex), - ) - - return nil -} - func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { select { case <-ctx.Done(): @@ -182,6 +116,11 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { default: } + // some operators do not call Series of all their children + if err := o.initOnce(ctx); err != nil { + return nil, err + } + var lhs []model.StepVector var lerrChan = make(chan error, 1) go func() { @@ -209,35 +148,15 @@ func (o *vectorOperator) Next(ctx context.Context) ([]model.StepVector, error) { return nil, nil } - var err error - o.once.Do(func() { err = o.initOutputs(ctx) }) - if err != nil { - return nil, err - } - batch := o.pool.GetVectorBatch() for i, vector := range lhs { if i < len(rhs) { - step, err := o.table.execBinaryOperation(lhs[i], rhs[i], o.returnBool) - if err == nil { - batch = append(batch, step) - o.rhs.GetPool().PutStepVector(rhs[i]) - continue + step, err := o.execBinaryOperation(lhs[i], rhs[i]) + if err != nil { + return nil, err } - - var sampleID, duplicateSampleID labels.Labels - switch err.side { - case lhBinOpSide: - sampleID = o.lhSampleIDs[err.sampleID] - duplicateSampleID = o.lhSampleIDs[err.duplicateSampleID] - case rhBinOpSide: - sampleID = o.rhSampleIDs[err.sampleID] - duplicateSampleID = o.rhSampleIDs[err.duplicateSampleID] - } - group := sampleID.MatchLabels(o.matching.On, o.matching.MatchingLabels...) - msg := "found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]" + - ";many-to-many matching not allowed: matching labels must be unique on one side" - return nil, errors.Newf(msg, group, err.side, sampleID.String(), duplicateSampleID.String()) + batch = append(batch, step) + o.rhs.GetPool().PutStepVector(rhs[i]) } o.lhs.GetPool().PutStepVector(vector) } @@ -251,125 +170,397 @@ func (o *vectorOperator) GetPool() *model.VectorPool { return o.pool } -// hashSeries calculates the hash of each series from an input operator. -// Since series from the high cardinality operator can map to multiple output series, -// hashSeries returns an index from hash to a slice of resulting series, and -// a map from input series ID to output series ID. -// The latter can be used to build an array backed index from input model.Series to output model.Series, -// avoiding expensive hashmap lookups. -func (o *vectorOperator) hashSeries(series []labels.Labels, keepLabels, keepName bool, buf []byte) (map[uint64][]model.Series, map[uint64][]uint64) { - hashes := make(map[uint64][]model.Series) - inputIndex := make(map[uint64][]uint64) - for i, s := range series { - sig, lbls := signature(s, !o.matching.On, o.groupingLabels, keepLabels, keepName, buf) - if _, ok := hashes[sig]; !ok { - hashes[sig] = make([]model.Series, 0, 1) - inputIndex[sig] = make([]uint64, 0, 1) +func (o *vectorOperator) initOnce(ctx context.Context) error { + var err error + o.once.Do(func() { err = o.init(ctx) }) + return err +} + +func (o *vectorOperator) init(ctx context.Context) error { + var highCardSide []labels.Labels + var errChan = make(chan error, 1) + go func() { + var err error + highCardSide, err = o.lhs.Series(ctx) + if err != nil { + errChan <- err } - hashes[sig] = append(hashes[sig], model.Series{ - ID: uint64(i), - Metric: lbls, - }) - inputIndex[sig] = append(inputIndex[sig], uint64(i)) + close(errChan) + }() + + lowCardSide, err := o.rhs.Series(ctx) + if err != nil { + return err } + if err := <-errChan; err != nil { + return err + } + o.lhsSampleIDs = highCardSide + o.rhsSampleIDs = lowCardSide - return hashes, inputIndex + if o.matching.Card == parser.CardOneToMany { + highCardSide, lowCardSide = lowCardSide, highCardSide + } + + o.initJoinTables(highCardSide, lowCardSide) + + return nil } -// join performs a join between series from the high cardinality and low cardinality operators. -// It does that by using hash maps which point from series hash to the output series. -// It also returns array backed indices for the high cardinality and low cardinality operators, -// pointing from input model.Series ID to output model.Series ID. -// The high cardinality operator can fail to join, which is why its index contains nullable values. -// The low cardinality operator can join to multiple high cardinality series, which is why its index -// points to an array of output series. -func (o *vectorOperator) join( - highCardHashes map[uint64][]model.Series, - highCardInputIndex map[uint64][]uint64, - lowCardHashes map[uint64][]model.Series, - lowCardInputIndex map[uint64][]uint64, - includeLabels []string, -) ([]model.Series, []*uint64, [][]uint64) { - // Output index points from output series ID - // to the actual series. - outputIndex := make([]model.Series, 0) - - // Prune high cardinality series which do not have a - // matching low cardinality series. - outputSize := 0 - for hash, series := range highCardHashes { - outputSize += len(series) - if _, ok := lowCardHashes[hash]; !ok { - delete(highCardHashes, hash) - continue +func (o *vectorOperator) execBinaryOperation(lhs, rhs model.StepVector) (model.StepVector, error) { + switch o.opType { + case parser.LAND: + return o.execBinaryAnd(lhs, rhs) + case parser.LOR: + return o.execBinaryOr(lhs, rhs) + case parser.LUNLESS: + return o.execBinaryUnless(lhs, rhs) + default: + return o.execBinaryArithmetic(lhs, rhs) + } +} + +func (o *vectorOperator) execBinaryAnd(lhs, rhs model.StepVector) (model.StepVector, error) { + ts := lhs.T + step := o.pool.GetStepVector(ts) + + for _, sampleID := range rhs.SampleIDs { + jp := o.lcJoinBuckets[sampleID] + jp.sid = sampleID + jp.ats = ts + } + for i, sampleID := range lhs.SampleIDs { + if jp := o.hcJoinBuckets[sampleID]; jp.ats == ts { + sid, _ := o.joinPairToSeriesIds.Load(cantorPairing(sampleID+1, jp.sid+1)) + step.AppendSample(o.pool, sid, lhs.Samples[i]) } } - lowCardOutputSize := 0 - for _, lowCardOutputs := range lowCardInputIndex { - lowCardOutputSize += len(lowCardOutputs) + return step, nil +} + +func (o *vectorOperator) execBinaryOr(lhs, rhs model.StepVector) (model.StepVector, error) { + ts := lhs.T + step := o.pool.GetStepVector(ts) + + for i, sampleID := range lhs.SampleIDs { + jp := o.hcJoinBuckets[sampleID] + jp.ats = ts + sid, _ := o.joinPairToSeriesIds.Load(cantorPairing(sampleID+1, 0)) + step.AppendSample(o.pool, sid, lhs.Samples[i]) + } + for i, sampleID := range rhs.SampleIDs { + if jp := o.lcJoinBuckets[sampleID]; jp.ats != ts { + sid, _ := o.joinPairToSeriesIds.Load(cantorPairing(0, sampleID+1)) + step.AppendSample(o.pool, sid, rhs.Samples[i]) + } } + return step, nil +} + +func (o *vectorOperator) execBinaryUnless(lhs, rhs model.StepVector) (model.StepVector, error) { + ts := lhs.T + step := o.pool.GetStepVector(ts) - highCardOutputIndex := make([]*uint64, outputSize) - lowCardOutputIndex := make([][]uint64, lowCardOutputSize) - for hash, highCardSeries := range highCardHashes { - for _, lowCardSeriesID := range lowCardInputIndex[hash] { - // Each low cardinality series can map to multiple output series. - lowCardOutputIndex[lowCardSeriesID] = make([]uint64, 0, len(highCardSeries)) + for _, sampleID := range rhs.SampleIDs { + jp := o.lcJoinBuckets[sampleID] + jp.ats = ts + } + for i, sampleID := range lhs.SampleIDs { + if jp := o.hcJoinBuckets[sampleID]; jp.ats != ts { + sid, _ := o.joinPairToSeriesIds.Load(cantorPairing(sampleID+1, 0)) + step.AppendSample(o.pool, sid, lhs.Samples[i]) + } + } + return step, nil +} + +// TODO: add support for histogram +func (o *vectorOperator) computeBinaryPairing(hval, lval float64) (float64, bool) { + // operand is not commutative so we need to address potential swapping + if o.matching.Card == parser.CardOneToMany { + v, _, keep := vectorElemBinop(o.opType, lval, hval, nil, nil) + return v, keep + } + v, _, keep := vectorElemBinop(o.opType, hval, lval, nil, nil) + return v, keep +} + +func (o *vectorOperator) execBinaryArithmetic(lhs, rhs model.StepVector) (model.StepVector, error) { + ts := lhs.T + step := o.pool.GetStepVector(ts) + + var ( + hcs, lcs model.StepVector + ) + + switch o.matching.Card { + case parser.CardManyToOne, parser.CardOneToOne: + hcs, lcs = lhs, rhs + case parser.CardOneToMany: + hcs, lcs = rhs, lhs + default: + return step, errors.Newf("Unexpected matching cardinality: %s", o.matching.Card.String()) + } + + for i, sampleID := range lcs.SampleIDs { + jp := o.lcJoinBuckets[sampleID] + // hash collisions on the low-card-side would imply a many-to-many relation + if jp.ats == ts { + return model.StepVector{}, o.newManyToManyMatchErrorOnLowCardSide(jp.sid, sampleID) + } + jp.sid = sampleID + jp.val = lcs.Samples[i] + jp.ats = ts + } + + for i, sampleID := range hcs.SampleIDs { + jp := o.hcJoinBuckets[sampleID] + if jp.ats != ts { + continue + } + // hash collisions on the high card side are expected except if a one-to-one + // matching was requested and we have an implicite many-to-one match instead + if jp.bts == ts && o.matching.Card == parser.CardOneToOne { + return model.StepVector{}, o.newImplicitManyToOneError() + } + jp.bts = ts + + val, keep := o.computeBinaryPairing(hcs.Samples[i], jp.val) + if o.returnBool { + val = 0 + if keep { + val = 1 + } + } else if !keep { + continue } + sid, _ := o.joinPairToSeriesIds.Load(cantorPairing(sampleID+1, jp.sid+1)) + step.AppendSample(o.pool, sid, val) + } + return step, nil +} +func (o *vectorOperator) newManyToManyMatchErrorOnLowCardSide(originalSampleId, duplicateSampleId uint64) error { + side := rhBinOpSide + labels := o.rhsSampleIDs - lowCardSeries := lowCardHashes[hash][0] - for i, output := range highCardSeries { - outputSeries := buildOutputSeries(uint64(len(outputIndex)), output, lowCardSeries, includeLabels) - outputIndex = append(outputIndex, outputSeries) + if o.matching.Card == parser.CardOneToMany { + side = lhBinOpSide + labels = o.lhsSampleIDs + } + return newManyToManyMatchError(o.matching, labels[duplicateSampleId], labels[originalSampleId], side) +} - highCardSeriesID := highCardInputIndex[hash][i] - highCardOutputIndex[highCardSeriesID] = &outputSeries.ID +func (o *vectorOperator) newImplicitManyToOneError() error { + return errors.New("multiple matches for labels: many-to-one matching must be explicit (group_left/group_right)") +} + +func (o *vectorOperator) initJoinTables(highCardSide, lowCardSide []labels.Labels) { + var ( + series = make([]labels.Labels, 0) + joinPairToSeriesIds = umap.New64(0) + joinBucketsByHash = make(map[uint64]*joinBucket) + lcJoinBuckets = make([]*joinBucket, len(lowCardSide)) + hcJoinBuckets = make([]*joinBucket, len(highCardSide)) + lowCardHashToSeriesIDs = make(map[uint64][]uint64, len(lowCardSide)) + highCardHashToSeriesIDs = make(map[uint64][]uint64, len(highCardSide)) + lowCardSampleIdToSignature = make(map[int]uint64, len(lowCardSide)) + highCardSampleIdToSignature = make(map[int]uint64, len(highCardSide)) + ) + + // initialize join bucket mappings + for i := range lowCardSide { + sig := o.sigFunc(lowCardSide[i]) + lowCardSampleIdToSignature[i] = sig + lowCardHashToSeriesIDs[sig] = append(lowCardHashToSeriesIDs[sig], uint64(i)) + if jb, ok := joinBucketsByHash[sig]; ok { + lcJoinBuckets[i] = jb + } else { + jb := joinBucket{ats: -1, bts: -1} + joinBucketsByHash[sig] = &jb + lcJoinBuckets[i] = &jb + } + } + for i := range highCardSide { + sig := o.sigFunc(highCardSide[i]) + highCardSampleIdToSignature[i] = sig + highCardHashToSeriesIDs[sig] = append(highCardHashToSeriesIDs[sig], uint64(i)) + if jb, ok := joinBucketsByHash[sig]; ok { + hcJoinBuckets[i] = jb + } else { + jb := joinBucket{ats: -1, bts: -1} + joinBucketsByHash[sig] = &jb + hcJoinBuckets[i] = &jb + } + } - for _, lowCardSeriesID := range lowCardInputIndex[hash] { - lowCardOutputIndex[lowCardSeriesID] = append(lowCardOutputIndex[lowCardSeriesID], outputSeries.ID) + // initialize series + switch o.opType { + case parser.LAND: + series = highCardSide + for i := range highCardSide { + sig := highCardSampleIdToSignature[i] + lcs, ok := lowCardHashToSeriesIDs[sig] + if !ok { + continue } + for _, lc := range lcs { + joinPairToSeriesIds.Store(cantorPairing(uint64(i)+1, lc+1), uint64(i)) + } + } + case parser.LOR: + h := &joinHelper{seen: make(map[uint64]int)} + for i := range highCardSide { + joinPairToSeriesIds.Store(cantorPairing(uint64(i)+1, 0), uint64(h.append(highCardSide[i]))) + } + for i := range lowCardSide { + joinPairToSeriesIds.Store(cantorPairing(0, uint64(i)+1), uint64(h.append(lowCardSide[i]))) } + series = h.ls + case parser.LUNLESS: + series = highCardSide + for i := range highCardSide { + joinPairToSeriesIds.Store(cantorPairing(uint64(i)+1, 0), uint64(i)) + } + default: + h := &joinHelper{seen: make(map[uint64]int)} + b := labels.NewBuilder(labels.EmptyLabels()) + for i := range highCardSide { + sig := highCardSampleIdToSignature[i] + lcs, ok := lowCardHashToSeriesIDs[sig] + if !ok { + continue + } + for _, lc := range lcs { + joinPairToSeriesIds.Store(cantorPairing(uint64(i)+1, lc+1), uint64(h.append(o.resultMetric(b, highCardSide[i], lowCardSide[lc])))) + } + } + series = h.ls } + o.series = series + o.joinPairToSeriesIds = joinPairToSeriesIds + o.lcJoinBuckets = lcJoinBuckets + o.hcJoinBuckets = hcJoinBuckets +} - return outputIndex, highCardOutputIndex, lowCardOutputIndex +type joinHelper struct { + seen map[uint64]int + ls []labels.Labels + n int } -func signature(metric labels.Labels, without bool, grouping []string, keepOriginalLabels, keepName bool, buf []byte) (uint64, labels.Labels) { - buf = buf[:0] - lb := labels.NewBuilder(metric) - if !keepName { - lb = lb.Del(labels.MetricName) +func (h *joinHelper) append(ls labels.Labels) int { + hash := ls.Hash() + if n, ok := h.seen[hash]; ok { + return n } - if without { - dropLabels := grouping - if !keepName { - dropLabels = append(grouping, labels.MetricName) + h.ls = append(h.ls, ls) + h.seen[hash] = h.n + h.n++ + + return h.n - 1 +} + +func (o *vectorOperator) resultMetric(b *labels.Builder, highCard, lowCard labels.Labels) labels.Labels { + b.Reset(highCard) + + if shouldDropMetricName(o.opType, o.returnBool) { + b.Del(labels.MetricName) + } + + if o.matching.Card == parser.CardOneToOne { + if o.matching.On { + b.Keep(o.matching.MatchingLabels...) + } else { + b.Del(o.matching.MatchingLabels...) } - key, _ := metric.HashWithoutLabels(buf, dropLabels...) - if !keepOriginalLabels { - lb.Del(dropLabels...) + } + for _, ln := range o.matching.Include { + if v := lowCard.Get(ln); v != "" { + b.Set(ln, v) + } else { + b.Del(ln) } - return key, lb.Labels() } + if o.returnBool { + b.Del(labels.MetricName) + } + return b.Labels() +} - if !keepOriginalLabels { - lb.Keep(grouping...) +func signatureFunc(on bool, names ...string) func(labels.Labels) uint64 { + b := make([]byte, 256) + if on { + slices.Sort(names) + return func(lset labels.Labels) uint64 { + return xxhash.Sum64(lset.BytesWithLabels(b, names...)) + } } - if len(grouping) == 0 { - return 0, lb.Labels() + names = append([]string{labels.MetricName}, names...) + slices.Sort(names) + return func(lset labels.Labels) uint64 { + return xxhash.Sum64(lset.BytesWithoutLabels(b, names...)) } +} - key, _ := metric.HashForLabels(buf, grouping...) - return key, lb.Labels() +func cantorPairing(h, l uint64) uint64 { + return (h+l)*(h+l+1)/2 + l } -func buildOutputSeries(seriesID uint64, highCardSeries, lowCardSeries model.Series, includeLabels []string) model.Series { - metricBuilder := labels.NewBuilder(highCardSeries.Metric) - if len(includeLabels) > 0 { - labels.NewBuilder(lowCardSeries.Metric). - Keep(includeLabels...). - Labels(). - Range(func(l labels.Label) { metricBuilder.Set(l.Name, l.Value) }) +// Lifted from: https://github.com/prometheus/prometheus/blob/a38179c4e183d9b50b271167bf90050eda8ec3d1/promql/engine.go#L2430 +// TODO: call with histogram values in followup PR +func vectorElemBinop(op parser.ItemType, lhs, rhs float64, hlhs, hrhs *histogram.FloatHistogram) (float64, *histogram.FloatHistogram, bool) { + switch op { + case parser.ADD: + if hlhs != nil && hrhs != nil { + // The histogram being added must have the larger schema + // code (i.e. the higher resolution). + if hrhs.Schema >= hlhs.Schema { + return 0, hlhs.Copy().Add(hrhs).Compact(0), true + } + return 0, hrhs.Copy().Add(hlhs).Compact(0), true + } + return lhs + rhs, nil, true + case parser.SUB: + if hlhs != nil && hrhs != nil { + // The histogram being subtracted must have the larger schema + // code (i.e. the higher resolution). + if hrhs.Schema >= hlhs.Schema { + return 0, hlhs.Copy().Sub(hrhs).Compact(0), true + } + return 0, hrhs.Copy().Mul(-1).Add(hlhs).Compact(0), true + } + return lhs - rhs, nil, true + case parser.MUL: + if hlhs != nil && hrhs == nil { + return 0, hlhs.Copy().Mul(rhs), true + } + if hlhs == nil && hrhs != nil { + return 0, hrhs.Copy().Mul(lhs), true + } + return lhs * rhs, nil, true + case parser.DIV: + if hlhs != nil && hrhs == nil { + return 0, hlhs.Copy().Div(rhs), true + } + return lhs / rhs, nil, true + case parser.POW: + return math.Pow(lhs, rhs), nil, true + case parser.MOD: + return math.Mod(lhs, rhs), nil, true + case parser.EQLC: + return lhs, nil, lhs == rhs + case parser.NEQ: + return lhs, nil, lhs != rhs + case parser.GTR: + return lhs, nil, lhs > rhs + case parser.LSS: + return lhs, nil, lhs < rhs + case parser.GTE: + return lhs, nil, lhs >= rhs + case parser.LTE: + return lhs, nil, lhs <= rhs + case parser.ATAN2: + return math.Atan2(lhs, rhs), nil, true } - return model.Series{ID: seriesID, Metric: metricBuilder.Labels()} + panic(fmt.Errorf("operator %q not allowed for operations between Vectors", op)) } diff --git a/extlabels/labels.go b/extlabels/labels.go index f8be0c1fb..aec3e722a 100644 --- a/extlabels/labels.go +++ b/extlabels/labels.go @@ -13,25 +13,6 @@ var ( ErrDuplicateLabelSet = errors.New("vector cannot contain metrics with the same labelset") ) -func ContainsDuplicateLabelSet(series []labels.Labels) bool { - var ( - buf = make([]byte, 0, 256) - seen = make(map[uint64]struct{}, len(series)) - ) - - for _, s := range series { - buf = buf[:0] - - buf = s.Bytes(buf) - h := xxhash.Sum64(s.Bytes(buf)) - if _, ok := seen[h]; ok { - return true - } - seen[h] = struct{}{} - } - return false -} - func ContainsDuplicateLabelSetAfterDroppingName(series []labels.Labels) bool { var ( buf = make([]byte, 0, 256) diff --git a/go.mod b/go.mod index 95760a05d..305db8ce6 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/prometheus/common v0.44.0 github.com/prometheus/prometheus v0.46.1-0.20230818184859-4d8e380269da github.com/stretchr/testify v1.8.4 + github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 go.uber.org/goleak v1.2.1 golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1 gonum.org/v1/gonum v0.12.0 diff --git a/go.sum b/go.sum index 10c343435..290461395 100644 --- a/go.sum +++ b/go.sum @@ -359,6 +359,8 @@ github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7Jul github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39 h1:D3ltj0b2c2FgUacKrB1pWGgwrUyCESY9W8XYYQ5sqY8= +github.com/zhangyunhao116/umap v0.0.0-20221211160557-cb7705fafa39/go.mod h1:r86X1CnsDRrOeLtJlqRWdELPWpkcf933GTlojQlifQw= go.mongodb.org/mongo-driver v1.7.3/go.mod h1:NqaYOwnXWr5Pm7AOpO5QFxKJ503nbMse/R79oO62zWg= go.mongodb.org/mongo-driver v1.7.5/go.mod h1:VXEWRZ6URJIkUq2SCAyapmhH0ZLRBP+FT4xhp5Zvxng= go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8=