Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable "execute hooks" for worked jobs #122

Closed
dhermes opened this issue Dec 19, 2023 · 7 comments
Closed

Enable "execute hooks" for worked jobs #122

dhermes opened this issue Dec 19, 2023 · 7 comments

Comments

@dhermes
Copy link
Contributor

dhermes commented Dec 19, 2023

Why?

  • Opening a span for tracing and stashing it on the ctx
  • Generic "starting job" logging

(Distinct from https://riverqueue.com/docs/subscriptions because this needs to be synchronous / needs direct access to ctx.)

User experience

Define an optional hooks interface

// WorkerWithHooks is a job worker that provides hooks.
type WorkerWithHooks[T JobArgs] interface {
	PreRun(ctx context.Context, job *Job[T]) (context.Context, error) // Naming inspired by cobra's PreRun
}

and then use a type assertion to check if this optional interface is satisfied by a river.Worker.

Implementation

E.g. in jobExecutor{}.execute() you could make it possible to run a hook like

ctx, err := e.WorkUnit.PreWork(ctx)

And you could invoke in wrapperWorkUnit{}.PreWork() similar to Work()

// noOpPreRun is a `PreRun()` function that does nothing.
func noOpPreRun[T JobArgs](ctx context.Context, _ *Job[T]) (context.Context, error) {
	return ctx, nil
}

func (w *workUnitFactoryWrapper[T]) MakeUnit(jobRow *rivertype.JobRow) workunit.WorkUnit {
	preRun := noOpPreRun[T]
	// To make this type assertion cheaper, it should probably happen when
	// `workUnitFactoryWrapper{}` is created (not when `MakeUnit()` is invoked)
	wh, ok := w.worker.(WorkerWithHooks[T])
	if ok && wh.PreRun != nil {
		preRun = wh.PreRun
	}

	return &wrapperWorkUnit[T]{jobRow: jobRow, worker: w.worker, preRun: preRun}
}

// wrapperWorkUnit implements workUnit for a job and Worker.
type wrapperWorkUnit[T JobArgs] struct {
	job    *Job[T] // not set until after UnmarshalJob is invoked
	jobRow *rivertype.JobRow
	worker Worker[T]
	preRun func(ctx context.Context, job *Job[T]) (context.Context, error)
}
@brandur
Copy link
Contributor

brandur commented Dec 20, 2023

Thanks @dhermes.

I'll have to think about this one a bit more. I do like the idea of providing a reasonably easy way for getting generic telemetry, but would like to avoid the job API from becoming too sprawling and difficult to understand. The addition of one function wouldn't seem to be a problem, but once you've added PreRun, it'd feel kind of weird not to also have PostRun (the latter being somewhat more difficult to implement correctly in case it accidentally caused a failure for a successfully completed job), and things might expand from there. Cobra might act as a good example for what to try and avoid — it's fully featured alright, but has a massive surface area and is difficult to grok.

In some of our internal jobs/handlers, we do something like:

type MyWorker struct {
     servicebase.BaseService
}

func (w *MyWorker) Run(ctx context.Context) {
     ctx := w.BaseStart(ctx)
     defer w.BaseFinish(ctx)

     ...
}

It does take a little more boilerplate in each job/handler, but it's more visible, and has presented very few problems over the years. I'd be tempted to propose a similar convention for River.

@dhermes
Copy link
Contributor Author

dhermes commented Dec 20, 2023

I'll have to think about this one a bit more

SGTM. Thanks for being thoughtful and deliberate! Using the struct-embedding trick will certainly work great for now, thanks.

@dbhoot
Copy link

dbhoot commented May 16, 2024

I'm new to river (and go in general) but also am implementing telemetry in my workers.

They way I'm approaching it is the worker struct has a private tracer and on the work function, the span is created from the tracer. I also create a child logger using the span context so all logging is traceable.

@elee1766
Copy link
Contributor

@dbhoot @dhermes
we settled on this pattern for telemetry

https://gfx.cafe/util/go/-/blob/master/fxriver/traceWorker.go?ref_type=heads
https://gfx.cafe/util/go/-/blob/master/fxriver/hack.go?ref_type=heads

we wrap every worker with a traceworker at registration time with a custom registration struct

https://gfx.cafe/util/go/-/blob/master/fxriver/river.go?ref_type=heads#L153

not sure if we will wrap more hooks in the future.

@dbhoot
Copy link

dbhoot commented Jul 18, 2024

@elee1766

Is my reading of this correct that the trace worker pulls the globally registered tracer from the trace provider? How does this work when telemetry is not enabled (for example a dev environment or local).

I'm essentially doing the same thing -- for the most part. I have a private begin function i call on every Work.

import (
	"context"

	"go.opentelemetry.io/otel/trace"
)

func begin(ictx context.Context, t trace.Tracer, name string) (context.Context, func(...trace.SpanEndOption)) {
	if t == nil {
		return ictx, func(o ...trace.SpanEndOption) {}
	}

	ctx, span := t.Start(ictx, name)
	return ctx, span.End
}
func (w *Someworker) Work(
	ictx context.Context,
	job *river.Job[SomeArgs],
) error {
	ctx, cleanup := begin(ictx, w.tracer, fmt.Sprintf("%s:%d", job.Args.Kind(), job.ID))
	defer cleanup()

	logger := log.LoggerWithTraceContext(w.log, ctx)
...
   
Looks like a fairly straightforward migration

@elee1766
Copy link
Contributor

elee1766 commented Jul 18, 2024

@elee1766

Is my reading of this correct that the trace worker pulls the globally registered tracer from the trace provider? How does this work when telemetry is not enabled (for example a dev environment or local).

I'm essentially doing the same thing -- for the most part. I have a private begin function i call on every Work.

import (
	"context"

	"go.opentelemetry.io/otel/trace"
)

func begin(ictx context.Context, t trace.Tracer, name string) (context.Context, func(...trace.SpanEndOption)) {
	if t == nil {
		return ictx, func(o ...trace.SpanEndOption) {}
	}

	ctx, span := t.Start(ictx, name)
	return ctx, span.End
}
func (w *Someworker) Work(
	ictx context.Context,
	job *river.Job[SomeArgs],
) error {
	ctx, cleanup := begin(ictx, w.tracer, fmt.Sprintf("%s:%d", job.Args.Kind(), job.ID))
	defer cleanup()

	logger := log.LoggerWithTraceContext(w.log, ctx)
...
   
Looks like a fairly straightforward migration

yes exactly, it's using global trace initialized elsewhere. in development, it's all noop tracers if no initialization done.

I wanted to avoid the mistake of forgetting the hook, hence forcibly doing it at registration time.

@bgentry
Copy link
Contributor

bgentry commented Oct 5, 2024

Please check out the job insertion and worker middleware added in #632 / v0.13.0-rc.1. I believe that should solve the use cases I'm aware of here. Please also report back if you try it and it works or doesn't! 🙏

@bgentry bgentry closed this as completed Oct 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants