-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add middleware system for jobs #632
Conversation
rivertype/river_type.go
Outdated
// | ||
// Returning an error from this function will fail the overarching insert | ||
// operation, even if the inner insertion originally succeeded. | ||
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(ctx context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it could still be possible to have a single-job method here for simplicity, but we would need to have some adapter code to make that work. It seemed easier to get this out with a more minimal version that does just what we need for now. Plus JobMiddlewareDefaults
helps enough with forward compatibility if we update the interface.
597216c
to
13f0842
Compare
client.go
Outdated
middlewareItem := c.config.JobMiddleware[i] // capture the current middleware item | ||
previousDoInner := doInner // Capture the current doInner function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the course of actually using these, it turns out these were both shadowing on subsequent iterations. With just a single middleware it was trying to grab the one at index -1. Realized both of these need to be captured. Probably should figure out some tests to exercise this!
13f0842
to
e495633
Compare
job_executor.go
Outdated
if len(e.JobMiddleware) > 0 { | ||
// Wrap middlewares in reverse order so the one defined first is wrapped | ||
// as the outermost function and is first to receive the operation. | ||
for i := len(e.JobMiddleware) - 1; i >= 0; i-- { | ||
middlewareItem := e.JobMiddleware[i] // capture the current middleware item | ||
previousDoInner := doInner // Capture the current doInner function | ||
doInner = func(ctx context.Context) error { | ||
return middlewareItem.Work(ctx, e.JobRow, previousDoInner) | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can come in a subsequent PR since it's an optimization, but I'm realizing we probably don't want to construct this middleware pipeline on every job execution. I would guess it wouldn't be too hard to do this once at the time the client is started or constructed and then just pass jobs into the existing pipeline.
The same is probably true with the insert middleware—we should be able to do that init just once.
e495633
to
e190e0e
Compare
type JobInsertMiddleware interface { | ||
// InsertMany is invoked around a batch insert operation. Implementations | ||
// must always include a call to doInner to call down the middleware stack | ||
// and perfom the batch insertion, and may run custom code before and after. | ||
// | ||
// Returning an error from this function will fail the overarching insert | ||
// operation, even if the inner insertion originally succeeded. | ||
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) | ||
} | ||
|
||
type WorkerMiddleware interface { | ||
// Work is invoked after a job's JSON args being unmarshaled and before the | ||
// job is worked. Implementations must always include a call to doInner to | ||
// call down the middleware stack and perfom the batch insertion, and may run | ||
// custom code before and after. | ||
// | ||
// Returning an error from this function will fail the overarching work | ||
// operation, even if the inner work originally succeeded. | ||
Work(ctx context.Context, job *JobRow, doInner func(context.Context) error) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just added a commit which splits the middleware interface in two. I'm not 100% settled on the design, but I was led here by trying to allow for middleware to be defined per-worker and related to this comment: #584 (comment)
At insert time, we only have a JobArgs
type and have no guaranteed access to the Worker
. This inherently means that per-job-type job insertion middleware needs to be derived from the JobArgs
and not connected to the Worker
.
However when looking at Work
middleware and its conceptual similarity to a worker's Work
function, it can really only make sense for that to be dynamically defined on the stateful Worker[T]
type. And if one were returning a general purpose []JobMiddleware
from their JobArgs
, they would probably expect that the Work()
functions on those would actually be used—but again they don't really make sense there rather than on the actual Worker
.
As such, I've split the interface. While I don't think this split is ideal, I think it might be an unavoidable consequence of where we ended up on the ancient JobArgs
vs Worker
discussion from like a year ago 😃
On the plus side, along with this change, I added dynamic worker-level middleware defined by a Middleware(job *Job[T]) []rivertype.WorkerMiddleware
function now on the Worker
type and WorkerDefaults
. The executor calls into this via the work unit to extract the middleware that should be used for this specific job, and it applies those in order after the globally-defined WorkerMiddleware
on the client config.
To make this work, I did have to move the arg deserialization to happen before the middleware are called, but I think this is probably ok. This also adds some flexibility if there's a use case out there which demands access to the decoded args struct, because you can use that and the worker's other stateful properties when determining which middleware should be used. Not sure that's realistic or not 🤷♂️
a3ca8c1
to
d39531e
Compare
Here, implement a middleware system that adds middleware functions to job lifecycles, which results in them being invoked during specific phases of a job like as it's being inserted or worked. The most obvious unlock for this is telemetry (e.g. logging, metrics), but it also acts as a building block for features like encrypted jobs. There are two middleware interfaces added: `JobInsertMiddleware` and `WorkerMiddleware`. A user could implement these on the same struct type for something like telemetry where you want to inject a trace ID at job insertion, and then resurrect it into the context at execution time. In other cases, such as execution-specific logic, it may only make sense to implement one of these. Each of these interfaces has a corresponding `*Defaults` struct that can be embedded to provide future-proofing if additions are made to them for future functionality. The `Worker[T]` type has been extended with a `Middleware()` method that allows each individual worker type to define its own added lists of middleware which will be run _after_ the global `WorkerMiddleware` at the client config level. There are also global `JobInsertMiddleware` on the client config. Co-authored-by: Blake Gentry <[email protected]>
d39531e
to
45527c2
Compare
// | ||
// Returning an error from this function will fail the overarching insert | ||
// operation, even if the inner insertion originally succeeded. | ||
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@brandur I don't think we ever discussed why these middleware functions don't pass through the manyParams
or job
as arguments to doInner
. I'm guessing it's partly to do with making it easier & cleaner for callers?
I don't think there's a use case for changing out the job struct on the Work
variant, but for insertion it's conceivable that one could want to add or remove rows to the insert batch as part of a middleware call (in addition to merely manipulating them before insertion).
Something to consider adjusting as this bakes in an RC release.
Moving forward on shipping this into an RC release to unblock other work ✌️ We can still make adjustments to it after. |
Here, implement a middleware system that adds middleware functions to job lifecycles, which results in them being invoked during specific phases of a job like as it's being inserted or worked. The most obvious unlock for this is telemetry (e.g. logging, metrics), but it also acts as a building block for features like encrypted jobs. There are two middleware interfaces added: `JobInsertMiddleware` and `WorkerMiddleware`. A user could implement these on the same struct type for something like telemetry where you want to inject a trace ID at job insertion, and then resurrect it into the context at execution time. In other cases, such as execution-specific logic, it may only make sense to implement one of these. Each of these interfaces has a corresponding `*Defaults` struct that can be embedded to provide future-proofing if additions are made to them for future functionality. The `Worker[T]` type has been extended with a `Middleware()` method that allows each individual worker type to define its own added lists of middleware which will be run _after_ the global `WorkerMiddleware` at the client config level. There are also global `JobInsertMiddleware` on the client config. Co-authored-by: Brandur <[email protected]>
This is a rebased version of #584 that's been adapted for recent changes including consolidated insert logic and the newer unique jobs implementation.
Here, implement a middleware system that adds middleware functions to job lifecycles, which results in them being invoked during specific phases of a job like as it's being inserted or worked.
The most obvious unlock for this is telemetry (e.g. logging, metrics), but it also acts as a building block for features like encrypted jobs.
There are two middleware interfaces added:
JobInsertMiddleware
andWorkerMiddleware
. A user could implement these on the same struct type for something like telemetry where you want to inject a trace ID at job insertion, and then resurrect it into the context at execution time. In other cases, such as execution-specific logic, it may only make sense to implement one of these.The
Worker[T]
type has been extended with aMiddleware()
method that allows each individual worker type to define its own added lists of middleware which will be run after the globalWorkerMiddleware
at the client config level. There are also globalJobInsertMiddleware
on the client config.