From 65fe0c0c917c086b86b53d1a1ab659e73992c08f Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 24 Oct 2023 15:37:19 +1100 Subject: [PATCH 01/12] Add failing test. --- integration/ingester_test.go | 47 ++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/integration/ingester_test.go b/integration/ingester_test.go index d73fc1c523f..3f55e75c3e6 100644 --- a/integration/ingester_test.go +++ b/integration/ingester_test.go @@ -469,6 +469,27 @@ func TestIngesterQuerying(t *testing.T) { }, }, }, + "query that returns no results": { + // We have to push at least one sample to ensure that the tenant TSDB exists (otherwise the ingester takes a shortcut and returns early). + inSeries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "not_foobar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(), + Value: 100, + }, + }, + }, + }, + expectedQueryResult: model.Matrix{}, + expectedTimestampQueryResult: model.Matrix{}, + }, } for testName, tc := range testCases { @@ -529,6 +550,32 @@ func TestIngesterQuerying(t *testing.T) { result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep) require.NoError(t, err) require.Equal(t, tc.expectedTimestampQueryResult, result) + + queryRequestCount := func(status string) (float64, error) { + counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"), + labels.MustNewMatcher(labels.MatchEqual, "status_code", status), + ), + e2e.WithMetricCount, + e2e.SkipMissingMetrics, + ) + + if err != nil { + return 0, err + } + + require.Len(t, counts, 1) + return counts[0], nil + } + + successfulQueryRequests, err := queryRequestCount("2xx") + require.NoError(t, err) + require.Equal(t, 2.0, successfulQueryRequests) // 1 for first query request, 1 for timestamp query request + + cancelledQueryRequests, err := queryRequestCount("cancel") + require.NoError(t, err) + require.Equal(t, 0.0, cancelledQueryRequests) }) } }) From 77c191da45439fee37f77deaf8611c3e117ed86d Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 24 Oct 2023 15:40:33 +1100 Subject: [PATCH 02/12] Fix issue where query requests that stream chunks from ingesters but return no series are reported as cancelled rather than successful in traces and metrics. --- pkg/distributor/query.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 096f895c510..d8e36f0cb50 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -214,6 +214,13 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri if err := stream.CloseSend(); err != nil { level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err) } + + // Exhaust the stream to ensure instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled". + for { + if _, err := stream.Recv(); err != nil { + break + } + } } cleanup() From ae15605d94ad5c747c4590c7acdf5d1be50bf189 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 24 Oct 2023 15:45:38 +1100 Subject: [PATCH 03/12] Fix other instances of issue. --- pkg/distributor/distributor.go | 4 ++-- pkg/distributor/query.go | 10 ++------ pkg/frontend/v2/frontend_scheduler_worker.go | 3 ++- pkg/ingester/client/streaming.go | 3 ++- pkg/querier/block_streaming.go | 3 ++- pkg/querier/blocks_store_queryable.go | 4 ++-- pkg/util/grpc.go | 25 ++++++++++++++++++++ 7 files changed, 37 insertions(+), 15 deletions(-) create mode 100644 pkg/util/grpc.go diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9c2df25a63c..302f95c1a49 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1371,7 +1371,7 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label if err != nil { return nil, err } - defer stream.CloseSend() //nolint:errcheck + defer util.CloseAndExhaust[*ingester_client.LabelNamesAndValuesResponse](stream) //nolint:errcheck return nil, merger.collectResponses(stream) }) if err != nil { @@ -1528,7 +1528,7 @@ func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []m if err != nil { return struct{}{}, err } - defer func() { _ = stream.CloseSend() }() + defer func() { _ = util.CloseAndExhaust[*ingester_client.LabelValuesCardinalityResponse](stream) }() return struct{}{}, cardinalityConcurrentMap.processLabelValuesCardinalityMessages(desc.Zone, stream) }, func(struct{}) {}) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index d8e36f0cb50..81e597f4135 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -25,6 +25,7 @@ import ( ingester_client "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -211,16 +212,9 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri defer func() { if closeStream { if stream != nil { - if err := stream.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*ingester_client.QueryStreamResponse](stream); err != nil { level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err) } - - // Exhaust the stream to ensure instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled". - for { - if _, err := stream.Recv(); err != nil { - break - } - } } cleanup() diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 6ed571a396b..ca9d099f1d7 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/servicediscovery" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -282,7 +283,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb } loopErr = w.schedulerLoop(loop) - if closeErr := loop.CloseSend(); closeErr != nil { + if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil { level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr) } diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index 595d0948dfb..53a1e2bb6f2 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -12,6 +12,7 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -59,7 +60,7 @@ func NewSeriesChunksStreamReader(client Ingester_QueryStreamClient, expectedSeri // Close cleans up all resources associated with this SeriesChunksStreamReader. // This method should only be called if StartBuffering is not called. func (s *SeriesChunksStreamReader) Close() { - if err := s.client.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*QueryStreamResponse](s.client); err != nil { level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err) } diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 659324583a9..df15450f6f7 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/mimir/pkg/storage/series" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -151,7 +152,7 @@ func newStoreGatewayStreamReader(client storegatewaypb.StoreGateway_SeriesClient // Close cleans up all resources associated with this storeGatewayStreamReader. // This method should only be called if StartBuffering is not called. func (s *storeGatewayStreamReader) Close() { - if err := s.client.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*storepb.SeriesResponse](s.client); err != nil { level.Warn(s.log).Log("msg", "closing store-gateway client stream failed", "err", err) } } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 4162b814907..51c044a2e66 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -898,8 +898,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // Wait until all client requests complete. if err := g.Wait(); err != nil { for _, stream := range streams { - if err := stream.CloseSend(); err != nil { - level.Warn(q.logger).Log("msg", "closing storegateway client stream failed", "err", err) + if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { + level.Warn(q.logger).Log("msg", "closing store-gateway client stream failed", "err", err) } } return nil, nil, nil, nil, nil, err diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go new file mode 100644 index 00000000000..f28ced2be91 --- /dev/null +++ b/pkg/util/grpc.go @@ -0,0 +1,25 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package util + +type Stream[T any] interface { + CloseSend() error + Recv() (T, error) +} + +func CloseAndExhaust[T any](stream Stream[T]) error { + if err := stream.CloseSend(); err != nil { + return err + } + + // Exhaust the stream to ensure: + // - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream) + // - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled" + for { + if _, err := stream.Recv(); err != nil { + break + } + } + + return nil +} From 795f704357c282ca04cdbb062d2992b578e22936 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Tue, 24 Oct 2023 16:12:56 +1100 Subject: [PATCH 04/12] Add changelog entry. --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 113bb0e6765..3b5936fbc60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,6 +62,7 @@ * [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6469 * [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470 +* [BUGFIX] Querier: correctly mark streaming requests that return no series from ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 ### Mixin From 2da62cd3c5118daeca5712d0e5c797049ac8a842 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 25 Oct 2023 14:50:47 +1100 Subject: [PATCH 05/12] Address PR feedback Co-authored-by: Oleg Zaytsev --- pkg/util/grpc.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go index f28ced2be91..f963a6aae58 100644 --- a/pkg/util/grpc.go +++ b/pkg/util/grpc.go @@ -8,17 +8,16 @@ type Stream[T any] interface { } func CloseAndExhaust[T any](stream Stream[T]) error { - if err := stream.CloseSend(); err != nil { + err := stream.CloseSend() + if err != nil { return err } // Exhaust the stream to ensure: // - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream) // - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled" - for { - if _, err := stream.Recv(); err != nil { - break - } + for err == nil { + _, err = stream.Recv() } return nil From d905fe9d4b5b6865ae03fd9cef27ec89b3d97e27 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Wed, 25 Oct 2023 15:09:33 +1100 Subject: [PATCH 06/12] Add more details to assertions --- integration/ingester_test.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/integration/ingester_test.go b/integration/ingester_test.go index 3f55e75c3e6..49d54bb82da 100644 --- a/integration/ingester_test.go +++ b/integration/ingester_test.go @@ -555,7 +555,7 @@ func TestIngesterQuerying(t *testing.T) { counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"), - labels.MustNewMatcher(labels.MatchEqual, "status_code", status), + labels.MustNewMatcher(labels.MatchRegexp, "status_code", status), ), e2e.WithMetricCount, e2e.SkipMissingMetrics, @@ -571,11 +571,17 @@ func TestIngesterQuerying(t *testing.T) { successfulQueryRequests, err := queryRequestCount("2xx") require.NoError(t, err) - require.Equal(t, 2.0, successfulQueryRequests) // 1 for first query request, 1 for timestamp query request cancelledQueryRequests, err := queryRequestCount("cancel") require.NoError(t, err) - require.Equal(t, 0.0, cancelledQueryRequests) + + totalQueryRequests, err := queryRequestCount(".*") + require.NoError(t, err) + + // We expect two query requests: the first query request and the timestamp query request + require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) }) } }) From 94b88e691b25b52b95ede760f1d0597728d22f52 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 26 Oct 2023 13:50:13 +1100 Subject: [PATCH 07/12] Extract method --- pkg/ingester/client/streaming.go | 43 +++++++++++++++++++------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index 53a1e2bb6f2..e7cefeb4288 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -175,25 +175,9 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err }() if len(s.seriesBatch) == 0 { - batch, channelOpen := <-s.seriesBatchChan - - if !channelOpen { - // If there's an error, report it. - select { - case err, haveError := <-s.errorChan: - if haveError { - if _, ok := err.(validation.LimitError); ok { - return nil, err - } - return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) - } - default: - } - - return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + if err := s.readNextBatch(seriesIndex); err != nil { + return nil, err } - - s.seriesBatch = batch } series := s.seriesBatch[0] @@ -211,3 +195,26 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err return series.Chunks, nil } + +func (s *SeriesChunksStreamReader) readNextBatch(seriesIndex uint64) error { + batch, channelOpen := <-s.seriesBatchChan + + if !channelOpen { + // If there's an error, report it. + select { + case err, haveError := <-s.errorChan: + if haveError { + if _, ok := err.(validation.LimitError); ok { + return err + } + return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) + } + default: + } + + return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + } + + s.seriesBatch = batch + return nil +} From 5f6d927ca20292103d07606cace8a41e623a30e3 Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 26 Oct 2023 14:04:21 +1100 Subject: [PATCH 08/12] Fix race between query evaluation finishing and ingester chunks streaming goroutine closing the gRPC stream. --- pkg/ingester/client/streaming.go | 16 ++++++++ pkg/ingester/client/streaming_test.go | 57 +++++++++++++++++---------- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index e7cefeb4288..b2ff19f4064 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -193,6 +193,22 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has series with index %v", seriesIndex, series.SeriesIndex) } + if int(seriesIndex) == s.expectedSeriesCount-1 { + // This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or + // closing errorChan). + // + // This ensures two things: + // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, + // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report + // it, rather than it potentially being logged and missed. + // 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being + // logged and included in metrics and traces, when in fact the call succeeded. + if err := <-s.errorChan; err != nil { + return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) + } + } + return series.Chunks, nil } diff --git a/pkg/ingester/client/streaming_test.go b/pkg/ingester/client/streaming_test.go index e9cfa30ced4..50797e1f51b 100644 --- a/pkg/ingester/client/streaming_test.go +++ b/pkg/ingester/client/streaming_test.go @@ -227,34 +227,49 @@ func TestSeriesChunksStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) } func TestSeriesChunksStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { - batches := [][]QueryStreamSeriesChunks{ - { - {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, - {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, - {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + testCases := map[string][][]QueryStreamSeriesChunks{ + "extra series received as part of batch for last expected series": { + { + {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, + {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + }, + }, + "extra series received as part of batch after batch containing last expected series": { + { + {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, + }, + { + {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + }, }, } - mockClient := &mockQueryStreamClient{ctx: context.Background(), batches: batches} - cleanedUp := atomic.NewBool(false) - cleanup := func() { cleanedUp.Store(true) } + for name, batches := range testCases { + t.Run(name, func(t *testing.T) { + mockClient := &mockQueryStreamClient{ctx: context.Background(), batches: batches} + cleanedUp := atomic.NewBool(false) + cleanup := func() { cleanedUp.Store(true) } - reader := NewSeriesChunksStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), cleanup, log.NewNopLogger()) - reader.StartBuffering() + reader := NewSeriesChunksStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), cleanup, log.NewNopLogger()) + reader.StartBuffering() - s, err := reader.GetChunks(0) - require.Nil(t, s) - expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" - require.EqualError(t, err, expectedError) + s, err := reader.GetChunks(0) + require.Nil(t, s) + expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" + require.EqualError(t, err, expectedError) - require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") - require.True(t, cleanedUp.Load(), "expected cleanup function to be called") + require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + require.True(t, cleanedUp.Load(), "expected cleanup function to be called") - // Ensure we continue to return the error, even for subsequent calls to GetChunks. - _, err = reader.GetChunks(1) - require.EqualError(t, err, "attempted to read series at index 1 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) - _, err = reader.GetChunks(2) - require.EqualError(t, err, "attempted to read series at index 2 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + // Ensure we continue to return the error, even for subsequent calls to GetChunks. + _, err = reader.GetChunks(1) + require.EqualError(t, err, "attempted to read series at index 1 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + _, err = reader.GetChunks(2) + require.EqualError(t, err, "attempted to read series at index 2 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + }) + } } func TestSeriesChunksStreamReader_ChunksLimits(t *testing.T) { From 4453e0a1f73d5cffe065ed8533bae08aa9f461bb Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 26 Oct 2023 15:27:47 +1100 Subject: [PATCH 09/12] Refactor SeriesChunksStreamReader to follow same structure as for store-gateway chunk stream reader --- pkg/ingester/client/streaming.go | 101 +++++++++++++++---------------- 1 file changed, 49 insertions(+), 52 deletions(-) diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index b2ff19f4064..479f0df6eb2 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -77,7 +77,6 @@ func (s *SeriesChunksStreamReader) StartBuffering() { // Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once. s.errorChan = make(chan error, 1) - ctxDone := s.client.Context().Done() go func() { log, _ := spanlogger.NewWithLogger(s.client.Context(), s.log, "SeriesChunksStreamReader.StartBuffering") @@ -87,76 +86,74 @@ func (s *SeriesChunksStreamReader) StartBuffering() { close(s.seriesBatchChan) close(s.errorChan) - log.Span.Finish() + log.Finish() }() - onError := func(err error) { + if err := s.readStream(log); err != nil { s.errorChan <- err level.Error(log).Log("msg", "received error while streaming chunks from ingester", "err", err) ext.Error.Set(log.Span, true) } + }() +} + +func (s *SeriesChunksStreamReader) readStream(log *spanlogger.SpanLogger) error { + totalSeries := 0 + totalChunks := 0 - totalSeries := 0 - totalChunks := 0 - - for { - msg, err := s.client.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - if totalSeries < s.expectedSeriesCount { - onError(fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)) - } else { - level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) - } - } else { - onError(err) + for { + msg, err := s.client.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + if totalSeries < s.expectedSeriesCount { + return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries) } - return + level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) + return nil } - if len(msg.StreamingSeriesChunks) == 0 { - continue - } + return err + } - totalSeries += len(msg.StreamingSeriesChunks) - if totalSeries > s.expectedSeriesCount { - onError(fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)) - return - } + if len(msg.StreamingSeriesChunks) == 0 { + continue + } - chunkBytes := 0 + totalSeries += len(msg.StreamingSeriesChunks) + if totalSeries > s.expectedSeriesCount { + return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries) + } - for _, s := range msg.StreamingSeriesChunks { - totalChunks += len(s.Chunks) + chunkBytes := 0 - for _, c := range s.Chunks { - chunkBytes += c.Size() - } - } + for _, s := range msg.StreamingSeriesChunks { + totalChunks += len(s.Chunks) - // The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here. - if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil { - onError(err) - return + for _, c := range s.Chunks { + chunkBytes += c.Size() } + } - select { - case <-ctxDone: - // Why do we abort if the context is done? - // We want to make sure that this goroutine is never leaked. - // This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send - // more series to a full buffer: it would block forever. - // So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up. - // This only works correctly if the context is cancelled when the query request is complete or cancelled, - // which is true at the time of writing. - onError(s.client.Context().Err()) - return - case s.seriesBatchChan <- msg.StreamingSeriesChunks: - // Batch enqueued successfully, nothing else to do for this batch. - } + // The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here. + if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil { + return err } - }() + + select { + case <-s.client.Context().Done(): + // Why do we abort if the context is done? + // We want to make sure that this goroutine is never leaked. + // This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send + // more series to a full buffer: it would block forever. + // So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up. + // This only works correctly if the context is cancelled when the query request is complete or cancelled, + // which is true at the time of writing. + return s.client.Context().Err() + case s.seriesBatchChan <- msg.StreamingSeriesChunks: + // Batch enqueued successfully, nothing else to do for this batch. + } + } } // GetChunks returns the chunks for the series with index seriesIndex. From b31e4fbead8e9a210e6a0064b4c2aa0a1ab38ecc Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 26 Oct 2023 15:31:01 +1100 Subject: [PATCH 10/12] Update changelog entry. --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3b5936fbc60..096227e01fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -62,7 +62,7 @@ * [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6469 * [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470 -* [BUGFIX] Querier: correctly mark streaming requests that return no series from ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 +* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 ### Mixin From 91fd8920e33ef791fc8aaf8111ea5e890d13e20d Mon Sep 17 00:00:00 2001 From: Charles Korn Date: Thu, 26 Oct 2023 15:40:47 +1100 Subject: [PATCH 11/12] Fix race between query evaluation finishing and store-gateway chunks streaming goroutine closing the gRPC stream. --- pkg/querier/block_streaming.go | 59 ++++++++++++++++++++--------- pkg/querier/block_streaming_test.go | 58 ++++++++++++++++++---------- 2 files changed, 80 insertions(+), 37 deletions(-) diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index df15450f6f7..6fd86a58a3e 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -307,25 +307,9 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag }() if len(s.chunksBatch) == 0 { - chks, channelOpen := <-s.seriesChunksChan - - if !channelOpen { - // If there's an error, report it. - select { - case err, haveError := <-s.errorChan: - if haveError { - if _, ok := err.(validation.LimitError); ok { - return nil, err - } - return nil, errors.Wrapf(err, "attempted to read series at index %v from store-gateway chunks stream, but the stream has failed", seriesIndex) - } - default: - } - - return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + if err := s.readNextBatch(seriesIndex); err != nil { + return nil, err } - - s.chunksBatch = chks.Series } chks := s.chunksBatch[0] @@ -339,9 +323,48 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has series with index %v", seriesIndex, chks.SeriesIndex) } + if int(seriesIndex) == s.expectedSeriesCount-1 { + // This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or + // closing errorChan). + // + // This ensures two things: + // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, + // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report + // it, rather than it potentially being logged and missed. + // 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being + // logged and included in metrics and traces, when in fact the call succeeded. + if err := <-s.errorChan; err != nil { + return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has failed: %w", seriesIndex, err) + } + } + return chks.Chunks, nil } +func (s *storeGatewayStreamReader) readNextBatch(seriesIndex uint64) error { + chks, channelOpen := <-s.seriesChunksChan + + if !channelOpen { + // If there's an error, report it. + select { + case err, haveError := <-s.errorChan: + if haveError { + if _, ok := err.(validation.LimitError); ok { + return err + } + return errors.Wrapf(err, "attempted to read series at index %v from store-gateway chunks stream, but the stream has failed", seriesIndex) + } + default: + } + + return fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + } + + s.chunksBatch = chks.Series + return nil +} + // EstimateChunkCount returns an estimate of the number of chunks this stream reader will return. // If the stream fails before an estimate is received from the store-gateway, this method returns 0. // This method should only be called after calling StartBuffering. If this method is called multiple times, it may block. diff --git a/pkg/querier/block_streaming_test.go b/pkg/querier/block_streaming_test.go index c83d993924e..96fa0e3fe6c 100644 --- a/pkg/querier/block_streaming_test.go +++ b/pkg/querier/block_streaming_test.go @@ -465,31 +465,51 @@ func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) } func TestStoreGatewayStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { - batches := []storepb.StreamingChunksBatch{ - { - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, - {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, - {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + testCases := map[string][]storepb.StreamingChunksBatch{ + "extra series received as part of batch for last expected series": { + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, + }, + }, + "extra series received as part of batch after batch containing last expected series": { + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + }, + }, + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, }, }, } - mockClient := &mockStoreGatewayQueryStreamClient{ctx: context.Background(), messages: batchesToMessages(3, batches...)} - reader := newStoreGatewayStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger()) - reader.StartBuffering() - s, err := reader.GetChunks(0) - require.Nil(t, s) - expectedError := "attempted to read series at index 0 from store-gateway chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" - require.EqualError(t, err, expectedError) + for name, batches := range testCases { + t.Run(name, func(t *testing.T) { + mockClient := &mockStoreGatewayQueryStreamClient{ctx: context.Background(), messages: batchesToMessages(3, batches...)} + reader := newStoreGatewayStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger()) + reader.StartBuffering() - require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + s, err := reader.GetChunks(0) + require.Nil(t, s) + expectedError := "attempted to read series at index 0 from store-gateway chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" + require.EqualError(t, err, expectedError) - // Ensure we continue to return the error, even for subsequent calls to GetChunks. - _, err = reader.GetChunks(1) - require.EqualError(t, err, "attempted to read series at index 1 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) - _, err = reader.GetChunks(2) - require.EqualError(t, err, "attempted to read series at index 2 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + + // Ensure we continue to return the error, even for subsequent calls to GetChunks. + _, err = reader.GetChunks(1) + require.EqualError(t, err, "attempted to read series at index 1 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + _, err = reader.GetChunks(2) + require.EqualError(t, err, "attempted to read series at index 2 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + }) + } } func TestStoreGatewayStreamReader_ChunksLimits(t *testing.T) { From d676d1ea5aa6f9fa38534cab0da60fee118e6f82 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Sun, 29 Oct 2023 09:39:39 +0100 Subject: [PATCH 12/12] Apply suggestions from code review --- pkg/ingester/client/streaming.go | 2 +- pkg/querier/block_streaming.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index 479f0df6eb2..d544608400e 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -109,7 +109,7 @@ func (s *SeriesChunksStreamReader) readStream(log *spanlogger.SpanLogger) error return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries) } - level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) + log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) return nil } diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 6fd86a58a3e..22993f808d5 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -331,7 +331,7 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report // it, rather than it potentially being logged and missed. - // 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // 2. It ensures the gRPC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being // logged and included in metrics and traces, when in fact the call succeeded. if err := <-s.errorChan; err != nil {