Skip to content

Commit

Permalink
MQE: fix issues with vector/vector binary comparsion operations (#10235)
Browse files Browse the repository at this point in the history
* Add early filtering test case for group_right

* Fix issue where comparison operations without the bool modifier would return incorrect results if the left side contained series with different metric names

* Avoid expensive `labels.Labels.String()` call

* Fix issue where comparison operations between two vectors incorrectly fail with a conflict if multiple left series match the same right series and only one left point remains after applying the comparison

* Add comparison operator benchmark

* Improve performance for case where left side is smaller than right

* Extract some methods
  • Loading branch information
charleskorn authored Jan 10, 2025
1 parent e7d8d36 commit 56c9edd
Show file tree
Hide file tree
Showing 7 changed files with 318 additions and 197 deletions.
3 changes: 3 additions & 0 deletions pkg/streamingpromql/benchmarks/benchmarks.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ func TestCases(metricSizes []int) []BenchCase {
{
Expr: "nh_X / a_X",
},
{
Expr: "a_X == b_X",
},
{
Expr: "2 * a_X",
},
Expand Down
39 changes: 39 additions & 0 deletions pkg/streamingpromql/operators/binops/binary_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ package binops
import (
"fmt"
"slices"
"time"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
Expand Down Expand Up @@ -82,6 +84,43 @@ func groupLabelsFunc(vectorMatching parser.VectorMatching, op parser.ItemType, r
}
}

func formatConflictError(
firstConflictingSeriesIndex int,
secondConflictingSeriesIndex int,
description string,
ts int64,
sourceSeriesMetadata []types.SeriesMetadata,
side string,
vectorMatching parser.VectorMatching,
op parser.ItemType,
returnBool bool,
) error {
firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels
groupLabels := groupLabelsFunc(vectorMatching, op, returnBool)(firstConflictingSeriesLabels)

if secondConflictingSeriesIndex == -1 {
return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
)
}

secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels

return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
firstConflictingSeriesLabels,
secondConflictingSeriesLabels,
)
}

// filterSeries returns data filtered based on the mask provided.
//
// mask is expected to contain one value for each time step in the query time range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ import (
"fmt"
"slices"
"sort"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/promql/parser/posrange"
"github.com/prometheus/prometheus/util/annotations"
Expand Down Expand Up @@ -618,13 +616,13 @@ func (g *GroupedVectorVectorBinaryOperation) updateOneSidePresence(side *oneSide

for _, p := range seriesData.Floats {
if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 {
return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness())
return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
}
}

for _, p := range seriesData.Histograms {
if otherSeriesIdx := matchGroup.updatePresence(g.timeRange.PointIndex(p.T), seriesIdx); otherSeriesIdx != -1 {
return g.formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness())
return formatConflictError(otherSeriesIdx, seriesIdx, "duplicate series", p.T, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
}
}
}
Expand All @@ -646,7 +644,8 @@ func (g *GroupedVectorVectorBinaryOperation) mergeOneSide(data []types.InstantVe
}

if conflict != nil {
return types.InstantVectorSeriesData{}, g.formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness())
err := formatConflictError(conflict.FirstConflictingSeriesIndex, conflict.SecondConflictingSeriesIndex, conflict.Description, conflict.Timestamp, g.oneSideMetadata, g.oneSideHandedness(), g.VectorMatching, g.Op, g.ReturnBool)
return types.InstantVectorSeriesData{}, err
}

return merged, nil
Expand Down Expand Up @@ -689,40 +688,6 @@ func (g *GroupedVectorVectorBinaryOperation) mergeManySide(data []types.InstantV
return merged, nil
}

func (g *GroupedVectorVectorBinaryOperation) formatConflictError(
firstConflictingSeriesIndex int,
secondConflictingSeriesIndex int,
description string,
ts int64,
sourceSeriesMetadata []types.SeriesMetadata,
side string,
) error {
firstConflictingSeriesLabels := sourceSeriesMetadata[firstConflictingSeriesIndex].Labels
groupLabels := groupLabelsFunc(g.VectorMatching, g.Op, g.ReturnBool)(firstConflictingSeriesLabels)

if secondConflictingSeriesIndex == -1 {
return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
)
}

secondConflictingSeriesLabels := sourceSeriesMetadata[secondConflictingSeriesIndex].Labels

return fmt.Errorf(
"found %s for the match group %s on the %s side of the operation at timestamp %s: %s and %s",
description,
groupLabels,
side,
timestamp.Time(ts).Format(time.RFC3339Nano),
firstConflictingSeriesLabels,
secondConflictingSeriesLabels,
)
}

func (g *GroupedVectorVectorBinaryOperation) oneSideHandedness() string {
switch g.VectorMatching.Card {
case parser.CardOneToMany:
Expand Down
Loading

0 comments on commit 56c9edd

Please sign in to comment.