From 3fec6170c9563c9fc2a95253bd4ba02c70a12c85 Mon Sep 17 00:00:00 2001 From: Andriy Semenets Date: Mon, 26 Aug 2024 16:17:43 +0200 Subject: [PATCH 1/3] Do not insert a new job if PeriodicJobConstructor returns nil This commit fixes an issue with the PerioridJobConstructor ignoring the return value. According to the docs, we should ignore the job is nil is returned. --- periodic_job.go | 3 +++ periodic_job_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/periodic_job.go b/periodic_job.go index 90d55cbb..20648e09 100644 --- a/periodic_job.go +++ b/periodic_job.go @@ -183,6 +183,9 @@ func (b *PeriodicJobBundle) toInternal(periodicJob *PeriodicJob) *maintenance.Pe return &maintenance.PeriodicJob{ ConstructorFunc: func() (*riverdriver.JobInsertFastParams, *dbunique.UniqueOpts, error) { args, options := periodicJob.constructorFunc() + if args == nil { + return nil, nil, maintenance.ErrNoJobToInsert + } return insertParamsFromConfigArgsAndOptions(&b.periodicJobEnqueuer.Archetype, b.clientConfig, args, options) }, RunOnStart: opts.RunOnStart, diff --git a/periodic_job_test.go b/periodic_job_test.go index 299b70b3..35d74072 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -59,6 +59,30 @@ func TestPeriodicJobBundle(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, mustUnmarshalJSON[TestJobArgs](t, insertParams2.EncodedArgs).JobNum) }) + + t.Run("ReturningNilDoesntInsertNewJob", func(t *testing.T) { + t.Parallel() + + periodicJobBundle, _ := setup(t) + + type TestJobArgs struct { + JobArgsReflectKind[TestJobArgs] + } + + periodicJob := NewPeriodicJob( + PeriodicInterval(15*time.Minute), + func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new job. + return nil, nil + }, + nil, + ) + + internalPeriodicJob := periodicJobBundle.toInternal(periodicJob) + + _, _, err := internalPeriodicJob.ConstructorFunc() + require.ErrorIs(t, err, maintenance.ErrNoJobToInsert) + }) } func mustUnmarshalJSON[T any](t *testing.T, data []byte) *T { From 2c40feda39a4b81e48ba1c691fb33e292378fb2b Mon Sep 17 00:00:00 2001 From: Andriy Semenets Date: Thu, 29 Aug 2024 16:48:16 +0200 Subject: [PATCH 2/3] Remove unused struct --- periodic_job_test.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/periodic_job_test.go b/periodic_job_test.go index 35d74072..ae469e04 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -65,10 +65,6 @@ func TestPeriodicJobBundle(t *testing.T) { periodicJobBundle, _ := setup(t) - type TestJobArgs struct { - JobArgsReflectKind[TestJobArgs] - } - periodicJob := NewPeriodicJob( PeriodicInterval(15*time.Minute), func() (JobArgs, *InsertOpts) { From 31afef4c4082e0910eefdaeb9dbb3416f7649d21 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Thu, 29 Aug 2024 21:42:52 -0500 Subject: [PATCH 3/3] add client-level test coverage for PeriodicJobConstructor nil return --- CHANGELOG.md | 5 +++-- client_test.go | 27 +++++++++++++++++++++++++++ periodic_job_test.go | 2 +- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97c71423..d50d0a88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,10 +21,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ```go # before - migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator := rivermigrate.New(riverpgxv5.New(dbPool), nil) # after - migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) + migrator, err := rivermigrate.New(riverpgxv5.New(dbPool), nil) if err != nil { // handle error } @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Fixed - Fixed a panic that'd occur if `StopAndCancel` was invoked before a client was started. [PR #557](https://github.com/riverqueue/river/pull/557). +- A `PeriodicJobConstructor` should be able to return `nil` `JobArgs` if it wishes to not have any job inserted. However, this was either never working or was broken at some point. It's now fixed. Thanks [@semanser](https://github.com/semanser)! [PR #572](https://github.com/riverqueue/river/pull/572). ## [0.11.4] - 2024-08-20 diff --git a/client_test.go b/client_test.go index f118e435..3bf1607c 100644 --- a/client_test.go +++ b/client_test.go @@ -2727,6 +2727,33 @@ func Test_Client_Maintenance(t *testing.T) { require.Empty(t, jobs) }) + t.Run("PeriodicJobConstructorReturningNil", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + + worker := &periodicJobWorker{} + AddWorker(config.Workers, worker) + config.PeriodicJobs = []*PeriodicJob{ + NewPeriodicJob(cron.Every(15*time.Minute), func() (JobArgs, *InsertOpts) { + // Returning nil from the constructor function should not insert a new + // job and should be handled cleanly + return nil, nil + }, &PeriodicJobOpts{RunOnStart: true}), + } + + client, bundle := setup(t, config) + + startAndWaitForQueueMaintainer(ctx, t, client) + + svc := maintenance.GetService[*maintenance.PeriodicJobEnqueuer](client.queueMaintainer) + svc.TestSignals.SkippedJob.WaitOrTimeout() + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{(periodicJobArgs{}).Kind()}) + require.NoError(t, err) + require.Empty(t, jobs, "Expected to find zero jobs of kind: "+(periodicJobArgs{}).Kind()) + }) + t.Run("PeriodicJobEnqueuerAddDynamically", func(t *testing.T) { t.Parallel() diff --git a/periodic_job_test.go b/periodic_job_test.go index ae469e04..745a06d8 100644 --- a/periodic_job_test.go +++ b/periodic_job_test.go @@ -16,7 +16,7 @@ func TestPeriodicJobBundle(t *testing.T) { type testBundle struct{} - setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { + setup := func(t *testing.T) (*PeriodicJobBundle, *testBundle) { //nolint:unparam t.Helper() periodicJobEnqueuer := maintenance.NewPeriodicJobEnqueuer(