Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unified insert path for periodic jobs #679

Merged
merged 1 commit into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed

- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678).
- Job insert middleware were not being utilized for periodic jobs. This insertion path has been refactored to rely on the unified insertion path from the client. Fixes #675. [PR #679](https://github.com/riverqueue/river/pull/679).

## [0.14.1] - 2024-11-04

Expand Down
41 changes: 28 additions & 13 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
{
periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(archetype, &maintenance.PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
NotifyInsert: client.maybeNotifyInsertForQueues,
Insert: client.insertMany,
}, driver.GetExecutor())
maintenanceServices = append(maintenanceServices, periodicJobEnqueuer)
client.testSignals.periodicJobEnqueuer = &periodicJobEnqueuer.TestSignals
Expand Down Expand Up @@ -1335,7 +1335,7 @@ func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *

func (c *Client[TTx]) insert(ctx context.Context, tx riverdriver.ExecutorTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
params := []InsertManyParams{{Args: args, InsertOpts: opts}}
results, err := c.insertMany(ctx, tx, params)
results, err := c.validateParamsAndInsertMany(ctx, tx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
}
defer tx.Rollback(ctx)

inserted, err := c.insertMany(ctx, tx, params)
inserted, err := c.validateParamsAndInsertMany(ctx, tx, params)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1421,11 +1421,26 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertManyTx(ctx context.Context, tx TTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
exec := c.driver.UnwrapExecutor(tx)
return c.insertMany(ctx, exec, params)
return c.validateParamsAndInsertMany(ctx, exec, params)
}

func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
// validateParamsAndInsertMany is a helper method that wraps the insertMany
// method to provide param validation and conversion prior to calling the actual
// insertMany method. This allows insertMany to be reused by the
// PeriodicJobEnqueuer which cannot reference top-level river package types.
func (c *Client[TTx]) validateParamsAndInsertMany(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return nil, err
}

return c.insertMany(ctx, tx, insertParams)
}

// insertMany is a shared code path for InsertMany and InsertManyTx, also used
// by the PeriodicJobEnqueuer.
func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error) {
return c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nesting in here is getting a bit unwieldy and is probably worth spending some more time to refactor. For now though I want to get this bug fixed quickly due to its impact on pro features.

results, err := c.pilot.JobInsertMany(ctx, tx, insertParams)
if err != nil {
return nil, err
Expand All @@ -1446,14 +1461,9 @@ func (c *Client[TTx]) insertMany(ctx context.Context, tx riverdriver.ExecutorTx,
func (c *Client[TTx]) insertManyShared(
ctx context.Context,
tx riverdriver.ExecutorTx,
rawParams []InsertManyParams,
insertParams []*rivertype.JobInsertParams,
execute func(context.Context, []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error),
) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(rawParams)
if err != nil {
return nil, err
}

doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) {
finalInsertParams := sliceutil.Map(insertParams, func(params *rivertype.JobInsertParams) *riverdriver.JobInsertFastParams {
return (*riverdriver.JobInsertFastParams)(params)
Expand Down Expand Up @@ -1584,7 +1594,12 @@ func (c *Client[TTx]) InsertManyFastTx(ctx context.Context, tx TTx, params []Ins
}

func (c *Client[TTx]) insertManyFast(ctx context.Context, tx riverdriver.ExecutorTx, params []InsertManyParams) (int, error) {
results, err := c.insertManyShared(ctx, tx, params, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
insertParams, err := c.insertManyParams(params)
if err != nil {
return 0, err
}

results, err := c.insertManyShared(ctx, tx, insertParams, func(ctx context.Context, insertParams []*riverdriver.JobInsertFastParams) ([]*rivertype.JobInsertResult, error) {
count, err := tx.JobInsertFastManyNoReturning(ctx, insertParams)
if err != nil {
return nil, err
Expand Down
33 changes: 33 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3559,6 +3559,39 @@ func Test_Client_Maintenance(t *testing.T) {
}
})

t.Run("PeriodicJobEnqueuerUsesMiddleware", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)

worker := &periodicJobWorker{}
AddWorker(config.Workers, worker)
config.PeriodicJobs = []*PeriodicJob{
NewPeriodicJob(cron.Every(time.Minute), func() (JobArgs, *InsertOpts) {
return periodicJobArgs{}, nil
}, &PeriodicJobOpts{RunOnStart: true}),
}
config.JobInsertMiddleware = []rivertype.JobInsertMiddleware{&overridableJobMiddleware{
insertManyFunc: func(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error) {
for _, job := range manyParams {
job.EncodedArgs = []byte(`{"from": "middleware"}`)
}
return doInner(ctx)
},
}}

client, bundle := setup(t, config)

startAndWaitForQueueMaintainer(ctx, t, client)

svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer)
svc.TestSignals.InsertedJobs.WaitOrTimeout()

jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()})
require.NoError(t, err)
require.Len(t, jobs, 1, "Expected to find exactly one job of kind: "+(periodicJobArgs{}).Kind())
})

t.Run("QueueCleaner", func(t *testing.T) {
t.Parallel()

Expand Down
3 changes: 3 additions & 0 deletions internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
"github.com/riverqueue/river/rivertype"
)

const (
Expand All @@ -33,6 +34,8 @@ func (ts *JobSchedulerTestSignals) Init() {
ts.ScheduledBatch.Init()
}

type InsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, insertParams []*rivertype.JobInsertParams) ([]*rivertype.JobInsertResult, error)

// NotifyInsert is a function to call to emit notifications for queues where
// jobs were scheduled.
type NotifyInsertFunc func(ctx context.Context, tx riverdriver.ExecutorTx, queues []string) error
Expand Down
42 changes: 12 additions & 30 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@ var ErrNoJobToInsert = errors.New("a nil job was returned, nothing to insert")

// Test-only properties.
type PeriodicJobEnqueuerTestSignals struct {
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
NotifiedQueues testsignal.TestSignal[[]string] // notifies when queues are sent an insert notification
SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
EnteredLoop testsignal.TestSignal[struct{}] // notifies when the enqueuer finishes start up and enters its initial run loop
InsertedJobs testsignal.TestSignal[struct{}] // notifies when a batch of jobs is inserted
SkippedJob testsignal.TestSignal[struct{}] // notifies when a job is skipped because of nil JobInsertParams
}

func (ts *PeriodicJobEnqueuerTestSignals) Init() {
ts.EnteredLoop.Init()
ts.InsertedJobs.Init()
ts.NotifiedQueues.Init()
ts.SkippedJob.Init()
}

Expand Down Expand Up @@ -59,9 +57,8 @@ func (j *PeriodicJob) mustValidate() *PeriodicJob {
type PeriodicJobEnqueuerConfig struct {
AdvisoryLockPrefix int32

// NotifyInsert is a function to call to emit notifications for queues
// where jobs were scheduled.
NotifyInsert NotifyInsertFunc
// Insert is the function to call to insert jobs into the database.
Insert InsertFunc

// PeriodicJobs are the periodic jobs with which to configure the enqueuer.
PeriodicJobs []*PeriodicJob
Expand Down Expand Up @@ -104,7 +101,7 @@ func NewPeriodicJobEnqueuer(archetype *baseservice.Archetype, config *PeriodicJo
svc := baseservice.Init(archetype, &PeriodicJobEnqueuer{
Config: (&PeriodicJobEnqueuerConfig{
AdvisoryLockPrefix: config.AdvisoryLockPrefix,
NotifyInsert: config.NotifyInsert,
Insert: config.Insert,
PeriodicJobs: config.PeriodicJobs,
}).mustValidate(),

Expand Down Expand Up @@ -223,7 +220,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
defer s.mu.RUnlock()

var (
insertParamsMany []*riverdriver.JobInsertFastParams
insertParamsMany []*rivertype.JobInsertParams
now = s.Time.NowUTC()
)

Expand Down Expand Up @@ -269,7 +266,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
for {
select {
case <-timerUntilNextRun.C:
var insertParamsMany []*riverdriver.JobInsertFastParams
var insertParamsMany []*rivertype.JobInsertParams

now := s.Time.NowUTC()

Expand Down Expand Up @@ -329,7 +326,7 @@ func (s *PeriodicJobEnqueuer) Start(ctx context.Context) error {
return nil
}

func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*riverdriver.JobInsertFastParams) {
func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany []*rivertype.JobInsertParams) {
if len(insertParamsMany) == 0 {
return
}
Expand All @@ -341,28 +338,13 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
}
defer tx.Rollback(ctx)

queues := make([]string, 0, len(insertParamsMany))

if len(insertParamsMany) > 0 {
results, err := tx.JobInsertFastMany(ctx, insertParamsMany)
_, err := s.Config.Insert(ctx, tx, insertParamsMany)
if err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error inserting periodic jobs",
"error", err.Error(), "num_jobs", len(insertParamsMany))
return
}
for _, result := range results {
if !result.UniqueSkippedAsDuplicate {
queues = append(queues, result.Job.Queue)
}
}
}

if len(queues) > 0 {
if err := s.Config.NotifyInsert(ctx, tx, queues); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error notifying insert", "error", err.Error())
return
}
s.TestSignals.NotifiedQueues.Signal(queues)
Comment on lines -353 to -365
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that this change ended up simplifying some things, like not needing to deal with insert notifications in the PeriodicJobEnqueuer anymore. Now it's all the same insert logic as the Client.

}

if err := tx.Commit(ctx); err != nil {
Expand All @@ -373,7 +355,7 @@ func (s *PeriodicJobEnqueuer) insertBatch(ctx context.Context, insertParamsMany
s.TestSignals.InsertedJobs.Signal(struct{}{})
}

func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*riverdriver.JobInsertFastParams, bool) {
func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, constructorFunc func() (*rivertype.JobInsertParams, error), scheduledAt time.Time) (*rivertype.JobInsertParams, bool) {
insertParams, err := constructorFunc()
if err != nil {
if errors.Is(err, ErrNoJobToInsert) {
Expand All @@ -389,7 +371,7 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
insertParams.ScheduledAt = &scheduledAt
}

return (*riverdriver.JobInsertFastParams)(insertParams), true
return insertParams, true
}

const periodicJobEnqueuerVeryLongDuration = 24 * time.Hour
Expand Down
Loading
Loading