Skip to content

Commit

Permalink
Fix issue where successful streaming gRPC requests are incorrectly re…
Browse files Browse the repository at this point in the history
…ported as cancelled (#6471)

* Add failing test.

* Fix issue where query requests that stream chunks from ingesters but return no series are reported as cancelled rather than successful in traces and metrics.

* Fix other instances of issue.

* Add changelog entry.

* Address PR feedback

Co-authored-by: Oleg Zaytsev <[email protected]>

* Add more details to assertions

* Extract method

* Fix race between query evaluation finishing and ingester chunks streaming goroutine closing the gRPC stream.

* Refactor SeriesChunksStreamReader to follow same structure as for store-gateway chunk stream reader

* Update changelog entry.

* Fix race between query evaluation finishing and store-gateway chunks streaming goroutine closing the gRPC stream.

* Apply suggestions from code review

---------

Co-authored-by: Oleg Zaytsev <[email protected]>
Co-authored-by: Marco Pracucci <[email protected]>
  • Loading branch information
3 people authored Oct 29, 2023
1 parent d6010a4 commit 1c4cacc
Show file tree
Hide file tree
Showing 11 changed files with 296 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451 #6469
* [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470
* [BUGFIX] Querier: correctly mark streaming requests to ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471

### Mixin

Expand Down
53 changes: 53 additions & 0 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,27 @@ func TestIngesterQuerying(t *testing.T) {
},
},
},
"query that returns no results": {
// We have to push at least one sample to ensure that the tenant TSDB exists (otherwise the ingester takes a shortcut and returns early).
inSeries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "not_foobar",
},
},
Samples: []prompb.Sample{
{
Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(),
Value: 100,
},
},
},
},
expectedQueryResult: model.Matrix{},
expectedTimestampQueryResult: model.Matrix{},
},
}

for testName, tc := range testCases {
Expand Down Expand Up @@ -529,6 +550,38 @@ func TestIngesterQuerying(t *testing.T) {
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount("2xx")
require.NoError(t, err)

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)

totalQueryRequests, err := queryRequestCount(".*")
require.NoError(t, err)

// We expect two query requests: the first query request and the timestamp query request
require.Equalf(t, 2.0, totalQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 2.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label
if err != nil {
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
defer util.CloseAndExhaust[*ingester_client.LabelNamesAndValuesResponse](stream) //nolint:errcheck
return nil, merger.collectResponses(stream)
})
if err != nil {
Expand Down Expand Up @@ -1655,7 +1655,7 @@ func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []m
if err != nil {
return struct{}{}, err
}
defer func() { _ = stream.CloseSend() }()
defer func() { _ = util.CloseAndExhaust[*ingester_client.LabelValuesCardinalityResponse](stream) }()

return struct{}{}, cardinalityConcurrentMap.processLabelValuesCardinalityMessages(desc.Zone, stream)
}, func(struct{}) {})
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -211,7 +212,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
defer func() {
if closeStream {
if stream != nil {
if err := stream.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*ingester_client.QueryStreamResponse](stream); err != nil {
level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/servicediscovery"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -282,7 +283,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb
}

loopErr = w.schedulerLoop(loop)
if closeErr := loop.CloseSend(); closeErr != nil {
if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil {
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr)
}

Expand Down
163 changes: 92 additions & 71 deletions pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewSeriesChunksStreamReader(client Ingester_QueryStreamClient, expectedSeri
// Close cleans up all resources associated with this SeriesChunksStreamReader.
// This method should only be called if StartBuffering is not called.
func (s *SeriesChunksStreamReader) Close() {
if err := s.client.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*QueryStreamResponse](s.client); err != nil {
level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err)
}

Expand All @@ -76,7 +77,6 @@ func (s *SeriesChunksStreamReader) StartBuffering() {

// Important: to ensure that the goroutine does not become blocked and leak, the goroutine must only ever write to errorChan at most once.
s.errorChan = make(chan error, 1)
ctxDone := s.client.Context().Done()

go func() {
log, _ := spanlogger.NewWithLogger(s.client.Context(), s.log, "SeriesChunksStreamReader.StartBuffering")
Expand All @@ -86,76 +86,74 @@ func (s *SeriesChunksStreamReader) StartBuffering() {

close(s.seriesBatchChan)
close(s.errorChan)
log.Span.Finish()
log.Finish()
}()

onError := func(err error) {
if err := s.readStream(log); err != nil {
s.errorChan <- err
level.Error(log).Log("msg", "received error while streaming chunks from ingester", "err", err)
ext.Error.Set(log.Span, true)
}
}()
}

totalSeries := 0
totalChunks := 0

for {
msg, err := s.client.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if totalSeries < s.expectedSeriesCount {
onError(fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries))
} else {
level.Debug(log).Log("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
}
} else {
onError(err)
func (s *SeriesChunksStreamReader) readStream(log *spanlogger.SpanLogger) error {
totalSeries := 0
totalChunks := 0

for {
msg, err := s.client.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
if totalSeries < s.expectedSeriesCount {
return fmt.Errorf("expected to receive %v series, but got EOF after receiving %v series", s.expectedSeriesCount, totalSeries)
}

return
log.DebugLog("msg", "finished streaming", "series", totalSeries, "chunks", totalChunks)
return nil
}

if len(msg.StreamingSeriesChunks) == 0 {
continue
}
return err
}

totalSeries += len(msg.StreamingSeriesChunks)
if totalSeries > s.expectedSeriesCount {
onError(fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries))
return
}
if len(msg.StreamingSeriesChunks) == 0 {
continue
}

totalSeries += len(msg.StreamingSeriesChunks)
if totalSeries > s.expectedSeriesCount {
return fmt.Errorf("expected to receive only %v series, but received at least %v series", s.expectedSeriesCount, totalSeries)
}

chunkBytes := 0
chunkBytes := 0

for _, s := range msg.StreamingSeriesChunks {
totalChunks += len(s.Chunks)
for _, s := range msg.StreamingSeriesChunks {
totalChunks += len(s.Chunks)

for _, c := range s.Chunks {
chunkBytes += c.Size()
}
for _, c := range s.Chunks {
chunkBytes += c.Size()
}
}

// The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here.
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
onError(err)
return
}
// The chunk count limit is enforced earlier, while we're reading series labels, so we don't need to do that here.
if err := s.queryLimiter.AddChunkBytes(chunkBytes); err != nil {
return err
}

select {
case <-ctxDone:
// Why do we abort if the context is done?
// We want to make sure that this goroutine is never leaked.
// This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send
// more series to a full buffer: it would block forever.
// So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up.
// This only works correctly if the context is cancelled when the query request is complete or cancelled,
// which is true at the time of writing.
onError(s.client.Context().Err())
return
case s.seriesBatchChan <- msg.StreamingSeriesChunks:
// Batch enqueued successfully, nothing else to do for this batch.
}
select {
case <-s.client.Context().Done():
// Why do we abort if the context is done?
// We want to make sure that this goroutine is never leaked.
// This goroutine could be leaked if nothing is reading from the buffer, but this method is still trying to send
// more series to a full buffer: it would block forever.
// So, here, we try to send the series to the buffer if we can, but if the context is cancelled, then we give up.
// This only works correctly if the context is cancelled when the query request is complete or cancelled,
// which is true at the time of writing.
return s.client.Context().Err()
case s.seriesBatchChan <- msg.StreamingSeriesChunks:
// Batch enqueued successfully, nothing else to do for this batch.
}
}()
}
}

