Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where successful streaming gRPC requests are incorrectly reported as cancelled #6471

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6469
* [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470
* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471

### Mixin

Expand Down
53 changes: 53 additions & 0 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,27 @@ func TestIngesterQuerying(t *testing.T) {
},
},
},
"query that returns no results": {
// We have to push at least one sample to ensure that the tenant TSDB exists (otherwise the ingester takes a shortcut and returns early).
inSeries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "not_foobar",
},
},
Samples: []prompb.Sample{
{
Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(),
Value: 100,
},
},
},
},
expectedQueryResult: model.Matrix{},
expectedTimestampQueryResult: model.Matrix{},
},
}

for testName, tc := range testCases {
Expand Down Expand Up @@ -529,6 +550,38 @@ func TestIngesterQuerying(t *testing.T) {
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount("2xx")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label
if err != nil {
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
defer util.CloseAndExhaust[*ingester_client.LabelNamesAndValuesResponse](stream) //nolint:errcheck
return nil, merger.collectResponses(stream)
})
if err != nil {
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []m
if err != nil {
return struct{}{}, err
}
defer func() { _ = stream.CloseSend() }()
defer func() { _ = util.CloseAndExhaust[*ingester_client.LabelValuesCardinalityResponse](stream) }()

return struct{}{}, cardinalityConcurrentMap.processLabelValuesCardinalityMessages(desc.Zone, stream)
}, func(struct{}) {})
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -211,7 +212,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
defer func() {
if closeStream {
if stream != nil {
if err := stream.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*ingester_client.QueryStreamResponse](stream); err != nil {
level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/servicediscovery"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -282,7 +283,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb
}

loopErr = w.schedulerLoop(loop)
if closeErr := loop.CloseSend(); closeErr != nil {
if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil {
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr)
}

Expand Down
163 changes: 92 additions & 71 deletions pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewSeriesChunksStreamReader(client Ingester_QueryStreamClient, expectedSeri
// Close cleans up all resources associated with this SeriesChunksStreamReader.
// This method should only be called if StartBuffering is not called.
func (s *SeriesChunksStreamReader) Close() {
if err := s.client.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*QueryStreamResponse](s.client); err != nil {
level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err)
}

Expand All @@ -76,7 +77,6 @@ func (s *SeriesChunksStreamReader) StartBuffering() {

// Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once.
s.errorChan = make(chan error, 1)
ctxDone := s.client.Context().Done()

go func() {
log, _ := spanlogger.NewWithLogger(s.client.Context(), s.log, "SeriesChunksStreamReader.StartBuffering")
Expand All @@ -86,76 +86,74 @@ func (s *SeriesChunksStreamReader) StartBuffering() {

close(s.seriesBatchChan)
close(s.errorChan)
log.Span.Finish()
log.Finish()
}()

onError := func(err error) {
if err := s.readStream(log); err != nil {
s.errorChan <- err
level.Error(log).Log("msg", "received error while streaming chunks from ingester", "err", err)
ext.Error.Set(log.Span, true)
}
}()
}

totalSeries := 0
totalChunks := 0

for {
msg, err := s.client.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if totalSeries < s.expectedSeriesCount {
onError(fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries))
} else {
level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
}
} else {
onError(err)
func (s *SeriesChunksStreamReader) readStream(log *spanlogger.SpanLogger) error {
totalSeries := 0
totalChunks := 0

for {
msg, err := s.client.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if totalSeries < s.expectedSeriesCount {
return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)
}

return
log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
return nil
}

if len(msg.StreamingSeriesChunks) == 0 {
continue
}
return err
}

totalSeries += len(msg.StreamingSeriesChunks)
if totalSeries > s.expectedSeriesCount {
onError(fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries))
return
}
if len(msg.StreamingSeriesChunks) == 0 {
continue
}

totalSeries += len(msg.StreamingSeriesChunks)
if totalSeries > s.expectedSeriesCount {
return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
}

chunkBytes := 0
chunkBytes := 0

for _, s := range msg.StreamingSeriesChunks {
totalChunks += len(s.Chunks)
for _, s := range msg.StreamingSeriesChunks {
totalChunks += len(s.Chunks)

for _, c := range s.Chunks {
chunkBytes += c.Size()
}
for _, c := range s.Chunks {
chunkBytes += c.Size()
}
}

// The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here.
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
onError(err)
return
}
// The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here.
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
return err
}

select {
case <-ctxDone:
// Why do we abort if the context is done?
// We want to make sure that this goroutine is never leaked.
// This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send
// more series to a full buffer: it would block forever.
// So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up.
// This only works correctly if the context is cancelled when the query request is complete or cancelled,
// which is true at the time of writing.
onError(s.client.Context().Err())
return
case s.seriesBatchChan <- msg.StreamingSeriesChunks:
// Batch enqueued successfully, nothing else to do for this batch.
}
select {
case <-s.client.Context().Done():
// Why do we abort if the context is done?
// We want to make sure that this goroutine is never leaked.
// This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send
// more series to a full buffer: it would block forever.
// So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up.
// This only works correctly if the context is cancelled when the query request is complete or cancelled,
// which is true at the time of writing.
return s.client.Context().Err()
case s.seriesBatchChan <- msg.StreamingSeriesChunks:
// Batch enqueued successfully, nothing else to do for this batch.
}
}()
}
}

// GetChunks returns the chunks for the series with index seriesIndex.
Expand All @@ -174,25 +172,9 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err
}()

if len(s.seriesBatch) == 0 {
batch, channelOpen := <-s.seriesBatchChan

if !channelOpen {
// If there's an error, report it.
select {
case err, haveError := <-s.errorChan:
if haveError {
if _, ok := err.(validation.LimitError); ok {
return nil, err
}
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
default:
}

return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount)
if err := s.readNextBatch(seriesIndex); err != nil {
return nil, err
}

s.seriesBatch = batch
}

series := s.seriesBatch[0]
Expand All @@ -208,5 +190,44 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has series with index %v", seriesIndex, series.SeriesIndex)
}

if int(seriesIndex) == s.expectedSeriesCount-1 {
// This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or
// closing errorChan).
//
// This ensures two things:
// 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series,
// StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report
// it, rather than it potentially being logged and missed.
// 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context
// is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being
// logged and included in metrics and traces, when in fact the call succeeded.
if err := <-s.errorChan; err != nil {
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
}

return series.Chunks, nil
}

func (s *SeriesChunksStreamReader) readNextBatch(seriesIndex uint64) error {
batch, channelOpen := <-s.seriesBatchChan

if !channelOpen {
// If there's an error, report it.
select {
case err, haveError := <-s.errorChan:
if haveError {
if _, ok := err.(validation.LimitError); ok {
return err
}
return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
default:
}

return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount)
}

s.seriesBatch = batch
return nil
}
Loading
Loading