Skip to content

Commit

Permalink
Address post-merge feedback for #6471 (#6505) (#6510)
Browse files Browse the repository at this point in the history
* Fix typo in comment.

* Use more efficient `SpanLogger.DebugLog()` method in more places.

* Add linting rule to forbid the direct use of CloseSend().

* Add documentation to CloseAndExhaust.

* Update changelog entry.

* Report correct error when closing loop fails

* Avoid reading entire store-gateway query stream response when one or more calls fail.

* Silence linting warnings.

* Ensure CloseAndExhaust never blocks forever.

* Fix race in ingester streaming tests: there's no guarantee we'll close the stream synchronously after encountering an error

* Address PR feedback: reduce timeout.

(cherry picked from commit 0103407)

Co-authored-by: Charles Korn <[email protected]>
  • Loading branch information
grafanabot and charleskorn authored Oct 30, 2023
1 parent b34913f commit 4d397e9
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 22 deletions.
7 changes: 7 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ linters:
- gofmt
- misspell
- errorlint
- forbidigo

linters-settings:
errcheck:
Expand Down Expand Up @@ -44,6 +45,12 @@ linters-settings:
# Do not check whether fmt.Errorf uses the %w verb for formatting errors.
errorf: false

forbidigo:
forbid:
# We can't use faillint for a rule like this, because it does not support matching methods on structs or interfaces (see https://github.com/fatih/faillint/issues/18)
- p: ^.*\.CloseSend.*$
msg: Do not use CloseSend on a server-streaming gRPC stream. Use util.CloseAndExhaust instead. See the documentation on CloseAndExhaust for further explanation.

run:
timeout: 10m

Expand Down
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
* [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 #6469
* [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470
* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471
* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471 #6505

### Mixin

Expand Down
2 changes: 1 addition & 1 deletion pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb

loopErr = w.schedulerLoop(loop)
if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil {
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr)
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", closeErr, "addr", w.schedulerAddr)
}

if loopErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err
// 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series,
// StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report
// it, rather than it potentially being logged and missed.
// 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context
// 2. It ensures the gRPC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context
// is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being
// logged and included in metrics and traces, when in fact the call succeeded.
if err := <-s.errorChan; err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/client/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,8 @@ func TestSeriesChunksStreamReader_ReceivedMoreSeriesThanExpected(t *testing.T) {
expectedError := "attempted to read series at index 0 from ingester chunks stream, but the stream has failed: expected to receive only 1 series, but received at least 3 series"
require.EqualError(t, err, expectedError)

require.True(t, mockClient.closed.Load(), "expected gRPC client to be closed after receiving more series than expected")
require.True(t, cleanedUp.Load(), "expected cleanup function to be called")
require.Eventually(t, mockClient.closed.Load, time.Second, 10*time.Millisecond, "expected gRPC client to be closed after receiving more series than expected")
require.Eventually(t, cleanedUp.Load, time.Second, 10*time.Millisecond, "expected cleanup function to be called")

// Ensure we continue to return the error, even for subsequent calls to GetChunks.
_, err = reader.GetChunks(1)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestSeriesChunksStreamReader_ChunksLimits(t *testing.T) {
}

require.Eventually(t, mockClient.closed.Load, time.Second, 10*time.Millisecond, "expected gRPC client to be closed")
require.True(t, cleanedUp.Load(), "expected cleanup function to be called")
require.Eventually(t, cleanedUp.Load, time.Second, 10*time.Millisecond, "expected cleanup function to be called")

if testCase.expectedError != "" {
// Ensure we continue to return the error, even for subsequent calls to GetChunks.
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)
}

level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
return nil
}

Expand All @@ -214,7 +214,7 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error
return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result)
}

