diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ff6ac18ada..7b40d925b3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,7 @@ * [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 #6469 * [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470 +* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 ### Mixin diff --git a/integration/ingester_test.go b/integration/ingester_test.go index d73fc1c523f..49d54bb82da 100644 --- a/integration/ingester_test.go +++ b/integration/ingester_test.go @@ -469,6 +469,27 @@ func TestIngesterQuerying(t *testing.T) { }, }, }, + "query that returns no results": { + // We have to push at least one sample to ensure that the tenant TSDB exists (otherwise the ingester takes a shortcut and returns early). + inSeries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + { + Name: "__name__", + Value: "not_foobar", + }, + }, + Samples: []prompb.Sample{ + { + Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(), + Value: 100, + }, + }, + }, + }, + expectedQueryResult: model.Matrix{}, + expectedTimestampQueryResult: model.Matrix{}, + }, } for testName, tc := range testCases { @@ -529,6 +550,38 @@ func TestIngesterQuerying(t *testing.T) { result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep) require.NoError(t, err) require.Equal(t, tc.expectedTimestampQueryResult, result) + + queryRequestCount := func(status string) (float64, error) { + counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"}, + e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"), + labels.MustNewMatcher(labels.MatchRegexp, "status_code", status), + ), + e2e.WithMetricCount, + e2e.SkipMissingMetrics, + ) + + if err != nil { + return 0, err + } + + require.Len(t, counts, 1) + return counts[0], nil + } + + successfulQueryRequests, err := queryRequestCount("2xx") + require.NoError(t, err) + + cancelledQueryRequests, err := queryRequestCount("cancel") + require.NoError(t, err) + + totalQueryRequests, err := queryRequestCount(".*") + require.NoError(t, err) + + // We expect two query requests: the first query request and the timestamp query request + require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) + require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests) }) } }) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index d392693d667..2950e757303 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1498,7 +1498,7 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label if err != nil { return nil, err } - defer stream.CloseSend() //nolint:errcheck + defer util.CloseAndExhaust[*ingester_client.LabelNamesAndValuesResponse](stream) //nolint:errcheck return nil, merger.collectResponses(stream) }) if err != nil { @@ -1655,7 +1655,7 @@ func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []m if err != nil { return struct{}{}, err } - defer func() { _ = stream.CloseSend() }() + defer func() { _ = util.CloseAndExhaust[*ingester_client.LabelValuesCardinalityResponse](stream) }() return struct{}{}, cardinalityConcurrentMap.processLabelValuesCardinalityMessages(desc.Zone, stream) }, func(struct{}) {}) diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 096f895c510..81e597f4135 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -25,6 +25,7 @@ import ( ingester_client "github.com/grafana/mimir/pkg/ingester/client" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -211,7 +212,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri defer func() { if closeStream { if stream != nil { - if err := stream.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*ingester_client.QueryStreamResponse](stream); err != nil { level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err) } } diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index 6ed571a396b..ca9d099f1d7 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/servicediscovery" "github.com/grafana/mimir/pkg/util/spanlogger" ) @@ -282,7 +283,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb } loopErr = w.schedulerLoop(loop) - if closeErr := loop.CloseSend(); closeErr != nil { + if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil { level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr) } diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index 595d0948dfb..d544608400e 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -12,6 +12,7 @@ import ( "github.com/opentracing/opentracing-go/ext" "github.com/prometheus/prometheus/model/labels" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -59,7 +60,7 @@ func NewSeriesChunksStreamReader(client Ingester_QueryStreamClient, expectedSeri // Close cleans up all resources associated with this SeriesChunksStreamReader. // This method should only be called if StartBuffering is not called. func (s *SeriesChunksStreamReader) Close() { - if err := s.client.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*QueryStreamResponse](s.client); err != nil { level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err) } @@ -76,7 +77,6 @@ func (s *SeriesChunksStreamReader) StartBuffering() { // Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once. s.errorChan = make(chan error, 1) - ctxDone := s.client.Context().Done() go func() { log, _ := spanlogger.NewWithLogger(s.client.Context(), s.log, "SeriesChunksStreamReader.StartBuffering") @@ -86,76 +86,74 @@ func (s *SeriesChunksStreamReader) StartBuffering() { close(s.seriesBatchChan) close(s.errorChan) - log.Span.Finish() + log.Finish() }() - onError := func(err error) { + if err := s.readStream(log); err != nil { s.errorChan <- err level.Error(log).Log("msg", "received error while streaming chunks from ingester", "err", err) ext.Error.Set(log.Span, true) } + }() +} - totalSeries := 0 - totalChunks := 0 - - for { - msg, err := s.client.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - if totalSeries < s.expectedSeriesCount { - onError(fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)) - } else { - level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) - } - } else { - onError(err) +func (s *SeriesChunksStreamReader) readStream(log *spanlogger.SpanLogger) error { + totalSeries := 0 + totalChunks := 0 + + for { + msg, err := s.client.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + if totalSeries < s.expectedSeriesCount { + return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries) } - return + log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) + return nil } - if len(msg.StreamingSeriesChunks) == 0 { - continue - } + return err + } - totalSeries += len(msg.StreamingSeriesChunks) - if totalSeries > s.expectedSeriesCount { - onError(fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)) - return - } + if len(msg.StreamingSeriesChunks) == 0 { + continue + } + + totalSeries += len(msg.StreamingSeriesChunks) + if totalSeries > s.expectedSeriesCount { + return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries) + } - chunkBytes := 0 + chunkBytes := 0 - for _, s := range msg.StreamingSeriesChunks { - totalChunks += len(s.Chunks) + for _, s := range msg.StreamingSeriesChunks { + totalChunks += len(s.Chunks) - for _, c := range s.Chunks { - chunkBytes += c.Size() - } + for _, c := range s.Chunks { + chunkBytes += c.Size() } + } - // The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here. - if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil { - onError(err) - return - } + // The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here. + if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil { + return err + } - select { - case <-ctxDone: - // Why do we abort if the context is done? - // We want to make sure that this goroutine is never leaked. - // This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send - // more series to a full buffer: it would block forever. - // So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up. - // This only works correctly if the context is cancelled when the query request is complete or cancelled, - // which is true at the time of writing. - onError(s.client.Context().Err()) - return - case s.seriesBatchChan <- msg.StreamingSeriesChunks: - // Batch enqueued successfully, nothing else to do for this batch. - } + select { + case <-s.client.Context().Done(): + // Why do we abort if the context is done? + // We want to make sure that this goroutine is never leaked. + // This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send + // more series to a full buffer: it would block forever. + // So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up. + // This only works correctly if the context is cancelled when the query request is complete or cancelled, + // which is true at the time of writing. + return s.client.Context().Err() + case s.seriesBatchChan <- msg.StreamingSeriesChunks: + // Batch enqueued successfully, nothing else to do for this batch. } - }() + } } // GetChunks returns the chunks for the series with index seriesIndex. @@ -174,25 +172,9 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err }() if len(s.seriesBatch) == 0 { - batch, channelOpen := <-s.seriesBatchChan - - if !channelOpen { - // If there's an error, report it. - select { - case err, haveError := <-s.errorChan: - if haveError { - if _, ok := err.(validation.LimitError); ok { - return nil, err - } - return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) - } - default: - } - - return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + if err := s.readNextBatch(seriesIndex); err != nil { + return nil, err } - - s.seriesBatch = batch } series := s.seriesBatch[0] @@ -208,5 +190,44 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has series with index %v", seriesIndex, series.SeriesIndex) } + if int(seriesIndex) == s.expectedSeriesCount-1 { + // This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or + // closing errorChan). + // + // This ensures two things: + // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, + // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report + // it, rather than it potentially being logged and missed. + // 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being + // logged and included in metrics and traces, when in fact the call succeeded. + if err := <-s.errorChan; err != nil { + return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) + } + } + return series.Chunks, nil } + +func (s *SeriesChunksStreamReader) readNextBatch(seriesIndex uint64) error { + batch, channelOpen := <-s.seriesBatchChan + + if !channelOpen { + // If there's an error, report it. + select { + case err, haveError := <-s.errorChan: + if haveError { + if _, ok := err.(validation.LimitError); ok { + return err + } + return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err) + } + default: + } + + return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + } + + s.seriesBatch = batch + return nil +} diff --git a/pkg/ingester/client/streaming_test.go b/pkg/ingester/client/streaming_test.go index e9cfa30ced4..50797e1f51b 100644 --- a/pkg/ingester/client/streaming_test.go +++ b/pkg/ingester/client/streaming_test.go @@ -227,34 +227,49 @@ func TestSeriesChunksStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) } func TestSeriesChunksStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { - batches := [][]QueryStreamSeriesChunks{ - { - {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, - {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, - {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + testCases := map[string][][]QueryStreamSeriesChunks{ + "extra series received as part of batch for last expected series": { + { + {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, + {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + }, + }, + "extra series received as part of batch after batch containing last expected series": { + { + {SeriesIndex: 0, Chunks: []Chunk{createTestChunk(t, 1000, 1.23)}}, + }, + { + {SeriesIndex: 1, Chunks: []Chunk{createTestChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []Chunk{createTestChunk(t, 1000, 7.89)}}, + }, }, } - mockClient := &mockQueryStreamClient{ctx: context.Background(), batches: batches} - cleanedUp := atomic.NewBool(false) - cleanup := func() { cleanedUp.Store(true) } + for name, batches := range testCases { + t.Run(name, func(t *testing.T) { + mockClient := &mockQueryStreamClient{ctx: context.Background(), batches: batches} + cleanedUp := atomic.NewBool(false) + cleanup := func() { cleanedUp.Store(true) } - reader := NewSeriesChunksStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), cleanup, log.NewNopLogger()) - reader.StartBuffering() + reader := NewSeriesChunksStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), cleanup, log.NewNopLogger()) + reader.StartBuffering() - s, err := reader.GetChunks(0) - require.Nil(t, s) - expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" - require.EqualError(t, err, expectedError) + s, err := reader.GetChunks(0) + require.Nil(t, s) + expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" + require.EqualError(t, err, expectedError) - require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") - require.True(t, cleanedUp.Load(), "expected cleanup function to be called") + require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + require.True(t, cleanedUp.Load(), "expected cleanup function to be called") - // Ensure we continue to return the error, even for subsequent calls to GetChunks. - _, err = reader.GetChunks(1) - require.EqualError(t, err, "attempted to read series at index 1 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) - _, err = reader.GetChunks(2) - require.EqualError(t, err, "attempted to read series at index 2 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + // Ensure we continue to return the error, even for subsequent calls to GetChunks. + _, err = reader.GetChunks(1) + require.EqualError(t, err, "attempted to read series at index 1 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + _, err = reader.GetChunks(2) + require.EqualError(t, err, "attempted to read series at index 2 from ingester chunks stream, but the stream previously failed and returned an error: "+expectedError) + }) + } } func TestSeriesChunksStreamReader_ChunksLimits(t *testing.T) { diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 659324583a9..22993f808d5 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/mimir/pkg/storage/series" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" "github.com/grafana/mimir/pkg/storegateway/storepb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/limiter" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/grafana/mimir/pkg/util/validation" @@ -151,7 +152,7 @@ func newStoreGatewayStreamReader(client storegatewaypb.StoreGateway_SeriesClient // Close cleans up all resources associated with this storeGatewayStreamReader. // This method should only be called if StartBuffering is not called. func (s *storeGatewayStreamReader) Close() { - if err := s.client.CloseSend(); err != nil { + if err := util.CloseAndExhaust[*storepb.SeriesResponse](s.client); err != nil { level.Warn(s.log).Log("msg", "closing store-gateway client stream failed", "err", err) } } @@ -306,25 +307,9 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag }() if len(s.chunksBatch) == 0 { - chks, channelOpen := <-s.seriesChunksChan - - if !channelOpen { - // If there's an error, report it. - select { - case err, haveError := <-s.errorChan: - if haveError { - if _, ok := err.(validation.LimitError); ok { - return nil, err - } - return nil, errors.Wrapf(err, "attempted to read series at index %v from store-gateway chunks stream, but the stream has failed", seriesIndex) - } - default: - } - - return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + if err := s.readNextBatch(seriesIndex); err != nil { + return nil, err } - - s.chunksBatch = chks.Series } chks := s.chunksBatch[0] @@ -338,9 +323,48 @@ func (s *storeGatewayStreamReader) GetChunks(seriesIndex uint64) (_ []storepb.Ag return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has series with index %v", seriesIndex, chks.SeriesIndex) } + if int(seriesIndex) == s.expectedSeriesCount-1 { + // This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or + // closing errorChan). + // + // This ensures two things: + // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, + // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report + // it, rather than it potentially being logged and missed. + // 2. It ensures the gRPC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being + // logged and included in metrics and traces, when in fact the call succeeded. + if err := <-s.errorChan; err != nil { + return nil, fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has failed: %w", seriesIndex, err) + } + } + return chks.Chunks, nil } +func (s *storeGatewayStreamReader) readNextBatch(seriesIndex uint64) error { + chks, channelOpen := <-s.seriesChunksChan + + if !channelOpen { + // If there's an error, report it. + select { + case err, haveError := <-s.errorChan: + if haveError { + if _, ok := err.(validation.LimitError); ok { + return err + } + return errors.Wrapf(err, "attempted to read series at index %v from store-gateway chunks stream, but the stream has failed", seriesIndex) + } + default: + } + + return fmt.Errorf("attempted to read series at index %v from store-gateway chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount) + } + + s.chunksBatch = chks.Series + return nil +} + // EstimateChunkCount returns an estimate of the number of chunks this stream reader will return. // If the stream fails before an estimate is received from the store-gateway, this method returns 0. // This method should only be called after calling StartBuffering. If this method is called multiple times, it may block. diff --git a/pkg/querier/block_streaming_test.go b/pkg/querier/block_streaming_test.go index c83d993924e..96fa0e3fe6c 100644 --- a/pkg/querier/block_streaming_test.go +++ b/pkg/querier/block_streaming_test.go @@ -465,31 +465,51 @@ func TestStoreGatewayStreamReader_ReceivedFewerSeriesThanExpected(t *testing.T) } func TestStoreGatewayStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { - batches := []storepb.StreamingChunksBatch{ - { - Series: []*storepb.StreamingChunks{ - {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, - {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, - {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + testCases := map[string][]storepb.StreamingChunksBatch{ + "extra series received as part of batch for last expected series": { + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, + }, + }, + "extra series received as part of batch after batch containing last expected series": { + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 0, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 1.23)}}, + }, + }, + { + Series: []*storepb.StreamingChunks{ + {SeriesIndex: 1, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 4.56)}}, + {SeriesIndex: 2, Chunks: []storepb.AggrChunk{createChunk(t, 1000, 7.89)}}, + }, }, }, } - mockClient := &mockStoreGatewayQueryStreamClient{ctx: context.Background(), messages: batchesToMessages(3, batches...)} - reader := newStoreGatewayStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger()) - reader.StartBuffering() - s, err := reader.GetChunks(0) - require.Nil(t, s) - expectedError := "attempted to read series at index 0 from store-gateway chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" - require.EqualError(t, err, expectedError) + for name, batches := range testCases { + t.Run(name, func(t *testing.T) { + mockClient := &mockStoreGatewayQueryStreamClient{ctx: context.Background(), messages: batchesToMessages(3, batches...)} + reader := newStoreGatewayStreamReader(mockClient, 1, limiter.NewQueryLimiter(0, 0, 0, 0, nil), &stats.Stats{}, log.NewNopLogger()) + reader.StartBuffering() - require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + s, err := reader.GetChunks(0) + require.Nil(t, s) + expectedError := "attempted to read series at index 0 from store-gateway chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" + require.EqualError(t, err, expectedError) - // Ensure we continue to return the error, even for subsequent calls to GetChunks. - _, err = reader.GetChunks(1) - require.EqualError(t, err, "attempted to read series at index 1 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) - _, err = reader.GetChunks(2) - require.EqualError(t, err, "attempted to read series at index 2 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") + + // Ensure we continue to return the error, even for subsequent calls to GetChunks. + _, err = reader.GetChunks(1) + require.EqualError(t, err, "attempted to read series at index 1 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + _, err = reader.GetChunks(2) + require.EqualError(t, err, "attempted to read series at index 2 from store-gateway chunks stream, but the stream previously failed and returned an error: "+expectedError) + }) + } } func TestStoreGatewayStreamReader_ChunksLimits(t *testing.T) { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 4162b814907..51c044a2e66 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -898,8 +898,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // Wait until all client requests complete. if err := g.Wait(); err != nil { for _, stream := range streams { - if err := stream.CloseSend(); err != nil { - level.Warn(q.logger).Log("msg", "closing storegateway client stream failed", "err", err) + if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { + level.Warn(q.logger).Log("msg", "closing store-gateway client stream failed", "err", err) } } return nil, nil, nil, nil, nil, err diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go new file mode 100644 index 00000000000..f963a6aae58 --- /dev/null +++ b/pkg/util/grpc.go @@ -0,0 +1,24 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package util + +type Stream[T any] interface { + CloseSend() error + Recv() (T, error) +} + +func CloseAndExhaust[T any](stream Stream[T]) error { + err := stream.CloseSend() + if err != nil { + return err + } + + // Exhaust the stream to ensure: + // - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream) + // - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled" + for err == nil { + _, err = stream.Recv() + } + + return nil +}