// GetChunks returns the chunks for the series with index seriesIndex.
Expand All @@ -174,25 +172,9 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err
}()

if len(s.seriesBatch) == 0 {
batch, channelOpen := <-s.seriesBatchChan

if !channelOpen {
// If there's an error, report it.
select {
case err, haveError := <-s.errorChan:
if haveError {
if _, ok := err.(validation.LimitError); ok {
return nil, err
}
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
default:
}

return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount)
if err := s.readNextBatch(seriesIndex); err != nil {
return nil, err
}

s.seriesBatch = batch
}

series := s.seriesBatch[0]
Expand All @@ -208,5 +190,44 @@ func (s *SeriesChunksStreamReader) GetChunks(seriesIndex uint64) (_ []Chunk, err
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has series with index %v", seriesIndex, series.SeriesIndex)
}

if int(seriesIndex) == s.expectedSeriesCount-1 {
// This is the last series we expect to receive. Wait for StartBuffering() to exit (which is signalled by returning an error or
// closing errorChan).
//
// This ensures two things:
// 1. If we receive more series than expected (likely due to a bug), or something else goes wrong after receiving the last series,
// StartBuffering() will return an error. This method will then return it, which will bubble up to the PromQL engine and report
// it, rather than it potentially being logged and missed.
// 2. It ensures the gPRC stream is cleaned up before the PromQL engine cancels the context used for the query. If the context
// is cancelled before the gRPC stream's Recv() returns EOF, this can result in misleading context cancellation errors being
// logged and included in metrics and traces, when in fact the call succeeded.
if err := <-s.errorChan; err != nil {
return nil, fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
}

return series.Chunks, nil
}

func (s *SeriesChunksStreamReader) readNextBatch(seriesIndex uint64) error {
batch, channelOpen := <-s.seriesBatchChan

if !channelOpen {
// If there's an error, report it.
select {
case err, haveError := <-s.errorChan:
if haveError {
if _, ok := err.(validation.LimitError); ok {
return err
}
return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has failed: %w", seriesIndex, err)
}
default:
}

return fmt.Errorf("attempted to read series at index %v from ingester chunks stream, but the stream has already been exhausted (was expecting %v series)", seriesIndex, s.expectedSeriesCount)
}

s.seriesBatch = batch
return nil
}
Loading

0 comments on commit 1c4cacc

Please sign in to comment.