level.Debug(log).Log("msg", "received estimated number of chunks", "chunks", estimate.EstimatedChunkCount)
log.DebugLog("msg", "received estimated number of chunks", "chunks", estimate.EstimatedChunkCount)
if err := s.sendChunksEstimate(estimate.EstimatedChunkCount); err != nil {
return err
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,10 @@ func canBlockWithCompactorShardIndexContainQueryShard(queryShardIndex, queryShar
// before iterating on the series.
func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *storage.SelectHints, clients map[BlocksStoreClient][]ulid.ULID, minT int64, maxT int64, tenantID string, convertedMatchers []storepb.LabelMatcher) (_ []storage.SeriesSet, _ []ulid.ULID, _ annotations.Annotations, startStreamingChunks func(), estimateChunks func() int, _ error) {
var (
reqCtx = grpc_metadata.AppendToOutgoingContext(ctx, storegateway.GrpcContextMetadataTenantID, tenantID)
// We deliberately only cancel this context if any store-gateway call fails, to ensure that all streams are aborted promptly.
// When all calls succeed, we rely on the parent context being cancelled, otherwise we'd abort all the store-gateway streams returned by this method, which makes them unusable.
reqCtx, cancelReqCtx = context.WithCancel(grpc_metadata.AppendToOutgoingContext(ctx, storegateway.GrpcContextMetadataTenantID, tenantID)) //nolint:govet

g, gCtx = errgroup.WithContext(reqCtx)
mtx = sync.Mutex{}
seriesSets = []storage.SeriesSet(nil)
Expand Down Expand Up @@ -897,6 +900,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor

// Wait until all client requests complete.
if err := g.Wait(); err != nil {
cancelReqCtx()

for _, stream := range streams {
if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil {
level.Warn(q.logger).Log("msg", "closing store-gateway client stream failed", "err", err)
Expand All @@ -921,7 +926,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
return totalChunks
}

return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil
return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above.
}

func shouldStopQueryFunc(err error) bool {
Expand Down
9 changes: 5 additions & 4 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/httpgrpcutil"
util_test "github.com/grafana/mimir/pkg/util/test"
)
Expand Down Expand Up @@ -195,7 +196,7 @@ func TestSchedulerEnqueueWithFrontendDisconnect(t *testing.T) {
})

// Disconnect frontend.
require.NoError(t, frontendLoop.CloseSend())
require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](frontendLoop))

// Wait until the frontend has disconnected.
test.Poll(t, time.Second, float64(0), func() interface{} {
Expand Down Expand Up @@ -228,7 +229,7 @@ func TestCancelRequestInProgress(t *testing.T) {

// At this point, scheduler assumes that querier is processing the request (until it receives empty QuerierToScheduler message back).
// Simulate frontend disconnect.
require.NoError(t, frontendLoop.CloseSend())
require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](frontendLoop))

// Add a little sleep to make sure that scheduler notices frontend disconnect.
time.Sleep(500 * time.Millisecond)
Expand Down Expand Up @@ -415,7 +416,7 @@ func TestSchedulerForwardsErrorToFrontend(t *testing.T) {
require.NoError(t, err)

// Querier now disconnects, without sending empty message back.
require.NoError(t, querierLoop.CloseSend())
require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToQuerier](querierLoop))

// Verify that frontend was notified about request.
test.Poll(t, 2*time.Second, true, func() interface{} {
Expand Down Expand Up @@ -483,8 +484,8 @@ func TestSchedulerQuerierMetrics(t *testing.T) {
return err == nil
}, time.Second, 10*time.Millisecond, "expected cortex_query_scheduler_connected_querier_clients metric to be incremented after querier connected")

require.NoError(t, querierLoop.CloseSend())
cancel()
require.NoError(t, util.CloseAndExhaust[*schedulerpb.SchedulerToQuerier](querierLoop))

require.Eventually(t, func() bool {
err := promtest.GatherAndCompare(reg, strings.NewReader(`
Expand Down
38 changes: 30 additions & 8 deletions pkg/util/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,45 @@

package util

import (
"errors"
"time"
)

var ErrCloseAndExhaustTimedOut = errors.New("timed out waiting to exhaust stream after calling CloseSend, will continue exhausting stream in background")

type Stream[T any] interface {
CloseSend() error
Recv() (T, error)
}

// CloseAndExhaust closes and then tries to exhaust stream. This ensures:
// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
//
// Note that this method may block for up to 200ms if the stream has not already been exhausted.
// If the stream has not been exhausted after this time, it will return ErrCloseAndExhaustTimedOut and continue exhausting the stream in the background.
func CloseAndExhaust[T any](stream Stream[T]) error {
err := stream.CloseSend()
err := stream.CloseSend() //nolint:forbidigo // This is the one place we want to call this method.
if err != nil {
return err
}

// Exhaust the stream to ensure:
// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
for err == nil {
_, err = stream.Recv()
}
done := make(chan struct{})

return nil
go func() {
for {
if _, err := stream.Recv(); err != nil {
close(done)
return
}
}
}()

select {
case <-done:
return nil
case <-time.After(200 * time.Millisecond):
return ErrCloseAndExhaustTimedOut
}
}
71 changes: 71 additions & 0 deletions pkg/util/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// SPDX-License-Identifier: AGPL-3.0-only

package util

import (
"errors"
"io"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestCloseAndExhaust(t *testing.T) {
t.Run("CloseSend returns an error", func(t *testing.T) {
expectedErr := errors.New("something went wrong")
stream := &mockStream{closeSendError: expectedErr}

actualErr := CloseAndExhaust[string](stream)
require.Equal(t, expectedErr, actualErr)
})

t.Run("Recv returns error immediately", func(t *testing.T) {
stream := &mockStream{recvErrors: []error{io.EOF}}
err := CloseAndExhaust[string](stream)
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()")
})

t.Run("Recv returns error after multiple calls", func(t *testing.T) {
stream := &mockStream{recvErrors: []error{nil, nil, io.EOF}}
err := CloseAndExhaust[string](stream)
require.NoError(t, err, "CloseAndExhaust should ignore errors from Recv()")
})

t.Run("Recv blocks forever", func(t *testing.T) {
stream := &mockStream{}
returned := make(chan error)

go func() {
returned <- CloseAndExhaust[string](stream)
}()

select {
case err := <-returned:
require.Equal(t, ErrCloseAndExhaustTimedOut, err)
case <-time.After(500 * time.Millisecond):
require.FailNow(t, "expected CloseAndExhaust to time out waiting for Recv() to return, but it did not")
}
})
}

type mockStream struct {
closeSendError error
recvErrors []error
}

func (m *mockStream) CloseSend() error {
return m.closeSendError
}

func (m *mockStream) Recv() (string, error) {
if len(m.recvErrors) == 0 {
// Block forever.
<-make(chan struct{})
}

err := m.recvErrors[0]
m.recvErrors = m.recvErrors[1:]

return "", err
}

0 comments on commit 4d397e9

Please sign in to comment.