From 2f5c05e9937a809999986723a878fa00013fda09 Mon Sep 17 00:00:00 2001 From: Michael Hoffmann Date: Mon, 12 Aug 2024 14:33:56 +0200 Subject: [PATCH] execution: partial responses in distributed engine For very distributed setups we need to be able to deal with partial failures. This commit adds an option to continue evaulation if we encounter an error in a remote engine but dont want to fail the whole query. Signed-off-by: Michael Hoffmann --- engine/distributed_test.go | 42 +++++++++++++++++++++++++++ engine/engine.go | 23 ++++++++++----- engine/engine_test.go | 10 ++++++- execution/execution.go | 2 +- execution/remote/operator.go | 14 ++++++--- logicalplan/distribute.go | 1 - query/options.go | 1 + storage/prometheus/vector_selector.go | 1 - 8 files changed, 78 insertions(+), 16 deletions(-) diff --git a/engine/distributed_test.go b/engine/distributed_test.go index b7f26a89..36e2b28d 100644 --- a/engine/distributed_test.go +++ b/engine/distributed_test.go @@ -269,6 +269,7 @@ func TestDistributedAggregations(t *testing.T) { t.Run(test.name, func(t *testing.T) { for _, lookbackDelta := range lookbackDeltas { localOpts := engine.Opts{ + EnablePartialResponses: true, EngineOpts: promql.EngineOpts{ Timeout: 1 * time.Hour, MaxSamples: 1e10, @@ -373,3 +374,44 @@ func TestDistributedEngineWarnings(t *testing.T) { res := q.Exec(context.Background()) testutil.Equals(t, 1, len(res.Warnings)) } + +func TestDistributedEnginePartialResponses(t *testing.T) { + t.Parallel() + querierErr := &storage.MockQueryable{ + MockQuerier: &storage.MockQuerier{ + SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { + return newErrorSeriesSet(errors.New("test error")) + }, + }, + } + querierOk := storageWithMockSeries(newMockSeries([]string{labels.MetricName, "foo", "zone", "west"}, []int64{0, 30, 60, 90}, []float64{0, 3, 4, 5})) + + opts := engine.Opts{ + EnablePartialResponses: true, + EngineOpts: promql.EngineOpts{ + MaxSamples: math.MaxInt64, + Timeout: 1 * time.Minute, + }, + } + + remoteErr := engine.NewRemoteEngine(opts, querierErr, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "east")}) + remoteOk := engine.NewRemoteEngine(opts, querierOk, math.MinInt64, math.MaxInt64, []labels.Labels{labels.FromStrings("zone", "west")}) + ng := engine.NewDistributedEngine(opts, api.NewStaticEndpoints([]api.RemoteEngine{remoteErr, remoteOk})) + var ( + start = time.UnixMilli(0) + end = time.UnixMilli(600 * 1000) + step = 30 * time.Second + ) + q, err := ng.NewRangeQuery(context.Background(), nil, nil, "sum by (zone) (foo)", start, end, step) + testutil.Ok(t, err) + + res := q.Exec(context.Background()) + testutil.Ok(t, res.Err) + testutil.Equals(t, 1, len(res.Warnings)) + testutil.Equals(t, `error querying [{zone="east"}]: test error`, res.Warnings.AsErrors()[0].Error()) + + m, err := res.Matrix() + testutil.Ok(t, err) + testutil.Equals(t, 1, m.Len()) + testutil.Equals(t, labels.FromStrings("zone", "west"), m[0].Metric) +} diff --git a/engine/engine.go b/engine/engine.go index e3594bf5..16399694 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -75,6 +75,9 @@ type Opts struct { // EnableAnalysis enables query analysis. EnableAnalysis bool + // EnablePartialResponses enables partial responses in distributed mode. + EnablePartialResponses bool + // SelectorBatchSize specifies the maximum number of samples to be returned by selectors in a single batch. SelectorBatchSize int64 @@ -184,14 +187,15 @@ func NewWithScanners(opts Opts, scanners engstorage.Scanners) *Engine { disableDuplicateLabelChecks: opts.DisableDuplicateLabelChecks, disableFallback: opts.DisableFallback, - logger: opts.Logger, - lookbackDelta: opts.LookbackDelta, - enablePerStepStats: opts.EnablePerStepStats, - logicalOptimizers: opts.getLogicalOptimizers(), - timeout: opts.Timeout, - metrics: metrics, - extLookbackDelta: opts.ExtLookbackDelta, - enableAnalysis: opts.EnableAnalysis, + logger: opts.Logger, + lookbackDelta: opts.LookbackDelta, + enablePerStepStats: opts.EnablePerStepStats, + logicalOptimizers: opts.getLogicalOptimizers(), + timeout: opts.Timeout, + metrics: metrics, + extLookbackDelta: opts.ExtLookbackDelta, + enableAnalysis: opts.EnableAnalysis, + enablePartialResponses: opts.EnablePartialResponses, noStepSubqueryIntervalFn: func(d time.Duration) time.Duration { return time.Duration(opts.NoStepSubqueryIntervalFn(d.Milliseconds()) * 1000000) }, @@ -225,6 +229,7 @@ type Engine struct { extLookbackDelta time.Duration decodingConcurrency int enableAnalysis bool + enablePartialResponses bool noStepSubqueryIntervalFn func(time.Duration) time.Duration } @@ -261,6 +266,7 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), ExtLookbackDelta: e.extLookbackDelta, EnableAnalysis: e.enableAnalysis, + EnablePartialResponses: e.enablePartialResponses, NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, DecodingConcurrency: e.decodingConcurrency, } @@ -446,6 +452,7 @@ func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duratio EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), ExtLookbackDelta: e.extLookbackDelta, EnableAnalysis: e.enableAnalysis, + EnablePartialResponses: e.enablePartialResponses, NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, DecodingConcurrency: e.decodingConcurrency, } diff --git a/engine/engine_test.go b/engine/engine_test.go index 6cfd42ff..b76c2bec 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -4909,6 +4909,7 @@ type testSeriesSet struct { i int series []storage.Series warns annotations.Annotations + err error } func newTestSeriesSet(series ...storage.Series) storage.SeriesSet { @@ -4925,9 +4926,16 @@ func newWarningsSeriesSet(warns annotations.Annotations) storage.SeriesSet { } } +func newErrorSeriesSet(err error) storage.SeriesSet { + return &testSeriesSet{ + i: -1, + err: err, + } +} + func (s *testSeriesSet) Next() bool { s.i++; return s.i < len(s.series) } func (s *testSeriesSet) At() storage.Series { return s.series[s.i] } -func (s *testSeriesSet) Err() error { return nil } +func (s *testSeriesSet) Err() error { return s.err } func (s *testSeriesSet) Warnings() annotations.Annotations { return s.warns } type slowSeries struct{} diff --git a/execution/execution.go b/execution/execution.go index 8746693b..d4654101 100644 --- a/execution/execution.go +++ b/execution/execution.go @@ -386,7 +386,7 @@ func newRemoteExecution(ctx context.Context, e logicalplan.RemoteExecution, opts // We need to set the lookback for the selector to 0 since the remote query already applies one lookback. selectorOpts := *opts selectorOpts.LookbackDelta = 0 - remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, &selectorOpts, hints) + remoteExec := remote.NewExecution(qry, model.NewVectorPool(opts.StepsBatch), e.QueryRangeStart, e.Engine.LabelSets(), &selectorOpts, hints) return exchange.NewConcurrent(remoteExec, 2, opts), nil } diff --git a/execution/remote/operator.go b/execution/remote/operator.go index 645b9b27..cf6b9645 100644 --- a/execution/remote/operator.go +++ b/execution/remote/operator.go @@ -29,8 +29,8 @@ type Execution struct { model.OperatorTelemetry } -func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, opts *query.Options, _ storage.SelectHints) *Execution { - storage := newStorageFromQuery(query, opts) +func NewExecution(query promql.Query, pool *model.VectorPool, queryRangeStart time.Time, engineLabels []labels.Labels, opts *query.Options, _ storage.SelectHints) *Execution { + storage := newStorageFromQuery(query, opts, engineLabels) oper := &Execution{ storage: storage, query: query, @@ -90,16 +90,18 @@ func (e *Execution) Samples() *stats.QuerySamples { type storageAdapter struct { query promql.Query opts *query.Options + lbls []labels.Labels once sync.Once err error series []promstorage.SignedSeries } -func newStorageFromQuery(query promql.Query, opts *query.Options) *storageAdapter { +func newStorageFromQuery(query promql.Query, opts *query.Options, lbls []labels.Labels) *storageAdapter { return &storageAdapter{ query: query, opts: opts, + lbls: lbls, } } @@ -120,7 +122,11 @@ func (s *storageAdapter) executeQuery(ctx context.Context) { warnings.AddToContext(w, ctx) } if result.Err != nil { - s.err = result.Err + if s.opts.EnablePartialResponses { + warnings.AddToContext(fmt.Errorf("error querying %s: %s", s.lbls, result.Err), ctx) + } else { + s.err = result.Err + } return } switch val := result.Value.(type) { diff --git a/logicalplan/distribute.go b/logicalplan/distribute.go index ae786a69..dd280d88 100644 --- a/logicalplan/distribute.go +++ b/logicalplan/distribute.go @@ -255,7 +255,6 @@ func (m DistributedExecutionOptimizer) Optimize(plan Node, opts *query.Options) *current = m.distributeQuery(current, engines, m.subqueryOpts(parents, current, opts), minEngineOverlap) return true }) - return plan, *warns } diff --git a/query/options.go b/query/options.go index 5dbb3c09..decdda36 100644 --- a/query/options.go +++ b/query/options.go @@ -17,6 +17,7 @@ type Options struct { ExtLookbackDelta time.Duration NoStepSubqueryIntervalFn func(time.Duration) time.Duration EnableAnalysis bool + EnablePartialResponses bool DecodingConcurrency int } diff --git a/storage/prometheus/vector_selector.go b/storage/prometheus/vector_selector.go index 082a689f..2cc88fcd 100644 --- a/storage/prometheus/vector_selector.go +++ b/storage/prometheus/vector_selector.go @@ -242,6 +242,5 @@ func selectPoint(it *storage.MemoizedSeriesIterator, ts, lookbackDelta, offset i if value.IsStaleNaN(v) || (fh != nil && value.IsStaleNaN(fh.Sum)) { return 0, 0, nil, false, nil } - return t, v, fh, true, nil }