From 448b43c2c3b2755a15b1d4024dd9cdad7c6b60d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Tue, 17 Sep 2024 17:33:18 +0300 Subject: [PATCH] storage/prometheus: close querier at the end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are trying to reuse memory and hitting a bug where the querier is closed too soon. Prometheus closes the querier at the end of Exec(). We should do that too. Signed-off-by: Giedrius Statkevičius --- engine/engine.go | 67 ++++++++++++++------ engine/engine_test.go | 88 ++++++++++++++++++++++++--- execution/remote/operator.go | 3 +- storage/interface.go | 1 + storage/prometheus/pool.go | 10 +-- storage/prometheus/scanners.go | 14 ++++- storage/prometheus/series_selector.go | 12 +--- 7 files changed, 153 insertions(+), 42 deletions(-) diff --git a/engine/engine.go b/engine/engine.go index e3594bf5..803fd643 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -273,9 +273,14 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts } lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.logicalOptimizers) + scanners, err := e.storageScanners(q, qOpts) + if err != nil { + return nil, errors.Wrap(err, "creating storage scanners") + } + ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts) + exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, qs, ts) @@ -292,6 +297,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts warns: warns, t: InstantQuery, resultSort: resultSort, + scanners: scanners, }, nil } @@ -320,7 +326,13 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts) + + scnrs, err := e.storageScanners(q, qOpts) + if err != nil { + return nil, errors.Wrap(err, "creating storage scanners") + } + + exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts) @@ -339,6 +351,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl t: InstantQuery, // TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function. resultSort: noSortResultSort{}, + scanners: scnrs, }, nil } @@ -375,7 +388,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts) + scnrs, err := e.storageScanners(q, qOpts) + if err != nil { + return nil, errors.Wrap(err, "creating storage scanners") + } + + exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step) @@ -386,11 +404,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - plan: lplan, - warns: warns, - t: RangeQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + plan: lplan, + warns: warns, + t: RangeQuery, + scanners: scnrs, }, nil } @@ -416,9 +435,14 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, } lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.logicalOptimizers) + scnrs, err := e.storageScanners(q, qOpts) + if err != nil { + return nil, errors.Wrap(err, "creating storage scanners") + } + ctx = warnings.NewContext(ctx) defer func() { warns.Merge(warnings.FromContext(ctx)) }() - exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts) + exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts) if e.triggerFallback(err) { e.metrics.queries.WithLabelValues("true").Inc() return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step) @@ -428,11 +452,12 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, return nil, err } return &compatibilityQuery{ - Query: &Query{exec: exec, opts: opts}, - engine: e, - plan: lplan, - warns: warns, - t: RangeQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + plan: lplan, + warns: warns, + t: RangeQuery, + scanners: scnrs, }, nil } @@ -452,11 +477,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio return qOpts } -func (e *Engine) storageScanners(queryable storage.Queryable) engstorage.Scanners { +func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options) (engstorage.Scanners, error) { if e.scanners == nil { - return promstorage.NewPrometheusScanners(queryable) + return promstorage.NewPrometheusScanners(queryable, qOpts) } - return e.scanners + return e.scanners, nil } func (e *Engine) triggerFallback(err error) bool { @@ -495,6 +520,8 @@ type compatibilityQuery struct { t QueryType resultSort resultSorter cancel context.CancelFunc + + scanners engstorage.Scanners } func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) { @@ -671,7 +698,11 @@ func (q *compatibilityQuery) Stats() *stats.Statistics { return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: samples} } -func (q *compatibilityQuery) Close() { q.Cancel() } +func (q *compatibilityQuery) Close() { + if err := q.scanners.Close(); err != nil { + level.Warn(q.engine.logger).Log("msg", "error closing storage scanners, some memory might have leaked", "err", err) + } +} func (q *compatibilityQuery) String() string { return q.plan.Root().String() } diff --git a/engine/engine_test.go b/engine/engine_test.go index 16dc7699..4b096eb2 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -111,6 +111,71 @@ func TestVectorSelectorWithGaps(t *testing.T) { testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1)) } +type queryableCloseChecker struct { + closed bool + + storage.Queryable +} + +func (q *queryableCloseChecker) Querier(mint, maxt int64) (storage.Querier, error) { + qr, err := q.Queryable.Querier(mint, maxt) + if err != nil { + return nil, err + } + return &querierCloseChecker{Querier: qr, closed: &q.closed}, nil +} + +type querierCloseChecker struct { + storage.Querier + + closed *bool +} + +func (q *querierCloseChecker) Close() error { + *q.closed = true + return q.Querier.Close() +} + +// TestQuerierClosedAfterQueryClosed tests that the querier is only closed +// after the query is closed. +func TestQuerierClosedAfterQueryClosed(t *testing.T) { + t.Parallel() + opts := promql.EngineOpts{ + Timeout: 1 * time.Hour, + MaxSamples: 1e10, + EnableNegativeOffset: true, + EnableAtModifier: true, + } + + load := `load 30s + http_requests_total{pod="nginx-1", route="/"} 41.00+0.20x40 + http_requests_total{pod="nginx-2", route="/"} 51+21.71x40` + + storage := promqltest.LoadedStorage(t, load) + defer storage.Close() + + optimizers := logicalplan.AllOptimizers + newEngine := engine.New(engine.Opts{ + EngineOpts: opts, + DisableFallback: true, + LogicalOptimizers: optimizers, + // Set to 1 to make sure batching is tested. + SelectorBatchSize: 1, + }) + ctx := context.Background() + qr := &queryableCloseChecker{ + Queryable: storage, + } + q1, err := newEngine.NewInstantQuery(ctx, qr, nil, "sum(http_requests_total)", time.Unix(0, 0)) + testutil.Ok(t, err) + _ = q1.Exec(ctx) + + require.Equal(t, false, qr.closed) + q1.Close() + + require.Equal(t, true, qr.closed) +} + func TestQueriesAgainstOldEngine(t *testing.T) { t.Parallel() start := time.Unix(0, 0) @@ -2132,15 +2197,21 @@ type scannersWithWarns struct { promScanners *prometheus.Scanners } -func newScannersWithWarns(warn error) *scannersWithWarns { - return &scannersWithWarns{ - warn: warn, - promScanners: prometheus.NewPrometheusScanners(&storage.MockQueryable{ - MockQuerier: storage.NoopQuerier(), - }), +func newScannersWithWarns(warn error, qOpts *query.Options) (*scannersWithWarns, error) { + scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{ + MockQuerier: storage.NoopQuerier(), + }, qOpts) + if err != nil { + return nil, err } + return &scannersWithWarns{ + warn: warn, + promScanners: scanners, + }, nil } +func (s *scannersWithWarns) Close() error { return nil } + func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) { warnings.AddToContext(s.warn, ctx) return s.promScanners.NewVectorSelector(ctx, opts, hints, selector) @@ -2156,7 +2227,10 @@ func TestWarningsPlanCreation(t *testing.T) { opts = engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}} expectedWarn = errors.New("test warning") ) - newEngine := engine.NewWithScanners(opts, newScannersWithWarns(expectedWarn)) + + scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{}) + testutil.Ok(t, err) + newEngine := engine.NewWithScanners(opts, scnrs) q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second) testutil.Ok(t, err) diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 645b9b27..f07a0f9d 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -149,6 +149,7 @@ func (s *storageAdapter) executeQuery(ctx context.Context) { } } -func (s *storageAdapter) Close() { +func (s *storageAdapter) Close() error { s.query.Close() + return nil } diff --git a/storage/interface.go b/storage/interface.go index 4b1711e0..35f550f0 100644 --- a/storage/interface.go +++ b/storage/interface.go @@ -16,4 +16,5 @@ import ( type Scanners interface { NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error) + Close() error } diff --git a/storage/prometheus/pool.go b/storage/prometheus/pool.go index bac802fb..be28a2ca 100644 --- a/storage/prometheus/pool.go +++ b/storage/prometheus/pool.go @@ -17,20 +17,20 @@ var sep = []byte{'\xff'} type SelectorPool struct { selectors map[uint64]*seriesSelector - queryable storage.Queryable + querier storage.Querier } -func NewSelectorPool(queryable storage.Queryable) *SelectorPool { +func NewSelectorPool(querier storage.Querier) *SelectorPool { return &SelectorPool{ selectors: make(map[uint64]*seriesSelector), - queryable: queryable, + querier: querier, } } func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) SeriesSelector { key := hashMatchers(matchers, mint, maxt, hints) if _, ok := p.selectors[key]; !ok { - p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints) + p.selectors[key] = newSeriesSelector(p.querier, mint, maxt, step, matchers, hints) } return p.selectors[key] } @@ -38,7 +38,7 @@ func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Ma func (p *SelectorPool) GetFilteredSelector(mint, maxt, step int64, matchers, filters []*labels.Matcher, hints storage.SelectHints) SeriesSelector { key := hashMatchers(matchers, mint, maxt, hints) if _, ok := p.selectors[key]; !ok { - p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints) + p.selectors[key] = newSeriesSelector(p.querier, mint, maxt, step, matchers, hints) } return NewFilteredSelector(p.selectors[key], NewFilter(filters)) diff --git a/storage/prometheus/scanners.go b/storage/prometheus/scanners.go index e4c718c5..eda6e226 100644 --- a/storage/prometheus/scanners.go +++ b/storage/prometheus/scanners.go @@ -23,10 +23,20 @@ import ( type Scanners struct { selectors *SelectorPool + + querier storage.Querier +} + +func (s *Scanners) Close() error { + return s.querier.Close() } -func NewPrometheusScanners(queryable storage.Queryable) *Scanners { - return &Scanners{selectors: NewSelectorPool(queryable)} +func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options) (*Scanners, error) { + querier, err := queryable.Querier(qOpts.Start.UnixMilli(), qOpts.End.UnixMilli()) + if err != nil { + return nil, errors.Wrap(err, "create querier") + } + return &Scanners{querier: querier, selectors: NewSelectorPool(querier)}, nil } func (p Scanners) NewVectorSelector( diff --git a/storage/prometheus/series_selector.go b/storage/prometheus/series_selector.go index cffad6c8..198a3959 100644 --- a/storage/prometheus/series_selector.go +++ b/storage/prometheus/series_selector.go @@ -24,7 +24,7 @@ type SignedSeries struct { } type seriesSelector struct { - storage storage.Queryable + storage storage.Querier mint int64 maxt int64 step int64 @@ -35,7 +35,7 @@ type seriesSelector struct { series []SignedSeries } -func newSeriesSelector(storage storage.Queryable, mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) *seriesSelector { +func newSeriesSelector(storage storage.Querier, mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) *seriesSelector { return &seriesSelector{ storage: storage, maxt: maxt, @@ -61,13 +61,7 @@ func (o *seriesSelector) GetSeries(ctx context.Context, shard int, numShards int } func (o *seriesSelector) loadSeries(ctx context.Context) error { - querier, err := o.storage.Querier(o.mint, o.maxt) - if err != nil { - return err - } - defer querier.Close() - - seriesSet := querier.Select(ctx, false, &o.hints, o.matchers...) + seriesSet := o.storage.Select(ctx, false, &o.hints, o.matchers...) i := 0 for seriesSet.Next() { s := seriesSet.At()