Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unified insert path for periodic jobs #679

Merged
merged 1 commit into from
Nov 16, 2024

Conversation

bgentry
Copy link
Contributor

@bgentry bgentry commented Nov 16, 2024

Job insert middleware were not being utilized for periodic jobs, resulting in behavioral differences (especially Pro features). This periodic job insertion path has been refactored to utilize the unified insertion path from the client, which also makes these inserts pass through the Pilot instead of going directly to the driver.

Fixes #675.

Job insert middleware were not being utilized for periodic jobs,
resulting in behavioral differences (especially Pro features). This
insertion path has been refactored to rely on the unified insertion path
from the client.

Fixes #675.
@bgentry bgentry requested a review from brandur November 16, 2024 20:30
// insertMany is a shared code path for InsertMany and InsertManyTx, also used
// by the PeriodicJobEnqueuer.
func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nesting in here is getting a bit unwieldy and is probably worth spending some more time to refactor. For now though I want to get this bug fixed quickly due to its impact on pro features.

Comment on lines -353 to -365
for _, result := range results {
if !result.UniqueSkippedAsDuplicate {
queues = append(queues, result.Job.Queue)
}
}
}

if len(queues) > 0 {
if err := s.Config.NotifyInsert(ctx, tx, queues); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error notifying insert", "error", err.Error())
return
}
s.TestSignals.NotifiedQueues.Signal(queues)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this change ended up simplifying some things, like not needing to deal with insert notifications in the PeriodicJobEnqueuer anymore. Now it's all the same insert logic as the Client.

@bgentry bgentry merged commit 9e5cac7 into master Nov 16, 2024
10 checks passed
@bgentry bgentry deleted the bg-middleware-for-periodic-inserts branch November 16, 2024 20:37
tigrato pushed a commit to gravitational/river that referenced this pull request Dec 18, 2024
Job insert middleware were not being utilized for periodic jobs,
resulting in behavioral differences (especially Pro features). This
insertion path has been refactored to rely on the unified insertion path
from the client.

Fixes riverqueue#675.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Middleware for Periodic Jobs (Tracing)
1 participant