diff --git a/engine/distributed.go b/engine/distributed.go index fef42438..a7c73782 100644 --- a/engine/distributed.go +++ b/engine/distributed.go @@ -69,37 +69,45 @@ func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *Distributed func (l DistributedEngine) SetQueryLogger(log promql.QueryLogger) {} func (l DistributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + return l.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts) +} + +func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { + return l.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval) +} + +func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) { // Truncate milliseconds to avoid mismatch in timestamps between remote and local engines. // Some clients might only support second precision when executing queries. ts = ts.Truncate(time.Second) - return l.remoteEngine.NewInstantQuery(ctx, q, opts, qs, ts) + return l.remoteEngine.MakeInstantQueryFromPlan(ctx, q, opts, plan, ts) } -func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { +func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) { // Truncate milliseconds to avoid mismatch in timestamps between remote and local engines. // Some clients might only support second precision when executing queries. start = start.Truncate(time.Second) end = end.Truncate(time.Second) interval = interval.Truncate(time.Second) - return l.remoteEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval) + return l.remoteEngine.MakeRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval) } -func (l DistributedEngine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) { +func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) { // Truncate milliseconds to avoid mismatch in timestamps between remote and local engines. // Some clients might only support second precision when executing queries. - start = start.Truncate(time.Second) - end = end.Truncate(time.Second) - interval = interval.Truncate(time.Second) + ts = ts.Truncate(time.Second) - return l.remoteEngine.NewRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval) + return l.remoteEngine.MakeInstantQuery(ctx, q, opts, qs, ts) } -func (l DistributedEngine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) { +func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { // Truncate milliseconds to avoid mismatch in timestamps between remote and local engines. // Some clients might only support second precision when executing queries. - ts = ts.Truncate(time.Second) + start = start.Truncate(time.Second) + end = end.Truncate(time.Second) + interval = interval.Truncate(time.Second) - return l.remoteEngine.NewInstantQueryFromPlan(ctx, q, opts, plan, ts) + return l.remoteEngine.MakeRangeQuery(ctx, q, opts, qs, start, end, interval) } diff --git a/engine/engine.go b/engine/engine.go index 6a564854..226374ad 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -96,6 +96,29 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer { return optimizers } +// QueryOpts implements promql.QueryOpts but allows to override more engine default options. +type QueryOpts struct { + lookbackDelta time.Duration + + enablePerStepStats bool + + // DecodingConcurrency can be used to override the DecodingConcurrency engine setting. + DecodingConcurrency int +} + +func (opts QueryOpts) LookbackDelta() time.Duration { return opts.lookbackDelta } +func (opts QueryOpts) EnablePerStepStats() bool { return opts.enablePerStepStats } + +func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts { + if opts == nil { + return &QueryOpts{} + } + return &QueryOpts{ + lookbackDelta: opts.LookbackDelta(), + enablePerStepStats: opts.EnablePerStepStats(), + } +} + // New creates a new query engine with the given options. The query engine will // use the storage passed in NewInstantQuery and NewRangeQuery for retrieving // data when executing queries. @@ -228,42 +251,17 @@ type Engine struct { noStepSubqueryIntervalFn func(time.Duration) time.Duration } -func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { - idx, err := e.activeQueryTracker.Insert(ctx, qs) - if err != nil { - return nil, err - } - defer e.activeQueryTracker.Delete(idx) - +func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) { expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr() if err != nil { return nil, err } - - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } - // determine sorting order before optimizers run, we do this by looking for "sort" // and "sort_desc" and optimize them away afterwards since they are only needed at // the presentation layer and not when computing the results. resultSort := newResultSort(expr) - qOpts := &query.Options{ - Start: ts, - End: ts, - Step: 0, - StepsBatch: stepsBatch, - LookbackDelta: opts.LookbackDelta(), - EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), - ExtLookbackDelta: e.extLookbackDelta, - EnableAnalysis: e.enableAnalysis, - NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, - DecodingConcurrency: e.decodingConcurrency, - } + qOpts := e.makeQueryOpts(ts, ts, 0, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge } @@ -301,20 +299,13 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts }, nil } -func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) { +func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, root.String()) if err != nil { return nil, err } defer e.activeQueryTracker.Delete(idx) - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } - qOpts := e.makeQueryOpts(ts, ts, 0, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -355,7 +346,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl }, nil } -func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { +func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, qs) if err != nil { return nil, err @@ -371,12 +362,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar { return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type())) } - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } qOpts := e.makeQueryOpts(start, end, step, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -413,19 +398,13 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr }, nil } -func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) { +func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) { idx, err := e.activeQueryTracker.Insert(ctx, root.String()) if err != nil { return nil, err } defer e.activeQueryTracker.Delete(idx) - if opts == nil { - opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta) - } - if opts.LookbackDelta() <= 0 { - opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta) - } qOpts := e.makeQueryOpts(start, end, step, opts) if qOpts.StepsBatch > 64 { return nil, ErrStepsBatchTooLarge @@ -461,20 +440,46 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, }, nil } -func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts promql.QueryOpts) *query.Options { - qOpts := &query.Options{ +// PromQL compatibility constructors + +// NewInstantQuery implements the promql.Engine interface. +func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { + return e.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts) +} + +// NewRangeQuery implements the promql.Engine interface. +func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) { + return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step) +} + +func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options { + res := &query.Options{ Start: start, End: end, Step: step, StepsBatch: stepsBatch, - LookbackDelta: opts.LookbackDelta(), - EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(), + LookbackDelta: e.lookbackDelta, + EnablePerStepStats: e.enablePerStepStats, ExtLookbackDelta: e.extLookbackDelta, EnableAnalysis: e.enableAnalysis, NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn, DecodingConcurrency: e.decodingConcurrency, } - return qOpts + if opts == nil { + return res + } + + if opts.LookbackDelta() > 0 { + res.LookbackDelta = opts.LookbackDelta() + } + if opts.EnablePerStepStats() { + res.EnablePerStepStats = opts.EnablePerStepStats() + } + + if opts.DecodingConcurrency != 0 { + res.DecodingConcurrency = opts.DecodingConcurrency + } + return res } func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {