Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add middleware system for jobs #632

Merged
merged 1 commit into from
Oct 5, 2024
Merged

Add middleware system for jobs #632

merged 1 commit into from
Oct 5, 2024

Conversation

bgentry
Copy link
Contributor

@bgentry bgentry commented Oct 4, 2024

This is a rebased version of #584 that's been adapted for recent changes including consolidated insert logic and the newer unique jobs implementation.

Here, implement a middleware system that adds middleware functions to job lifecycles, which results in them being invoked during specific phases of a job like as it's being inserted or worked.

The most obvious unlock for this is telemetry (e.g. logging, metrics), but it also acts as a building block for features like encrypted jobs.

There are two middleware interfaces added: JobInsertMiddleware and WorkerMiddleware. A user could implement these on the same struct type for something like telemetry where you want to inject a trace ID at job insertion, and then resurrect it into the context at execution time. In other cases, such as execution-specific logic, it may only make sense to implement one of these.

The Worker[T] type has been extended with a Middleware() method that allows each individual worker type to define its own added lists of middleware which will be run after the global WorkerMiddleware at the client config level. There are also global JobInsertMiddleware on the client config.

@bgentry bgentry requested a review from brandur October 4, 2024 02:19
//
// Returning an error from this function will fail the overarching insert
// operation, even if the inner insertion originally succeeded.
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(ctx context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could still be possible to have a single-job method here for simplicity, but we would need to have some adapter code to make that work. It seemed easier to get this out with a more minimal version that does just what we need for now. Plus JobMiddlewareDefaults helps enough with forward compatibility if we update the interface.

@bgentry bgentry force-pushed the bg-lifecycle-hooks branch from 597216c to 13f0842 Compare October 5, 2024 15:42
client.go Outdated
Comment on lines 1477 to 1478
middlewareItem := c.config.JobMiddleware[i] // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the course of actually using these, it turns out these were both shadowing on subsequent iterations. With just a single middleware it was trying to grab the one at index -1. Realized both of these need to be captured. Probably should figure out some tests to exercise this!

@bgentry bgentry force-pushed the bg-lifecycle-hooks branch from 13f0842 to e495633 Compare October 5, 2024 16:10
job_executor.go Outdated
Comment on lines 221 to 237
if len(e.JobMiddleware) > 0 {
// Wrap middlewares in reverse order so the one defined first is wrapped
// as the outermost function and is first to receive the operation.
for i := len(e.JobMiddleware) - 1; i >= 0; i-- {
middlewareItem := e.JobMiddleware[i] // capture the current middleware item
previousDoInner := doInner // Capture the current doInner function
doInner = func(ctx context.Context) error {
return middlewareItem.Work(ctx, e.JobRow, previousDoInner)
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can come in a subsequent PR since it's an optimization, but I'm realizing we probably don't want to construct this middleware pipeline on every job execution. I would guess it wouldn't be too hard to do this once at the time the client is started or constructed and then just pass jobs into the existing pipeline.

The same is probably true with the insert middleware—we should be able to do that init just once.

@bgentry bgentry force-pushed the bg-lifecycle-hooks branch from e495633 to e190e0e Compare October 5, 2024 19:15
Comment on lines +255 to +273
type JobInsertMiddleware interface {
// InsertMany is invoked around a batch insert operation. Implementations
// must always include a call to doInner to call down the middleware stack
// and perfom the batch insertion, and may run custom code before and after.
//
// Returning an error from this function will fail the overarching insert
// operation, even if the inner insertion originally succeeded.
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error)
}

type WorkerMiddleware interface {
// Work is invoked after a job's JSON args being unmarshaled and before the
// job is worked. Implementations must always include a call to doInner to
// call down the middleware stack and perfom the batch insertion, and may run
// custom code before and after.
//
// Returning an error from this function will fail the overarching work
// operation, even if the inner work originally succeeded.
Work(ctx context.Context, job *JobRow, doInner func(context.Context) error) error
Copy link
Contributor Author

@bgentry bgentry Oct 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added a commit which splits the middleware interface in two. I'm not 100% settled on the design, but I was led here by trying to allow for middleware to be defined per-worker and related to this comment: #584 (comment)

At insert time, we only have a JobArgs type and have no guaranteed access to the Worker. This inherently means that per-job-type job insertion middleware needs to be derived from the JobArgs and not connected to the Worker.

However when looking at Work middleware and its conceptual similarity to a worker's Work function, it can really only make sense for that to be dynamically defined on the stateful Worker[T] type. And if one were returning a general purpose []JobMiddleware from their JobArgs, they would probably expect that the Work() functions on those would actually be used—but again they don't really make sense there rather than on the actual Worker.

As such, I've split the interface. While I don't think this split is ideal, I think it might be an unavoidable consequence of where we ended up on the ancient JobArgs vs Worker discussion from like a year ago 😃

On the plus side, along with this change, I added dynamic worker-level middleware defined by a Middleware(job *Job[T]) []rivertype.WorkerMiddleware function now on the Worker type and WorkerDefaults. The executor calls into this via the work unit to extract the middleware that should be used for this specific job, and it applies those in order after the globally-defined WorkerMiddleware on the client config.

To make this work, I did have to move the arg deserialization to happen before the middleware are called, but I think this is probably ok. This also adds some flexibility if there's a use case out there which demands access to the decoded args struct, because you can use that and the worker's other stateful properties when determining which middleware should be used. Not sure that's realistic or not 🤷‍♂️

@bgentry bgentry force-pushed the bg-lifecycle-hooks branch 2 times, most recently from a3ca8c1 to d39531e Compare October 5, 2024 20:00
Here, implement a middleware system that adds middleware functions to
job lifecycles, which results in them being invoked during specific
phases of a job like as it's being inserted or worked.

The most obvious unlock for this is telemetry (e.g. logging, metrics),
but it also acts as a building block for features like encrypted jobs.

There are two middleware interfaces added: `JobInsertMiddleware` and
`WorkerMiddleware`. A user could implement these on the same struct type
for something like telemetry where you want to inject a trace ID at job
insertion, and then resurrect it into the context at execution time. In
other cases, such as execution-specific logic, it may only make sense to
implement one of these. Each of these interfaces has a corresponding
`*Defaults` struct that can be embedded to provide future-proofing if
additions are made to them for future functionality.

The `Worker[T]` type has been extended with a `Middleware()` method that
allows each individual worker type to define its own added lists of
middleware which will be run _after_ the global `WorkerMiddleware` at
the client config level. There are also global `JobInsertMiddleware` on
the client config.

Co-authored-by: Blake Gentry <[email protected]>
@bgentry bgentry force-pushed the bg-lifecycle-hooks branch from d39531e to 45527c2 Compare October 5, 2024 20:14
//
// Returning an error from this function will fail the overarching insert
// operation, even if the inner insertion originally succeeded.
InsertMany(ctx context.Context, manyParams []*JobInsertParams, doInner func(context.Context) ([]*JobInsertResult, error)) ([]*JobInsertResult, error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@brandur I don't think we ever discussed why these middleware functions don't pass through the manyParams or job as arguments to doInner. I'm guessing it's partly to do with making it easier & cleaner for callers?

I don't think there's a use case for changing out the job struct on the Work variant, but for insertion it's conceivable that one could want to add or remove rows to the insert batch as part of a middleware call (in addition to merely manipulating them before insertion).

Something to consider adjusting as this bakes in an RC release.

@bgentry
Copy link
Contributor Author

bgentry commented Oct 5, 2024

Moving forward on shipping this into an RC release to unblock other work ✌️ We can still make adjustments to it after.

@bgentry bgentry merged commit 0b93275 into master Oct 5, 2024
14 checks passed
@bgentry bgentry deleted the bg-lifecycle-hooks branch October 5, 2024 20:19
tigrato pushed a commit to gravitational/river that referenced this pull request Dec 18, 2024
Here, implement a middleware system that adds middleware functions to
job lifecycles, which results in them being invoked during specific
phases of a job like as it's being inserted or worked.

The most obvious unlock for this is telemetry (e.g. logging, metrics),
but it also acts as a building block for features like encrypted jobs.

There are two middleware interfaces added: `JobInsertMiddleware` and
`WorkerMiddleware`. A user could implement these on the same struct type
for something like telemetry where you want to inject a trace ID at job
insertion, and then resurrect it into the context at execution time. In
other cases, such as execution-specific logic, it may only make sense to
implement one of these. Each of these interfaces has a corresponding
`*Defaults` struct that can be embedded to provide future-proofing if
additions are made to them for future functionality.

The `Worker[T]` type has been extended with a `Middleware()` method that
allows each individual worker type to define its own added lists of
middleware which will be run _after_ the global `WorkerMiddleware` at
the client config level. There are also global `JobInsertMiddleware` on
the client config.

Co-authored-by: Brandur <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants