From bd437381f089b4412b719952581135f577881608 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 25 Sep 2024 15:41:44 -0500 Subject: [PATCH] BatchCompleter: batch all ops, not just completed (#617) This adds a `JobSetStateIfRunningMany` query and corresponding driver API, with implementations for both pgxv5 and `database/sql`. The `BatchCompleter` was updated to use this new query and to batch _all_ operations, not only those moving to a `complete` state. This means the `AsyncCompleter` (as well as the `InlineCompleter`) are both now unused and could be deleted, along with their underlying queries. The intention of this is not just to facilitate improved performance even on snoozes, retries, errors, cancellations, etc., but also to get down to a single path for completions (similar to now having a single path for insertions). --- CHANGELOG.md | 4 + internal/jobcompleter/job_completer.go | 106 +++---- internal/jobcompleter/job_completer_test.go | 40 +-- .../riverdrivertest/riverdrivertest.go | 287 ++++++++++++++++++ riverdriver/river_driver_interface.go | 13 + .../internal/dbsqlc/river_job.sql.go | 127 ++++++++ .../internal/dbsqlc/sqlc.yaml | 1 + .../river_database_sql_driver.go | 43 +++ .../riverpgxv5/internal/dbsqlc/river_job.sql | 60 ++++ .../internal/dbsqlc/river_job.sql.go | 124 ++++++++ .../riverpgxv5/internal/dbsqlc/sqlc.yaml | 1 + riverdriver/riverpgxv5/river_pgx_v5_driver.go | 40 +++ 12 files changed, 776 insertions(+), 70 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 31e6c091..c6535146 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- The `BatchCompleter` that marks jobs as completed can now batch database updates for _all_ states of jobs that have finished execution. Prior to this change, only `completed` jobs were batched into a single `UPDATE` call, while jobs moving to any other state used a single `UPDATE` per job. This change should significantly reduce database and pool contention on high volume system when jobs get retried, snoozed, cancelled, or discarded following execution. [PR #617](https://github.com/riverqueue/river/pull/617). + ## [0.12.0] - 2024-09-23 ⚠️ Version 0.12.0 contains a new database migration, version 6. See [documentation on running River migrations](https://riverqueue.com/docs/migrations). If migrating with the CLI, make sure to update it to its latest version: diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 8afd27a4..60a771ec 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -48,7 +48,7 @@ type CompleterJobUpdated struct { // but is a minimal interface with the functions needed for completers to work // to more easily facilitate mocking. type PartialExecutor interface { - JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) } @@ -220,13 +220,13 @@ type BatchCompleter struct { baseservice.BaseService startstop.BaseStartStop - asyncCompleter *AsyncCompleter // used for non-complete completions - completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation - disableSleep bool // disable sleep in testing - maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted + completionMaxSize int // configurable for testing purposes; max jobs to complete in single database operation + disableSleep bool // disable sleep in testing + maxBacklog int // configurable for testing purposes; max backlog allowed before no more completions accepted exec PartialExecutor setStateParams map[int64]*batchCompleterSetState setStateParamsMu sync.RWMutex + setStateStartTimes map[int64]time.Time subscribeCh SubscribeChan waitOnBacklogChan chan struct{} waitOnBacklogWaiting bool @@ -239,18 +239,17 @@ func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor, s ) return baseservice.Init(archetype, &BatchCompleter{ - asyncCompleter: NewAsyncCompleter(archetype, exec, subscribeCh), - completionMaxSize: completionMaxSize, - exec: exec, - maxBacklog: maxBacklog, - setStateParams: make(map[int64]*batchCompleterSetState), - subscribeCh: subscribeCh, + completionMaxSize: completionMaxSize, + exec: exec, + maxBacklog: maxBacklog, + setStateParams: make(map[int64]*batchCompleterSetState), + setStateStartTimes: make(map[int64]time.Time), + subscribeCh: subscribeCh, }) } func (c *BatchCompleter) ResetSubscribeChan(subscribeCh SubscribeChan) { c.subscribeCh = subscribeCh - c.asyncCompleter.subscribeCh = subscribeCh } func (c *BatchCompleter) Start(ctx context.Context) error { @@ -263,13 +262,10 @@ func (c *BatchCompleter) Start(ctx context.Context) error { panic("subscribeCh must be non-nil") } - if err := c.asyncCompleter.Start(ctx); err != nil { - return err - } - go func() { started() defer stopped() // this defer should come first so it's first out + defer close(c.subscribeCh) c.Logger.DebugContext(ctx, c.Name+": Run loop started") defer c.Logger.DebugContext(ctx, c.Name+": Run loop stopped") @@ -327,17 +323,22 @@ func (c *BatchCompleter) Start(ctx context.Context) error { } func (c *BatchCompleter) handleBatch(ctx context.Context) error { - var setStateBatch map[int64]*batchCompleterSetState + var ( + setStateBatch map[int64]*batchCompleterSetState + setStateStartTimes map[int64]time.Time + ) func() { c.setStateParamsMu.Lock() defer c.setStateParamsMu.Unlock() setStateBatch = c.setStateParams + setStateStartTimes = c.setStateStartTimes // Don't bother resetting the map if there's nothing to process, // allowing the completer to idle efficiently. if len(setStateBatch) > 0 { c.setStateParams = make(map[int64]*batchCompleterSetState) + c.setStateStartTimes = make(map[int64]time.Time) } else { // Set nil to avoid a data race below in case the map is set as a // new job comes in. @@ -351,34 +352,39 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { // Complete a sub-batch with retries. Also helps reduce visual noise and // increase readability of loop below. - completeSubBatch := func(batchID []int64, batchFinalizedAt []time.Time) ([]*rivertype.JobRow, error) { + completeSubBatch := func(batchParams *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { start := time.Now() defer func() { - c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchID)) + c.Logger.DebugContext(ctx, c.Name+": Completed sub-batch of job(s)", "duration", time.Since(start), "num_jobs", len(batchParams.ID)) }() return withRetries(ctx, &c.BaseService, c.disableSleep, func(ctx context.Context) ([]*rivertype.JobRow, error) { - return c.exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ - ID: batchID, - FinalizedAt: batchFinalizedAt, - }) + return c.exec.JobSetStateIfRunningMany(ctx, batchParams) }) } // This could be written more simply using multiple `sliceutil.Map`s, but // it's done this way to allocate as few new slices as necessary. - mapIDsAndFinalizedAt := func(setStateBatch map[int64]*batchCompleterSetState) ([]int64, []time.Time) { - var ( - batchIDs = make([]int64, len(setStateBatch)) - batchFinalizedAt = make([]time.Time, len(setStateBatch)) - i int - ) + mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams { + params := &riverdriver.JobSetStateIfRunningManyParams{ + ID: make([]int64, len(setStateBatch)), + ErrData: make([][]byte, len(setStateBatch)), + FinalizedAt: make([]*time.Time, len(setStateBatch)), + MaxAttempts: make([]*int, len(setStateBatch)), + ScheduledAt: make([]*time.Time, len(setStateBatch)), + State: make([]rivertype.JobState, len(setStateBatch)), + } + var i int for _, setState := range setStateBatch { - batchIDs[i] = setState.Params.ID - batchFinalizedAt[i] = *setState.Params.FinalizedAt + params.ID[i] = setState.Params.ID + params.ErrData[i] = setState.Params.ErrData + params.FinalizedAt[i] = setState.Params.FinalizedAt + params.MaxAttempts[i] = setState.Params.MaxAttempts + params.ScheduledAt[i] = setState.Params.ScheduledAt + params.State[i] = setState.Params.State i++ } - return batchIDs, batchFinalizedAt + return params } // Tease apart enormous batches into sub-batches. @@ -387,15 +393,23 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { // doesn't allocate any additional memory in case the entire batch is // smaller than the sub-batch maximum size (which will be the common case). var ( - batchID, batchFinalizedAt = mapIDsAndFinalizedAt(setStateBatch) - jobRows []*rivertype.JobRow + params = mapBatch(setStateBatch) + jobRows []*rivertype.JobRow ) c.Logger.DebugContext(ctx, c.Name+": Completing batch of job(s)", "num_jobs", len(setStateBatch)) if len(setStateBatch) > c.completionMaxSize { jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) for i := 0; i < len(setStateBatch); i += c.completionMaxSize { - endIndex := min(i+c.completionMaxSize, len(batchID)) // beginning of next sub-batch or end of slice - jobRowsSubBatch, err := completeSubBatch(batchID[i:endIndex], batchFinalizedAt[i:endIndex]) + endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice + subBatch := &riverdriver.JobSetStateIfRunningManyParams{ + ID: params.ID[i:endIndex], + ErrData: params.ErrData[i:endIndex], + FinalizedAt: params.FinalizedAt[i:endIndex], + MaxAttempts: params.MaxAttempts[i:endIndex], + ScheduledAt: params.ScheduledAt[i:endIndex], + State: params.State[i:endIndex], + } + jobRowsSubBatch, err := completeSubBatch(subBatch) if err != nil { return err } @@ -403,7 +417,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } } else { var err error - jobRows, err = completeSubBatch(batchID, batchFinalizedAt) + jobRows, err = completeSubBatch(params) if err != nil { return err } @@ -411,7 +425,8 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { events := sliceutil.Map(jobRows, func(jobRow *rivertype.JobRow) CompleterJobUpdated { setState := setStateBatch[jobRow.ID] - setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(*setState.Params.FinalizedAt) + startTime := setStateStartTimes[jobRow.ID] + setState.Stats.CompleteDuration = c.Time.NowUTC().Sub(startTime) return CompleterJobUpdated{Job: jobRow, JobStats: setState.Stats} }) @@ -432,13 +447,7 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error { } func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - // Send completions other than setting to `complete` to an async completer. - // We consider this okay because these are expected to be much more rare, so - // only optimizing `complete` will yield huge speed gains. - if params.State != rivertype.JobStateCompleted { - return c.asyncCompleter.JobSetStateIfRunning(ctx, stats, params) - } - + now := c.Time.NowUTC() // If we've built up too much of a backlog because the completer's fallen // behind, block completions until the complete loop's had a chance to catch // up. @@ -448,16 +457,11 @@ func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobsta defer c.setStateParamsMu.Unlock() c.setStateParams[params.ID] = &batchCompleterSetState{params, stats} + c.setStateStartTimes[params.ID] = now return nil } -func (c *BatchCompleter) Stop() { - c.BaseStartStop.Stop() - c.asyncCompleter.Stop() - // subscribeCh already closed by asyncCompleter.Stop ^ -} - func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { c.setStateParamsMu.RLock() var ( diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 942e4bed..8e237fe4 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -25,25 +25,25 @@ import ( ) type partialExecutorMock struct { - JobSetCompleteIfRunningManyCalled bool - JobSetCompleteIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) - JobSetStateIfRunningCalled bool - JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) - mu sync.Mutex + JobSetStateIfRunningManyCalled bool + JobSetStateIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningCalled bool + JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) + mu sync.Mutex } // NewPartialExecutorMock returns a new mock with all mock functions set to call // down into the given real executor. func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock { return &partialExecutorMock{ - JobSetCompleteIfRunningManyFunc: exec.JobSetCompleteIfRunningMany, - JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, + JobSetStateIfRunningManyFunc: exec.JobSetStateIfRunningMany, + JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, } } -func (m *partialExecutorMock) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { - m.setCalled(func() { m.JobSetCompleteIfRunningManyCalled = true }) - return m.JobSetCompleteIfRunningManyFunc(ctx, params) +func (m *partialExecutorMock) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetStateIfRunningManyCalled = true }) + return m.JobSetStateIfRunningManyFunc(ctx, params) } func (m *partialExecutorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -325,7 +325,8 @@ func TestAsyncCompleter(t *testing.T) { return NewAsyncCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *AsyncCompleter) { completer.disableSleep = true }, - func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }) + func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }, + ) } func TestBatchCompleter(t *testing.T) { @@ -336,7 +337,8 @@ func TestBatchCompleter(t *testing.T) { return NewBatchCompleter(riversharedtest.BaseServiceArchetype(t), exec, subscribeCh) }, func(completer *BatchCompleter) { completer.disableSleep = true }, - func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }) + func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }, + ) ctx := context.Background() @@ -728,11 +730,11 @@ func testCompleter[TCompleter JobCompleter]( } execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { if err := maybeError(); err != nil { return nil, err } - return bundle.exec.JobSetCompleteIfRunningMany(ctx, params) + return bundle.exec.JobSetStateIfRunningMany(ctx, params) } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { if err := maybeError(); err != nil { @@ -751,7 +753,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job still managed to complete despite the errors. requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) @@ -767,7 +769,7 @@ func testCompleter[TCompleter JobCompleter]( disableSleep(completer) execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { return nil, context.Canceled } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -788,7 +790,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job is still running because the completer is forced to give up // immediately on certain types of errors like where a pool is closed. @@ -805,7 +807,7 @@ func testCompleter[TCompleter JobCompleter]( disableSleep(completer) execMock := NewPartialExecutorMock(bundle.exec) - execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + execMock.JobSetStateIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { return nil, puddle.ErrClosedPool } execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { @@ -826,7 +828,7 @@ func testCompleter[TCompleter JobCompleter]( // Make sure our mocks were really called. The specific function called // will depend on the completer under test, so okay as long as one or // the other was. - require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) // Job is still running because the completer is forced to give up // immediately on certain types of errors like where a pool is closed. diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index ce93868c..a4cf1add 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1944,6 +1944,293 @@ func Exercise[TTx any](ctx context.Context, t *testing.T, }) }) + setStateManyParams := func(params ...*riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams { + batchParams := &riverdriver.JobSetStateIfRunningManyParams{} + // ID: make([]int64, len(params)), + // ErrData: make([]byte, len(params)), + // FinalizedAt: make([]*time.Time, len(params)), + // MaxAttempts: []*int{maxAttempts}, + // ScheduledAt: []*time.Time{scheduledAt}, + // State: []rivertype.JobState{params.State}, + // } + for _, param := range params { + var ( + errData []byte + finalizedAt *time.Time + maxAttempts *int + scheduledAt *time.Time + ) + if param.ErrData != nil { + errData = param.ErrData + } + if param.FinalizedAt != nil { + finalizedAt = param.FinalizedAt + } + if param.MaxAttempts != nil { + maxAttempts = param.MaxAttempts + } + if param.ScheduledAt != nil { + scheduledAt = param.ScheduledAt + } + + batchParams.ID = append(batchParams.ID, param.ID) + batchParams.ErrData = append(batchParams.ErrData, errData) + batchParams.FinalizedAt = append(batchParams.FinalizedAt, finalizedAt) + batchParams.MaxAttempts = append(batchParams.MaxAttempts, maxAttempts) + batchParams.ScheduledAt = append(batchParams.ScheduledAt, scheduledAt) + batchParams.State = append(batchParams.State, param.State) + } + + return batchParams + } + + t.Run("JobSetStateIfRunningMany_JobSetStateCompleted", func(t *testing.T) { + t.Parallel() + + t.Run("CompletesARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCompleted(job.ID, now))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + + t.Run("DoesNotCompleteARetryableJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRetryable), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCompleted(job.ID, now))) + jobAfter := jobsAfter[0] + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.Nil(t, jobAfter.FinalizedAt) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateErrored", func(t *testing.T) { + t.Parallel() + + t.Run("SetsARunningJobToRetryable", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.WithinDuration(t, now, jobAfter.ScheduledAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + + // validate error payload: + require.Len(t, jobAfter.Errors, 1) + require.Equal(t, now, jobAfter.Errors[0].At) + require.Equal(t, 1, jobAfter.Errors[0].Attempt) + require.Equal(t, "fake error", jobAfter.Errors[0].Error) + require.Equal(t, "foo.go:123\nbar.go:456", jobAfter.Errors[0].Trace) + }) + + t.Run("DoesNotTouchAlreadyRetryableJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRetryable), + ScheduledAt: ptrutil.Ptr(now.Add(10 * time.Second)), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateRetryable, jobAfter.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, jobUpdated.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + }) + + t.Run("SetsAJobWithCancelAttemptedAtToCancelled", func(t *testing.T) { + // If a job has cancel_attempted_at in its metadata, it means that the user + // tried to cancel the job with the Cancel API but that the job + // finished/errored before the producer received the cancel notification. + // + // In this case, we want to move the job to cancelled instead of retryable + // so that the job is not retried. + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + Metadata: []byte(fmt.Sprintf(`{"cancel_attempted_at":"%s"}`, time.Now().UTC().Format(time.RFC3339))), + State: ptrutil.Ptr(rivertype.JobStateRunning), + ScheduledAt: ptrutil.Ptr(now.Add(-10 * time.Second)), + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateErrorRetryable(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.NotNil(t, jobAfter.FinalizedAt) + // Loose assertion against FinalizedAt just to make sure it was set (it uses + // the database's now() instead of a passed-in time): + require.WithinDuration(t, time.Now().UTC(), *jobAfter.FinalizedAt, 2*time.Second) + // ScheduledAt should not be touched: + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + + // Errors should still be appended to: + require.Len(t, jobAfter.Errors, 1) + require.Contains(t, jobAfter.Errors[0].Error, "fake error") + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.WithinDuration(t, job.ScheduledAt, jobAfter.ScheduledAt, time.Microsecond) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateCancelled", func(t *testing.T) { + t.Parallel() + + t.Run("CancelsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + UniqueStates: 0xFF, + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateCancelled(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateCancelled, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCancelled, jobUpdated.State) + require.Equal(t, "unique-key", string(jobUpdated.UniqueKey)) + }) + }) + + t.Run("JobSetStateIfRunningMany_JobSetStateDiscarded", func(t *testing.T) { + t.Parallel() + + t.Run("DiscardsARunningJob", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{ + State: ptrutil.Ptr(rivertype.JobStateRunning), + UniqueKey: []byte("unique-key"), + UniqueStates: 0xFF, + }) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateDiscarded(job.ID, now, makeErrPayload(t, now)))) + require.NoError(t, err) + jobAfter := jobsAfter[0] + require.Equal(t, rivertype.JobStateDiscarded, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + require.Equal(t, "unique-key", string(jobAfter.UniqueKey)) + require.Equal(t, rivertype.JobStates(), jobAfter.UniqueStates) + + jobUpdated, err := exec.JobGetByID(ctx, job.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateDiscarded, jobUpdated.State) + }) + }) + + t.Run("JobSetStateIfRunningMany_MultipleJobsAtOnce", func(t *testing.T) { + t.Parallel() + + exec, _ := setup(ctx, t) + + now := time.Now().UTC() + future := now.Add(10 * time.Second) + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams( + riverdriver.JobSetStateCompleted(job1.ID, now), + riverdriver.JobSetStateErrorRetryable(job2.ID, future, makeErrPayload(t, now)), + riverdriver.JobSetStateCancelled(job3.ID, now, makeErrPayload(t, now)), + )) + require.NoError(t, err) + completedJob := jobsAfter[0] + require.Equal(t, rivertype.JobStateCompleted, completedJob.State) + require.WithinDuration(t, now, *completedJob.FinalizedAt, time.Microsecond) + + retryableJob := jobsAfter[1] + require.Equal(t, rivertype.JobStateRetryable, retryableJob.State) + require.WithinDuration(t, future, retryableJob.ScheduledAt, time.Microsecond) + // validate error payload: + require.Len(t, retryableJob.Errors, 1) + require.Equal(t, now, retryableJob.Errors[0].At) + require.Equal(t, 1, retryableJob.Errors[0].Attempt) + require.Equal(t, "fake error", retryableJob.Errors[0].Error) + require.Equal(t, "foo.go:123\nbar.go:456", retryableJob.Errors[0].Trace) + + cancelledJob := jobsAfter[2] + require.Equal(t, rivertype.JobStateCancelled, cancelledJob.State) + require.WithinDuration(t, now, *cancelledJob.FinalizedAt, time.Microsecond) + }) + t.Run("JobUpdate", func(t *testing.T) { t.Parallel() diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index 2cedb610..52af07f4 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -126,6 +126,7 @@ type Executor interface { JobSchedule(ctx context.Context, params *JobScheduleParams) ([]*JobScheduleResult, error) JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error) + JobSetStateIfRunningMany(ctx context.Context, params *JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) LeaderAttemptReelect(ctx context.Context, params *LeaderElectParams) (bool, error) @@ -355,6 +356,18 @@ func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, maxAttempts in return &JobSetStateIfRunningParams{ID: id, MaxAttempts: &maxAttempts, ScheduledAt: &scheduledAt, State: rivertype.JobStateAvailable} } +// JobSetStateIfRunningManyParams are parameters to update the state of +// currently running jobs. Use one of the constructors below to ensure a correct +// combination of parameters. +type JobSetStateIfRunningManyParams struct { + ID []int64 + ErrData [][]byte + FinalizedAt []*time.Time + MaxAttempts []*int + ScheduledAt []*time.Time + State []rivertype.JobState +} + type JobUpdateParams struct { ID int64 AttemptDoUpdate bool diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 77f16cda..d337907d 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -1283,6 +1283,133 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet return &i, err } +const jobSetStateIfRunningMany = `-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest($1::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($2::text[])::river_job_state AS state, + unnest($3::boolean[]) AS finalized_at_do_update, + unnest($4::timestamptz[]) AS finalized_at, + unnest($5::boolean[]) AS errors_do_update, + unnest($6::jsonb[]) AS errors, + unnest($7::boolean[]) AS max_attempts_do_update, + unnest($8::int[]) AS max_attempts, + unnest($9::boolean[]) AS scheduled_at_do_update, + unnest($10::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM updated_job +` + +type JobSetStateIfRunningManyParams struct { + IDs []int64 + State []string + FinalizedAtDoUpdate []bool + FinalizedAt []time.Time + ErrorsDoUpdate []bool + Errors []string + MaxAttemptsDoUpdate []bool + MaxAttempts []int32 + ScheduledAtDoUpdate []bool + ScheduledAt []time.Time +} + +func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *JobSetStateIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobSetStateIfRunningMany, + pq.Array(arg.IDs), + pq.Array(arg.State), + pq.Array(arg.FinalizedAtDoUpdate), + pq.Array(arg.FinalizedAt), + pq.Array(arg.ErrorsDoUpdate), + pq.Array(arg.Errors), + pq.Array(arg.MaxAttemptsDoUpdate), + pq.Array(arg.MaxAttempts), + pq.Array(arg.ScheduledAtDoUpdate), + pq.Array(arg.ScheduledAt), + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobUpdate = `-- name: JobUpdate :one UPDATE river_job SET diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml index 0290d662..f8ebbee1 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/sqlc.yaml @@ -28,6 +28,7 @@ sql: emit_result_struct_pointers: true rename: + ids: "IDs" ttl: "TTL" overrides: diff --git a/riverdriver/riverdatabasesql/river_database_sql_driver.go b/riverdriver/riverdatabasesql/river_database_sql_driver.go index 4e4e894e..ca8f0e1a 100644 --- a/riverdriver/riverdatabasesql/river_database_sql_driver.go +++ b/riverdriver/riverdatabasesql/river_database_sql_driver.go @@ -576,6 +576,49 @@ func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver return jobRowFromInternal(job) } +func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + setStateParams := &dbsqlc.JobSetStateIfRunningManyParams{ + IDs: params.ID, + Errors: make([]string, len(params.ID)), + ErrorsDoUpdate: make([]bool, len(params.ID)), + FinalizedAt: make([]time.Time, len(params.ID)), + FinalizedAtDoUpdate: make([]bool, len(params.ID)), + MaxAttempts: make([]int32, len(params.ID)), + MaxAttemptsDoUpdate: make([]bool, len(params.ID)), + ScheduledAt: make([]time.Time, len(params.ID)), + ScheduledAtDoUpdate: make([]bool, len(params.ID)), + State: make([]string, len(params.ID)), + } + + const defaultObject = "{}" + + for i := 0; i < len(params.ID); i++ { + setStateParams.Errors[i] = valutil.ValOrDefault(string(params.ErrData[i]), defaultObject) + if params.ErrData[i] != nil { + setStateParams.ErrorsDoUpdate[i] = true + } + if params.FinalizedAt[i] != nil { + setStateParams.FinalizedAtDoUpdate[i] = true + setStateParams.FinalizedAt[i] = *params.FinalizedAt[i] + } + if params.MaxAttempts[i] != nil { + setStateParams.MaxAttemptsDoUpdate[i] = true + setStateParams.MaxAttempts[i] = int32(*params.MaxAttempts[i]) //nolint:gosec + } + if params.ScheduledAt[i] != nil { + setStateParams.ScheduledAtDoUpdate[i] = true + setStateParams.ScheduledAt[i] = *params.ScheduledAt[i] + } + setStateParams.State[i] = string(params.State[i]) + } + + jobs, err := dbsqlc.New().JobSetStateIfRunningMany(ctx, e.dbtx, setStateParams) + if err != nil { + return nil, interpretError(err) + } + return mapSliceError(jobs, jobRowFromInternal) +} + func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { job, err := dbsqlc.New().JobUpdate(ctx, e.dbtx, &dbsqlc.JobUpdateParams{ ID: params.ID, diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 06c84c25..7703c15a 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -523,6 +523,66 @@ UNION SELECT * FROM updated_job; +-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest(@ids::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest(@state::text[])::river_job_state AS state, + unnest(@finalized_at_do_update::boolean[]) AS finalized_at_do_update, + unnest(@finalized_at::timestamptz[]) AS finalized_at, + unnest(@errors_do_update::boolean[]) AS errors_do_update, + unnest(@errors::jsonb[]) AS errors, + unnest(@max_attempts_do_update::boolean[]) AS max_attempts_do_update, + unnest(@max_attempts::int[]) AS max_attempts, + unnest(@scheduled_at_do_update::boolean[]) AS scheduled_at_do_update, + unnest(@scheduled_at::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.* +) +SELECT * +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT * +FROM updated_job; + -- A generalized update for any property on a job. This brings in a large number -- of parameters and therefore may be more suitable for testing than production. -- name: JobUpdate :one diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index 399300c7..00317e53 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -1261,6 +1261,130 @@ func (q *Queries) JobSetStateIfRunning(ctx context.Context, db DBTX, arg *JobSet return &i, err } +const jobSetStateIfRunningMany = `-- name: JobSetStateIfRunningMany :many +WITH job_input AS ( + SELECT + unnest($1::bigint[]) AS id, + -- To avoid requiring pgx users to register the OID of the river_job_state[] + -- type, we cast the array to text[] and then to river_job_state. + unnest($2::text[])::river_job_state AS state, + unnest($3::boolean[]) AS finalized_at_do_update, + unnest($4::timestamptz[]) AS finalized_at, + unnest($5::boolean[]) AS errors_do_update, + unnest($6::jsonb[]) AS errors, + unnest($7::boolean[]) AS max_attempts_do_update, + unnest($8::int[]) AS max_attempts, + unnest($9::boolean[]) AS scheduled_at_do_update, + unnest($10::timestamptz[]) AS scheduled_at +), +job_to_update AS ( + SELECT + river_job.id, + job_input.state, + job_input.finalized_at, + job_input.errors, + job_input.max_attempts, + job_input.scheduled_at, + (job_input.state IN ('retryable', 'scheduled') AND river_job.metadata ? 'cancel_attempted_at') AS should_cancel, + job_input.finalized_at_do_update, + job_input.errors_do_update, + job_input.max_attempts_do_update, + job_input.scheduled_at_do_update + FROM river_job + JOIN job_input ON river_job.id = job_input.id + WHERE river_job.state = 'running' + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + state = CASE WHEN job_to_update.should_cancel THEN 'cancelled'::river_job_state + ELSE job_to_update.state END, + finalized_at = CASE WHEN job_to_update.should_cancel THEN now() + WHEN job_to_update.finalized_at_do_update THEN job_to_update.finalized_at + ELSE river_job.finalized_at END, + errors = CASE WHEN job_to_update.errors_do_update THEN array_append(river_job.errors, job_to_update.errors) + ELSE river_job.errors END, + max_attempts = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.max_attempts_do_update THEN job_to_update.max_attempts + ELSE river_job.max_attempts END, + scheduled_at = CASE WHEN NOT job_to_update.should_cancel AND job_to_update.scheduled_at_do_update THEN job_to_update.scheduled_at + ELSE river_job.scheduled_at END + FROM job_to_update + WHERE river_job.id = job_to_update.id + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags, river_job.unique_key, river_job.unique_states +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM river_job +WHERE id IN (SELECT id FROM job_input) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags, unique_key, unique_states +FROM updated_job +` + +type JobSetStateIfRunningManyParams struct { + IDs []int64 + State []string + FinalizedAtDoUpdate []bool + FinalizedAt []time.Time + ErrorsDoUpdate []bool + Errors [][]byte + MaxAttemptsDoUpdate []bool + MaxAttempts []int32 + ScheduledAtDoUpdate []bool + ScheduledAt []time.Time +} + +func (q *Queries) JobSetStateIfRunningMany(ctx context.Context, db DBTX, arg *JobSetStateIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobSetStateIfRunningMany, + arg.IDs, + arg.State, + arg.FinalizedAtDoUpdate, + arg.FinalizedAt, + arg.ErrorsDoUpdate, + arg.Errors, + arg.MaxAttemptsDoUpdate, + arg.MaxAttempts, + arg.ScheduledAtDoUpdate, + arg.ScheduledAt, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + &i.UniqueKey, + &i.UniqueStates, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobUpdate = `-- name: JobUpdate :one UPDATE river_job SET diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml index 93fb53fc..17ff029c 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml +++ b/riverdriver/riverpgxv5/internal/dbsqlc/sqlc.yaml @@ -29,6 +29,7 @@ sql: emit_result_struct_pointers: true rename: + ids: "IDs" ttl: "TTL" overrides: diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index 3107f598..8b4e6d0b 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -460,6 +460,46 @@ func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver return jobRowFromInternal(job) } +func (e *Executor) JobSetStateIfRunningMany(ctx context.Context, params *riverdriver.JobSetStateIfRunningManyParams) ([]*rivertype.JobRow, error) { + setStateParams := &dbsqlc.JobSetStateIfRunningManyParams{ + IDs: params.ID, + Errors: params.ErrData, + ErrorsDoUpdate: make([]bool, len(params.ID)), + FinalizedAt: make([]time.Time, len(params.ID)), + FinalizedAtDoUpdate: make([]bool, len(params.ID)), + MaxAttempts: make([]int32, len(params.ID)), + MaxAttemptsDoUpdate: make([]bool, len(params.ID)), + ScheduledAt: make([]time.Time, len(params.ID)), + ScheduledAtDoUpdate: make([]bool, len(params.ID)), + State: make([]string, len(params.ID)), + } + + for i := 0; i < len(params.ID); i++ { + if params.ErrData[i] != nil { + setStateParams.ErrorsDoUpdate[i] = true + } + if params.FinalizedAt[i] != nil { + setStateParams.FinalizedAtDoUpdate[i] = true + setStateParams.FinalizedAt[i] = *params.FinalizedAt[i] + } + if params.MaxAttempts[i] != nil { + setStateParams.MaxAttemptsDoUpdate[i] = true + setStateParams.MaxAttempts[i] = int32(*params.MaxAttempts[i]) //nolint:gosec + } + if params.ScheduledAt[i] != nil { + setStateParams.ScheduledAtDoUpdate[i] = true + setStateParams.ScheduledAt[i] = *params.ScheduledAt[i] + } + setStateParams.State[i] = string(params.State[i]) + } + + jobs, err := dbsqlc.New().JobSetStateIfRunningMany(ctx, e.dbtx, setStateParams) + if err != nil { + return nil, interpretError(err) + } + return mapSliceError(jobs, jobRowFromInternal) +} + func (e *Executor) JobUpdate(ctx context.Context, params *riverdriver.JobUpdateParams) (*rivertype.JobRow, error) { job, err := dbsqlc.New().JobUpdate(ctx, e.dbtx, &dbsqlc.JobUpdateParams{ ID: params.ID,