-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable "execute hooks" for worked jobs #122
Comments
Thanks @dhermes. I'll have to think about this one a bit more. I do like the idea of providing a reasonably easy way for getting generic telemetry, but would like to avoid the job API from becoming too sprawling and difficult to understand. The addition of one function wouldn't seem to be a problem, but once you've added In some of our internal jobs/handlers, we do something like: type MyWorker struct {
servicebase.BaseService
}
func (w *MyWorker) Run(ctx context.Context) {
ctx := w.BaseStart(ctx)
defer w.BaseFinish(ctx)
...
}
It does take a little more boilerplate in each job/handler, but it's more visible, and has presented very few problems over the years. I'd be tempted to propose a similar convention for River. |
SGTM. Thanks for being thoughtful and deliberate! Using the struct-embedding trick will certainly work great for now, thanks. |
I'm new to river (and go in general) but also am implementing telemetry in my workers. They way I'm approaching it is the worker struct has a private tracer and on the work function, the span is created from the tracer. I also create a child logger using the span context so all logging is traceable. |
@dbhoot @dhermes https://gfx.cafe/util/go/-/blob/master/fxriver/traceWorker.go?ref_type=heads we wrap every worker with a traceworker at registration time with a custom registration struct https://gfx.cafe/util/go/-/blob/master/fxriver/river.go?ref_type=heads#L153 not sure if we will wrap more hooks in the future. |
Is my reading of this correct that the trace worker pulls the globally registered tracer from the trace provider? How does this work when telemetry is not enabled (for example a dev environment or local). I'm essentially doing the same thing -- for the most part. I have a private begin function i call on every Work. import (
"context"
"go.opentelemetry.io/otel/trace"
)
func begin(ictx context.Context, t trace.Tracer, name string) (context.Context, func(...trace.SpanEndOption)) {
if t == nil {
return ictx, func(o ...trace.SpanEndOption) {}
}
ctx, span := t.Start(ictx, name)
return ctx, span.End
} func (w *Someworker) Work(
ictx context.Context,
job *river.Job[SomeArgs],
) error {
ctx, cleanup := begin(ictx, w.tracer, fmt.Sprintf("%s:%d", job.Args.Kind(), job.ID))
defer cleanup()
logger := log.LoggerWithTraceContext(w.log, ctx)
...
Looks like a fairly straightforward migration |
yes exactly, it's using global trace initialized elsewhere. in development, it's all noop tracers if no initialization done. I wanted to avoid the mistake of forgetting the hook, hence forcibly doing it at registration time. |
Please check out the job insertion and worker middleware added in #632 / v0.13.0-rc.1. I believe that should solve the use cases I'm aware of here. Please also report back if you try it and it works or doesn't! 🙏 |
Why?
ctx
(Distinct from https://riverqueue.com/docs/subscriptions because this needs to be synchronous / needs direct access to
ctx
.)User experience
Define an optional hooks interface
and then use a type assertion to check if this optional interface is satisfied by a
river.Worker
.Implementation
E.g. in
jobExecutor{}.execute()
you could make it possible to run a hook likeAnd you could invoke in
wrapperWorkUnit{}.PreWork()
similar toWork()
The text was updated successfully, but these errors were encountered: