Skip to content

Commit

Permalink
Implement streaming rate buffer
Browse files Browse the repository at this point in the history
The current way we calculate rate and friends (increase and delta) is to buffer
samples for the whole windowing range and apply the rate function over
that buffer. Samples which are used in the next step are retained, and
reused for the next step.

This approach works well for short rate intervals (up to a few minutes), but it
ends up using significant memory when the rate interval is over an hour long.

This commit implements a specialized rate buffer which can calculate the rate
in a streaming manner, by keeping track of the first and last sample for each
evaluation step. In order to account for counter resets, we also store samples
that trigger the reset in a separate slice and pass them to the rate function.

The advantage of this approach is twofold:
* we can use far less memory for instant queries over long rate intervals since we
  only track the first and last sample for the evaluation step
* when using Grafana, the rate interval and step variables are set such that there
  is a very small overlap between sequential steps (e.g. rate_interval = 1m, step = 45s).
  This means that the rate buffer should only calculate the value for 2 steps at a time,
  and can reuse state from past steps for upcoming ones.

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Aug 7, 2024
1 parent 103e6f3 commit d1ed6fb
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 42 deletions.
33 changes: 29 additions & 4 deletions engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func BenchmarkRangeQuery(b *testing.B) {
cases := []struct {
name string
query string
step time.Duration
storage *teststorage.TestStorage
}{
{
Expand Down Expand Up @@ -166,6 +167,12 @@ func BenchmarkRangeQuery(b *testing.B) {
query: `rate(http_requests_total[1m])`,
storage: sixHourDataset,
},
{
name: "rate with longer window",
query: `rate(http_requests_total[10m])`,
storage: sixHourDataset,
step: 5 * time.Minute,
},
{
name: "subquery",
query: `sum_over_time(rate(http_requests_total[1m])[10m:1m])`,
Expand Down Expand Up @@ -308,6 +315,10 @@ func BenchmarkRangeQuery(b *testing.B) {
}

for _, tc := range cases {
testStep := step
if tc.step != 0 {
testStep = tc.step
}
b.Run(tc.name, func(b *testing.B) {
b.ReportAllocs()
b.Run("old_engine", func(b *testing.B) {
Expand All @@ -317,7 +328,7 @@ func BenchmarkRangeQuery(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, step)
qry, err := promEngine.NewRangeQuery(context.Background(), tc.storage, nil, tc.query, start, end, testStep)
testutil.Ok(b, err)

oldResult := qry.Exec(context.Background())
Expand All @@ -329,7 +340,7 @@ func BenchmarkRangeQuery(b *testing.B) {
b.ReportAllocs()

for i := 0; i < b.N; i++ {
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, step, opts)
newResult := executeRangeQuery(b, tc.query, tc.storage, start, end, testStep, opts)
testutil.Ok(b, newResult.Err)
}
})
Expand All @@ -352,6 +363,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
cases := []struct {
name string
query string
step time.Duration
}{
{
name: "selector",
Expand All @@ -365,6 +377,11 @@ func BenchmarkNativeHistograms(b *testing.B) {
name: "rate",
query: `rate(native_histogram_series[1m])`,
},
{
name: "rate with longer window",
query: `rate(native_histogram_series[10m])`,
step: 5 * time.Minute,
},
{
name: "sum rate",
query: `sum(rate(native_histogram_series[1m]))`,
Expand Down Expand Up @@ -405,13 +422,17 @@ func BenchmarkNativeHistograms(b *testing.B) {
}
for _, tc := range cases {
b.Run(tc.name, func(b *testing.B) {
testStep := step
if tc.step != 0 {
testStep = tc.step
}
b.Run("old_engine", func(b *testing.B) {
engine := promql.NewEngine(opts)

b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, step)
qry, err := engine.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, testStep)
testutil.Ok(b, err)

oldResult := qry.Exec(context.Background())
Expand All @@ -427,7 +448,7 @@ func BenchmarkNativeHistograms(b *testing.B) {
EngineOpts: opts,
})

qry, err := ng.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, step)
qry, err := ng.NewRangeQuery(context.Background(), storage, nil, tc.query, start, end, testStep)
testutil.Ok(b, err)

newResult := qry.Exec(context.Background())
Expand Down Expand Up @@ -484,6 +505,10 @@ func BenchmarkInstantQuery(b *testing.B) {
name: "rate",
query: `rate(http_requests_total[1m])`,
},
{
name: "rate with long window",
query: `rate(http_requests_total[1h])`,
},
{
name: "sum rate",
query: `sum(rate(http_requests_total[1m]))`,
Expand Down
13 changes: 5 additions & 8 deletions execution/scan/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type subqueryOperator struct {
currentStep int64
step int64
stepsBatch int
opts *query.Options

funcExpr *logicalplan.FunctionCall
subQuery *logicalplan.Subquery
Expand All @@ -41,7 +42,7 @@ type subqueryOperator struct {

lastVectors []model.StepVector
lastCollected int
buffers []*ringbuffer.RingBuffer
buffers []*ringbuffer.GenericRingBuffer

// params holds the function parameter for each step.
params []float64
Expand All @@ -64,6 +65,7 @@ func NewSubqueryOperator(pool *model.VectorPool, next, paramOp model.VectorOpera
pool: pool,
funcExpr: funcExpr,
subQuery: subQuery,
opts: opts,
mint: opts.Start.UnixMilli(),
maxt: opts.End.UnixMilli(),
currentStep: opts.Start.UnixMilli(),
Expand Down Expand Up @@ -229,14 +231,9 @@ func (o *subqueryOperator) initSeries(ctx context.Context) error {
}

o.series = make([]labels.Labels, len(series))
o.buffers = make([]*ringbuffer.RingBuffer, len(series))
o.buffers = make([]*ringbuffer.GenericRingBuffer, len(series))
for i := range o.buffers {
o.buffers[i] = ringbuffer.New(
8,
o.subQuery.Range.Milliseconds(),
o.subQuery.Offset.Milliseconds(),
o.call,
)
o.buffers[i] = ringbuffer.New(8, o.subQuery.Range.Milliseconds(), o.subQuery.Offset.Milliseconds(), o.call)
}
var b labels.ScratchBuilder
for i, s := range series {
Expand Down
12 changes: 6 additions & 6 deletions ringbuffer/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/thanos-io/promql-engine/execution/parse"
)

type SamplesBuffer RingBuffer
type SamplesBuffer GenericRingBuffer

type FunctionArgs struct {
Samples []Sample
Expand Down Expand Up @@ -166,7 +166,7 @@ var rangeVectorFuncs = map[string]FunctionCall{
if len(f.Samples) < 2 {
return 0., nil, false, nil
}
v, h, err := extrapolatedRate(f.Samples, true, true, f.StepTime, f.SelectRange, f.Offset)
v, h, err := extrapolatedRate(f.Samples, len(f.Samples), true, true, f.StepTime, f.SelectRange, f.Offset)
if err != nil {
return 0, nil, false, err
}
Expand All @@ -176,7 +176,7 @@ var rangeVectorFuncs = map[string]FunctionCall{
if len(f.Samples) < 2 {
return 0., nil, false, nil
}
v, h, err := extrapolatedRate(f.Samples, false, false, f.StepTime, f.SelectRange, f.Offset)
v, h, err := extrapolatedRate(f.Samples, len(f.Samples), false, false, f.StepTime, f.SelectRange, f.Offset)
if err != nil {
return 0, nil, false, err
}
Expand All @@ -186,7 +186,7 @@ var rangeVectorFuncs = map[string]FunctionCall{
if len(f.Samples) < 2 {
return 0., nil, false, nil
}
v, h, err := extrapolatedRate(f.Samples, true, false, f.StepTime, f.SelectRange, f.Offset)
v, h, err := extrapolatedRate(f.Samples, len(f.Samples), true, false, f.StepTime, f.SelectRange, f.Offset)
if err != nil {
return 0, nil, false, err
}
Expand Down Expand Up @@ -252,7 +252,7 @@ func NewRangeVectorFunc(name string) (FunctionCall, error) {
// It calculates the rate (allowing for counter resets if isCounter is true),
// extrapolates if the first/last sample is close to the boundary, and returns
// the result as either per-second (if isRate is true) or overall.
func extrapolatedRate(samples []Sample, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (float64, *histogram.FloatHistogram, error) {
func extrapolatedRate(samples []Sample, numSamples int, isCounter, isRate bool, stepTime int64, selectRange int64, offset int64) (float64, *histogram.FloatHistogram, error) {
var (
rangeStart = stepTime - (selectRange + offset)
rangeEnd = stepTime - offset
Expand Down Expand Up @@ -284,7 +284,7 @@ func extrapolatedRate(samples []Sample, isCounter, isRate bool, stepTime int64,
durationToEnd := float64(rangeEnd-samples[len(samples)-1].T) / 1000

sampledInterval := float64(samples[len(samples)-1].T-samples[0].T) / 1000
averageDurationBetweenSamples := sampledInterval / float64(len(samples)-1)
averageDurationBetweenSamples := sampledInterval / float64(numSamples-1)

// If the first/last samples are close to the boundaries of the range,
// extrapolate the result. This is as we expect that another sample
Expand Down
25 changes: 11 additions & 14 deletions ringbuffer/ringbuffer.go → ringbuffer/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type Sample struct {
V Value
}

type RingBuffer struct {
type GenericRingBuffer struct {
items []Sample
tail []Sample

Expand All @@ -30,40 +30,38 @@ type RingBuffer struct {
call FunctionCall
}

func New(size int, selectRange, offset int64, call FunctionCall) *RingBuffer {
func New(size int, selectRange, offset int64, call FunctionCall) *GenericRingBuffer {
return NewWithExtLookback(size, selectRange, offset, 0, call)
}

func NewWithExtLookback(size int, selectRange, offset int64, lookback int64, call FunctionCall) *RingBuffer {
return &RingBuffer{
func NewWithExtLookback(size int, selectRange, offset, extLookback int64, call FunctionCall) *GenericRingBuffer {
return &GenericRingBuffer{
items: make([]Sample, 0, size),
selectRange: selectRange,
offset: offset,
extLookback: lookback,
extLookback: extLookback,
call: call,
}
}

func (r *RingBuffer) Len() int {
return len(r.items)
}
func (r *GenericRingBuffer) Len() int { return len(r.items) }

// MaxT returns the maximum timestamp of the ring buffer.
// If the ring buffer is empty, it returns math.MinInt64.
func (r *RingBuffer) MaxT() int64 {
func (r *GenericRingBuffer) MaxT() int64 {
if len(r.items) == 0 {
return math.MinInt64
}
return r.items[len(r.items)-1].T
}

// ReadIntoLast reads a sample into the last slot in the buffer, replacing the existing sample.
func (r *RingBuffer) ReadIntoLast(f func(*Sample)) {
func (r *GenericRingBuffer) ReadIntoLast(f func(*Sample)) {
f(&r.items[len(r.items)-1])
}

// Push adds a new sample to the buffer.
func (r *RingBuffer) Push(t int64, v Value) {
func (r *GenericRingBuffer) Push(t int64, v Value) {
n := len(r.items)
if n < cap(r.items) {
r.items = r.items[:n+1]
Expand All @@ -84,9 +82,8 @@ func (r *RingBuffer) Push(t int64, v Value) {
}
}

func (r *RingBuffer) Reset(mint int64, evalt int64) {
func (r *GenericRingBuffer) Reset(mint int64, evalt int64) {
r.currentStep = evalt

if len(r.items) == 0 || r.items[len(r.items)-1].T < mint {
r.items = r.items[:0]
return
Expand All @@ -106,7 +103,7 @@ func (r *RingBuffer) Reset(mint int64, evalt int64) {
r.items = r.items[:keep]
}

func (r *RingBuffer) Eval(scalarArg float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) {
func (r *GenericRingBuffer) Eval(scalarArg float64, metricAppearedTs *int64) (float64, *histogram.FloatHistogram, bool, error) {
return r.call(FunctionArgs{
Samples: r.items,
StepTime: r.currentStep,
Expand Down
Loading

0 comments on commit d1ed6fb

Please sign in to comment.