Skip to content

Commit

Permalink
engine: allow to override opts at query time (#501)
Browse files Browse the repository at this point in the history
* engine: allow to override opts at query time

Signed-off-by: Michael Hoffmann <[email protected]>

* engine: add more constructors

Signed-off-by: Michael Hoffmann <[email protected]>

---------

Signed-off-by: Michael Hoffmann <[email protected]>
Co-authored-by: Michael Hoffmann <[email protected]>
  • Loading branch information
MichaHoffmann and Michael Hoffmann authored Dec 16, 2024
1 parent 2f49f80 commit 8a0ffdb
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 65 deletions.
30 changes: 19 additions & 11 deletions engine/distributed.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,45 @@ func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) *Distributed
func (l DistributedEngine) SetQueryLogger(log promql.QueryLogger) {}

func (l DistributedEngine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return l.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
}

func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
return l.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, interval)
}

func (l DistributedEngine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)

return l.remoteEngine.NewInstantQuery(ctx, q, opts, qs, ts)
return l.remoteEngine.MakeInstantQueryFromPlan(ctx, q, opts, plan, ts)
}

func (l DistributedEngine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (l DistributedEngine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

return l.remoteEngine.NewRangeQuery(ctx, q, opts, qs, start, end, interval)
return l.remoteEngine.MakeRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
}

func (l DistributedEngine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, start, end time.Time, interval time.Duration) (promql.Query, error) {
func (l DistributedEngine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)
ts = ts.Truncate(time.Second)

return l.remoteEngine.NewRangeQueryFromPlan(ctx, q, opts, plan, start, end, interval)
return l.remoteEngine.MakeInstantQuery(ctx, q, opts, qs, ts)
}

func (l DistributedEngine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, plan logicalplan.Node, ts time.Time) (promql.Query, error) {
func (l DistributedEngine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) {
// Truncate milliseconds to avoid mismatch in timestamps between remote and local engines.
// Some clients might only support second precision when executing queries.
ts = ts.Truncate(time.Second)
start = start.Truncate(time.Second)
end = end.Truncate(time.Second)
interval = interval.Truncate(time.Second)

return l.remoteEngine.NewInstantQueryFromPlan(ctx, q, opts, plan, ts)
return l.remoteEngine.MakeRangeQuery(ctx, q, opts, qs, start, end, interval)
}
113 changes: 59 additions & 54 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,29 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer {
return optimizers
}

// QueryOpts implements promql.QueryOpts but allows to override more engine default options.
type QueryOpts struct {
lookbackDelta time.Duration

enablePerStepStats bool

// DecodingConcurrency can be used to override the DecodingConcurrency engine setting.
DecodingConcurrency int
}

func (opts QueryOpts) LookbackDelta() time.Duration { return opts.lookbackDelta }
func (opts QueryOpts) EnablePerStepStats() bool { return opts.enablePerStepStats }

func fromPromQLOpts(opts promql.QueryOpts) *QueryOpts {
if opts == nil {
return &QueryOpts{}
}
return &QueryOpts{
lookbackDelta: opts.LookbackDelta(),
enablePerStepStats: opts.EnablePerStepStats(),
}
}

// New creates a new query engine with the given options. The query engine will
// use the storage passed in NewInstantQuery and NewRangeQuery for retrieving
// data when executing queries.
Expand Down Expand Up @@ -228,42 +251,17 @@ type Engine struct {
noStepSubqueryIntervalFn func(time.Duration) time.Duration
}

func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

func (e *Engine) MakeInstantQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, ts time.Time) (promql.Query, error) {
expr, err := parser.NewParser(qs, parser.WithFunctions(e.functions)).ParseExpr()
if err != nil {
return nil, err
}

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}

// determine sorting order before optimizers run, we do this by looking for "sort"
// and "sort_desc" and optimize them away afterwards since they are only needed at
// the presentation layer and not when computing the results.
resultSort := newResultSort(expr)

qOpts := &query.Options{
Start: ts,
End: ts,
Step: 0,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
}
Expand Down Expand Up @@ -301,20 +299,13 @@ func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts
}, nil
}

