Skip to content

Commit

Permalink
storage/prometheus: close querier at the end
Browse files Browse the repository at this point in the history
We are trying to reuse memory and hitting a bug where the querier is
closed too soon. Prometheus closes the querier at the end of Exec(). We
should do that too.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Sep 17, 2024
1 parent de6d9f8 commit 448b43c
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 42 deletions.
67 changes: 49 additions & 18 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,14 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
}
lplan, warns := logicalplan.NewFromAST(expr, qOpts, planOpts).Optimize(e.logicalOptimizers)

scanners, err := e.storageScanners(q, qOpts)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
exec, err := execution.New(ctx, lplan.Root(), scanners, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, qs, ts)
Expand All @@ -292,6 +297,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
warns: warns,
t: InstantQuery,
resultSort: resultSort,
scanners: scanners,
}, nil
}

Expand Down Expand Up @@ -320,7 +326,13 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)

scnrs, err := e.storageScanners(q, qOpts)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewInstantQuery(ctx, q, opts, root.String(), ts)
Expand All @@ -339,6 +351,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
t: InstantQuery,
// TODO(fpetkovski): Infer the sort order from the plan, ideally without copying the newResultSort function.
resultSort: noSortResultSort{},
scanners: scnrs,
}, nil
}

Expand Down Expand Up @@ -375,7 +388,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
scnrs, err := e.storageScanners(q, qOpts)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, qs, start, end, step)
Expand All @@ -386,11 +404,12 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
}

return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
}, nil
}

Expand All @@ -416,9 +435,14 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}
lplan, warns := logicalplan.New(root, qOpts, planOpts).Optimize(e.logicalOptimizers)

scnrs, err := e.storageScanners(q, qOpts)
if err != nil {
return nil, errors.Wrap(err, "creating storage scanners")
}

ctx = warnings.NewContext(ctx)
defer func() { warns.Merge(warnings.FromContext(ctx)) }()
exec, err := execution.New(ctx, lplan.Root(), e.storageScanners(q), qOpts)
exec, err := execution.New(ctx, lplan.Root(), scnrs, qOpts)
if e.triggerFallback(err) {
e.metrics.queries.WithLabelValues("true").Inc()
return e.prom.NewRangeQuery(ctx, q, opts, lplan.Root().String(), start, end, step)
Expand All @@ -428,11 +452,12 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
return nil, err
}
return &compatibilityQuery{
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
Query: &Query{exec: exec, opts: opts},
engine: e,
plan: lplan,
warns: warns,
t: RangeQuery,
scanners: scnrs,
}, nil
}

Expand All @@ -452,11 +477,11 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio
return qOpts
}

func (e *Engine) storageScanners(queryable storage.Queryable) engstorage.Scanners {
func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options) (engstorage.Scanners, error) {
if e.scanners == nil {
return promstorage.NewPrometheusScanners(queryable)
return promstorage.NewPrometheusScanners(queryable, qOpts)
}
return e.scanners
return e.scanners, nil
}

func (e *Engine) triggerFallback(err error) bool {
Expand Down Expand Up @@ -495,6 +520,8 @@ type compatibilityQuery struct {
t QueryType
resultSort resultSorter
cancel context.CancelFunc

scanners engstorage.Scanners
}

func (q *compatibilityQuery) Exec(ctx context.Context) (ret *promql.Result) {
Expand Down Expand Up @@ -671,7 +698,11 @@ func (q *compatibilityQuery) Stats() *stats.Statistics {
return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: samples}
}

func (q *compatibilityQuery) Close() { q.Cancel() }
func (q *compatibilityQuery) Close() {
if err := q.scanners.Close(); err != nil {
level.Warn(q.engine.logger).Log("msg", "error closing storage scanners, some memory might have leaked", "err", err)
}
}

func (q *compatibilityQuery) String() string { return q.plan.Root().String() }

Expand Down
88 changes: 81 additions & 7 deletions engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,71 @@ func TestVectorSelectorWithGaps(t *testing.T) {
testutil.WithGoCmp(comparer).Equals(t, oldResult, newResult, queryExplanation(q1))
}

type queryableCloseChecker struct {
closed bool

storage.Queryable
}

func (q *queryableCloseChecker) Querier(mint, maxt int64) (storage.Querier, error) {
qr, err := q.Queryable.Querier(mint, maxt)
if err != nil {
return nil, err
}
return &querierCloseChecker{Querier: qr, closed: &q.closed}, nil
}

type querierCloseChecker struct {
storage.Querier

closed *bool
}

func (q *querierCloseChecker) Close() error {
*q.closed = true
return q.Querier.Close()
}

// TestQuerierClosedAfterQueryClosed tests that the querier is only closed
// after the query is closed.
func TestQuerierClosedAfterQueryClosed(t *testing.T) {
t.Parallel()
opts := promql.EngineOpts{
Timeout: 1 * time.Hour,
MaxSamples: 1e10,
EnableNegativeOffset: true,
EnableAtModifier: true,
}

load := `load 30s
http_requests_total{pod="nginx-1", route="/"} 41.00+0.20x40
http_requests_total{pod="nginx-2", route="/"} 51+21.71x40`

storage := promqltest.LoadedStorage(t, load)
defer storage.Close()

optimizers := logicalplan.AllOptimizers
newEngine := engine.New(engine.Opts{
EngineOpts: opts,
DisableFallback: true,
LogicalOptimizers: optimizers,
// Set to 1 to make sure batching is tested.
SelectorBatchSize: 1,
})
ctx := context.Background()
qr := &queryableCloseChecker{
Queryable: storage,
}
q1, err := newEngine.NewInstantQuery(ctx, qr, nil, "sum(http_requests_total)", time.Unix(0, 0))
testutil.Ok(t, err)
_ = q1.Exec(ctx)

require.Equal(t, false, qr.closed)
q1.Close()

require.Equal(t, true, qr.closed)
}

func TestQueriesAgainstOldEngine(t *testing.T) {
t.Parallel()
start := time.Unix(0, 0)
Expand Down Expand Up @@ -2132,15 +2197,21 @@ type scannersWithWarns struct {
promScanners *prometheus.Scanners
}

func newScannersWithWarns(warn error) *scannersWithWarns {
return &scannersWithWarns{
warn: warn,
promScanners: prometheus.NewPrometheusScanners(&storage.MockQueryable{
MockQuerier: storage.NoopQuerier(),
}),
func newScannersWithWarns(warn error, qOpts *query.Options) (*scannersWithWarns, error) {
scanners, err := prometheus.NewPrometheusScanners(&storage.MockQueryable{
MockQuerier: storage.NoopQuerier(),
}, qOpts)
if err != nil {
return nil, err
}
return &scannersWithWarns{
warn: warn,
promScanners: scanners,
}, nil
}

func (s *scannersWithWarns) Close() error { return nil }

func (s scannersWithWarns) NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error) {
warnings.AddToContext(s.warn, ctx)
return s.promScanners.NewVectorSelector(ctx, opts, hints, selector)
Expand All @@ -2156,7 +2227,10 @@ func TestWarningsPlanCreation(t *testing.T) {
opts = engine.Opts{EngineOpts: promql.EngineOpts{Timeout: 1 * time.Hour}}
expectedWarn = errors.New("test warning")
)
newEngine := engine.NewWithScanners(opts, newScannersWithWarns(expectedWarn))

scnrs, err := newScannersWithWarns(expectedWarn, &query.Options{})
testutil.Ok(t, err)
newEngine := engine.NewWithScanners(opts, scnrs)
q1, err := newEngine.NewRangeQuery(context.Background(), nil, nil, "http_requests_total", time.UnixMilli(0), time.UnixMilli(600), 30*time.Second)
testutil.Ok(t, err)

Expand Down
3 changes: 2 additions & 1 deletion execution/remote/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (s *storageAdapter) executeQuery(ctx context.Context) {
}
}

func (s *storageAdapter) Close() {
func (s *storageAdapter) Close() error {
s.query.Close()
return nil
}
1 change: 1 addition & 0 deletions storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ import (
type Scanners interface {
NewVectorSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.VectorSelector) (model.VectorOperator, error)
NewMatrixSelector(ctx context.Context, opts *query.Options, hints storage.SelectHints, selector logicalplan.MatrixSelector, call logicalplan.FunctionCall) (model.VectorOperator, error)
Close() error
}
10 changes: 5 additions & 5 deletions storage/prometheus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@ var sep = []byte{'\xff'}
type SelectorPool struct {
selectors map[uint64]*seriesSelector

queryable storage.Queryable
querier storage.Querier
}

func NewSelectorPool(queryable storage.Queryable) *SelectorPool {
func NewSelectorPool(querier storage.Querier) *SelectorPool {
return &SelectorPool{
selectors: make(map[uint64]*seriesSelector),
queryable: queryable,
querier: querier,
}
}

func (p *SelectorPool) GetSelector(mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newSeriesSelector(p.querier, mint, maxt, step, matchers, hints)
}
return p.selectors[key]
}

func (p *SelectorPool) GetFilteredSelector(mint, maxt, step int64, matchers, filters []*labels.Matcher, hints storage.SelectHints) SeriesSelector {
key := hashMatchers(matchers, mint, maxt, hints)
if _, ok := p.selectors[key]; !ok {
p.selectors[key] = newSeriesSelector(p.queryable, mint, maxt, step, matchers, hints)
p.selectors[key] = newSeriesSelector(p.querier, mint, maxt, step, matchers, hints)
}

return NewFilteredSelector(p.selectors[key], NewFilter(filters))
Expand Down
14 changes: 12 additions & 2 deletions storage/prometheus/scanners.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,20 @@ import (

type Scanners struct {
selectors *SelectorPool

querier storage.Querier
}

func (s *Scanners) Close() error {
return s.querier.Close()
}

func NewPrometheusScanners(queryable storage.Queryable) *Scanners {
return &Scanners{selectors: NewSelectorPool(queryable)}
func NewPrometheusScanners(queryable storage.Queryable, qOpts *query.Options) (*Scanners, error) {
querier, err := queryable.Querier(qOpts.Start.UnixMilli(), qOpts.End.UnixMilli())
if err != nil {
return nil, errors.Wrap(err, "create querier")
}
return &Scanners{querier: querier, selectors: NewSelectorPool(querier)}, nil
}

func (p Scanners) NewVectorSelector(
Expand Down
12 changes: 3 additions & 9 deletions storage/prometheus/series_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type SignedSeries struct {
}

type seriesSelector struct {
storage storage.Queryable
storage storage.Querier
mint int64
maxt int64
step int64
Expand All @@ -35,7 +35,7 @@ type seriesSelector struct {
series []SignedSeries
}

func newSeriesSelector(storage storage.Queryable, mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) *seriesSelector {
func newSeriesSelector(storage storage.Querier, mint, maxt, step int64, matchers []*labels.Matcher, hints storage.SelectHints) *seriesSelector {
return &seriesSelector{
storage: storage,
maxt: maxt,
Expand All @@ -61,13 +61,7 @@ func (o *seriesSelector) GetSeries(ctx context.Context, shard int, numShards int
}

func (o *seriesSelector) loadSeries(ctx context.Context) error {
querier, err := o.storage.Querier(o.mint, o.maxt)
if err != nil {
return err
}
defer querier.Close()

seriesSet := querier.Select(ctx, false, &o.hints, o.matchers...)
seriesSet := o.storage.Select(ctx, false, &o.hints, o.matchers...)
i := 0
for seriesSet.Next() {
s := seriesSet.At()
Expand Down

0 comments on commit 448b43c

Please sign in to comment.