From 2d410d43eabd051751109aaf0b14069619e8af48 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Mon, 5 Aug 2024 15:56:45 -0500 Subject: [PATCH] database/sql: fix InsertManyTx default scheduled_at The database/sql driver had a slightly different behavior than the pgx driver in that it was setting the `scheduled_at` time to `null`, rather than setting the current timestamp if one was not specified. Additionally, the driver test suite did not cover this case at all. Fix both of these issues. Fixes #502. --- CHANGELOG.md | 4 + client.go | 5 +- .../riverdrivertest/riverdrivertest.go | 116 +++++++++++------- .../river_database_sql_driver.go | 3 +- 4 files changed, 82 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c6f20c5..f6749d51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- `database/sql` driver: fix default value of `scheduled_at` for `InsertManyTx` when it is not specified in `InsertOpts`. [PR #504](https://github.com/riverqueue/river/pull/504). + ## [0.11.0] - 2024-08-02 ### Added diff --git a/client.go b/client.go index e3682264..8448023e 100644 --- a/client.go +++ b/client.go @@ -1169,9 +1169,8 @@ func insertParamsFromConfigArgsAndOptions(archetype *baseservice.Archetype, conf } // If the time is stubbed (in a test), use that for `created_at`. Otherwise, - // leave an empty value which will use the database's `now()` value, which - // keeps the timestamps of jobs inserted across many different computers - // more consistent (i.e. in case of minor time drifts). + // leave an empty value which will either use the database's `now()` or be defaulted + // by drivers as necessary. createdAt := archetype.Time.NowUTCOrNil() maxAttempts := valutil.FirstNonZero(insertOpts.MaxAttempts, jobInsertOpts.MaxAttempts, config.MaxAttempts) diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 5d8c9a54..842fec5d 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -899,53 +899,85 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, t.Run("JobInsertFastMany", func(t *testing.T) { t.Parallel() - exec, _ := setup(ctx, t) + t.Run("AllArgs", func(t *testing.T) { + exec, _ := setup(ctx, t) - // This test needs to use a time from before the transaction begins, otherwise - // the newly-scheduled jobs won't yet show as available because their - // scheduled_at (which gets a default value from time.Now() in code) will be - // after the start of the transaction. - now := time.Now().UTC().Add(-1 * time.Minute) + // This test needs to use a time from before the transaction begins, otherwise + // the newly-scheduled jobs won't yet show as available because their + // scheduled_at (which gets a default value from time.Now() in code) will be + // after the start of the transaction. + now := time.Now().UTC().Add(-1 * time.Minute) - insertParams := make([]*riverdriver.JobInsertFastParams, 10) - for i := 0; i < len(insertParams); i++ { - insertParams[i] = &riverdriver.JobInsertFastParams{ - EncodedArgs: []byte(`{"encoded": "args"}`), - Kind: "test_kind", - MaxAttempts: rivercommon.MaxAttemptsDefault, - Metadata: []byte(`{"meta": "data"}`), - Priority: rivercommon.PriorityDefault, - Queue: rivercommon.QueueDefault, - ScheduledAt: &now, - State: rivertype.JobStateAvailable, - Tags: []string{"tag"}, + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: &now, + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + insertParams[i].ScheduledAt = &now } - insertParams[i].ScheduledAt = &now - } - count, err := exec.JobInsertFastMany(ctx, insertParams) - require.NoError(t, err) - require.Len(t, insertParams, count) + count, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, insertParams, count) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.Equal(t, 0, job.Attempt) + require.Nil(t, job.AttemptedAt) + require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second) + require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) + require.Empty(t, job.Errors) + require.Nil(t, job.FinalizedAt) + require.Equal(t, "test_kind", job.Kind) + require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) + require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) + require.Equal(t, rivercommon.PriorityDefault, job.Priority) + require.Equal(t, rivercommon.QueueDefault, job.Queue) + requireEqualTime(t, now, job.ScheduledAt) + require.Equal(t, rivertype.JobStateAvailable, job.State) + require.Equal(t, []string{"tag"}, job.Tags) + } + }) - jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) - require.NoError(t, err) - require.Len(t, jobsAfter, len(insertParams)) - for _, job := range jobsAfter { - require.Equal(t, 0, job.Attempt) - require.Nil(t, job.AttemptedAt) - require.WithinDuration(t, time.Now().UTC(), job.CreatedAt, 2*time.Second) - require.Equal(t, []byte(`{"encoded": "args"}`), job.EncodedArgs) - require.Empty(t, job.Errors) - require.Nil(t, job.FinalizedAt) - require.Equal(t, "test_kind", job.Kind) - require.Equal(t, rivercommon.MaxAttemptsDefault, job.MaxAttempts) - require.Equal(t, []byte(`{"meta": "data"}`), job.Metadata) - require.Equal(t, rivercommon.PriorityDefault, job.Priority) - require.Equal(t, rivercommon.QueueDefault, job.Queue) - requireEqualTime(t, now, job.ScheduledAt) - require.Equal(t, rivertype.JobStateAvailable, job.State) - require.Equal(t, []string{"tag"}, job.Tags) - } + t.Run("MissingScheduledAtDefaultsToNow", func(t *testing.T) { + exec, _ := setup(ctx, t) + + insertParams := make([]*riverdriver.JobInsertFastParams, 10) + for i := 0; i < len(insertParams); i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{"encoded": "args"}`), + Kind: "test_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Metadata: []byte(`{"meta": "data"}`), + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + ScheduledAt: nil, // explicit nil + State: rivertype.JobStateAvailable, + Tags: []string{"tag"}, + } + } + + count, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + require.Len(t, insertParams, count) + + jobsAfter, err := exec.JobGetByKindMany(ctx, []string{"test_kind"}) + require.NoError(t, err) + require.Len(t, jobsAfter, len(insertParams)) + for _, job := range jobsAfter { + require.WithinDuration(t, time.Now().UTC(), job.ScheduledAt, 2*time.Second) + } + }) }) t.Run("JobInsertFull", func(t *testing.T) { diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 48625d4e..dd369424 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -228,11 +228,12 @@ func (e *Executor) JobInsertFastMany(ctx context.Context, params []*riverdriver. State: make([]dbsqlc.RiverJobState, len(params)), Tags: make([]string, len(params)), } + now := time.Now() for i := 0; i < len(params); i++ { params := params[i] - var scheduledAt time.Time + scheduledAt := now if params.ScheduledAt != nil { scheduledAt = *params.ScheduledAt }