Skip to content

Commit

Permalink
*: add fanout metadata to Analyze()
Browse files Browse the repository at this point in the history
Add fanout metadata to Analyze() output. Depends on
thanos-io/promql-engine#490.

Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS committed Oct 3, 2024
1 parent f265c3b commit 0cef534
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 35 deletions.
68 changes: 40 additions & 28 deletions pkg/api/query/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/prometheus/prometheus/util/stats"
promqlapi "github.com/thanos-io/promql-engine/api"
"github.com/thanos-io/promql-engine/engine"
thanosmodel "github.com/thanos-io/promql-engine/execution/model"
"github.com/thanos-io/promql-engine/logicalplan"

"github.com/thanos-io/thanos/pkg/api"
Expand Down Expand Up @@ -319,11 +320,12 @@ type queryData struct {
type queryTelemetry struct {
// TODO(saswatamcode): Replace with engine.TrackedTelemetry once it has exported fields.
// TODO(saswatamcode): Add aggregate fields to enrich data.
OperatorName string `json:"name,omitempty"`
Execution string `json:"executionTime,omitempty"`
PeakSamples int64 `json:"peakSamples,omitempty"`
TotalSamples int64 `json:"totalSamples,omitempty"`
Children []queryTelemetry `json:"children,omitempty"`
OperatorName string `json:"name,omitempty"`
Execution string `json:"executionTime,omitempty"`
PeakSamples int64 `json:"peakSamples,omitempty"`
TotalSamples int64 `json:"totalSamples,omitempty"`
FanoutData map[string]map[string]any `json:"fanout_data,omitempty"`
Children []queryTelemetry `json:"children,omitempty"`
}

func (qapi *QueryAPI) parseEnableDedupParam(r *http.Request) (enableDeduplication bool, _ *api.ApiError) {
Expand Down Expand Up @@ -477,14 +479,8 @@ func (qapi *QueryAPI) getQueryExplain(query promql.Query) (*engine.ExplainOutput

}

func (qapi *QueryAPI) parseQueryAnalyzeParam(r *http.Request, query promql.Query) (queryTelemetry, error) {
if r.FormValue(QueryAnalyzeParam) == "true" || r.FormValue(QueryAnalyzeParam) == "1" {
if eq, ok := query.(engine.ExplainableQuery); ok {
return processAnalysis(eq.Analyze()), nil
}
return queryTelemetry{}, errors.Errorf("Query not analyzable; change engine to 'thanos'")
}
return queryTelemetry{}, nil
func (qapi *QueryAPI) parseQueryAnalyzeParam(r *http.Request) bool {
return r.FormValue(QueryAnalyzeParam) == "true" || r.FormValue(QueryAnalyzeParam) == "1"
}

func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry {
Expand All @@ -493,6 +489,9 @@ func processAnalysis(a *engine.AnalyzeOutputNode) queryTelemetry {
analysis.Execution = a.OperatorTelemetry.ExecutionTimeTaken().String()
analysis.PeakSamples = a.PeakSamples()
analysis.TotalSamples = a.TotalSamples()
if fm := a.OperatorTelemetry.FanoutMetadata(); fm != nil {

Check failure on line 492 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

a.OperatorTelemetry.FanoutMetadata undefined (type "github.com/thanos-io/promql-engine/execution/model".OperatorTelemetry has no field or method FanoutMetadata)

Check failure on line 492 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

a.OperatorTelemetry.FanoutMetadata undefined (type "github.com/thanos-io/promql-engine/execution/model".OperatorTelemetry has no field or method FanoutMetadata)
analysis.FanoutData = fm.GetMetadata()
}
for _, c := range a.Children {
analysis.Children = append(analysis.Children, processAnalysis(&c))
}
Expand Down Expand Up @@ -651,7 +650,7 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, apiErr, func() {}
}

engine, _, apiErr := qapi.parseEngineParam(r)
engineParam, _, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand All @@ -671,13 +670,18 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

analysis := qapi.parseQueryAnalyzeParam(r)
if analysis {
ctx = thanosmodel.AddMetadataStorage(ctx)

Check failure on line 675 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: thanosmodel.AddMetadataStorage

Check failure on line 675 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: thanosmodel.AddMetadataStorage
}

var (
qry promql.Query
seriesStats []storepb.SeriesStatsCounter
)
if err := tracing.DoInSpanWithErr(ctx, "instant_query_create", func(ctx context.Context) error {
var err error
qry, err = engine.NewInstantQuery(
qry, err = engineParam.NewInstantQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -698,11 +702,6 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

analysis, err := qapi.parseQueryAnalyzeParam(r, qry)
if err != nil {
return nil, nil, apiErr, func() {}
}

if err := tracing.DoInSpanWithErr(ctx, "query_gate_ismyturn", qapi.gate.Start); err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, qry.Close
}
Expand Down Expand Up @@ -736,11 +735,17 @@ func (qapi *QueryAPI) query(r *http.Request) (interface{}, []error, *api.ApiErro
if r.FormValue(Stats) != "" {
qs = stats.NewQueryStats(qry.Stats())
}

var telemetry queryTelemetry
if eq, ok := qry.(engine.ExplainableQuery); ok {
telemetry = processAnalysis(eq.Analyze())
}

return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
QueryAnalysis: analysis,
QueryAnalysis: telemetry,
}, res.Warnings.AsErrors(), nil, qry.Close
}

Expand Down Expand Up @@ -949,7 +954,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, apiErr, func() {}
}

engine, _, apiErr := qapi.parseEngineParam(r)
engineParam, _, apiErr := qapi.parseEngineParam(r)
if apiErr != nil {
return nil, nil, apiErr, func() {}
}
Expand All @@ -969,6 +974,11 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}

analysis := qapi.parseQueryAnalyzeParam(r)
if analysis {
ctx = thanosmodel.AddMetadataStorage(ctx)

Check failure on line 979 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: thanosmodel.AddMetadataStorage

Check failure on line 979 in pkg/api/query/v1.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: thanosmodel.AddMetadataStorage
}

// Record the query range requested.
qapi.queryRangeHist.Observe(end.Sub(start).Seconds())

Expand All @@ -978,7 +988,7 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
)
if err := tracing.DoInSpanWithErr(ctx, "range_query_create", func(ctx context.Context) error {
var err error
qry, err = engine.NewRangeQuery(
qry, err = engineParam.NewRangeQuery(
ctx,
qapi.queryableCreate(
enableDedup,
Expand All @@ -1000,10 +1010,6 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
}); err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}
analysis, err := qapi.parseQueryAnalyzeParam(r, qry)
if err != nil {
return nil, nil, apiErr, func() {}
}

if err := tracing.DoInSpanWithErr(ctx, "query_gate_ismyturn", qapi.gate.Start); err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorExec, Err: err}, qry.Close
Expand Down Expand Up @@ -1036,11 +1042,17 @@ func (qapi *QueryAPI) queryRange(r *http.Request) (interface{}, []error, *api.Ap
if r.FormValue(Stats) != "" {
qs = stats.NewQueryStats(qry.Stats())
}

var telemetry queryTelemetry
if eq, ok := qry.(engine.ExplainableQuery); ok {
telemetry = processAnalysis(eq.Analyze())
}

return &queryData{
ResultType: res.Value.Type(),
Result: res.Value,
Stats: qs,
QueryAnalysis: analysis,
QueryAnalysis: telemetry,
}, res.Warnings.AsErrors(), nil, qry.Close
}

Expand Down
7 changes: 0 additions & 7 deletions pkg/query/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/tenancy"
"github.com/thanos-io/thanos/pkg/tracing"
)

Expand Down Expand Up @@ -260,12 +259,6 @@ func (q *querier) Select(ctx context.Context, _ bool, hints *storage.SelectHints
for i, m := range ms {
matchers[i] = m.String()
}
tenant := ctx.Value(tenancy.TenantKey)
// The context gets canceled as soon as query evaluation is completed by the engine.
// We want to prevent this from happening for the async store API calls we make while preserving tracing context.
// TODO(bwplotka): Does the above still is true? It feels weird to leave unfinished calls behind query API.
ctx = tracing.CopyTraceContext(context.Background(), ctx)
ctx = context.WithValue(ctx, tenancy.TenantKey, tenant)
ctx, cancel := context.WithTimeout(ctx, q.selectTimeout)
span, ctx := tracing.StartSpan(ctx, "querier_select", opentracing.Tags{
"minTime": hints.Start,
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1603,9 +1603,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
gctx,
span,
10*time.Minute,
blk.meta.ULID.String(),
"",
[]labels.Labels{blk.extLset},
onClose,
blockClient,
Expand All @@ -1616,9 +1618,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
gctx,
span,
10*time.Minute,
blk.meta.ULID.String(),
"",
[]labels.Labels{blk.extLset},
onClose,
blockClient,
Expand Down
46 changes: 46 additions & 0 deletions pkg/store/proxy_merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/promql-engine/execution/model"
grpc_opentracing "github.com/thanos-io/thanos/pkg/tracing/tracing_middleware"

"github.com/thanos-io/thanos/pkg/losertree"
Expand Down Expand Up @@ -300,9 +301,11 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse {
}

func newLazyRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
storeAddr string,
storeLabelSets []labels.Labels,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
Expand Down Expand Up @@ -344,6 +347,24 @@ func newLazyRespSet(
l.span.Finish()
}()

start := time.Now()
defer func() {
metadataStore := model.GetMetadataStorage(ctx)

Check failure on line 352 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

undefined: model.GetMetadataStorage

Check failure on line 352 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: model.GetMetadataStorage

Check failure on line 352 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: model.GetMetadataStorage

Check failure on line 352 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Documentation check

undefined: model.GetMetadataStorage

Check failure on line 352 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Go build for different platforms

undefined: model.GetMetadataStorage
if metadataStore == nil {
return
}
metadataStore.SetMetadata(
storeAddr,
map[string]any{
"duration_seconds": time.Since(start).Seconds(),
"processed_series": seriesStats.Series,
"processed_chunks": seriesStats.Chunks,
"processed_samples": seriesStats.Samples,
"processed_bytes": bytesProcessed,
},
)
}()

numResponses := 0
defer func() {
if numResponses == 0 {
Expand Down Expand Up @@ -493,9 +514,11 @@ func newAsyncRespSet(
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
ctx,
span,
frameTimeout,
st.String(),
storeAddr,
st.LabelSets(),
closeSeries,
cl,
Expand All @@ -505,9 +528,11 @@ func newAsyncRespSet(
), nil
case EagerRetrieval:
return newEagerRespSet(
ctx,
span,
frameTimeout,
st.String(),
storeAddr,
st.LabelSets(),
closeSeries,
cl,
Expand Down Expand Up @@ -556,9 +581,11 @@ type eagerRespSet struct {
}

func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
storeAddr string,
storeLabelSets []labels.Labels,
closeSeries context.CancelFunc,
cl storepb.Store_SeriesClient,
Expand Down Expand Up @@ -589,9 +616,28 @@ func newEagerRespSet(

// Start a goroutine and immediately buffer everything.
go func(l *eagerRespSet) {
start := time.Now()

seriesStats := &storepb.SeriesStatsCounter{}
bytesProcessed := 0

defer func() {
metadataStore := model.GetMetadataStorage(ctx)

Check failure on line 625 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

undefined: model.GetMetadataStorage

Check failure on line 625 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: model.GetMetadataStorage

Check failure on line 625 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

undefined: model.GetMetadataStorage

Check failure on line 625 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Documentation check

undefined: model.GetMetadataStorage

Check failure on line 625 in pkg/store/proxy_merge.go

View workflow job for this annotation

GitHub Actions / Go build for different platforms

undefined: model.GetMetadataStorage
if metadataStore == nil {
return
}
metadataStore.SetMetadata(
storeAddr,
map[string]any{
"duration_seconds": time.Since(start).Seconds(),
"processed_series": seriesStats.Series,
"processed_chunks": seriesStats.Chunks,
"processed_samples": seriesStats.Samples,
"processed_bytes": bytesProcessed,
},
)
}()

defer func() {
l.span.SetTag("processed.series", seriesStats.Series)
l.span.SetTag("processed.chunks", seriesStats.Chunks)
Expand Down

0 comments on commit 0cef534

Please sign in to comment.