From 01d3531ec648bd1f1195b8c730935ff852e23909 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sat, 9 Mar 2024 08:08:58 -0800 Subject: [PATCH] Batch completer + additional completer test suite and benchmarks Here, add a new completer using a completion strategy designed to be much faster than what we're doing right now. Rather than blindly throwing completion work into goroutine slots, it accumulates "batches" of completions to be carried out, and using a debounced channel to fire periodically (currently, up to every 100 milliseconds) and submit entire batches for completion at once up to 2,000 jobs. For the purposes of not grossly expanding the `riverdriver` interface, the completer only batches jobs being set to `completed`, which under most normal workloads we expect to be the vast common case. Jobs going to other states are fed into a member `AsyncCompleter`, thereby allowing the `BatchCompleter` to keeps implementation quite simple. According to in-package benchmarking, the new completer is in the range of 3-5x faster than `AsyncCompleter` (the one currently in use by River client), and 10-15x faster than `InlineCompleter`. $ go test -bench=. ./internal/jobcompleter goos: darwin goarch: arm64 pkg: github.com/riverqueue/river/internal/jobcompleter BenchmarkAsyncCompleter_Concurrency10/Completion-8 10851 112318 ns/op BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8 11386 120706 ns/op BenchmarkAsyncCompleter_Concurrency100/Completion-8 9763 116773 ns/op BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8 10884 115718 ns/op BenchmarkBatchCompleter/Completion-8 54916 27314 ns/op BenchmarkBatchCompleter/RotatingStates-8 11518 100997 ns/op BenchmarkInlineCompleter/Completion-8 4656 369281 ns/op BenchmarkInlineCompleter/RotatingStates-8 1561 794136 ns/op PASS ok github.com/riverqueue/river/internal/jobcompleter 21.123s Along with the new completer, we also add a vastly more thorough test suite to help tease out race conditions and test edges that were previously being ignored completely. For most cases we drop the heavy mocking that was happening before, which was having the effect of minimizing the surface area under test, and producing misleading timing that wasn't realistic. Similarly, we bring in a new benchmark framework to allow us to easily vet and compare completer implementations relative to each other. The expectation is that this will act as a more synthetic proxy, with the new benchmarking tool in #254 providing a more realistic end-to-end measurement. --- client.go | 138 ++-- client_test.go | 198 ++++- cmd/river/go.mod | 17 +- cmd/river/go.sum | 23 +- cmd/river/riverbench/river_bench.go | 56 +- event.go | 4 - internal/jobcompleter/job_completer.go | 392 +++++++--- internal/jobcompleter/job_completer_test.go | 678 +++++++++++++++++- internal/jobcompleter/main_test.go | 11 + .../riverdrivertest/riverdrivertest.go | 64 ++ .../testfactory/test_factory.go | 18 +- job_executor.go | 10 +- job_executor_test.go | 50 +- producer_test.go | 2 +- riverdriver/river_driver_interface.go | 14 +- .../internal/dbsqlc/river_job.sql.go | 97 ++- .../riverdatabasesql/river_database_sql.go | 4 + .../riverpgxv5/internal/dbsqlc/river_job.sql | 51 +- .../internal/dbsqlc/river_job.sql.go | 94 ++- riverdriver/riverpgxv5/river_pgx_v5_driver.go | 11 + 20 files changed, 1633 insertions(+), 299 deletions(-) create mode 100644 internal/jobcompleter/main_test.go diff --git a/client.go b/client.go index 11bf6fde..70dd7653 100644 --- a/client.go +++ b/client.go @@ -271,10 +271,10 @@ type Client[TTx any] struct { driver riverdriver.Driver[TTx] elector *leadership.Elector - // fetchNewWorkCancel cancels the context used for fetching new work. This + // fetchWorkCancel cancels the context used for fetching new work. This // will be used to stop fetching new work whenever stop is initiated, or // when the context provided to Run is itself cancelled. - fetchNewWorkCancel context.CancelCauseFunc + fetchWorkCancel context.CancelCauseFunc monitor *clientMonitor notifier *notifier.Notifier @@ -428,10 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client TimeNowUTC: func() time.Time { return time.Now().UTC() }, } - completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100) - client := &Client[TTx]{ - completer: completer, config: config, driver: driver, monitor: newClientMonitor(), @@ -460,6 +457,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client // we'll need to add a config for this. instanceName := "default" + client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor()) + client.services = append(client.services, client.completer) + client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus) client.services = append(client.services, client.notifier) @@ -582,14 +582,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return errors.New("at least one Worker must be added to the Workers bundle") } - // We use separate contexts for fetching and working to allow for a graceful - // stop. However, both inherit from the provided context so if it is - // cancelled a more aggressive stop will be initiated. - fetchCtx, fetchNewWorkCancel := context.WithCancelCause(ctx) - c.fetchNewWorkCancel = fetchNewWorkCancel - workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c)) - c.workCancel = workCancel - // Before doing anything else, make an initial connection to the database to // verify that it appears healthy. Many of the subcomponents below start up // in a goroutine and in case of initial failure, only produce a log line, @@ -602,6 +594,14 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return fmt.Errorf("error making initial connection to database: %w", err) } + // In case of error, stop any services that might have started. This + // is safe because even services that were never started will still + // tolerate being stopped. + stopServicesOnError := func() { + startstop.StopAllParallel(c.services) + c.monitor.Stop() + } + // Monitor should be the first subprocess to start, and the last to stop. // It's not part of the waitgroup because we need to wait for everything else // to shut down prior to closing the monitor. @@ -612,19 +612,40 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return err } - // Receives job complete notifications from the completer and distributes - // them to any subscriptions. - c.completer.Subscribe(c.distributeJobCompleterCallback) + if c.completer != nil { + // The completer is part of the services list below, but although it can + // stop gracefully along with all the other services, it needs to be + // started with a context that's _not_ fetchCtx. This ensures that even + // when fetch is cancelled on shutdown, the completer is still given a + // separate opportunity to start stopping only after the producers have + // finished up and returned. + if err := c.completer.Start(ctx); err != nil { + stopServicesOnError() + return err + } - for _, service := range c.services { - if err := service.Start(fetchCtx); err != nil { - // In case of error, stop any services that might have started. This - // is safe because even services that were never started will still - // tolerate being stopped. - startstop.StopAllParallel(c.services) + // Receives job complete notifications from the completer and + // distributes them to any subscriptions. + c.completer.Subscribe(c.distributeJobCompleterCallback) + } - c.monitor.Stop() + // We use separate contexts for fetching and working to allow for a graceful + // stop. However, both inherit from the provided context so if it is + // cancelled a more aggressive stop will be initiated. + fetchCtx, fetchWorkCancel := context.WithCancelCause(ctx) + c.fetchWorkCancel = fetchWorkCancel + workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c)) + c.workCancel = workCancel + for _, service := range c.services { + // TODO(brandur): Reevaluate the use of fetchNewWorkCtx here. It's + // currently necessary to speed up shutdown so that all services start + // shutting down before having to wait for the producers to finish, but + // as stopping becomes more normalized (hopefully by making the client + // itself a start/stop service), we can likely accomplish that in a + // cleaner way. + if err := service.Start(fetchCtx); err != nil { + stopServicesOnError() if errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) { return nil } @@ -656,18 +677,21 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { producer.Stop() } - // Stop all mainline services where stop order isn't important. Contains the - // elector and notifier, amongst others. - startstop.StopAllParallel(c.services) - - // Once the producers have all finished, we know that completers have at least - // enqueued any remaining work. Wait for the completer to finish. - // - // TODO: there's a risk here that the completer is stuck on a job that won't - // complete. We probably need a timeout or way to move on in those cases. - c.completer.Wait() + // Stop all mainline services where stop order isn't important. + startstop.StopAllParallel(append( + // This list of services contains the completer, which should always + // stop after the producers so that any remaining work that was enqueued + // will have a chance to have its state completed as it finishes. + // + // TODO: there's a risk here that the completer is stuck on a job that + // won't complete. We probably need a timeout or way to move on in those + // cases. + c.services, - c.queueMaintainer.Stop() + // Will only be started if this client was leader, but can tolerate a stop + // without having been started. + c.queueMaintainer, + )) c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": All services stopped") @@ -701,12 +725,12 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // There's no need to call this method if a hard stop has already been initiated // by cancelling the context passed to Start or by calling StopAndCancel. func (c *Client[TTx]) Stop(ctx context.Context) error { - if c.fetchNewWorkCancel == nil { + if c.fetchWorkCancel == nil { return errors.New("client not started") } c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started") - c.fetchNewWorkCancel(rivercommon.ErrShutdown) + c.fetchWorkCancel(rivercommon.ErrShutdown) return c.awaitStop(ctx) } @@ -731,7 +755,7 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error { // instead. func (c *Client[TTx]) StopAndCancel(ctx context.Context) error { c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work") - c.fetchNewWorkCancel(rivercommon.ErrShutdown) + c.fetchWorkCancel(rivercommon.ErrShutdown) c.workCancel(rivercommon.ErrShutdown) return c.awaitStop(ctx) } @@ -762,7 +786,41 @@ func (c *Client[TTx]) Stopped() <-chan struct{} { // versions. If new event kinds are added, callers will have to explicitly add // them to their requested list and ensure they can be handled correctly. func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) { - for _, kind := range kinds { + return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds}) +} + +// The default maximum size of the subscribe channel. Events that would overflow +// it will be dropped. +const subscribeChanSizeDefault = 1_000 + +// SubscribeConfig is more thorough subscription configuration used for +// Client.SubscribeConfig. +type SubscribeConfig struct { + // ChanSize is the size of the buffered channel that will be created for the + // subscription. Incoming events that overall this number because a listener + // isn't reading from the channel in a timely manner will be dropped. + // + // Defaults to 1000. + ChanSize int + + // Kinds are the kinds of events that the subscription will receive. + // Requiring that kinds are specified explicitly allows for forward + // compatibility in case new kinds of events are added in future versions. + // If new event kinds are added, callers will have to explicitly add them to + // their requested list and esnure they can be handled correctly. + Kinds []EventKind +} + +// Special internal variant that lets us inject an overridden size. +func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) { + if config.ChanSize < 0 { + panic("SubscribeConfig.ChanSize must be greater or equal to 1") + } + if config.ChanSize == 0 { + config.ChanSize = subscribeChanSizeDefault + } + + for _, kind := range config.Kinds { if _, ok := allKinds[kind]; !ok { panic(fmt.Errorf("unknown event kind: %s", kind)) } @@ -771,7 +829,7 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) { c.subscriptionsMu.Lock() defer c.subscriptionsMu.Unlock() - subChan := make(chan *Event, subscribeChanSize) + subChan := make(chan *Event, config.ChanSize) // Just gives us an easy way of removing the subscription again later. subID := c.subscriptionsSeq @@ -779,7 +837,7 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) { c.subscriptions[subID] = &eventSubscription{ Chan: subChan, - Kinds: sliceutil.KeyBy(kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }), + Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }), } cancel := func() { diff --git a/client_test.go b/client_test.go index e6bf6fb4..d2f12654 100644 --- a/client_test.go +++ b/client_test.go @@ -2316,6 +2316,137 @@ func Test_Client_Subscribe(t *testing.T) { require.Equal(t, JobStateRetryable, eventFailed.Job.State) }) + t.Run("PanicOnUnknownKind", func(t *testing.T) { + t.Parallel() + + dbPool := riverinternaltest.TestDB(ctx, t) + + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + return nil + }) + + client := newTestClient(t, dbPool, config) + + require.PanicsWithError(t, "unknown event kind: does_not_exist", func() { + _, _ = client.Subscribe(EventKind("does_not_exist")) + }) + }) + + t.Run("SubscriptionCancellation", func(t *testing.T) { + t.Parallel() + + dbPool := riverinternaltest.TestDB(ctx, t) + + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + return nil + }) + + client := newTestClient(t, dbPool, config) + + subscribeChan, cancel := client.Subscribe(EventKindJobCompleted) + cancel() + + // Drops through immediately because the channel is closed. + riverinternaltest.WaitOrTimeout(t, subscribeChan) + + require.Empty(t, client.subscriptions) + }) +} + +// SubscribeConfig uses all the same code as Subscribe, so these are just a +// minimal set of new tests to make sure that the function also works when used +// independently. +func Test_Client_SubscribeConfig(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + keyEventsByName := func(events []*Event) map[string]*Event { + return sliceutil.KeyBy(events, func(event *Event) (string, *Event) { + var args callbackArgs + require.NoError(t, json.Unmarshal(event.Job.EncodedArgs, &args)) + return args.Name, event + }) + } + + requireInsert := func(ctx context.Context, client *Client[pgx.Tx], jobName string) *rivertype.JobRow { + job, err := client.Insert(ctx, callbackArgs{Name: jobName}, nil) + require.NoError(t, err) + return job + } + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + dbPool := riverinternaltest.TestDB(ctx, t) + + // Fail/succeed jobs based on their name so we can get a mix of both to + // verify. + config := newTestConfig(t, func(ctx context.Context, job *Job[callbackArgs]) error { + if strings.HasPrefix(job.Args.Name, "failed") { + return errors.New("job error") + } + return nil + }) + + client := newTestClient(t, dbPool, config) + + subscribeChan, cancel := client.SubscribeConfig(&SubscribeConfig{ + Kinds: []EventKind{EventKindJobCompleted, EventKindJobFailed}, + }) + t.Cleanup(cancel) + + jobCompleted1 := requireInsert(ctx, client, "completed1") + jobCompleted2 := requireInsert(ctx, client, "completed2") + jobFailed1 := requireInsert(ctx, client, "failed1") + jobFailed2 := requireInsert(ctx, client, "failed2") + + expectedJobs := []*rivertype.JobRow{ + jobCompleted1, + jobCompleted2, + jobFailed1, + jobFailed2, + } + + startClient(ctx, t, client) + + events := make([]*Event, len(expectedJobs)) + + for i := 0; i < len(expectedJobs); i++ { + events[i] = riverinternaltest.WaitOrTimeout(t, subscribeChan) + } + + eventsByName := keyEventsByName(events) + + { + eventCompleted1 := eventsByName["completed1"] + require.Equal(t, EventKindJobCompleted, eventCompleted1.Kind) + require.Equal(t, jobCompleted1.ID, eventCompleted1.Job.ID) + require.Equal(t, JobStateCompleted, eventCompleted1.Job.State) + } + + { + eventCompleted2 := eventsByName["completed2"] + require.Equal(t, EventKindJobCompleted, eventCompleted2.Kind) + require.Equal(t, jobCompleted2.ID, eventCompleted2.Job.ID) + require.Equal(t, JobStateCompleted, eventCompleted2.Job.State) + } + + { + eventFailed1 := eventsByName["failed1"] + require.Equal(t, EventKindJobFailed, eventFailed1.Kind) + require.Equal(t, jobFailed1.ID, eventFailed1.Job.ID) + require.Equal(t, JobStateRetryable, eventFailed1.Job.State) + } + + { + eventFailed2 := eventsByName["failed2"] + require.Equal(t, EventKindJobFailed, eventFailed2.Kind) + require.Equal(t, jobFailed2.ID, eventFailed2.Job.ID) + require.Equal(t, JobStateRetryable, eventFailed2.Job.State) + } + }) + t.Run("EventsDropWithNoListeners", func(t *testing.T) { t.Parallel() @@ -2327,37 +2458,70 @@ func Test_Client_Subscribe(t *testing.T) { client := newTestClient(t, dbPool, config) + type JobArgs struct { + JobArgsReflectKind[JobArgs] + } + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + return nil + })) + // A first channel that we'll use to make sure all the expected jobs are // finished. subscribeChan, cancel := client.Subscribe(EventKindJobCompleted) t.Cleanup(cancel) + // Artificially lowered subscribe channel size so we don't have to try + // and process thousands of jobs. + const ( + subscribeChanSize = 100 + numJobsToInsert = subscribeChanSize + 1 + ) + // Another channel with no listeners. Despite no listeners, it shouldn't // block or gum up the client's progress in any way. - _, cancel = client.Subscribe(EventKindJobCompleted) + subscribeChan2, cancel := client.SubscribeConfig(&SubscribeConfig{ + ChanSize: subscribeChanSize, + Kinds: []EventKind{EventKindJobCompleted}, + }) t.Cleanup(cancel) - // Insert more jobs than the maximum channel size. We'll be pulling from - // one channel but not the other. - for i := 0; i < subscribeChanSize+1; i++ { - _ = requireInsert(ctx, client, fmt.Sprintf("job %d", i)) + var ( + insertParams = make([]*riverdriver.JobInsertFastParams, numJobsToInsert) + kind = (&JobArgs{}).Kind() + ) + for i := 0; i < numJobsToInsert; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: kind, + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateAvailable, + } } + _, err := client.driver.GetExecutor().JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + // Need to start waiting on events before running the client or the // channel could overflow before we start listening. var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() - _ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, subscribeChanSize+1) + _ = riverinternaltest.WaitOrTimeoutN(t, subscribeChan, numJobsToInsert) }() startClient(ctx, t, client) wg.Wait() + + // Filled to maximum. + require.Len(t, subscribeChan2, subscribeChanSize) }) - t.Run("PanicOnUnknownKind", func(t *testing.T) { + t.Run("PanicOnChanSizeLessThanZero", func(t *testing.T) { t.Parallel() dbPool := riverinternaltest.TestDB(ctx, t) @@ -2368,12 +2532,14 @@ func Test_Client_Subscribe(t *testing.T) { client := newTestClient(t, dbPool, config) - require.PanicsWithError(t, "unknown event kind: does_not_exist", func() { - _, _ = client.Subscribe(EventKind("does_not_exist")) + require.PanicsWithValue(t, "SubscribeConfig.ChanSize must be greater or equal to 1", func() { + _, _ = client.SubscribeConfig(&SubscribeConfig{ + ChanSize: -1, + }) }) }) - t.Run("SubscriptionCancellation", func(t *testing.T) { + t.Run("PanicOnUnknownKind", func(t *testing.T) { t.Parallel() dbPool := riverinternaltest.TestDB(ctx, t) @@ -2384,13 +2550,11 @@ func Test_Client_Subscribe(t *testing.T) { client := newTestClient(t, dbPool, config) - subscribeChan, cancel := client.Subscribe(EventKindJobCompleted) - cancel() - - // Drops through immediately because the channel is closed. - riverinternaltest.WaitOrTimeout(t, subscribeChan) - - require.Empty(t, client.subscriptions) + require.PanicsWithError(t, "unknown event kind: does_not_exist", func() { + _, _ = client.SubscribeConfig(&SubscribeConfig{ + Kinds: []EventKind{EventKind("does_not_exist")}, + }) + }) }) } diff --git a/cmd/river/go.mod b/cmd/river/go.mod index 061eece0..da1122a6 100644 --- a/cmd/river/go.mod +++ b/cmd/river/go.mod @@ -2,19 +2,20 @@ module github.com/riverqueue/river/cmd/river go 1.21.4 -// replace github.com/riverqueue/river => ../.. +replace github.com/riverqueue/river => ../.. -// replace github.com/riverqueue/river/riverdriver => ../../riverdriver +replace github.com/riverqueue/river/riverdriver => ../../riverdriver -// replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverdriver/riverdatabasesql +replace github.com/riverqueue/river/riverdriver/riverdatabasesql => ../../riverdriver/riverdatabasesql -// replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5 +replace github.com/riverqueue/river/riverdriver/riverpgxv5 => ../../riverdriver/riverpgxv5 require ( - github.com/jackc/pgx/v5 v5.5.2 + github.com/jackc/pgx/v5 v5.5.5 github.com/riverqueue/river v0.0.17 - github.com/riverqueue/river/riverdriver v0.0.17 - github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 + github.com/riverqueue/river/riverdriver v0.0.25 + github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.25 + github.com/riverqueue/river/rivertype v0.0.25 github.com/spf13/cobra v1.8.0 ) @@ -23,10 +24,8 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect - github.com/oklog/ulid/v2 v2.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect golang.org/x/crypto v0.17.0 // indirect - golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/text v0.14.0 // indirect ) diff --git a/cmd/river/go.sum b/cmd/river/go.sum index a570f9b1..6a18c284 100644 --- a/cmd/river/go.sum +++ b/cmd/river/go.sum @@ -10,25 +10,16 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx/v5 v5.5.2 h1:iLlpgp4Cp/gC9Xuscl7lFL1PhhW+ZLtXZcrfCt4C3tA= -github.com/jackc/pgx/v5 v5.5.2/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= +github.com/jackc/pgx/v5 v5.5.5 h1:amBjrZVmksIdNjxGW/IiIMzxMKZFelXbUoPNb+8sjQw= +github.com/jackc/pgx/v5 v5.5.5/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiwgm1A= github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk= github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= -github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU= -github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ= -github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/riverqueue/river v0.0.17 h1:7beHZxo1WMzhN48y1Jt7CKkkmsw+TjuLd6qCEaznm7s= -github.com/riverqueue/river v0.0.17/go.mod h1:rWKYvWxz1eQltm6VhSVLoRm6qMNvsbW8gcsEArspfw0= -github.com/riverqueue/river/riverdriver v0.0.17 h1:yexFMREAJ3VgWQkWb1QZEj5xqNCGE5yaXW9tIhzOcNc= -github.com/riverqueue/river/riverdriver v0.0.17/go.mod h1:vtgL7tRTSB6rzeVEDppehd/rPx3Is+WBYb17Zj0+KsE= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17 h1:xPmTpQNBicTZEuFVqwiK8yJ6CxrvM7dQi2CroNMxXH4= -github.com/riverqueue/river/riverdriver/riverdatabasesql v0.0.17/go.mod h1:zlZKXZ6XHcbwYniSKWX2+GwFlXHTnG9pJtE/BkxK0Xc= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17 h1:iuruCNT7nkC7Z4Qzb79jcvAVniGyK+Kstsy7fKJagUU= -github.com/riverqueue/river/riverdriver/riverpgxv5 v0.0.17/go.mod h1:kL59NW3LoPbQxPz9DQoUtDYq3Zkcpdt5CIowgeBZwFw= +github.com/riverqueue/river/rivertype v0.0.25 h1:iyReBD59MUan83gp3geGoHKU5eUrB9J9acziPBOlnRs= +github.com/riverqueue/river/rivertype v0.0.25/go.mod h1:PvsLQ/xSATmmn9gdjB3pnIaj9ZSLmWhDTI4EPrK3AJ0= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -39,14 +30,12 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= -golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= -golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= diff --git a/cmd/river/riverbench/river_bench.go b/cmd/river/riverbench/river_bench.go index 36048ace..2b3a65f4 100644 --- a/cmd/river/riverbench/river_bench.go +++ b/cmd/river/riverbench/river_bench.go @@ -13,6 +13,7 @@ import ( "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/rivertype" ) type Benchmarker[TTx any] struct { @@ -116,11 +117,23 @@ func (b *Benchmarker[TTx]) Run(ctx context.Context) error { // worked instead of using telemetry from the worker itself because the // subscribe channel accounts for the job moving through the completer while // the worker does not. - subscribeChan, subscribeCancel := client.Subscribe( - river.EventKindJobCancelled, - river.EventKindJobCompleted, - river.EventKindJobFailed, - ) + subscribeChan, subscribeCancel := client.SubscribeConfig(&river.SubscribeConfig{ + // The benchmark may be processing a huge quantity of jobs far in excess + // of what River under normal conditions might see, so pick a much + // larger than normal subscribe channel size to make sure we don't + // accidentally drop any events. + // + // The subscribe channel is used to determine when jobs finish, so + // dropping jobs is very detrimental because it confuses the benchmark's + // bookkeeping of how many jobs there are left to work. + ChanSize: minJobs, + + Kinds: []river.EventKind{ + river.EventKindJobCancelled, + river.EventKindJobCompleted, + river.EventKindJobFailed, + }, + }) defer subscribeCancel() go func() { @@ -132,13 +145,30 @@ func (b *Benchmarker[TTx]) Run(ctx context.Context) error { case <-shutdown: return - case <-subscribeChan: - numJobsLeft.Add(-1) - numJobsWorked := numJobsWorked.Add(1) + case event := <-subscribeChan: + if event == nil { // Closed channel. + b.logger.InfoContext(ctx, "Subscription channel closed") + return + } + + switch { + case event.Kind == river.EventKindJobCancelled: + fallthrough + + case event.Kind == river.EventKindJobCompleted: + fallthrough + + // Only count a job as complete if it failed for the last time. + // We don't expect benchmark jobs to ever fail, so this extra + // attention to detail is here, but shouldn't be needed. + case event.Kind == river.EventKindJobFailed && event.Job.State == rivertype.JobStateDiscarded: + numJobsLeft.Add(-1) + numJobsWorked := numJobsWorked.Add(1) - const logBatchSize = 5_000 - if numJobsWorked%logBatchSize == 0 { - b.logger.InfoContext(ctx, b.name+": Worked job batch", "num_worked", logBatchSize) + const logBatchSize = 5_000 + if numJobsWorked%logBatchSize == 0 { + b.logger.InfoContext(ctx, b.name+": Worked job batch", "num_worked", logBatchSize) + } } } } @@ -256,8 +286,8 @@ func (b *Benchmarker[TTx]) Run(ctx context.Context) error { } const ( - insertBatchSize = 2_000 - minJobs = 50_000 + insertBatchSize = 5_000 + minJobs = 75_000 // max per/sec I've seen it work + 50% head room ) // Inserts `b.numTotalJobs` in batches. This variant inserts a bulk of initial diff --git a/event.go b/event.go index ef84b86b..30482b76 100644 --- a/event.go +++ b/event.go @@ -67,10 +67,6 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics { } } -// The maximum size of the subscribe channel. Events that would overflow it will -// be dropped. -const subscribeChanSize = 100 - // eventSubscription is an active subscription for events being produced by a // client, created with Client.Subscribe. type eventSubscription struct { diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 17d227df..5f369769 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -2,31 +2,39 @@ package jobcompleter import ( "context" + "errors" "math" "sync" "time" + "github.com/jackc/puddle/v2" "golang.org/x/sync/errgroup" "github.com/riverqueue/river/internal/baseservice" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/maintenance/startstop" + "github.com/riverqueue/river/internal/util/chanutil" + "github.com/riverqueue/river/internal/util/maputil" "github.com/riverqueue/river/internal/util/timeutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivertype" ) +// JobCompleter is an interface to a service that "completes" jobs by marking +// them with an appropriate state and any other necessary metadata in the +// database. It's a generic interface to let us experiment with the speed of a +// number of implementations, although River will likely always prefer our most +// optimized one. type JobCompleter interface { + startstop.Service + // JobSetState sets a new state for the given job, as long as it's // still running (i.e. its state has not changed to something else already). - JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error + JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error // Subscribe injects a callback which will be invoked whenever a job is // updated. Subscribe(subscribeFunc func(update CompleterJobUpdated)) - - // Wait waits for all ongoing completions to finish, enabling graceful - // shutdown. - Wait() } type CompleterJobUpdated struct { @@ -38,15 +46,15 @@ type CompleterJobUpdated struct { // but is a minimal interface with the functions needed for completers to work // to more easily facilitate mocking. type PartialExecutor interface { + JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) } -type InlineJobCompleter struct { +type InlineCompleter struct { baseservice.BaseService + withSubscribe - exec PartialExecutor - subscribeFunc func(update CompleterJobUpdated) - subscribeFuncMu sync.Mutex + exec PartialExecutor // A waitgroup is not actually needed for the inline completer because as // long as the caller is waiting on each function call, completion is @@ -56,117 +64,282 @@ type InlineJobCompleter struct { wg sync.WaitGroup } -func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *InlineJobCompleter { - return baseservice.Init(archetype, &InlineJobCompleter{ +func NewInlineCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *InlineCompleter { + return baseservice.Init(archetype, &InlineCompleter{ exec: exec, }) } -func (c *InlineJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - return c.doOperation(stats, func(ctx context.Context) (*rivertype.JobRow, error) { - return c.exec.JobSetStateIfRunning(ctx, params) - }) -} - -func (c *InlineJobCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() - - c.subscribeFunc = subscribeFunc -} - -func (c *InlineJobCompleter) Wait() { - c.wg.Wait() -} - -func (c *InlineJobCompleter) doOperation(stats *jobstats.JobStatistics, f func(ctx context.Context) (*rivertype.JobRow, error)) error { +func (c *InlineCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { c.wg.Add(1) defer c.wg.Done() start := c.TimeNowUTC() - job, err := withRetries(&c.BaseService, f) + job, err := withRetries(ctx, &c.BaseService, func(ctx context.Context) (*rivertype.JobRow, error) { + return c.exec.JobSetStateIfRunning(ctx, params) + }) if err != nil { return err } stats.CompleteDuration = c.TimeNowUTC().Sub(start) + c.sendJobToSubscription(job, stats) - func() { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() + return nil +} - if c.subscribeFunc != nil { - c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) - } - }() +func (c *InlineCompleter) Start(ctx context.Context) error { return nil } - return nil +func (c *InlineCompleter) Stop() { + c.wg.Wait() } -type AsyncJobCompleter struct { +// A default concurrency of 100 seems to perform better a much smaller number +// like 10, but it's quite dependent on environment (10 and 100 bench almost +// identically on MBA when it's on battery power). This number should represent +// our best known default for most use cases, but don't consider its choice to +// be particularly well informed at this point. +const asyncCompleterDefaultConcurrency = 100 + +type AsyncCompleter struct { baseservice.BaseService + withSubscribe - concurrency uint32 - exec PartialExecutor - eg *errgroup.Group - subscribeFunc func(update CompleterJobUpdated) - subscribeFuncMu sync.Mutex + concurrency int + errGroup *errgroup.Group + exec PartialExecutor +} + +func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *AsyncCompleter { + return newAsyncCompleterWithConcurrency(archetype, exec, asyncCompleterDefaultConcurrency) } -func NewAsyncCompleter(archetype *baseservice.Archetype, exec PartialExecutor, concurrency uint32) *AsyncJobCompleter { - eg := &errgroup.Group{} - // TODO: int concurrency may feel more natural than uint32 - eg.SetLimit(int(concurrency)) +func newAsyncCompleterWithConcurrency(archetype *baseservice.Archetype, exec PartialExecutor, concurrency int) *AsyncCompleter { + errGroup := &errgroup.Group{} + errGroup.SetLimit(concurrency) - return baseservice.Init(archetype, &AsyncJobCompleter{ + return baseservice.Init(archetype, &AsyncCompleter{ exec: exec, concurrency: concurrency, - eg: eg, + errGroup: errGroup, }) } -func (c *AsyncJobCompleter) JobSetStateIfRunning(stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { - return c.doOperation(stats, func(ctx context.Context) (*rivertype.JobRow, error) { - return c.exec.JobSetStateIfRunning(ctx, params) - }) -} - -func (c *AsyncJobCompleter) doOperation(stats *jobstats.JobStatistics, f func(ctx context.Context) (*rivertype.JobRow, error)) error { - c.eg.Go(func() error { - start := c.TimeNowUTC() +func (c *AsyncCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { + // Start clock outside of goroutine so that the time spent blocking waiting + // for an errgroup slot is accurately measured. + start := c.TimeNowUTC() - job, err := withRetries(&c.BaseService, f) + c.errGroup.Go(func() error { + job, err := withRetries(ctx, &c.BaseService, func(ctx context.Context) (*rivertype.JobRow, error) { + return c.exec.JobSetStateIfRunning(ctx, params) + }) if err != nil { return err } stats.CompleteDuration = c.TimeNowUTC().Sub(start) + c.sendJobToSubscription(job, stats) + + return nil + }) + return nil +} + +func (c *AsyncCompleter) Start(ctx context.Context) error { return nil } + +func (c *AsyncCompleter) Stop() { + if err := c.errGroup.Wait(); err != nil { + c.Logger.Error("Error waiting on async completer: %s", err) + } +} + +type batchCompleterSetState struct { + Params *riverdriver.JobSetStateIfRunningParams + Stats *jobstats.JobStatistics + WaitingAt time.Time // went job was submitted for completion +} + +// BatchCompleter uses a debounced channel to accumulate incoming completions +// and every so often complete many of them as a single efficient batch. To +// minimize the amount of driver surface area we need, the batching is only +// performed for jobs being changed to a `completed` state, which we expect to +// the vast common case under normal operation. The completer embeds an +// AsyncCompleter to perform other non-`completed` state completions. +type BatchCompleter struct { + baseservice.BaseService + startstop.BaseStartStop + withSubscribe + + asyncCompleter *AsyncCompleter // used for non-complete completions + debounceChan *chanutil.DebouncedChan + exec PartialExecutor + ready chan struct{} + setStateParams map[int64]*batchCompleterSetState + setStateParamsMu sync.Mutex +} + +func NewBatchCompleter(archetype *baseservice.Archetype, exec PartialExecutor) *BatchCompleter { + return baseservice.Init(archetype, &BatchCompleter{ + asyncCompleter: NewAsyncCompleter(archetype, exec), + exec: exec, + setStateParams: make(map[int64]*batchCompleterSetState), + }) +} + +func (c *BatchCompleter) Start(ctx context.Context) error { + stopCtx, shouldStart, stopped := c.StartInit(ctx) + if !shouldStart { + return nil + } + + c.ready = make(chan struct{}) + + go func() { + // This defer should come first so that it's last out, thereby avoiding + // races. + defer close(stopped) - func() { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() + c.Logger.InfoContext(ctx, c.Name+": Run loop started") + defer c.Logger.InfoContext(ctx, c.Name+": Run loop stopped") - if c.subscribeFunc != nil { - c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) + c.debounceChan = chanutil.NewDebouncedChan(stopCtx, 100*time.Millisecond) + + close(c.ready) + + for { + select { + case <-stopCtx.Done(): + // Try to insert last batch before leaving. Note we use the + // original context so operations aren't immediately cancelled. + if err := c.handleBatch(ctx); err != nil { + c.Logger.Error(c.Name+": Error completing batch", "err", err) + } + + return + case <-c.debounceChan.C(): + if err := c.handleBatch(ctx); err != nil { + c.Logger.Error(c.Name+": Error completing batch", "err", err) + } } - }() + } + }() + + return nil +} +func (c *BatchCompleter) handleBatch(ctx context.Context) error { + var setStateBatch map[int64]*batchCompleterSetState + func() { + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + setStateBatch = c.setStateParams + + // Don't bother resetting the map if there's nothing to process, + // allowing the completer to idle efficiently. + if len(setStateBatch) > 0 { + c.setStateParams = make(map[int64]*batchCompleterSetState) + } else { + // Set nil to avoid a data race below in case the map is set as a + // new job comes in. + setStateBatch = nil + } + }() + + if len(setStateBatch) < 1 { return nil - }) + } + + // Use a single `finalized_at` value for the whole batch. Not the greatest + // thing maybe, but makes things much easier. + var sampleFinalizedAt time.Time + for _, setState := range setStateBatch { + sampleFinalizedAt = *setState.Params.FinalizedAt + break + } + + // Insert a sub-batch with retries. Also helps reduce visual noise and + // increase readability of loop below. + insertSubBatch := func(ids []int64) ([]*rivertype.JobRow, error) { + return withRetries(ctx, &c.BaseService, func(ctx context.Context) ([]*rivertype.JobRow, error) { + return c.exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: ids, + FinalizedAt: sampleFinalizedAt, + }) + }) + } + + // Tease apart enormous batches into sub-batches. + // + // All the code below is concerned with doing that, with a fast loop that + // doesn't allocate any additional memory in case the entire batch is + // smaller than the sub-batch maximum size (which will be the common case). + const oneOperationMax = 2_000 + + var ( + ids = maputil.Keys(setStateBatch) + jobRows []*rivertype.JobRow + ) + if len(setStateBatch) > oneOperationMax { + jobRows = make([]*rivertype.JobRow, 0, len(setStateBatch)) + for i := 0; i < len(setStateBatch); i += oneOperationMax { + jobRowsSubBatch, err := insertSubBatch(ids[i:min(i+oneOperationMax, len(ids))]) + if err != nil { + return err + } + jobRows = append(jobRows, jobRowsSubBatch...) + } + } else { + var err error + jobRows, err = insertSubBatch(ids) + if err != nil { + return err + } + } + + for _, jobRow := range jobRows { + setState := setStateBatch[jobRow.ID] + setState.Stats.CompleteDuration = c.TimeNowUTC().Sub(setState.WaitingAt) + c.sendJobToSubscription(jobRow, setState.Stats) + } + return nil } -func (c *AsyncJobCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { - c.subscribeFuncMu.Lock() - defer c.subscribeFuncMu.Unlock() +func (c *BatchCompleter) JobSetStateIfRunning(ctx context.Context, stats *jobstats.JobStatistics, params *riverdriver.JobSetStateIfRunningParams) error { + // Send completions other than setting to `complete` to an async completer. + // We consider this okay because these are expected to be much more rare, so + // only optimizing `complete` will yield huge speed gains. + if params.State != rivertype.JobStateCompleted { + return c.asyncCompleter.JobSetStateIfRunning(ctx, stats, params) + } - c.subscribeFunc = subscribeFunc + // Wait until the completer is started and ready to start processing + // batches. Alternatively, we could remove this and allow batches to start + // accumulating even if the service isn't started, but that could introduce + // some danger of the service never being started and therefore accumulating + // forever without completing jobs. + <-c.ready + + c.setStateParamsMu.Lock() + defer c.setStateParamsMu.Unlock() + + c.setStateParams[params.ID] = &batchCompleterSetState{params, stats, c.TimeNowUTC()} + c.debounceChan.Call() + + return nil +} + +func (c *BatchCompleter) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { + c.withSubscribe.Subscribe(subscribeFunc) + c.asyncCompleter.Subscribe(subscribeFunc) } -func (c *AsyncJobCompleter) Wait() { - // TODO: handle error? - _ = c.eg.Wait() +func (c *BatchCompleter) Stop() { + c.BaseStartStop.Stop() + c.asyncCompleter.Stop() } // As configued, total time from initial attempt is ~7 seconds (1 + 2 + 4) (not @@ -174,7 +347,7 @@ func (c *AsyncJobCompleter) Wait() { // may want to rethink these numbers and strategy. const numRetries = 3 -func withRetries(c *baseservice.BaseService, f func(ctx context.Context) (*rivertype.JobRow, error)) (*rivertype.JobRow, error) { //nolint:varnamelen +func withRetries[T any](ctx context.Context, baseService *baseservice.BaseService, retryFunc func(ctx context.Context) (T, error)) (T, error) { retrySecondsWithoutJitter := func(attempt int) float64 { // Uses a different algorithm (2 ** N) compared to retry policies (4 ** // N) so we can get more retries sooner: 1, 2, 4, 8 @@ -185,41 +358,78 @@ func withRetries(c *baseservice.BaseService, f func(ctx context.Context) (*river retrySeconds := retrySecondsWithoutJitter(attempt) // Jitter number of seconds +/- 10%. - retrySeconds += retrySeconds * (c.Rand.Float64()*0.2 - 0.1) + retrySeconds += retrySeconds * (baseService.Rand.Float64()*0.2 - 0.1) return retrySeconds } - tryOnce := func() (*rivertype.JobRow, error) { - ctx := context.Background() + tryOnce := func() (T, error) { + uncancelledCtx := context.Background() - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + // I've found that we want at least ten seconds for a large batch, + // although it usually doesn't need that long. + uncancelledCtx, cancel := context.WithTimeout(uncancelledCtx, 10*time.Second) defer cancel() - return f(ctx) + return retryFunc(uncancelledCtx) } - var lastErr error + var ( + defaultVal T + lastErr error + ) // TODO: Added a basic retry algorithm based on the top-level retry policies // for now, but we may want to reevaluate this somewhat. + // + // TODO: Should use base service exponential backoff once available. for attempt := 1; attempt < numRetries+1; attempt++ { - job, err := tryOnce() + retVal, err := tryOnce() if err != nil { + // A cancelled context will never succeed, return immediately. + if errors.Is(err, context.Canceled) { + return defaultVal, err + } + + // A closed pool will never succeed, return immediately. + if errors.Is(err, puddle.ErrClosedPool) { + return defaultVal, err + } + lastErr = err sleepDuration := timeutil.SecondsAsDuration(retrySeconds(attempt)) - // TODO: this logger doesn't use the user-provided context because it's - // not currently available here. It should. - c.Logger.Error(c.Name+": Completer error (will retry)", "attempt", attempt, "err", err, "sleep_duration", sleepDuration) - c.CancellableSleep(context.Background(), sleepDuration) + baseService.Logger.ErrorContext(ctx, baseService.Name+": Completer error (will retry)", "attempt", attempt, "err", err, "sleep_duration", sleepDuration) + baseService.CancellableSleep(ctx, sleepDuration) continue } - return job, nil + return retVal, nil } - // TODO: this logger doesn't use the user-provided context because it's - // not currently available here. It should. - c.Logger.Error(c.Name + ": Too many errors; giving up") - return nil, lastErr + baseService.Logger.ErrorContext(ctx, baseService.Name+": Too many errors; giving up") + + return defaultVal, lastErr +} + +// Utility struct embedded in completers to give them an easy way to provide a +// Subscribe function and to handle locking around its use. +type withSubscribe struct { + subscribeFunc func(update CompleterJobUpdated) + subscribeFuncMu sync.RWMutex +} + +func (c *withSubscribe) Subscribe(subscribeFunc func(update CompleterJobUpdated)) { + c.subscribeFuncMu.Lock() + defer c.subscribeFuncMu.Unlock() + + c.subscribeFunc = subscribeFunc +} + +func (c *withSubscribe) sendJobToSubscription(job *rivertype.JobRow, stats *jobstats.JobStatistics) { + c.subscribeFuncMu.RLock() + defer c.subscribeFuncMu.RUnlock() + + if c.subscribeFunc != nil { + c.subscribeFunc(CompleterJobUpdated{Job: job, JobStats: stats}) + } } diff --git a/internal/jobcompleter/job_completer_test.go b/internal/jobcompleter/job_completer_test.go index 5845a2ea..a3008b0d 100644 --- a/internal/jobcompleter/job_completer_test.go +++ b/internal/jobcompleter/job_completer_test.go @@ -3,38 +3,67 @@ package jobcompleter import ( "context" "errors" + "fmt" "sync" + "sync/atomic" "testing" "time" + "github.com/jackc/puddle/v2" "github.com/stretchr/testify/require" "github.com/riverqueue/river/internal/jobstats" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/riverinternaltest/testfactory" + "github.com/riverqueue/river/internal/util/ptrutil" + "github.com/riverqueue/river/internal/util/randutil" "github.com/riverqueue/river/riverdriver" + "github.com/riverqueue/river/riverdriver/riverpgxv5" "github.com/riverqueue/river/rivertype" ) -type executorMock struct { - JobSetStateIfRunningCalled bool - JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) - mu sync.Mutex +type partialExecutorMock struct { + JobSetCompleteIfRunningManyCalled bool + JobSetCompleteIfRunningManyFunc func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) + JobSetStateIfRunningCalled bool + JobSetStateIfRunningFunc func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) + mu sync.Mutex } -func (m *executorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { - m.mu.Lock() - m.JobSetStateIfRunningCalled = true - m.mu.Unlock() +// NewPartialExecutorMock returns a new mock with all mock functions set to call +// down into the given real executor. +func NewPartialExecutorMock(exec riverdriver.Executor) *partialExecutorMock { + return &partialExecutorMock{ + JobSetCompleteIfRunningManyFunc: exec.JobSetCompleteIfRunningMany, + JobSetStateIfRunningFunc: exec.JobSetStateIfRunning, + } +} +func (m *partialExecutorMock) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetCompleteIfRunningManyCalled = true }) + return m.JobSetCompleteIfRunningManyFunc(ctx, params) +} + +func (m *partialExecutorMock) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + m.setCalled(func() { m.JobSetStateIfRunningCalled = true }) return m.JobSetStateIfRunningFunc(ctx, params) } +func (m *partialExecutorMock) setCalled(setCalledFunc func()) { + m.mu.Lock() + defer m.mu.Unlock() + setCalledFunc() +} + func TestInlineJobCompleter_Complete(t *testing.T) { t.Parallel() + ctx := context.Background() + var attempt int expectedErr := errors.New("an error from the completer") - adapter := &executorMock{ + execMock := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { require.Equal(t, int64(1), params.ID) attempt++ @@ -42,15 +71,15 @@ func TestInlineJobCompleter_Complete(t *testing.T) { }, } - completer := NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter) - t.Cleanup(completer.Wait) + completer := NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), execMock) + t.Cleanup(completer.Stop) - err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now())) + err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(1, time.Now())) if !errors.Is(err, expectedErr) { t.Errorf("expected %v, got %v", expectedErr, err) } - require.True(t, adapter.JobSetStateIfRunningCalled) + require.True(t, execMock.JobSetStateIfRunningCalled) require.Equal(t, numRetries, attempt) } @@ -73,6 +102,8 @@ func TestInlineJobCompleter_Wait(t *testing.T) { func TestAsyncJobCompleter_Complete(t *testing.T) { t.Parallel() + ctx := context.Background() + type jobInput struct { // TODO: Try to get rid of containing the context in struct. It'd be // better to pass it forward instead. @@ -89,26 +120,26 @@ func TestAsyncJobCompleter_Complete(t *testing.T) { resultCh <- expectedErr }() - adapter := &executorMock{ + adapter := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { inputCh <- jobInput{ctx: ctx, jobID: params.ID} err := <-resultCh return nil, err }, } - completer := NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter, 2) - t.Cleanup(completer.Wait) + completer := newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), adapter, 2) + t.Cleanup(completer.Stop) // launch 4 completions, only 2 can be inline due to the concurrency limit: for i := int64(0); i < 2; i++ { - if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { + if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } bgCompletionsStarted := make(chan struct{}) go func() { for i := int64(2); i < 4; i++ { - if err := completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { + if err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(i, time.Now())); err != nil { t.Errorf("expected nil err, got %v", err) } } @@ -159,7 +190,7 @@ func TestAsyncJobCompleter_Subscribe(t *testing.T) { t.Parallel() testCompleterSubscribe(t, func(exec PartialExecutor) JobCompleter { - return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) }) } @@ -167,14 +198,16 @@ func TestAsyncJobCompleter_Wait(t *testing.T) { t.Parallel() testCompleterWait(t, func(exec PartialExecutor) JobCompleter { - return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(t).WithSleepDisabled(), exec, 4) }) } func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobCompleter) { t.Helper() - exec := &executorMock{ + ctx := context.Background() + + exec := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { return &rivertype.JobRow{ State: rivertype.JobStateCompleted, @@ -190,10 +223,10 @@ func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobC }) for i := 0; i < 4; i++ { - require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) } - completer.Wait() + completer.Stop() updates := riverinternaltest.WaitOrTimeoutN(t, jobUpdates, 4) for i := 0; i < 4; i++ { @@ -204,9 +237,11 @@ func testCompleterSubscribe(t *testing.T, constructor func(PartialExecutor) JobC func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobCompleter) { t.Helper() + ctx := context.Background() + resultCh := make(chan error) completeStartedCh := make(chan struct{}) - exec := &executorMock{ + exec := &partialExecutorMock{ JobSetStateIfRunningFunc: func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { completeStartedCh <- struct{}{} err := <-resultCh @@ -220,7 +255,7 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple for i := 0; i < 4; i++ { i := i go func() { - require.NoError(t, completer.JobSetStateIfRunning(&jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(int64(i), time.Now()))) }() <-completeStartedCh // wait for func to actually start } @@ -230,7 +265,7 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple waitDone := make(chan struct{}) go func() { - completer.Wait() + completer.Stop() close(waitDone) }() @@ -259,3 +294,594 @@ func testCompleterWait(t *testing.T, constructor func(PartialExecutor) JobComple t.Errorf("expected Wait to return after all jobs are complete") } } + +func TestAsyncCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *AsyncCompleter { + return NewAsyncCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *AsyncCompleter) { completer.DisableSleep = true }, + func(completer *AsyncCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func TestBatchCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *BatchCompleter { + return NewBatchCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *BatchCompleter) { completer.DisableSleep = true }, + func(completer *BatchCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func TestInlineCompleter(t *testing.T) { + t.Parallel() + + testCompleter(t, func(exec riverdriver.Executor) *InlineCompleter { + return NewInlineCompleter(riverinternaltest.BaseServiceArchetype(t), exec) + }, + func(completer *InlineCompleter) { completer.DisableSleep = true }, + func(completer *InlineCompleter, exec PartialExecutor) { completer.exec = exec }) +} + +func testCompleter[TCompleter JobCompleter]( + t *testing.T, + newCompleter func(exec riverdriver.Executor) TCompleter, + + // These functions are here to help us inject test behavior that's not part + // of the JobCompleter interface. We could alternatively define a second + // interface like jobCompleterWithTestFacilities to expose the additional + // functionality, although that's not particularly beautiful either. + disableSleep func(completer TCompleter), + setExec func(completer TCompleter, exec PartialExecutor), +) { + t.Helper() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + } + + setup := func(t *testing.T) (TCompleter, *testBundle) { + t.Helper() + + var ( + driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, t)) + exec = driver.GetExecutor() + completer = newCompleter(exec) + ) + + require.NoError(t, completer.Start(ctx)) + + return completer, &testBundle{ + exec: exec, + } + } + + requireJob := func(t *testing.T, exec riverdriver.Executor, jobID int64) *rivertype.JobRow { + t.Helper() + + job, err := exec.JobGetByID(ctx, jobID) + require.NoError(t, err) + return job + } + + requireState := func(t *testing.T, exec riverdriver.Executor, jobID int64, state rivertype.JobState) { + t.Helper() + + job := requireJob(t, exec, jobID) + require.Equal(t, state, job.State) + } + + t.Run("CompletesJobs", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var ( + job1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job1.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job3.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job1.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job2.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job3.ID, rivertype.JobStateCompleted) + }) + + // Some completers like BatchCompleter have special logic for when they're + // handling enormous numbers of jobs, so make sure we're covered for cases + // like that. + t.Run("CompletesManyJobs", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + const ( + kind = "many_jobs_kind" + numJobs = 4_400 + ) + + var ( + insertParams = make([]*riverdriver.JobInsertFastParams, numJobs) + stats = make([]jobstats.JobStatistics, numJobs) + ) + for i := 0; i < numJobs; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: kind, + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateRunning, + } + } + + _, err := bundle.exec.JobInsertFastMany(ctx, insertParams) + require.NoError(t, err) + + jobs, err := bundle.exec.JobGetByKindMany(ctx, []string{kind}) + require.NoError(t, err) + + for i := range jobs { + require.NoError(t, completer.JobSetStateIfRunning(ctx, &stats[i], riverdriver.JobSetStateCompleted(jobs[i].ID, time.Now()))) + } + + completer.Stop() + + updatedJobs, err := bundle.exec.JobGetByKindMany(ctx, []string{kind}) + require.NoError(t, err) + for i := range updatedJobs { + require.Equal(t, rivertype.JobStateCompleted, updatedJobs[i].State) + } + }) + + // Performs continuous job insertion from a background goroutine. Returns a + // function that should be invoked to stop insertion, which will block until + // insertion stops, then return the total number of jobs that were inserted. + doContinuousInsertion := func(t *testing.T, completer JobCompleter, exec riverdriver.Executor) func() int { + t.Helper() + + var ( + insertionStopped = make(chan struct{}) + numInserted atomic.Int64 + stopInsertion = make(chan struct{}) + ) + go func() { + defer close(insertionStopped) + + defer func() { + t.Logf("Inserted %d jobs", numInserted.Load()) + }() + + for { + select { + case <-stopInsertion: + return + default: + } + + job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + numInserted.Add(1) + } + }() + + return func() int { + close(stopInsertion) + <-insertionStopped + return int(numInserted.Load()) + } + } + + t.Run("ContinuousCompletion", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + stopInsertion := doContinuousInsertion(t, completer, bundle.exec) + + // Give some time for some jobs to be inserted. + time.Sleep(100 * time.Millisecond) + + // Signal to stop insertion and wait for the goroutine to return. + numInserted := stopInsertion() + + require.Greater(t, numInserted, 0) + + completer.Stop() + }) + + t.Run("AllJobStates", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var ( + job1 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job3 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job4 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job5 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job6 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job7 = testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + ) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCancelled(job1.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job2.ID, time.Now()))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateDiscarded(job3.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorAvailable(job4.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateErrorRetryable(job5.ID, time.Now(), []byte("{}")))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozed(job6.ID, time.Now(), 10))) + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateSnoozedAvailable(job7.ID, time.Now(), 10))) + + completer.Stop() + + requireState(t, bundle.exec, job1.ID, rivertype.JobStateCancelled) + requireState(t, bundle.exec, job2.ID, rivertype.JobStateCompleted) + requireState(t, bundle.exec, job3.ID, rivertype.JobStateDiscarded) + requireState(t, bundle.exec, job4.ID, rivertype.JobStateAvailable) + requireState(t, bundle.exec, job5.ID, rivertype.JobStateRetryable) + requireState(t, bundle.exec, job6.ID, rivertype.JobStateScheduled) + requireState(t, bundle.exec, job7.ID, rivertype.JobStateAvailable) + }) + + t.Run("Subscription", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + var jobUpdate CompleterJobUpdated + completer.Subscribe(func(update CompleterJobUpdated) { + jobUpdate = update + }) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + require.NotZero(t, jobUpdate) + require.Equal(t, rivertype.JobStateCompleted, jobUpdate.Job.State) + }) + + t.Run("MultipleCycles", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + { + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + } + + { + require.NoError(t, completer.Start(ctx)) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + } + }) + + t.Run("CompletionFailure", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + // The completers will do an exponential backoff sleep while retrying. + // Make sure to disable it for this test case so the tests stay fast. + disableSleep(completer) + + var numCalls int + maybeError := func() error { + numCalls++ + switch numCalls { + case 1: + fallthrough + case 2: + return fmt.Errorf("error from executor %d", numCalls) + } + return nil + } + + execMock := NewPartialExecutorMock(bundle.exec) + execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + if err := maybeError(); err != nil { + return nil, err + } + return bundle.exec.JobSetCompleteIfRunningMany(ctx, params) + } + execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + if err := maybeError(); err != nil { + return nil, err + } + return bundle.exec.JobSetStateIfRunning(ctx, params) + } + setExec(completer, execMock) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + require.NoError(t, completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now()))) + + completer.Stop() + + // Make sure our mocks were really called. The specific function called + // will depend on the completer under test, so okay as long as one or + // the other was. + require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + + // Job still managed to complete despite the errors. + requireState(t, bundle.exec, job.ID, rivertype.JobStateCompleted) + }) + + t.Run("CompletionImmediateFailureOnContextCanceled", func(t *testing.T) { //nolint:dupl + t.Parallel() + + completer, bundle := setup(t) + + // The completers will do an exponential backoff sleep while retrying. + // Make sure to disable it for this test case so the tests stay fast. + disableSleep(completer) + + execMock := NewPartialExecutorMock(bundle.exec) + execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, context.Canceled + } + execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + return nil, context.Canceled + } + setExec(completer, execMock) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())) + + // The error returned will be nil for asynchronous completers, but + // returned immediately for synchronous ones. + require.True(t, err == nil || errors.Is(err, context.Canceled)) + + completer.Stop() + + // Make sure our mocks were really called. The specific function called + // will depend on the completer under test, so okay as long as one or + // the other was. + require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + + // Job is still running because the completer is forced to give up + // immediately on certain types of errors like where a pool is closed. + requireState(t, bundle.exec, job.ID, rivertype.JobStateRunning) + }) + + t.Run("CompletionImmediateFailureOnErrClosedPool", func(t *testing.T) { //nolint:dupl + t.Parallel() + + completer, bundle := setup(t) + + // The completers will do an exponential backoff sleep while retrying. + // Make sure to disable it for this test case so the tests stay fast. + disableSleep(completer) + + execMock := NewPartialExecutorMock(bundle.exec) + execMock.JobSetCompleteIfRunningManyFunc = func(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, puddle.ErrClosedPool + } + execMock.JobSetStateIfRunningFunc = func(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { + return nil, puddle.ErrClosedPool + } + setExec(completer, execMock) + + job := testfactory.Job(ctx, t, bundle.exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + err := completer.JobSetStateIfRunning(ctx, &jobstats.JobStatistics{}, riverdriver.JobSetStateCompleted(job.ID, time.Now())) + + // The error returned will be nil for asynchronous completers, but + // returned immediately for synchronous ones. + require.True(t, err == nil || errors.Is(err, puddle.ErrClosedPool)) + + completer.Stop() + + // Make sure our mocks were really called. The specific function called + // will depend on the completer under test, so okay as long as one or + // the other was. + require.True(t, execMock.JobSetCompleteIfRunningManyCalled || execMock.JobSetStateIfRunningCalled) + + // Job is still running because the completer is forced to give up + // immediately on certain types of errors like where a pool is closed. + requireState(t, bundle.exec, job.ID, rivertype.JobStateRunning) + }) + + t.Run("SubscribeStress", func(t *testing.T) { + t.Parallel() + + completer, bundle := setup(t) + + stopInsertion := doContinuousInsertion(t, completer, bundle.exec) + + const numGoroutines = 5 + + var ( + rand = randutil.NewCryptoSeededConcurrentSafeRand() + stopSubscribing = make(chan struct{}) + wg sync.WaitGroup + ) + + wg.Add(numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + defer wg.Done() + for { + select { + case <-stopSubscribing: + return + case <-time.After(time.Duration(randutil.IntBetween(rand, int(2*time.Millisecond), int(20*time.Millisecond)))): + completer.Subscribe(func(update CompleterJobUpdated) {}) + } + } + }() + } + + // Give some time for some jobs to be inserted and the subscriber + // goroutines to churn. + time.Sleep(100 * time.Millisecond) + + close(stopSubscribing) + wg.Wait() + + // Signal to stop insertion and wait for the goroutine to return. + numInserted := stopInsertion() + + require.Greater(t, numInserted, 0) + + completer.Stop() + }) +} + +func BenchmarkAsyncCompleter_Concurrency10(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(b), exec, 10) + }) +} + +func BenchmarkAsyncCompleter_Concurrency100(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return newAsyncCompleterWithConcurrency(riverinternaltest.BaseServiceArchetype(b), exec, 100) + }) +} + +func BenchmarkBatchCompleter(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return NewBatchCompleter(riverinternaltest.BaseServiceArchetype(b), exec) + }) +} + +func BenchmarkInlineCompleter(b *testing.B) { + benchmarkCompleter(b, func(exec riverdriver.Executor) JobCompleter { + return NewInlineCompleter(riverinternaltest.BaseServiceArchetype(b), exec) + }) +} + +func benchmarkCompleter( + b *testing.B, + newCompleter func(exec riverdriver.Executor) JobCompleter, +) { + b.Helper() + + ctx := context.Background() + + type testBundle struct { + exec riverdriver.Executor + jobs []*rivertype.JobRow + stats []jobstats.JobStatistics + } + + setup := func(b *testing.B) (JobCompleter, *testBundle) { + b.Helper() + + var ( + driver = riverpgxv5.New(riverinternaltest.TestDB(ctx, b)) + exec = driver.GetExecutor() + completer = newCompleter(exec) + ) + + require.NoError(b, completer.Start(ctx)) + + insertParams := make([]*riverdriver.JobInsertFastParams, b.N) + for i := 0; i < b.N; i++ { + insertParams[i] = &riverdriver.JobInsertFastParams{ + EncodedArgs: []byte(`{}`), + Kind: "benchmark_kind", + MaxAttempts: rivercommon.MaxAttemptsDefault, + Priority: rivercommon.PriorityDefault, + Queue: rivercommon.QueueDefault, + State: rivertype.JobStateRunning, + } + } + + _, err := exec.JobInsertFastMany(ctx, insertParams) + require.NoError(b, err) + + jobs, err := exec.JobGetByKindMany(ctx, []string{"benchmark_kind"}) + require.NoError(b, err) + + return completer, &testBundle{ + exec: exec, + jobs: jobs, + stats: make([]jobstats.JobStatistics, b.N), + } + } + + b.Run("Completion", func(b *testing.B) { + completer, bundle := setup(b) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now())) + require.NoError(b, err) + } + + completer.Stop() + }) + + b.Run("RotatingStates", func(b *testing.B) { + completer, bundle := setup(b) + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + switch i % 7 { + case 0: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCancelled(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 1: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateCompleted(bundle.jobs[i].ID, time.Now())) + require.NoError(b, err) + + case 2: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateDiscarded(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 3: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorAvailable(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 4: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateErrorRetryable(bundle.jobs[i].ID, time.Now(), []byte("{}"))) + require.NoError(b, err) + + case 5: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozed(bundle.jobs[i].ID, time.Now(), 10)) + require.NoError(b, err) + + case 6: + err := completer.JobSetStateIfRunning(ctx, &bundle.stats[i], riverdriver.JobSetStateSnoozedAvailable(bundle.jobs[i].ID, time.Now(), 10)) + require.NoError(b, err) + + default: + panic("unexpected modulo result (did you update cases without changing the modulo divider or vice versa?") + } + } + + completer.Stop() + }) +} diff --git a/internal/jobcompleter/main_test.go b/internal/jobcompleter/main_test.go new file mode 100644 index 00000000..86be3eb2 --- /dev/null +++ b/internal/jobcompleter/main_test.go @@ -0,0 +1,11 @@ +package jobcompleter + +import ( + "testing" + + "github.com/riverqueue/river/internal/riverinternaltest" +) + +func TestMain(m *testing.M) { + riverinternaltest.WrapTestMain(m) +} diff --git a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go index 50c85021..24a55ef4 100644 --- a/internal/riverinternaltest/riverdrivertest/riverdrivertest.go +++ b/internal/riverinternaltest/riverdrivertest/riverdrivertest.go @@ -1060,6 +1060,70 @@ func ExerciseExecutorFull[TTx any](ctx context.Context, t *testing.T, driver riv require.Equal(t, rivertype.JobStateAvailable, updatedJob3.State) }) + t.Run("JobSetCompleteIfRunningMany", func(t *testing.T) { + t.Parallel() + + t.Run("CompletesRunningJobs", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + now := time.Now().UTC() + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRunning)}) + + jobsAfter, err := exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: []int64{job1.ID, job2.ID}, + FinalizedAt: now, + }) + require.NoError(t, err) + for _, jobAfter := range jobsAfter { + require.Equal(t, rivertype.JobStateCompleted, jobAfter.State) + require.WithinDuration(t, now, *jobAfter.FinalizedAt, time.Microsecond) + } + + job1Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, job1Updated.State) + job2Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateCompleted, job2Updated.State) + }) + + t.Run("DoesNotCompleteJobsInNonRunningStates", func(t *testing.T) { + t.Parallel() + + exec, _ := setupExecutor(ctx, t, driver, beginTx) + + now := time.Now().UTC() + + job1 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateAvailable)}) + job2 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateRetryable)}) + job3 := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{State: ptrutil.Ptr(rivertype.JobStateScheduled)}) + + jobsAfter, err := exec.JobSetCompleteIfRunningMany(ctx, &riverdriver.JobSetCompleteIfRunningManyParams{ + ID: []int64{job1.ID, job2.ID, job3.ID}, + FinalizedAt: now, + }) + require.NoError(t, err) + for _, jobAfter := range jobsAfter { + require.NotEqual(t, rivertype.JobStateCompleted, jobAfter.State) + require.Nil(t, jobAfter.FinalizedAt) + } + + job1Updated, err := exec.JobGetByID(ctx, job1.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateAvailable, job1Updated.State) + job2Updated, err := exec.JobGetByID(ctx, job2.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateRetryable, job2Updated.State) + job3Updated, err := exec.JobGetByID(ctx, job3.ID) + require.NoError(t, err) + require.Equal(t, rivertype.JobStateScheduled, job3Updated.State) + }) + }) + t.Run("JobSetStateIfRunning_JobSetStateCompleted", func(t *testing.T) { t.Parallel() diff --git a/internal/riverinternaltest/testfactory/test_factory.go b/internal/riverinternaltest/testfactory/test_factory.go index ffd4e9aa..6b4a4891 100644 --- a/internal/riverinternaltest/testfactory/test_factory.go +++ b/internal/riverinternaltest/testfactory/test_factory.go @@ -33,8 +33,8 @@ type JobOpts struct { Tags []string } -func Job(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { - t.Helper() +func Job(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *JobOpts) *rivertype.JobRow { + tb.Helper() encodedArgs := opts.EncodedArgs if opts.EncodedArgs == nil { @@ -67,7 +67,7 @@ func Job(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *Job State: ptrutil.ValOrDefault(opts.State, rivertype.JobStateAvailable), Tags: tags, }) - require.NoError(t, err) + require.NoError(tb, err) return job } @@ -78,8 +78,8 @@ type LeaderOpts struct { Name *string } -func Leader(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *LeaderOpts) *riverdriver.Leader { - t.Helper() +func Leader(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *LeaderOpts) *riverdriver.Leader { + tb.Helper() leader, err := exec.LeaderInsert(ctx, &riverdriver.LeaderInsertParams{ ElectedAt: opts.ElectedAt, @@ -88,7 +88,7 @@ func Leader(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts * Name: ptrutil.ValOrDefault(opts.Name, "default"), TTL: 10 * time.Second, }) - require.NoError(t, err) + require.NoError(tb, err) return leader } @@ -96,13 +96,13 @@ type MigrationOpts struct { Version *int } -func Migration(ctx context.Context, t *testing.T, exec riverdriver.Executor, opts *MigrationOpts) *riverdriver.Migration { - t.Helper() +func Migration(ctx context.Context, tb testing.TB, exec riverdriver.Executor, opts *MigrationOpts) *riverdriver.Migration { + tb.Helper() migration, err := exec.MigrationInsertMany(ctx, []int{ ptrutil.ValOrDefaultFunc(opts.Version, nextSeq), }) - require.NoError(t, err) + require.NoError(tb, err) return migration[0] } diff --git a/job_executor.go b/job_executor.go index 35aebb5c..87ea382b 100644 --- a/job_executor.go +++ b/job_executor.go @@ -262,7 +262,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) } else { params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1) } - if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job", slog.Int64("job_id", e.JobRow.ID), ) @@ -275,7 +275,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) return } - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCompleted(e.JobRow.ID, e.TimeNowUTC())); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Error completing job", slog.String("err", err.Error()), slog.Int64("job_id", e.JobRow.ID), @@ -326,14 +326,14 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { now := time.Now() if cancelJob { - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateCancelled(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to cancel job and report error", logAttrs...) } return } if e.JobRow.Attempt >= e.JobRow.MaxAttempts { - if err := e.Completer.JobSetStateIfRunning(e.stats, riverdriver.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, riverdriver.JobSetStateDiscarded(e.JobRow.ID, now, errData)); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to discard job and report error", logAttrs...) } return @@ -367,7 +367,7 @@ func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) { } else { params = riverdriver.JobSetStateErrorRetryable(e.JobRow.ID, nextRetryScheduledAt, errData) } - if err := e.Completer.JobSetStateIfRunning(e.stats, params); err != nil { + if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil { e.Logger.ErrorContext(ctx, e.Name+": Failed to report error for job", logAttrs...) } } diff --git a/job_executor_test.go b/job_executor_test.go index 3f4a4cf3..5c98cb2e 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -116,7 +116,7 @@ func TestJobExecutor_Execute(t *testing.T) { ctx := context.Background() type testBundle struct { - completer *jobcompleter.InlineJobCompleter + completer *jobcompleter.InlineCompleter exec riverdriver.Executor errorHandler *testErrorHandler getUpdatesAndStop func() []jobcompleter.CompleterJobUpdated @@ -139,7 +139,7 @@ func TestJobExecutor_Execute(t *testing.T) { }) getJobUpdates := func() []jobcompleter.CompleterJobUpdated { - completer.Wait() + completer.Stop() return updates } t.Cleanup(func() { _ = getJobUpdates() }) @@ -206,7 +206,7 @@ func TestJobExecutor_Execute(t *testing.T) { }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -232,7 +232,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -256,7 +256,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -276,7 +276,7 @@ func TestJobExecutor_Execute(t *testing.T) { { executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -295,7 +295,7 @@ func TestJobExecutor_Execute(t *testing.T) { { executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -315,7 +315,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -335,7 +335,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -358,7 +358,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -378,7 +378,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -398,7 +398,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -418,7 +418,7 @@ func TestJobExecutor_Execute(t *testing.T) { }).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -436,7 +436,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -457,7 +457,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -478,7 +478,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -499,7 +499,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -515,7 +515,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -536,7 +536,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -554,7 +554,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { panic("panic val") }, nil).MakeUnit(bundle.jobRow) executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -574,7 +574,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -594,7 +594,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -614,7 +614,7 @@ func TestJobExecutor_Execute(t *testing.T) { } executor.Execute(ctx) - executor.Completer.Wait() + executor.Completer.Stop() job, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) @@ -634,7 +634,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor.CancelFunc = cancelFunc executor.Execute(workCtx) - executor.Completer.Wait() + executor.Completer.Stop() require.ErrorIs(t, context.Cause(workCtx), errExecutorDefaultCancel) }) @@ -664,7 +664,7 @@ func TestJobExecutor_Execute(t *testing.T) { t.Cleanup(func() { cancelFunc(nil) }) executor.Execute(workCtx) - executor.Completer.Wait() + executor.Completer.Stop() jobRow, err := bundle.exec.JobGetByID(ctx, bundle.jobRow.ID) require.NoError(t, err) diff --git a/producer_test.go b/producer_test.go index a90297c4..01a6c669 100644 --- a/producer_test.go +++ b/producer_test.go @@ -52,7 +52,7 @@ func Test_Producer_CanSafelyCompleteJobsWhileFetchingNewOnes(t *testing.T) { listener := dbDriver.GetListener() completer := jobcompleter.NewInlineCompleter(archetype, exec) - t.Cleanup(completer.Wait) + t.Cleanup(completer.Stop) type WithJobNumArgs struct { JobArgsReflectKind[WithJobNumArgs] diff --git a/riverdriver/river_driver_interface.go b/riverdriver/river_driver_interface.go index d107fe9d..49611152 100644 --- a/riverdriver/river_driver_interface.go +++ b/riverdriver/river_driver_interface.go @@ -90,6 +90,7 @@ type Executor interface { JobRescueMany(ctx context.Context, params *JobRescueManyParams) (*struct{}, error) JobRetry(ctx context.Context, id int64) (*rivertype.JobRow, error) JobSchedule(ctx context.Context, params *JobScheduleParams) (int, error) + JobSetCompleteIfRunningMany(ctx context.Context, params *JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) JobSetStateIfRunning(ctx context.Context, params *JobSetStateIfRunningParams) (*rivertype.JobRow, error) JobUpdate(ctx context.Context, params *JobUpdateParams) (*rivertype.JobRow, error) LeaderAttemptElect(ctx context.Context, params *LeaderElectParams) (bool, error) @@ -232,9 +233,16 @@ type JobScheduleParams struct { Now time.Time } -// JobSetStateIfRunningParams are parameters to update the state of a currently running -// job. Use one of the constructors below to ensure a correct combination of -// parameters. +// JobSetCompleteIfRunningManyParams are parameters to set many running jobs to +// `complete` all at once for improved throughput and efficiency. +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt time.Time +} + +// JobSetStateIfRunningParams are parameters to update the state of a currently +// running job. Use one of the constructors below to ensure a correct +// combination of parameters. type JobSetStateIfRunningParams struct { ID int64 ErrData []byte diff --git a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go index 5b439369..5414f504 100644 --- a/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverdatabasesql/internal/dbsqlc/river_job.sql.go @@ -738,11 +738,82 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara return count, err } +const jobSetCompleteIfRunningMany = `-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any($1::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = $2, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = any($1::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM updated_job +` + +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt *time.Time +} + +func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg *JobSetCompleteIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.QueryContext(ctx, jobSetCompleteIfRunningMany, pq.Array(arg.ID), arg.FinalizedAt) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + pq.Array(&i.AttemptedBy), + &i.CreatedAt, + pq.Array(&i.Errors), + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + pq.Array(&i.Tags), + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetStateIfRunning = `-- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = $2::bigint FOR UPDATE @@ -750,17 +821,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE $1::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN $3::boolean THEN $4 - ELSE finalized_at END, - errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE $1::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN $3::boolean THEN $4 + ELSE finalized_at END, + errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverdatabasesql/river_database_sql.go b/riverdriver/riverdatabasesql/river_database_sql.go index 43187879..7afe74e1 100644 --- a/riverdriver/riverdatabasesql/river_database_sql.go +++ b/riverdriver/riverdatabasesql/river_database_sql.go @@ -133,6 +133,10 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched return 0, riverdriver.ErrNotImplemented } +func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + return nil, riverdriver.ErrNotImplemented +} + func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { return nil, riverdriver.ErrNotImplemented } diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql index 19024be8..38fcc9c1 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql @@ -292,11 +292,36 @@ FROM ( FROM river_job_scheduled ) AS notifications_sent; +-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any(@id::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = @finalized_at, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.* +) +SELECT * +FROM river_job +WHERE id = any(@id::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT * +FROM updated_job; + -- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - @state::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + @state::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = @id::bigint FOR UPDATE @@ -304,17 +329,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE @state::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN @finalized_at_do_update::boolean THEN @finalized_at - ELSE finalized_at END, - errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE @state::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN @finalized_at_do_update::boolean THEN @finalized_at + ELSE finalized_at END, + errors = CASE WHEN @error_do_update::boolean THEN array_append(errors, @error::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND @max_attempts_update::boolean THEN @max_attempts + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND @scheduled_at_do_update::boolean THEN sqlc.narg('scheduled_at')::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go index b38e1efe..f18e4ff7 100644 --- a/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go +++ b/riverdriver/riverpgxv5/internal/dbsqlc/river_job.sql.go @@ -723,11 +723,79 @@ func (q *Queries) JobSchedule(ctx context.Context, db DBTX, arg *JobSchedulePara return count, err } +const jobSetCompleteIfRunningMany = `-- name: JobSetCompleteIfRunningMany :many +WITH job_to_update AS ( + SELECT id + FROM river_job + WHERE id = any($1::bigint[]) + FOR UPDATE +), +updated_job AS ( + UPDATE river_job + SET + finalized_at = $2, + state = 'completed' + FROM job_to_update + WHERE river_job.id = job_to_update.id + AND river_job.state = 'running'::river_job_state + RETURNING river_job.id, river_job.args, river_job.attempt, river_job.attempted_at, river_job.attempted_by, river_job.created_at, river_job.errors, river_job.finalized_at, river_job.kind, river_job.max_attempts, river_job.metadata, river_job.priority, river_job.queue, river_job.state, river_job.scheduled_at, river_job.tags +) +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM river_job +WHERE id = any($1::bigint[]) + AND id NOT IN (SELECT id FROM updated_job) +UNION +SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags +FROM updated_job +` + +type JobSetCompleteIfRunningManyParams struct { + ID []int64 + FinalizedAt *time.Time +} + +func (q *Queries) JobSetCompleteIfRunningMany(ctx context.Context, db DBTX, arg *JobSetCompleteIfRunningManyParams) ([]*RiverJob, error) { + rows, err := db.Query(ctx, jobSetCompleteIfRunningMany, arg.ID, arg.FinalizedAt) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*RiverJob + for rows.Next() { + var i RiverJob + if err := rows.Scan( + &i.ID, + &i.Args, + &i.Attempt, + &i.AttemptedAt, + &i.AttemptedBy, + &i.CreatedAt, + &i.Errors, + &i.FinalizedAt, + &i.Kind, + &i.MaxAttempts, + &i.Metadata, + &i.Priority, + &i.Queue, + &i.State, + &i.ScheduledAt, + &i.Tags, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const jobSetStateIfRunning = `-- name: JobSetStateIfRunning :one WITH job_to_update AS ( SELECT - id, - $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel + id, + $1::river_job_state IN ('retryable'::river_job_state, 'scheduled'::river_job_state) AND metadata ? 'cancel_attempted_at' AS should_cancel FROM river_job WHERE id = $2::bigint FOR UPDATE @@ -735,17 +803,17 @@ WITH job_to_update AS ( updated_job AS ( UPDATE river_job SET - state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state - ELSE $1::river_job_state END, - finalized_at = CASE WHEN should_cancel THEN now() - WHEN $3::boolean THEN $4 - ELSE finalized_at END, - errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) - ELSE errors END, - max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 - ELSE max_attempts END, - scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz - ELSE scheduled_at END + state = CASE WHEN should_cancel THEN 'cancelled'::river_job_state + ELSE $1::river_job_state END, + finalized_at = CASE WHEN should_cancel THEN now() + WHEN $3::boolean THEN $4 + ELSE finalized_at END, + errors = CASE WHEN $5::boolean THEN array_append(errors, $6::jsonb) + ELSE errors END, + max_attempts = CASE WHEN NOT should_cancel AND $7::boolean THEN $8 + ELSE max_attempts END, + scheduled_at = CASE WHEN NOT should_cancel AND $9::boolean THEN $10::timestamptz + ELSE scheduled_at END FROM job_to_update WHERE river_job.id = job_to_update.id AND river_job.state = 'running'::river_job_state diff --git a/riverdriver/riverpgxv5/river_pgx_v5_driver.go b/riverdriver/riverpgxv5/river_pgx_v5_driver.go index a0de8222..c3d4d3b6 100644 --- a/riverdriver/riverpgxv5/river_pgx_v5_driver.go +++ b/riverdriver/riverpgxv5/river_pgx_v5_driver.go @@ -294,6 +294,17 @@ func (e *Executor) JobSchedule(ctx context.Context, params *riverdriver.JobSched return int(numScheduled), interpretError(err) } +func (e *Executor) JobSetCompleteIfRunningMany(ctx context.Context, params *riverdriver.JobSetCompleteIfRunningManyParams) ([]*rivertype.JobRow, error) { + jobs, err := e.queries.JobSetCompleteIfRunningMany(ctx, e.dbtx, &dbsqlc.JobSetCompleteIfRunningManyParams{ + ID: params.ID, + FinalizedAt: ¶ms.FinalizedAt, + }) + if err != nil { + return nil, interpretError(err) + } + return mapSlice(jobs, jobRowFromInternal), nil +} + func (e *Executor) JobSetStateIfRunning(ctx context.Context, params *riverdriver.JobSetStateIfRunningParams) (*rivertype.JobRow, error) { var maxAttempts int16 if params.MaxAttempts != nil {