From 7df134f056c7344f2f59538f3f71d4e1269a4efa Mon Sep 17 00:00:00 2001 From: PumpkinSeed Date: Mon, 24 Jun 2024 12:22:12 +0200 Subject: [PATCH 1/3] Add AddQueue to the Client --- client.go | 64 +++++++++++++++++++++++++++++++++++------------ client_monitor.go | 7 ++++++ 2 files changed, 55 insertions(+), 16 deletions(-) diff --git a/client.go b/client.go index d1d897ee..4b11b3d9 100644 --- a/client.go +++ b/client.go @@ -9,6 +9,7 @@ import ( "os" "regexp" "strings" + "sync" "time" "github.com/riverqueue/river/internal/baseservice" @@ -304,22 +305,23 @@ type Client[TTx any] struct { baseService baseservice.BaseService baseStartStop startstop.BaseStartStop - completer jobcompleter.JobCompleter - completerSubscribeCh chan []jobcompleter.CompleterJobUpdated - config *Config - driver riverdriver.Driver[TTx] - elector *leadership.Elector - insertNotifyLimiter *notifylimiter.Limiter - monitor *clientMonitor - notifier *notifier.Notifier // may be nil in poll-only mode - periodicJobs *PeriodicJobBundle - producersByQueueName map[string]*producer - queueMaintainer *maintenance.QueueMaintainer - services []startstop.Service - subscriptionManager *subscriptionManager - stopped chan struct{} - testSignals clientTestSignals - uniqueInserter *dbunique.UniqueInserter + completer jobcompleter.JobCompleter + completerSubscribeCh chan []jobcompleter.CompleterJobUpdated + config *Config + driver riverdriver.Driver[TTx] + elector *leadership.Elector + insertNotifyLimiter *notifylimiter.Limiter + monitor *clientMonitor + notifier *notifier.Notifier // may be nil in poll-only mode + periodicJobs *PeriodicJobBundle + producersByQueueNameMu sync.Mutex + producersByQueueName map[string]*producer + queueMaintainer *maintenance.QueueMaintainer + services []startstop.Service + subscriptionManager *subscriptionManager + stopped chan struct{} + testSignals clientTestSignals + uniqueInserter *dbunique.UniqueInserter // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. @@ -616,6 +618,36 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return client, nil } +func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) { + c.producersByQueueNameMu.Lock() + defer c.producersByQueueNameMu.Unlock() + c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ + ClientID: c.config.ID, + Completer: c.completer, + ErrorHandler: c.config.ErrorHandler, + FetchCooldown: c.config.FetchCooldown, + FetchPollInterval: c.config.FetchPollInterval, + JobTimeout: c.config.JobTimeout, + MaxWorkers: queueConfig.MaxWorkers, + Notifier: c.notifier, + Queue: queueName, + RetryPolicy: c.config.RetryPolicy, + SchedulerInterval: c.config.schedulerInterval, + StatusFunc: c.monitor.SetProducerStatus, + Workers: c.config.Workers, + }) + c.monitor.InitializeProducerStatus(queueName) +} + +func (c *Client[TTx]) RemoveQueue(queueName string) { + c.producersByQueueNameMu.Lock() + defer c.producersByQueueNameMu.Unlock() + delete(c.producersByQueueName, queueName) + + // Remove queue from currentSnapshot.Producers + c.monitor.RemoveProducerStatus(queueName) +} + // Start starts the client's job fetching and working loops. Once this is called, // the client will run in a background goroutine until stopped. All jobs are // run with a context inheriting from the provided context, but with a timeout diff --git a/client_monitor.go b/client_monitor.go index 05fcb1c8..5dcb6e7e 100644 --- a/client_monitor.go +++ b/client_monitor.go @@ -67,6 +67,13 @@ func (m *clientMonitor) SetProducerStatus(queueName string, status componentstat m.bufferStatusUpdate() } +func (m *clientMonitor) RemoveProducerStatus(queueName string) { + m.statusSnapshotMu.Lock() + defer m.statusSnapshotMu.Unlock() + delete(m.currentSnapshot.Producers, queueName) + m.bufferStatusUpdate() +} + func (m *clientMonitor) SetElectorStatus(newStatus componentstatus.ElectorStatus) { m.statusSnapshotMu.Lock() defer m.statusSnapshotMu.Unlock() From 27e846cfee7d02cc0d111b19ba65dc80f3f750a7 Mon Sep 17 00:00:00 2001 From: PumpkinSeed Date: Tue, 25 Jun 2024 11:59:51 +0200 Subject: [PATCH 2/3] Changes based on reviews --- client.go | 84 ++++++++++++++++++++++------------------------- client_monitor.go | 7 ---- 2 files changed, 39 insertions(+), 52 deletions(-) diff --git a/client.go b/client.go index 4b11b3d9..82f37d3f 100644 --- a/client.go +++ b/client.go @@ -259,10 +259,7 @@ func (c *Config) validate() error { } for queue, queueConfig := range c.Queues { - if queueConfig.MaxWorkers < 1 || queueConfig.MaxWorkers > QueueNumWorkersMax { - return fmt.Errorf("invalid number of workers for queue %q: %d", queue, queueConfig.MaxWorkers) - } - if err := validateQueueName(queue); err != nil { + if err := queueConfig.validate(queue); err != nil { return err } } @@ -296,6 +293,17 @@ type QueueConfig struct { MaxWorkers int } +func (c QueueConfig) validate(queueName string) error { + if c.MaxWorkers < 1 || c.MaxWorkers > QueueNumWorkersMax { + return fmt.Errorf("invalid number of workers for queue %q: %d", queueName, c.MaxWorkers) + } + if err := validateQueueName(queueName); err != nil { + return err + } + + return nil +} + // Client is a single isolated instance of River. Your application may use // multiple instances operating on different databases or Postgres schemas // within a single database. @@ -510,23 +518,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client client.services = append(client.services, client.elector) for queue, queueConfig := range config.Queues { - client.producersByQueueName[queue] = newProducer(archetype, driver.GetExecutor(), &producerConfig{ - ClientID: config.ID, - Completer: client.completer, - ErrorHandler: config.ErrorHandler, - FetchCooldown: config.FetchCooldown, - FetchPollInterval: config.FetchPollInterval, - JobTimeout: config.JobTimeout, - MaxWorkers: queueConfig.MaxWorkers, - Notifier: client.notifier, - Queue: queue, - QueueEventCallback: client.subscriptionManager.distributeQueueEvent, - RetryPolicy: config.RetryPolicy, - SchedulerInterval: config.schedulerInterval, - StatusFunc: client.monitor.SetProducerStatus, - Workers: config.Workers, - }) - client.monitor.InitializeProducerStatus(queue) + client.producersByQueueName[queue] = client.addProducer(queue, queueConfig) } client.services = append(client.services, @@ -618,34 +610,16 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return client, nil } -func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) { - c.producersByQueueNameMu.Lock() - defer c.producersByQueueNameMu.Unlock() - c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ - ClientID: c.config.ID, - Completer: c.completer, - ErrorHandler: c.config.ErrorHandler, - FetchCooldown: c.config.FetchCooldown, - FetchPollInterval: c.config.FetchPollInterval, - JobTimeout: c.config.JobTimeout, - MaxWorkers: queueConfig.MaxWorkers, - Notifier: c.notifier, - Queue: queueName, - RetryPolicy: c.config.RetryPolicy, - SchedulerInterval: c.config.schedulerInterval, - StatusFunc: c.monitor.SetProducerStatus, - Workers: c.config.Workers, - }) - c.monitor.InitializeProducerStatus(queueName) -} +func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) error { + if err := queueConfig.validate(queueName); err != nil { + return err + } -func (c *Client[TTx]) RemoveQueue(queueName string) { c.producersByQueueNameMu.Lock() defer c.producersByQueueNameMu.Unlock() - delete(c.producersByQueueName, queueName) + c.producersByQueueName[queueName] = c.addProducer(queueName, queueConfig) - // Remove queue from currentSnapshot.Producers - c.monitor.RemoveProducerStatus(queueName) + return nil } // Start starts the client's job fetching and working loops. Once this is called, @@ -1537,6 +1511,26 @@ func (c *Client[TTx]) validateJobArgs(args JobArgs) error { return nil } +func (c *Client[TTx]) addProducer(queueName string, queueConfig QueueConfig) *producer { + producerInstance := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ + ClientID: c.config.ID, + Completer: c.completer, + ErrorHandler: c.config.ErrorHandler, + FetchCooldown: c.config.FetchCooldown, + FetchPollInterval: c.config.FetchPollInterval, + JobTimeout: c.config.JobTimeout, + MaxWorkers: queueConfig.MaxWorkers, + Notifier: c.notifier, + Queue: queueName, + RetryPolicy: c.config.RetryPolicy, + SchedulerInterval: c.config.schedulerInterval, + StatusFunc: c.monitor.SetProducerStatus, + Workers: c.config.Workers, + }) + c.monitor.InitializeProducerStatus(queueName) + return producerInstance +} + var nameRegex = regexp.MustCompile(`^(?:[a-z0-9])+(?:[_|\-]?[a-z0-9]+)*$`) func validateQueueName(queueName string) error { diff --git a/client_monitor.go b/client_monitor.go index 5dcb6e7e..05fcb1c8 100644 --- a/client_monitor.go +++ b/client_monitor.go @@ -67,13 +67,6 @@ func (m *clientMonitor) SetProducerStatus(queueName string, status componentstat m.bufferStatusUpdate() } -func (m *clientMonitor) RemoveProducerStatus(queueName string) { - m.statusSnapshotMu.Lock() - defer m.statusSnapshotMu.Unlock() - delete(m.currentSnapshot.Producers, queueName) - m.bufferStatusUpdate() -} - func (m *clientMonitor) SetElectorStatus(newStatus componentstatus.ElectorStatus) { m.statusSnapshotMu.Lock() defer m.statusSnapshotMu.Unlock() From c5168e571f443af07f89219b3fcd502bd6952d69 Mon Sep 17 00:00:00 2001 From: PumpkinSeed Date: Tue, 25 Jun 2024 13:54:09 +0200 Subject: [PATCH 3/3] Add changes to safely add new Queue --- client.go | 23 ++++--- client_test.go | 66 ++++++++++++++++++++ internal/maintenance/startstop/start_stop.go | 9 ++- 3 files changed, 88 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index 82f37d3f..498869e4 100644 --- a/client.go +++ b/client.go @@ -333,7 +333,8 @@ type Client[TTx any] struct { // workCancel cancels the context used for all work goroutines. Normal Stop // does not cancel that context. - workCancel context.CancelCauseFunc + workCancel context.CancelCauseFunc + workContext context.Context } // Test-only signals. @@ -610,14 +611,22 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client return client, nil } -func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) error { +func (c *Client[TTx]) AddQueue(ctx context.Context, queueName string, queueConfig QueueConfig) error { if err := queueConfig.validate(queueName); err != nil { return err } + producerInstance := c.addProducer(queueName, queueConfig) c.producersByQueueNameMu.Lock() - defer c.producersByQueueNameMu.Unlock() - c.producersByQueueName[queueName] = c.addProducer(queueName, queueConfig) + c.producersByQueueName[queueName] = producerInstance + c.producersByQueueNameMu.Unlock() + + fetchCtx, started := c.baseStartStop.IsStarted() + if started { + if err := producerInstance.StartWorkContext(fetchCtx, c.workContext); err != nil { + return err + } + } return nil } @@ -649,8 +658,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error { ) } - var workCtx context.Context - // Startup code. Wrapped in a closure so it doesn't have to remember to // close the stopped channel if returning with an error. if err := func() error { @@ -712,7 +719,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error { // We use separate contexts for fetching and working to allow for a graceful // stop. Both inherit from the provided context, so if it's cancelled, a // more aggressive stop will be initiated. - workCtx, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c)) + c.workContext, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c)) for _, service := range c.services { if err := service.Start(fetchCtx); err != nil { @@ -724,7 +731,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error { for _, producer := range c.producersByQueueName { producer := producer - if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil { + if err := producer.StartWorkContext(fetchCtx, c.workContext); err != nil { stopProducers() stopServicesOnError() return err diff --git a/client_test.go b/client_test.go index 392d3c00..d3d1aac1 100644 --- a/client_test.go +++ b/client_test.go @@ -252,6 +252,72 @@ func Test_Client(t *testing.T) { riverinternaltest.WaitOrTimeout(t, workedChan) }) + t.Run("StartInsertAndWorkAddQueue_BeforeStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + JobArgsReflectKind[JobArgs] + } + + workedChan := make(chan struct{}) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + workedChan <- struct{}{} + return nil + })) + + queueName := "new_queue" + err := client.AddQueue(ctx, queueName, QueueConfig{ + MaxWorkers: 2, + }) + if err != nil { + t.Fatal(err) + } + startClient(ctx, t, client) + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + riverinternaltest.WaitOrTimeout(t, workedChan) + }) + + t.Run("StartInsertAndWorkAddQueue_AfterStart", func(t *testing.T) { + t.Parallel() + + client, _ := setup(t) + + type JobArgs struct { + JobArgsReflectKind[JobArgs] + } + + workedChan := make(chan struct{}) + + AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error { + workedChan <- struct{}{} + return nil + })) + + startClient(ctx, t, client) + queueName := "new_queue" + err := client.AddQueue(ctx, queueName, QueueConfig{ + MaxWorkers: 2, + }) + if err != nil { + t.Fatal(err) + } + + _, err = client.Insert(ctx, &JobArgs{}, &InsertOpts{ + Queue: queueName, + }) + require.NoError(t, err) + + riverinternaltest.WaitOrTimeout(t, workedChan) + }) + t.Run("JobCancelErrorReturned", func(t *testing.T) { t.Parallel() diff --git a/internal/maintenance/startstop/start_stop.go b/internal/maintenance/startstop/start_stop.go index dfa9bb14..4c33d5eb 100644 --- a/internal/maintenance/startstop/start_stop.go +++ b/internal/maintenance/startstop/start_stop.go @@ -55,6 +55,7 @@ type serviceWithStopped interface { // override it. type BaseStartStop struct { cancelFunc context.CancelCauseFunc + context context.Context mu sync.Mutex started bool stopped chan struct{} @@ -107,9 +108,13 @@ func (s *BaseStartStop) StartInit(ctx context.Context) (context.Context, bool, c s.started = true s.stopped = make(chan struct{}) - ctx, s.cancelFunc = context.WithCancelCause(ctx) + s.context, s.cancelFunc = context.WithCancelCause(ctx) - return ctx, true, s.stopped + return s.context, true, s.stopped +} + +func (s *BaseStartStop) IsStarted() (context.Context, bool) { + return s.context, s.started } // Stop is an automatically provided implementation for the maintenance Service