diff --git a/engine/bench_test.go b/engine/bench_test.go index 7c7e6a11..00cd3332 100644 --- a/engine/bench_test.go +++ b/engine/bench_test.go @@ -134,6 +134,7 @@ func BenchmarkRangeQuery(b *testing.B) { cases := []struct { name string query string + step time.Duration storage *teststorage.TestStorage }{ { @@ -166,6 +167,12 @@ func BenchmarkRangeQuery(b *testing.B) { query: `rate(http_requests_total[1m])`, storage: sixHourDataset, }, + { + name: "rate with longer window", + query: `rate(http_requests_total[10m])`, + storage: sixHourDataset, + step: 5 * time.Minute, + }, { name: "subquery", query: `sum_over_time(rate(http_requests_total[1m])[10m:1m])`, @@ -308,6 +315,10 @@ func BenchmarkRangeQuery(b *testing.B) { } for _, tc := range cases { + testStep := step + if tc.step != 0 { + testStep = tc.step + } b.Run(tc.name, func(b *testing.B) { b.ReportAllocs() b.Run("old_engine", func(b *testing.B) { @@ -317,7 +328,7 @@ func BenchmarkRangeQuery(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step) + qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, testStep) testutil.Ok(b, err) oldResult := qry.Exec(context.Background()) @@ -329,7 +340,7 @@ func BenchmarkRangeQuery(b *testing.B) { b.ReportAllocs() for i := 0; i < b.N; i++ { - newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step, opts) + newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, testStep, opts) testutil.Ok(b, newResult.Err) } }) @@ -352,6 +363,7 @@ func BenchmarkNativeHistograms(b *testing.B) { cases := []struct { name string query string + step time.Duration }{ { name: "selector", @@ -365,6 +377,11 @@ func BenchmarkNativeHistograms(b *testing.B) { name: "rate", query: `rate(native_histogram_series[1m])`, }, + { + name: "rate with longer window", + query: `rate(native_histogram_series[10m])`, + step: 5 * time.Minute, + }, { name: "sum rate", query: `sum(rate(native_histogram_series[1m]))`, @@ -405,13 +422,17 @@ func BenchmarkNativeHistograms(b *testing.B) { } for _, tc := range cases { b.Run(tc.name, func(b *testing.B) { + testStep := step + if tc.step != 0 { + testStep = tc.step + } b.Run("old_engine", func(b *testing.B) { engine := promql.NewEngine(opts) b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, step) + qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, testStep) testutil.Ok(b, err) oldResult := qry.Exec(context.Background()) @@ -427,7 +448,7 @@ func BenchmarkNativeHistograms(b *testing.B) { EngineOpts: opts, }) - qry, err := ng.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, step) + qry, err := ng.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, testStep) testutil.Ok(b, err) newResult := qry.Exec(context.Background()) @@ -484,6 +505,10 @@ func BenchmarkInstantQuery(b *testing.B) { name: "rate", query: `rate(http_requests_total[1m])`, }, + { + name: "rate with long window", + query: `rate(http_requests_total[1h])`, + }, { name: "sum rate", query: `sum(rate(http_requests_total[1m]))`, diff --git a/execution/scan/subquery.go b/execution/scan/subquery.go index 5867fb36..53f4c7b2 100644 --- a/execution/scan/subquery.go +++ b/execution/scan/subquery.go @@ -32,6 +32,7 @@ type subqueryOperator struct { currentStep int64 step int64 stepsBatch int + opts *query.Options funcExpr *logicalplan.FunctionCall subQuery *logicalplan.Subquery @@ -41,7 +42,7 @@ type subqueryOperator struct { lastVectors []model.StepVector lastCollected int - buffers []*ringbuffer.RingBuffer + buffers []*ringbuffer.GenericRingBuffer // params holds the function parameter for each step. params []float64 @@ -64,6 +65,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera pool: pool, funcExpr: funcExpr, subQuery: subQuery, + opts: opts, mint: opts.Start.UnixMilli(), maxt: opts.End.UnixMilli(), currentStep: opts.Start.UnixMilli(), @@ -229,14 +231,9 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error { } o.series = make([]labels.Labels, len(series)) - o.buffers = make([]*ringbuffer.RingBuffer, len(series)) + o.buffers = make([]*ringbuffer.GenericRingBuffer, len(series)) for i := range o.buffers { - o.buffers[i] = ringbuffer.New( - 8, - o.subQuery.Range.Milliseconds(), - o.subQuery.Offset.Milliseconds(), - o.call, - ) + o.buffers[i] = ringbuffer.New(8, o.subQuery.Range.Milliseconds(), o.subQuery.Offset.Milliseconds(), o.call) } var b labels.ScratchBuilder for i, s := range series { diff --git a/ringbuffer/functions.go b/ringbuffer/functions.go index 775d7382..089ffe2a 100644 --- a/ringbuffer/functions.go +++ b/ringbuffer/functions.go @@ -12,7 +12,7 @@ import ( "github.com/thanos-io/promql-engine/execution/parse" ) -type SamplesBuffer RingBuffer +type SamplesBuffer GenericRingBuffer type FunctionArgs struct { Samples []Sample @@ -166,7 +166,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) < 2 { return 0., nil, false, nil } - v, h, err := extrapolatedRate(f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset) + v, h, err := extrapolatedRate(f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset) if err != nil { return 0, nil, false, err } @@ -176,7 +176,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) < 2 { return 0., nil, false, nil } - v, h, err := extrapolatedRate(f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset) + v, h, err := extrapolatedRate(f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset) if err != nil { return 0, nil, false, err } @@ -186,7 +186,7 @@ var rangeVectorFuncs = map[string]FunctionCall{ if len(f.Samples) < 2 { return 0., nil, false, nil } - v, h, err := extrapolatedRate(f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset) + v, h, err := extrapolatedRate(f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset) if err != nil { return 0, nil, false, err } @@ -252,7 +252,7 @@ func NewRangeVectorFunc(name string) (FunctionCall, error) { // It calculates the rate (allowing for counter resets if isCounter is true), // extrapolates if the first/last sample is close to the boundary, and returns // the result as either per-second (if isRate is true) or overall. -func extrapolatedRate(samples []Sample, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (float64, *histogram.FloatHistogram, error) { +func extrapolatedRate(samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (float64, *histogram.FloatHistogram, error) { var ( rangeStart = stepTime - (selectRange + offset) rangeEnd = stepTime - offset @@ -284,7 +284,7 @@ func extrapolatedRate(samples []Sample, isCounter, isRate bool, stepTime int64, durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000 sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000 - averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1) + averageDurationBetweenSamples := sampledInterval / float64(numSamples-1) // If the first/last samples are close to the boundaries of the range, // extrapolate the result. This is as we expect that another sample diff --git a/ringbuffer/ringbuffer.go b/ringbuffer/generic.go similarity index 77% rename from ringbuffer/ringbuffer.go rename to ringbuffer/generic.go index ed05d0d9..81a4dc59 100644 --- a/ringbuffer/ringbuffer.go +++ b/ringbuffer/generic.go @@ -19,7 +19,7 @@ type Sample struct { V Value } -type RingBuffer struct { +type GenericRingBuffer struct { items []Sample tail []Sample @@ -30,27 +30,25 @@ type RingBuffer struct { call FunctionCall } -func New(size int, selectRange, offset int64, call FunctionCall) *RingBuffer { +func New(size int, selectRange, offset int64, call FunctionCall) *GenericRingBuffer { return NewWithExtLookback(size, selectRange, offset, 0, call) } -func NewWithExtLookback(size int, selectRange, offset int64, lookback int64, call FunctionCall) *RingBuffer { - return &RingBuffer{ +func NewWithExtLookback(size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer { + return &GenericRingBuffer{ items: make([]Sample, 0, size), selectRange: selectRange, offset: offset, - extLookback: lookback, + extLookback: extLookback, call: call, } } -func (r *RingBuffer) Len() int { - return len(r.items) -} +func (r *GenericRingBuffer) Len() int { return len(r.items) } // MaxT returns the maximum timestamp of the ring buffer. // If the ring buffer is empty, it returns math.MinInt64. -func (r *RingBuffer) MaxT() int64 { +func (r *GenericRingBuffer) MaxT() int64 { if len(r.items) == 0 { return math.MinInt64 } @@ -58,12 +56,12 @@ func (r *RingBuffer) MaxT() int64 { } // ReadIntoLast reads a sample into the last slot in the buffer, replacing the existing sample. -func (r *RingBuffer) ReadIntoLast(f func(*Sample)) { +func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) { f(&r.items[len(r.items)-1]) } // Push adds a new sample to the buffer. -func (r *RingBuffer) Push(t int64, v Value) { +func (r *GenericRingBuffer) Push(t int64, v Value) { n := len(r.items) if n < cap(r.items) { r.items = r.items[:n+1] @@ -84,9 +82,8 @@ func (r *RingBuffer) Push(t int64, v Value) { } } -func (r *RingBuffer) Reset(mint int64, evalt int64) { +func (r *GenericRingBuffer) Reset(mint int64, evalt int64) { r.currentStep = evalt - if len(r.items) == 0 || r.items[len(r.items)-1].T < mint { r.items = r.items[:0] return @@ -106,7 +103,7 @@ func (r *RingBuffer) Reset(mint int64, evalt int64) { r.items = r.items[:keep] } -func (r *RingBuffer) Eval(scalarArg float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) { +func (r *GenericRingBuffer) Eval(scalarArg float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) { return r.call(FunctionArgs{ Samples: r.items, StepTime: r.currentStep, diff --git a/ringbuffer/rate.go b/ringbuffer/rate.go new file mode 100644 index 00000000..8df86953 --- /dev/null +++ b/ringbuffer/rate.go @@ -0,0 +1,192 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package ringbuffer + +import ( + "math" + + "github.com/prometheus/prometheus/model/histogram" + + "github.com/thanos-io/promql-engine/query" +) + +type Buffer interface { + Len() int + MaxT() int64 + Push(t int64, v Value) + Reset(mint int64, evalt int64) + Eval(_ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) + ReadIntoLast(f func(*Sample)) +} + +// RateBuffer is a Buffer which can calculate rate, increase and delta for a +// series in a streaming manner, calculating the value incrementally for each +// step where the sample is used. +type RateBuffer struct { + // stepRanges contain the bounds and number of samples for each evaluation step. + stepRanges []stepRange + // firstSamples contains the first sample for each evaluation step. + firstSamples []Sample + // resets contains all samples which are detected as a counter reset. + resets []Sample + // rateBuffer is the buffer passed to the rate function. This is a scratch buffer + // used to avoid allocating a new slice each time we need to calculate the rate. + rateBuffer []Sample + // last is the last sample in the current evaluation step. + last Sample + + selectRange int64 + step int64 + offset int64 + isCounter bool + isRate bool + + evalTs int64 +} + +type stepRange struct { + mint int64 + maxt int64 + numSamples int +} + +// NewRateBuffer creates a new RateBuffer. +func NewRateBuffer(opts query.Options, isCounter, isRate bool, selectRange, offset int64) *RateBuffer { + var ( + step = max(1, opts.Step.Milliseconds()) + numSteps = min( + selectRange/step+1, + querySteps(opts), + ) + + current = opts.Start.UnixMilli() + firstSamples = make([]Sample, 0, numSteps) + stepRanges = make([]stepRange, 0, numSteps) + ) + for i := 0; i < int(numSteps); i++ { + var ( + maxt = current - offset + mint = maxt - selectRange + ) + stepRanges = append(stepRanges, stepRange{mint: mint, maxt: maxt}) + firstSamples = append(firstSamples, Sample{T: math.MaxInt64}) + current += step + } + + return &RateBuffer{ + isCounter: isCounter, + isRate: isRate, + selectRange: selectRange, + step: step, + offset: offset, + stepRanges: stepRanges, + firstSamples: firstSamples, + last: Sample{T: math.MinInt64}, + } +} + +func (r *RateBuffer) Len() int { return r.stepRanges[0].numSamples } + +func (r *RateBuffer) MaxT() int64 { return r.last.T } + +func (r *RateBuffer) Push(t int64, v Value) { + // Detect resets and store the current and previous sample so that + // the rate is properly adjusted. + if v.H != nil && r.last.V.H != nil { + if v.H.DetectReset(r.last.V.H) { + r.resets = append(r.resets, Sample{ + T: r.last.T, + V: Value{H: r.last.V.H.Copy()}, + }) + r.resets = append(r.resets, Sample{ + T: t, + V: Value{H: v.H.Copy()}, + }) + } + } else if r.last.V.F > v.F { + r.resets = append(r.resets, Sample{T: r.last.T, V: Value{F: r.last.V.F}}) + r.resets = append(r.resets, Sample{T: t, V: Value{F: v.F}}) + } + + // Set the last sample for the current evaluation step. + r.last.T, r.last.V.F = t, v.F + if v.H != nil { + if r.last.V.H == nil { + r.last.V.H = v.H.Copy() + } else { + v.H.CopyTo(r.last.V.H) + } + } else { + r.last.V.H = nil + } + + // Set the first sample for each evaluation step where the currently read sample is used. + for i := 0; i < len(r.stepRanges) && t >= r.stepRanges[i].mint && t <= r.stepRanges[i].maxt; i++ { + r.stepRanges[i].numSamples++ + sample := &r.firstSamples[i] + if t >= sample.T { + continue + } + sample.T, sample.V.F = t, v.F + if v.H != nil { + if sample.V.H == nil { + sample.V.H = v.H.Copy() + } else { + v.H.CopyTo(sample.V.H) + } + } else { + sample.V.H = nil + } + } +} + +func (r *RateBuffer) Reset(mint int64, evalt int64) { + r.evalTs = evalt + if r.stepRanges[0].mint == mint { + return + } + dropResets := 0 + for ; dropResets < len(r.resets) && r.resets[dropResets].T < mint; dropResets++ { + } + r.resets = r.resets[dropResets:] + + last := len(r.stepRanges) - 1 + var ( + nextMint = r.stepRanges[last].mint + r.step + nextMaxt = r.stepRanges[last].maxt + r.step + ) + copy(r.stepRanges, r.stepRanges[1:]) + r.stepRanges[last] = stepRange{mint: nextMint, maxt: nextMaxt} + + nextSample := r.firstSamples[0] + copy(r.firstSamples, r.firstSamples[1:]) + r.firstSamples[last] = nextSample + r.firstSamples[last].T = math.MaxInt64 +} + +func (r *RateBuffer) Eval(_ float64, _ *int64) (float64, *histogram.FloatHistogram, bool, error) { + if r.firstSamples[0].T == math.MaxInt64 || r.firstSamples[0].T == r.last.T { + return 0, nil, false, nil + } + + r.rateBuffer = append(append( + append(r.rateBuffer[:0], r.firstSamples[0]), + r.resets...), + r.last, + ) + numSamples := r.stepRanges[0].numSamples + f, h, err := extrapolatedRate(r.rateBuffer, numSamples, r.isCounter, r.isRate, r.evalTs, r.selectRange, r.offset) + return f, h, true, err +} + +func (r *RateBuffer) ReadIntoLast(func(*Sample)) {} + +func querySteps(o query.Options) int64 { + // Instant evaluation is executed as a range evaluation with one step. + if o.Step.Milliseconds() == 0 { + return 1 + } + + return (o.End.UnixMilli()-o.Start.UnixMilli())/o.Step.Milliseconds() + 1 +} diff --git a/storage/prometheus/matrix_selector.go b/storage/prometheus/matrix_selector.go index 63f6c6e5..c86b73c7 100644 --- a/storage/prometheus/matrix_selector.go +++ b/storage/prometheus/matrix_selector.go @@ -16,8 +16,8 @@ import ( "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/thanos-io/promql-engine/execution/function" "github.com/thanos-io/promql-engine/execution/model" + "github.com/thanos-io/promql-engine/execution/parse" "github.com/thanos-io/promql-engine/extlabels" "github.com/thanos-io/promql-engine/query" "github.com/thanos-io/promql-engine/ringbuffer" @@ -27,7 +27,7 @@ type matrixScanner struct { labels labels.Labels signature uint64 - buffer *ringbuffer.RingBuffer + buffer ringbuffer.Buffer iterator chunkenc.Iterator lastSample ringbuffer.Sample metricAppearedTs *int64 @@ -46,6 +46,7 @@ type matrixSelector struct { functionName string call ringbuffer.FunctionCall fhReader *histogram.FloatHistogram + opts *query.Options numSteps int mint int64 @@ -91,11 +92,12 @@ func NewMatrixSelector( scalarArg: arg, fhReader: &histogram.FloatHistogram{}, + opts: opts, numSteps: opts.NumSteps(), mint: opts.Start.UnixMilli(), maxt: opts.End.UnixMilli(), step: opts.Step.Milliseconds(), - isExtFunction: function.IsExtFunction(functionName), + isExtFunction: parse.IsExtFunction(functionName), selectRange: selectRange.Milliseconds(), offset: offset.Milliseconds(), @@ -225,18 +227,12 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { // is reused between Select() calls? lbls, _ = extlabels.DropMetricName(lbls, b) } - var buf *ringbuffer.RingBuffer - if o.isExtFunction { - buf = ringbuffer.NewWithExtLookback(8, o.selectRange, o.offset, o.extLookbackDelta, o.call) - } else { - buf = ringbuffer.New(8, o.selectRange, o.offset, o.call) - } o.scanners[i] = matrixScanner{ labels: lbls, signature: s.Signature, iterator: s.Iterator(nil), lastSample: ringbuffer.Sample{T: math.MinInt64}, - buffer: buf, + buffer: o.newBuffer(), } o.series[i] = lbls } @@ -249,6 +245,23 @@ func (o *matrixSelector) loadSeries(ctx context.Context) error { return err } +func (o *matrixSelector) newBuffer() ringbuffer.Buffer { + switch o.functionName { + case "rate": + return ringbuffer.NewRateBuffer(*o.opts, true, true, o.selectRange, o.offset) + case "increase": + return ringbuffer.NewRateBuffer(*o.opts, true, false, o.selectRange, o.offset) + case "delta": + return ringbuffer.NewRateBuffer(*o.opts, false, false, o.selectRange, o.offset) + } + + if o.isExtFunction { + return ringbuffer.NewWithExtLookback(8, o.selectRange, o.offset, o.opts.ExtLookbackDelta.Milliseconds(), o.call) + } + return ringbuffer.New(8, o.selectRange, o.offset, o.call) + +} + func (o *matrixSelector) String() string { r := time.Duration(o.selectRange) * time.Millisecond if o.call != nil { @@ -276,6 +289,9 @@ func (m *matrixScanner) selectPoints( return nil } + if bufMaxt := m.buffer.MaxT() + 1; bufMaxt > mint { + mint = bufMaxt + } mint = maxInt64(mint, m.buffer.MaxT()+1) if m.lastSample.T >= mint { m.buffer.Push(m.lastSample.T, m.lastSample.V)