diff --git a/.golangci.yml b/.golangci.yml index 2e5c27171e2..0cf83c970a0 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,6 +8,7 @@ linters: - gofmt - misspell - errorlint + - forbidigo linters-settings: errcheck: @@ -44,6 +45,12 @@ linters-settings: # Do not check whether fmt.Errorf uses the %w verb for formatting errors. errorf: false + forbidigo: + forbid: + # We can't use faillint for a rule like this, because it does not support matching methods on structs or interfaces (see https://github.com/fatih/faillint/issues/18) + - p: ^.*\.CloseSend.*$ + msg: Do not use CloseSend on a server-streaming gRPC stream. Use util.CloseAndExhaust instead. See the documentation on CloseAndExhaust for further explanation. + run: timeout: 10m diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b40d925b3b..ad359922809 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,7 +65,7 @@ * [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446 * [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 #6469 * [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470 -* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 +* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 #6505 ### Mixin diff --git a/pkg/frontend/v2/frontend_scheduler_worker.go b/pkg/frontend/v2/frontend_scheduler_worker.go index ca9d099f1d7..5b22d3335aa 100644 --- a/pkg/frontend/v2/frontend_scheduler_worker.go +++ b/pkg/frontend/v2/frontend_scheduler_worker.go @@ -284,7 +284,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb loopErr = w.schedulerLoop(loop) if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil { - level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr) + level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", closeErr, "addr", w.schedulerAddr) } if loopErr != nil { diff --git a/pkg/ingester/client/streaming.go b/pkg/ingester/client/streaming.go index d544608400e..1b98792beac 100644 --- a/pkg/ingester/client/streaming.go +++ b/pkg/ingester/client/streaming.go @@ -198,7 +198,7 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err // 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series, // StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report // it, rather than it potentially being logged and missed. - // 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context + // 2. It ensures the gRPC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context // is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being // logged and included in metrics and traces, when in fact the call succeeded. if err := <-s.errorChan; err != nil { diff --git a/pkg/ingester/client/streaming_test.go b/pkg/ingester/client/streaming_test.go index 50797e1f51b..4942b101144 100644 --- a/pkg/ingester/client/streaming_test.go +++ b/pkg/ingester/client/streaming_test.go @@ -260,8 +260,8 @@ func TestSeriesChunksStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) { expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series" require.EqualError(t, err, expectedError) - require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected") - require.True(t, cleanedUp.Load(), "expected cleanup function to be called") + require.Eventually(t, mockClient.closed.Load, time.Second, 10*time.Millisecond, "expected gRPC client to be closed after receiving more series than expected") + require.Eventually(t, cleanedUp.Load, time.Second, 10*time.Millisecond, "expected cleanup function to be called") // Ensure we continue to return the error, even for subsequent calls to GetChunks. _, err = reader.GetChunks(1) @@ -319,7 +319,7 @@ func TestSeriesChunksStreamReader_ChunksLimits(t *testing.T) { } require.Eventually(t, mockClient.closed.Load, time.Second, 10*time.Millisecond, "expected gRPC client to be closed") - require.True(t, cleanedUp.Load(), "expected cleanup function to be called") + require.Eventually(t, cleanedUp.Load, time.Second, 10*time.Millisecond, "expected cleanup function to be called") if testCase.expectedError != "" { // Ensure we continue to return the error, even for subsequent calls to GetChunks. diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index 22993f808d5..6e6ddbff78f 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -200,7 +200,7 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries) } - level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) + log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks) return nil } @@ -214,7 +214,7 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result) } - level.Debug(log).Log("msg", "received estimated number of chunks", "chunks", estimate.EstimatedChunkCount) + log.DebugLog("msg", "received estimated number of chunks", "chunks", estimate.EstimatedChunkCount) if err := s.sendChunksEstimate(estimate.EstimatedChunkCount); err != nil { return err } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c21fdbd1c51..9d5c4ff3c49 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -713,7 +713,10 @@ func canBlockWithCompactorShardIndexContainQueryShard(queryShardIndex, queryShar // before iterating on the series. func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *storage.SelectHints, clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, tenantID string, convertedMatchers []storepb.LabelMatcher) (_ []storage.SeriesSet, _ []ulid.ULID, _ annotations.Annotations, startStreamingChunks func(), estimateChunks func() int, _ error) { var ( - reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, storegateway.GrpcContextMetadataTenantID, tenantID) + // We deliberately only cancel this context if any store-gateway call fails, to ensure that all streams are aborted promptly. + // When all calls succeed, we rely on the parent context being cancelled, otherwise we'd abort all the store-gateway streams returned by this method, which makes them unusable. + reqCtx, cancelReqCtx = context.WithCancel(grpc_metadata.AppendToOutgoingContext(ctx, storegateway.GrpcContextMetadataTenantID, tenantID)) //nolint:govet + g, gCtx = errgroup.WithContext(reqCtx) mtx = sync.Mutex{} seriesSets = []storage.SeriesSet(nil) @@ -897,6 +900,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor // Wait until all client requests complete. if err := g.Wait(); err != nil { + cancelReqCtx() + for _, stream := range streams { if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil { level.Warn(q.logger).Log("msg", "closing store-gateway client stream failed", "err", err) @@ -921,7 +926,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return totalChunks } - return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil + return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above. } func shouldStopQueryFunc(err error) bool { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 34c5d42ccdf..ac41a3a2ba3 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/httpgrpcutil" util_test "github.com/grafana/mimir/pkg/util/test" ) @@ -195,7 +196,7 @@ func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) { }) // Disconnect frontend. - require.NoError(t, frontendLoop.CloseSend()) + require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](frontendLoop)) // Wait until the frontend has disconnected. test.Poll(t, time.Second, float64(0), func() interface{} { @@ -228,7 +229,7 @@ func TestCancelRequestInProgress(t *testing.T) { // At this point, scheduler assumes that querier is processing the request (until it receives empty QuerierToScheduler message back). // Simulate frontend disconnect. - require.NoError(t, frontendLoop.CloseSend()) + require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](frontendLoop)) // Add a little sleep to make sure that scheduler notices frontend disconnect. time.Sleep(500 * time.Millisecond) @@ -415,7 +416,7 @@ func TestSchedulerForwardsErrorToFrontend(t *testing.T) { require.NoError(t, err) // Querier now disconnects, without sending empty message back. - require.NoError(t, querierLoop.CloseSend()) + require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToQuerier](querierLoop)) // Verify that frontend was notified about request. test.Poll(t, 2*time.Second, true, func() interface{} { @@ -483,8 +484,8 @@ func TestSchedulerQuerierMetrics(t *testing.T) { return err == nil }, time.Second, 10*time.Millisecond, "expected cortex_query_scheduler_connected_querier_clients metric to be incremented after querier connected") - require.NoError(t, querierLoop.CloseSend()) cancel() + require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToQuerier](querierLoop)) require.Eventually(t, func() bool { err := promtest.GatherAndCompare(reg, strings.NewReader(` diff --git a/pkg/util/grpc.go b/pkg/util/grpc.go index f963a6aae58..d5fdbeba44a 100644 --- a/pkg/util/grpc.go +++ b/pkg/util/grpc.go @@ -2,23 +2,45 @@ package util +import ( + "errors" + "time" +) + +var ErrCloseAndExhaustTimedOut = errors.New("timed out waiting to exhaust stream after calling CloseSend, will continue exhausting stream in background") + type Stream[T any] interface { CloseSend() error Recv() (T, error) } +// CloseAndExhaust closes and then tries to exhaust stream. This ensures: +// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream) +// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled" +// +// Note that this method may block for up to 200ms if the stream has not already been exhausted. +// If the stream has not been exhausted after this time, it will return ErrCloseAndExhaustTimedOut and continue exhausting the stream in the background. func CloseAndExhaust[T any](stream Stream[T]) error { - err := stream.CloseSend() + err := stream.CloseSend() //nolint:forbidigo // This is the one place we want to call this method. if err != nil { return err } - // Exhaust the stream to ensure: - // - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream) - // - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled" - for err == nil { - _, err = stream.Recv() - } + done := make(chan struct{}) - return nil + go func() { + for { + if _, err := stream.Recv(); err != nil { + close(done) + return + } + } + }() + + select { + case <-done: + return nil + case <-time.After(200 * time.Millisecond): + return ErrCloseAndExhaustTimedOut + } } diff --git a/pkg/util/grpc_test.go b/pkg/util/grpc_test.go new file mode 100644 index 00000000000..1dca9a0c93a --- /dev/null +++ b/pkg/util/grpc_test.go @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: AGPL-3.0-only + +package util + +import ( + "errors" + "io" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCloseAndExhaust(t *testing.T) { + t.Run("CloseSend returns an error", func(t *testing.T) { + expectedErr := errors.New("something went wrong") + stream := &mockStream{closeSendError: expectedErr} + + actualErr := CloseAndExhaust[string](stream) + require.Equal(t, expectedErr, actualErr) + }) + + t.Run("Recv returns error immediately", func(t *testing.T) { + stream := &mockStream{recvErrors: []error{io.EOF}} + err := CloseAndExhaust[string](stream) + require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()") + }) + + t.Run("Recv returns error after multiple calls", func(t *testing.T) { + stream := &mockStream{recvErrors: []error{nil, nil, io.EOF}} + err := CloseAndExhaust[string](stream) + require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()") + }) + + t.Run("Recv blocks forever", func(t *testing.T) { + stream := &mockStream{} + returned := make(chan error) + + go func() { + returned <- CloseAndExhaust[string](stream) + }() + + select { + case err := <-returned: + require.Equal(t, ErrCloseAndExhaustTimedOut, err) + case <-time.After(500 * time.Millisecond): + require.FailNow(t, "expected CloseAndExhaust to time out waiting for Recv() to return, but it did not") + } + }) +} + +type mockStream struct { + closeSendError error + recvErrors []error +} + +func (m *mockStream) CloseSend() error { + return m.closeSendError +} + +func (m *mockStream) Recv() (string, error) { + if len(m.recvErrors) == 0 { + // Block forever. + <-make(chan struct{}) + } + + err := m.recvErrors[0] + m.recvErrors = m.recvErrors[1:] + + return "", err +}