Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue where successful streaming gRPC requests are incorrectly reported as cancelled #6471

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
* [BUGFIX] Ingester: Don't cache context cancellation error when querying. #6446
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6469
* [BUGFIX] All: fix issue where traces for some inter-component gRPC calls would incorrectly show the call as failing due to cancellation. #6470
* [BUGFIX] Querier: correctly mark streaming requests that return no series from ingesters or store-gateways as successful, not cancelled, in metrics and traces. #6471

### Mixin

Expand Down
47 changes: 47 additions & 0 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,27 @@ func TestIngesterQuerying(t *testing.T) {
},
},
},
"query that returns no results": {
// We have to push at least one sample to ensure that the tenant TSDB exists (otherwise the ingester takes a shortcut and returns early).
inSeries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "not_foobar",
},
},
Samples: []prompb.Sample{
{
Timestamp: queryStart.Add(-2 * time.Second).UnixMilli(),
Value: 100,
},
},
},
},
expectedQueryResult: model.Matrix{},
expectedTimestampQueryResult: model.Matrix{},
},
}

for testName, tc := range testCases {
Expand Down Expand Up @@ -529,6 +550,32 @@ func TestIngesterQuerying(t *testing.T) {
result, err = client.QueryRange(timestampQuery, queryStart, queryEnd, queryStep)
require.NoError(t, err)
require.Equal(t, tc.expectedTimestampQueryResult, result)

queryRequestCount := func(status string) (float64, error) {
counts, err := querier.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/QueryStream"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", status),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)

if err != nil {
return 0, err
}

require.Len(t, counts, 1)
return counts[0], nil
}

successfulQueryRequests, err := queryRequestCount("2xx")
require.NoError(t, err)
require.Equal(t, 2.0, successfulQueryRequests) // 1 for first query request, 1 for timestamp query request

cancelledQueryRequests, err := queryRequestCount("cancel")
require.NoError(t, err)
require.Equal(t, 0.0, cancelledQueryRequests)
})
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1371,7 +1371,7 @@ func (d *Distributor) LabelNamesAndValues(ctx context.Context, matchers []*label
if err != nil {
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
defer util.CloseAndExhaust[*ingester_client.LabelNamesAndValuesResponse](stream) //nolint:errcheck
return nil, merger.collectResponses(stream)
})
if err != nil {
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func (d *Distributor) labelValuesCardinality(ctx context.Context, labelNames []m
if err != nil {
return struct{}{}, err
}
defer func() { _ = stream.CloseSend() }()
defer func() { _ = util.CloseAndExhaust[*ingester_client.LabelValuesCardinalityResponse](stream) }()

return struct{}{}, cardinalityConcurrentMap.processLabelValuesCardinalityMessages(desc.Zone, stream)
}, func(struct{}) {})
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ingester_client "github.com/grafana/mimir/pkg/ingester/client"
"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -211,7 +212,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
defer func() {
if closeStream {
if stream != nil {
if err := stream.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*ingester_client.QueryStreamResponse](stream); err != nil {
level.Warn(log).Log("msg", "closing ingester client stream failed", "err", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/frontend/v2/frontend_scheduler_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/servicediscovery"
"github.com/grafana/mimir/pkg/util/spanlogger"
)
Expand Down Expand Up @@ -282,7 +283,7 @@ func (w *frontendSchedulerWorker) runOne(ctx context.Context, client schedulerpb
}

loopErr = w.schedulerLoop(loop)
if closeErr := loop.CloseSend(); closeErr != nil {
if closeErr := util.CloseAndExhaust[*schedulerpb.SchedulerToFrontend](loop); closeErr != nil {
level.Debug(w.log).Log("msg", "failed to close frontend loop", "err", loopErr, "addr", w.schedulerAddr)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ingester/client/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/opentracing/opentracing-go/ext"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -59,7 +60,7 @@ func NewSeriesChunksStreamReader(client Ingester_QueryStreamClient, expectedSeri
// Close cleans up all resources associated with this SeriesChunksStreamReader.
// This method should only be called if StartBuffering is not called.
func (s *SeriesChunksStreamReader) Close() {
if err := s.client.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*QueryStreamResponse](s.client); err != nil {
level.Warn(s.log).Log("msg", "closing ingester client stream failed", "err", err)
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/querier/block_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/grafana/mimir/pkg/storage/series"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/storegateway/storepb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/limiter"
"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
Expand Down Expand Up @@ -151,7 +152,7 @@ func newStoreGatewayStreamReader(client storegatewaypb.StoreGateway_SeriesClient
// Close cleans up all resources associated with this storeGatewayStreamReader.
// This method should only be called if StartBuffering is not called.
func (s *storeGatewayStreamReader) Close() {
if err := s.client.CloseSend(); err != nil {
if err := util.CloseAndExhaust[*storepb.SeriesResponse](s.client); err != nil {
level.Warn(s.log).Log("msg", "closing store-gateway client stream failed", "err", err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,8 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor
// Wait until all client requests complete.
if err := g.Wait(); err != nil {
for _, stream := range streams {
if err := stream.CloseSend(); err != nil {
level.Warn(q.logger).Log("msg", "closing storegateway client stream failed", "err", err)
if err := util.CloseAndExhaust[*storepb.SeriesResponse](stream); err != nil {
level.Warn(q.logger).Log("msg", "closing store-gateway client stream failed", "err", err)
}
}
return nil, nil, nil, nil, nil, err
Expand Down
25 changes: 25 additions & 0 deletions pkg/util/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// SPDX-License-Identifier: AGPL-3.0-only

package util

type Stream[T any] interface {
CloseSend() error
Recv() (T, error)
}

func CloseAndExhaust[T any](stream Stream[T]) error {
if err := stream.CloseSend(); err != nil {
return err
}

// Exhaust the stream to ensure:
// - the gRPC library can release any resources associated with the stream (see https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream)
// - instrumentation middleware correctly observes the end of the stream, rather than reporting it as "context canceled"
for {
if _, err := stream.Recv(); err != nil {
break
}
}
charleskorn marked this conversation as resolved.
Show resolved Hide resolved

return nil
}
Loading