func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
func (e *Engine) MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, ts time.Time) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}

qOpts := e.makeQueryOpts(ts, ts, 0, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -355,7 +346,7 @@ func (e *Engine) NewInstantQueryFromPlan(ctx context.Context, q storage.Queryabl
}, nil
}

func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQuery(ctx context.Context, q storage.Queryable, opts *QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, qs)
if err != nil {
return nil, err
Expand All @@ -371,12 +362,6 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
if expr.Type() != parser.ValueTypeVector && expr.Type() != parser.ValueTypeScalar {
return nil, errors.Newf("invalid expression type %q for range query, must be Scalar or instant Vector", parser.DocumentedType(expr.Type()))
}
if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -413,19 +398,13 @@ func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts pr
}, nil
}

func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
func (e *Engine) MakeRangeQueryFromPlan(ctx context.Context, q storage.Queryable, opts *QueryOpts, root logicalplan.Node, start, end time.Time, step time.Duration) (promql.Query, error) {
idx, err := e.activeQueryTracker.Insert(ctx, root.String())
if err != nil {
return nil, err
}
defer e.activeQueryTracker.Delete(idx)

if opts == nil {
opts = promql.NewPrometheusQueryOpts(false, e.lookbackDelta)
}
if opts.LookbackDelta() <= 0 {
opts = promql.NewPrometheusQueryOpts(opts.EnablePerStepStats(), e.lookbackDelta)
}
qOpts := e.makeQueryOpts(start, end, step, opts)
if qOpts.StepsBatch > 64 {
return nil, ErrStepsBatchTooLarge
Expand Down Expand Up @@ -461,20 +440,46 @@ func (e *Engine) NewRangeQueryFromPlan(ctx context.Context, q storage.Queryable,
}, nil
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts promql.QueryOpts) *query.Options {
qOpts := &query.Options{
// PromQL compatibility constructors

// NewInstantQuery implements the promql.Engine interface.
func (e *Engine) NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) {
return e.MakeInstantQuery(ctx, q, fromPromQLOpts(opts), qs, ts)
}

// NewRangeQuery implements the promql.Engine interface.
func (e *Engine) NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, step time.Duration) (promql.Query, error) {
return e.MakeRangeQuery(ctx, q, fromPromQLOpts(opts), qs, start, end, step)
}

func (e *Engine) makeQueryOpts(start time.Time, end time.Time, step time.Duration, opts *QueryOpts) *query.Options {
res := &query.Options{
Start: start,
End: end,
Step: step,
StepsBatch: stepsBatch,
LookbackDelta: opts.LookbackDelta(),
EnablePerStepStats: e.enablePerStepStats && opts.EnablePerStepStats(),
LookbackDelta: e.lookbackDelta,
EnablePerStepStats: e.enablePerStepStats,
ExtLookbackDelta: e.extLookbackDelta,
EnableAnalysis: e.enableAnalysis,
NoStepSubqueryIntervalFn: e.noStepSubqueryIntervalFn,
DecodingConcurrency: e.decodingConcurrency,
}
return qOpts
if opts == nil {
return res
}

if opts.LookbackDelta() > 0 {
res.LookbackDelta = opts.LookbackDelta()
}
if opts.EnablePerStepStats() {
res.EnablePerStepStats = opts.EnablePerStepStats()
}

if opts.DecodingConcurrency != 0 {
res.DecodingConcurrency = opts.DecodingConcurrency
}
return res
}

func (e *Engine) storageScanners(queryable storage.Queryable, qOpts *query.Options, lplan logicalplan.Plan) (engstorage.Scanners, error) {
Expand Down

0 comments on commit 8a0ffdb

Please sign in to comment.