From 0cef53464e553b0fb6c3d0e7f2ada1c39a399223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Thu, 3 Oct 2024 12:12:51 +0300 Subject: [PATCH] *: add fanout metadata to Analyze() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add fanout metadata to Analyze() output. Depends on https://github.com/thanos-io/promql-engine/pull/490. Signed-off-by: Giedrius Statkevičius --- pkg/api/query/v1.go | 68 +++++++++++++++++++++++----------------- pkg/query/querier.go | 7 ----- pkg/store/bucket.go | 4 +++ pkg/store/proxy_merge.go | 46 +++++++++++++++++++++++++++ 4 files changed, 90 insertions(+), 35 deletions(-) diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 7d6d85e28d..fff343f61a 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -46,6 +46,7 @@ import ( "github.com/prometheus/prometheus/util/stats" promqlapi "github.com/thanos-io/promql-engine/api" "github.com/thanos-io/promql-engine/engine" + thanosmodel "github.com/thanos-io/promql-engine/execution/model" "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/api" @@ -319,11 +320,12 @@ type queryData struct { type queryTelemetry struct { // TODO(saswatamcode): Replace with engine.TrackedTelemetry once it has exported fields. // TODO(saswatamcode): Add aggregate fields to enrich data. - OperatorName string `json:"name,omitempty"` - Execution string `json:"executionTime,omitempty"` - PeakSamples int64 `json:"peakSamples,omitempty"` - TotalSamples int64 `json:"totalSamples,omitempty"` - Children []queryTelemetry `json:"children,omitempty"` + OperatorName string `json:"name,omitempty"` + Execution string `json:"executionTime,omitempty"` + PeakSamples int64 `json:"peakSamples,omitempty"` + TotalSamples int64 `json:"totalSamples,omitempty"` + FanoutData map[string]map[string]any `json:"fanout_data,omitempty"` + Children []queryTelemetry `json:"children,omitempty"` } func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) { @@ -477,14 +479,8 @@ func (qapi *QueryAPI) getQueryExplain(query promql.Query) (*engine.ExplainOutput } -func (qapi *QueryAPI) parseQueryAnalyzeParam(r *http.Request, query promql.Query) (queryTelemetry, error) { - if r.FormValue(QueryAnalyzeParam) == "true" || r.FormValue(QueryAnalyzeParam) == "1" { - if eq, ok := query.(engine.ExplainableQuery); ok { - return processAnalysis(eq.Analyze()), nil - } - return queryTelemetry{}, errors.Errorf("Query not analyzable; change engine to 'thanos'") - } - return queryTelemetry{}, nil +func (qapi *QueryAPI) parseQueryAnalyzeParam(r *http.Request) bool { + return r.FormValue(QueryAnalyzeParam) == "true" || r.FormValue(QueryAnalyzeParam) == "1" } func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry { @@ -493,6 +489,9 @@ func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry { analysis.Execution = a.OperatorTelemetry.ExecutionTimeTaken().String() analysis.PeakSamples = a.PeakSamples() analysis.TotalSamples = a.TotalSamples() + if fm := a.OperatorTelemetry.FanoutMetadata(); fm != nil { + analysis.FanoutData = fm.GetMetadata() + } for _, c := range a.Children { analysis.Children = append(analysis.Children, processAnalysis(&c)) } @@ -651,7 +650,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, apiErr, func() {} } - engine, _, apiErr := qapi.parseEngineParam(r) + engineParam, _, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -671,13 +670,18 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } + analysis := qapi.parseQueryAnalyzeParam(r) + if analysis { + ctx = thanosmodel.AddMetadataStorage(ctx) + } + var ( qry promql.Query seriesStats []storepb.SeriesStatsCounter ) if err := tracing.DoInSpanWithErr(ctx, "instant_query_create", func(ctx context.Context) error { var err error - qry, err = engine.NewInstantQuery( + qry, err = engineParam.NewInstantQuery( ctx, qapi.queryableCreate( enableDedup, @@ -698,11 +702,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } - analysis, err := qapi.parseQueryAnalyzeParam(r, qry) - if err != nil { - return nil, nil, apiErr, func() {} - } - if err := tracing.DoInSpanWithErr(ctx, "query_gate_ismyturn", qapi.gate.Start); err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, qry.Close } @@ -736,11 +735,17 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro if r.FormValue(Stats) != "" { qs = stats.NewQueryStats(qry.Stats()) } + + var telemetry queryTelemetry + if eq, ok := qry.(engine.ExplainableQuery); ok { + telemetry = processAnalysis(eq.Analyze()) + } + return &queryData{ ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - QueryAnalysis: analysis, + QueryAnalysis: telemetry, }, res.Warnings.AsErrors(), nil, qry.Close } @@ -949,7 +954,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, apiErr, func() {} } - engine, _, apiErr := qapi.parseEngineParam(r) + engineParam, _, apiErr := qapi.parseEngineParam(r) if apiErr != nil { return nil, nil, apiErr, func() {} } @@ -969,6 +974,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } + analysis := qapi.parseQueryAnalyzeParam(r) + if analysis { + ctx = thanosmodel.AddMetadataStorage(ctx) + } + // Record the query range requested. qapi.queryRangeHist.Observe(end.Sub(start).Seconds()) @@ -978,7 +988,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap ) if err := tracing.DoInSpanWithErr(ctx, "range_query_create", func(ctx context.Context) error { var err error - qry, err = engine.NewRangeQuery( + qry, err = engineParam.NewRangeQuery( ctx, qapi.queryableCreate( enableDedup, @@ -1000,10 +1010,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap }); err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {} } - analysis, err := qapi.parseQueryAnalyzeParam(r, qry) - if err != nil { - return nil, nil, apiErr, func() {} - } if err := tracing.DoInSpanWithErr(ctx, "query_gate_ismyturn", qapi.gate.Start); err != nil { return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, qry.Close @@ -1036,11 +1042,17 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap if r.FormValue(Stats) != "" { qs = stats.NewQueryStats(qry.Stats()) } + + var telemetry queryTelemetry + if eq, ok := qry.(engine.ExplainableQuery); ok { + telemetry = processAnalysis(eq.Analyze()) + } + return &queryData{ ResultType: res.Value.Type(), Result: res.Value, Stats: qs, - QueryAnalysis: analysis, + QueryAnalysis: telemetry, }, res.Warnings.AsErrors(), nil, qry.Close } diff --git a/pkg/query/querier.go b/pkg/query/querier.go index e084344ed9..1c5d95fd9e 100644 --- a/pkg/query/querier.go +++ b/pkg/query/querier.go @@ -23,7 +23,6 @@ import ( "github.com/thanos-io/thanos/pkg/gate" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" - "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tracing" ) @@ -260,12 +259,6 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints for i, m := range ms { matchers[i] = m.String() } - tenant := ctx.Value(tenancy.TenantKey) - // The context gets canceled as soon as query evaluation is completed by the engine. - // We want to prevent this from happening for the async store API calls we make while preserving tracing context. - // TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API. - ctx = tracing.CopyTraceContext(context.Background(), ctx) - ctx = context.WithValue(ctx, tenancy.TenantKey, tenant) ctx, cancel := context.WithTimeout(ctx, q.selectTimeout) span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{ "minTime": hints.Start, diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 865ea3878d..b3423e1cba 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1603,9 +1603,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store var resp respSet if s.sortingStrategy == sortingStrategyStore { resp = newEagerRespSet( + gctx, span, 10*time.Minute, blk.meta.ULID.String(), + "", []labels.Labels{blk.extLset}, onClose, blockClient, @@ -1616,9 +1618,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store ) } else { resp = newLazyRespSet( + gctx, span, 10*time.Minute, blk.meta.ULID.String(), + "", []labels.Labels{blk.extLset}, onClose, blockClient, diff --git a/pkg/store/proxy_merge.go b/pkg/store/proxy_merge.go index 29d1e6560a..956dead4b7 100644 --- a/pkg/store/proxy_merge.go +++ b/pkg/store/proxy_merge.go @@ -18,6 +18,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/promql-engine/execution/model" grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware" "github.com/thanos-io/thanos/pkg/losertree" @@ -300,9 +301,11 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse { } func newLazyRespSet( + ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, + storeAddr string, storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, @@ -344,6 +347,24 @@ func newLazyRespSet( l.span.Finish() }() + start := time.Now() + defer func() { + metadataStore := model.GetMetadataStorage(ctx) + if metadataStore == nil { + return + } + metadataStore.SetMetadata( + storeAddr, + map[string]any{ + "duration_seconds": time.Since(start).Seconds(), + "processed_series": seriesStats.Series, + "processed_chunks": seriesStats.Chunks, + "processed_samples": seriesStats.Samples, + "processed_bytes": bytesProcessed, + }, + ) + }() + numResponses := 0 defer func() { if numResponses == 0 { @@ -493,9 +514,11 @@ func newAsyncRespSet( switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( + ctx, span, frameTimeout, st.String(), + storeAddr, st.LabelSets(), closeSeries, cl, @@ -505,9 +528,11 @@ func newAsyncRespSet( ), nil case EagerRetrieval: return newEagerRespSet( + ctx, span, frameTimeout, st.String(), + storeAddr, st.LabelSets(), closeSeries, cl, @@ -556,9 +581,11 @@ type eagerRespSet struct { } func newEagerRespSet( + ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, + storeAddr string, storeLabelSets []labels.Labels, closeSeries context.CancelFunc, cl storepb.Store_SeriesClient, @@ -589,9 +616,28 @@ func newEagerRespSet( // Start a goroutine and immediately buffer everything. go func(l *eagerRespSet) { + start := time.Now() + seriesStats := &storepb.SeriesStatsCounter{} bytesProcessed := 0 + defer func() { + metadataStore := model.GetMetadataStorage(ctx) + if metadataStore == nil { + return + } + metadataStore.SetMetadata( + storeAddr, + map[string]any{ + "duration_seconds": time.Since(start).Seconds(), + "processed_series": seriesStats.Series, + "processed_chunks": seriesStats.Chunks, + "processed_samples": seriesStats.Samples, + "processed_bytes": bytesProcessed, + }, + ) + }() + defer func() { l.span.SetTag("processed.series", seriesStats.Series) l.span.SetTag("processed.chunks", seriesStats.Chunks)