Skip to content

Commit

Permalink
execution: rework binary operators, add set operatos
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann committed Sep 30, 2023
1 parent 9dcb22a commit 337b02b
Show file tree
Hide file tree
Showing 8 changed files with 510 additions and 397 deletions.
15 changes: 11 additions & 4 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,9 +577,6 @@ func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
if err != nil {
return newErrResult(ret, err)
}
if extlabels.ContainsDuplicateLabelSet(resultSeries) {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
}

series := make([]promql.Series, len(resultSeries))
for i := 0; i < len(resultSeries); i++ {
Expand Down Expand Up @@ -640,6 +637,9 @@ loop:
resultMatrix = append(resultMatrix, s)
}
sort.Sort(resultMatrix)
if resultMatrix.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
}
ret.Value = resultMatrix
if q.debugWriter != nil {
analyze(q.debugWriter, q.exec.(model.ObservableVectorOperator), "", "")
Expand All @@ -650,7 +650,11 @@ loop:
var result parser.Value
switch q.expr.Type() {
case parser.ValueTypeMatrix:
result = promql.Matrix(series)
matrix := promql.Matrix(series)
if matrix.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
}
result = matrix
case parser.ValueTypeVector:
// Convert matrix with one value per series into vector.
vector := make(promql.Vector, 0, len(resultSeries))
Expand All @@ -675,6 +679,9 @@ loop:
}
}
sort.Slice(vector, q.resultSort.comparer(&vector))
if vector.ContainsSameLabelset() {
return newErrResult(ret, extlabels.ErrDuplicateLabelSet)
}
result = vector
case parser.ValueTypeScalar:
v := math.NaN()
Expand Down
60 changes: 59 additions & 1 deletion engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3359,6 +3359,64 @@ func TestInstantQuery(t *testing.T) {
bar{method="get", code="404"} 1+1x30`,
query: `sum(foo) by (method) % sum(bar) by (method)`,
},
{
name: "vector binary op and 1",
load: `load 30s
foo{method="get", code="500"} 1+2x40
bar{method="get", code="404"} 1+1x30`,
query: `sum(foo) by (method) and sum(bar) by (method)`,
},
{
name: "vector binary op and 2",
load: `load 30s
foo{method="get", code="500"} 1+2x40
bar{method="get", code="404"} 1+1x30`,
query: `sum(foo) by (code) and sum(bar) by (code)`,
},
{
name: "vector binary op unless 1",
load: `load 30s
foo{method="get", code="500"} 1+2x40
bar{method="get", code="404"} 1+1x30`,
query: `sum(foo) by (method) unless sum(bar) by (method)`,
},
{
name: "vector binary op unless 2",
load: `load 30s
foo{method="get", code="500"} 1+2x40
bar{method="get", code="404"} 1+1x30`,
query: `sum(foo) by (code) unless sum(bar) by (code)`,
},
{
name: "vector binary op unless 3",
load: `load 30s
foo{method="get", code="500"} 1+2x40`,
query: `sum(foo) by (code) unless nonexistent`,
},
{
name: "vector binary op or 1",
load: `load 30s
foo{A="1"} 1+1x40
foo{A="2"} 2+2x40`,
query: `sinh(foo or exp(foo))`,
},
{
name: "vector binary op one-to-one left multiple matches",
load: `load 30s
foo{method="get", code="500"} 1
foo{method="get", code="200"} 1
bar{method="get", code="200"} 1`,
query: `foo / ignoring (code) bar`,
},
{
name: "vector binary operation with many-to-many matching rhs high card",
load: `load 30s
foo{code="200", method="get"} 1+1x20
foo{code="200", method="post"} 1+1x20
bar{code="200", method="get"} 1+1x20
bar{code="200", method="post"} 1+1x20`,
query: `foo + on(code) group_right bar`,
},
{
name: "vector binary op > scalar",
load: `load 30s
Expand Down Expand Up @@ -3774,7 +3832,7 @@ func TestInstantQuery(t *testing.T) {
if hasNaNs(oldResult) {
t.Log("Applying comparison with NaN equality.")
equalsWithNaNs(t, oldResult, newResult)
} else if oldResult.Err != nil {
} else if oldResult.Err != nil && newResult.Err != nil {
testutil.Equals(t, oldResult.Err.Error(), newResult.Err.Error())
} else {
testutil.Equals(t, oldResult, newResult)
Expand Down
35 changes: 0 additions & 35 deletions execution/binary/index.go

This file was deleted.

140 changes: 24 additions & 116 deletions execution/binary/table.go → execution/binary/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
package binary

import (
"fmt"
"math"

"github.com/prometheus/prometheus/model/histogram"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"

"github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/execution/parse"
)

Expand All @@ -22,122 +22,26 @@ const (
)

type errManyToManyMatch struct {
sampleID uint64
duplicateSampleID uint64
side binOpSide
matching *parser.VectorMatching
side binOpSide

original, duplicate labels.Labels
}

func newManyToManyMatchError(sampleID, duplicateSampleID uint64, side binOpSide) *errManyToManyMatch {
func newManyToManyMatchError(matching *parser.VectorMatching, original, duplicate labels.Labels, side binOpSide) *errManyToManyMatch {
return &errManyToManyMatch{
sampleID: sampleID,
duplicateSampleID: duplicateSampleID,
side: side,
original: original,
duplicate: duplicate,
matching: matching,
side: side,
}
}

type outputSample struct {
lhT int64
rhT int64
lhSampleID uint64
rhSampleID uint64
v float64
}

type table struct {
pool *model.VectorPool

operation operation
card parser.VectorMatchCardinality

outputValues []outputSample
// highCardOutputIndex is a mapping from series ID of the high cardinality
// operator to an output series ID.
// During joins, each high cardinality series that has a matching
// low cardinality series will map to exactly one output series.
highCardOutputIndex outputIndex
// lowCardOutputIndex is a mapping from series ID of the low cardinality
// operator to an output series ID.
// Each series from the low cardinality operator can join with many
// series of the high cardinality operator.
lowCardOutputIndex outputIndex
}

func newTable(
pool *model.VectorPool,
card parser.VectorMatchCardinality,
operation operation,
outputValues []outputSample,
highCardOutputCache outputIndex,
lowCardOutputCache outputIndex,
) *table {
for i := range outputValues {
outputValues[i].lhT = -1
outputValues[i].rhT = -1
}
return &table{
pool: pool,
card: card,

operation: operation,
outputValues: outputValues,
highCardOutputIndex: highCardOutputCache,
lowCardOutputIndex: lowCardOutputCache,
}
}

func (t *table) execBinaryOperation(lhs model.StepVector, rhs model.StepVector, returnBool bool) (model.StepVector, *errManyToManyMatch) {
ts := lhs.T
step := t.pool.GetStepVector(ts)

lhsIndex, rhsIndex := t.highCardOutputIndex, t.lowCardOutputIndex
if t.card == parser.CardOneToMany {
lhsIndex, rhsIndex = rhsIndex, lhsIndex
}

for i, sampleID := range lhs.SampleIDs {
lhsVal := lhs.Samples[i]
outputSampleIDs := lhsIndex.outputSamples(sampleID)
for _, outputSampleID := range outputSampleIDs {
if t.card != parser.CardManyToOne && t.outputValues[outputSampleID].lhT == ts {
prevSampleID := t.outputValues[outputSampleID].lhSampleID
return model.StepVector{}, newManyToManyMatchError(prevSampleID, sampleID, lhBinOpSide)
}

t.outputValues[outputSampleID].lhSampleID = sampleID
t.outputValues[outputSampleID].lhT = lhs.T
t.outputValues[outputSampleID].v = lhsVal
}
}

for i, sampleID := range rhs.SampleIDs {
rhVal := rhs.Samples[i]
outputSampleIDs := rhsIndex.outputSamples(sampleID)
for _, outputSampleID := range outputSampleIDs {
outputSample := t.outputValues[outputSampleID]
if rhs.T != outputSample.lhT {
continue
}
if t.card != parser.CardOneToMany && outputSample.rhT == rhs.T {
prevSampleID := t.outputValues[outputSampleID].rhSampleID
return model.StepVector{}, newManyToManyMatchError(prevSampleID, sampleID, rhBinOpSide)
}
t.outputValues[outputSampleID].rhSampleID = sampleID
t.outputValues[outputSampleID].rhT = rhs.T

outputVal, keep := t.operation([2]float64{outputSample.v, rhVal}, 0)
if returnBool {
outputVal = 0
if keep {
outputVal = 1
}
} else if !keep {
continue
}
step.AppendSample(t.pool, outputSampleID, outputVal)
}
}

return step, nil
func (e *errManyToManyMatch) Error() string {
group := e.original.MatchLabels(e.matching.On, e.matching.MatchingLabels...)
msg := "found duplicate series for the match group %s on the %s hand-side of the operation: [%s, %s]" +
";many-to-many matching not allowed: matching labels must be unique on one side"
return fmt.Sprintf(msg, group, e.side, e.original.String(), e.duplicate.String())
}

// operands is a length 2 array which contains lhs and rhs.
Expand Down Expand Up @@ -188,6 +92,10 @@ var vectorBinaryOperations = map[string]operation{

func newOperation(expr parser.ItemType, vectorBinOp bool) (operation, error) {
t := parser.ItemTypeStr[expr]
if expr.IsSetOperator() && vectorBinOp {
// handled in the operator
return nil, nil
}
if expr.IsComparisonOperator() && vectorBinOp {
if o, ok := vectorBinaryOperations[t]; ok {
return o, nil
Expand Down Expand Up @@ -245,10 +153,10 @@ func btof(b bool) float64 {
}

func shouldDropMetricName(op parser.ItemType, returnBool bool) bool {
switch op.String() {
case "+", "-", "*", "/", "%", "^", "atan2":
switch op {
case parser.ADD, parser.SUB, parser.MUL, parser.DIV, parser.MOD, parser.POW, parser.ATAN2:
return true
default:
return op.IsComparisonOperator() && returnBool
}

return op.IsComparisonOperator() && returnBool
}
Loading

0 comments on commit 337b02b

Please sign in to comment.