Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddQueue to the Client #399

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"regexp"
"strings"
"sync"
"time"

"github.com/riverqueue/river/internal/baseservice"
Expand Down Expand Up @@ -304,22 +305,23 @@ type Client[TTx any] struct {
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptionManager *subscriptionManager
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
completer jobcompleter.JobCompleter
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
insertNotifyLimiter *notifylimiter.Limiter
monitor *clientMonitor
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueNameMu sync.Mutex
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
services []startstop.Service
subscriptionManager *subscriptionManager
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -616,6 +618,36 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return client, nil
}

func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) {
c.producersByQueueNameMu.Lock()
defer c.producersByQueueNameMu.Unlock()
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just given this is a lot of duplicative code, let's move this to a new addProducer(queueName string, queueConfig QueueConfig) internal helper that can share it with Start.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is going to need to do some validation on the QueueConfig as we currently do when initializing the client, and it will also need to return an error if validation fails.

We should probably extract that to an internal func (c QueueConfig) validate() error similar to what we have for the top level Config so that we can use the same logic in each place. And of course we'll need to make sure this new surface area (add/remove) is fully covered by tests in this regard.


func (c *Client[TTx]) RemoveQueue(queueName string) {
c.producersByQueueNameMu.Lock()
defer c.producersByQueueNameMu.Unlock()
delete(c.producersByQueueName, queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one also return an error? We could reuse rivertype.ErrNotFound in case the queue does not exist.


// Remove queue from currentSnapshot.Producers
c.monitor.RemoveProducerStatus(queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safe removal is a bit trickier than this. We need to actually shut down the producer somehow, and then the producer itself should be responsible for reporting its status to the monitor until it is shut down. At that point, yeah, we need to somehow remove it from the monitor's map so we don't keep old state around in that map forever.

Maybe SetProducerStatus should just be smart enough to do that last part automatically when the producer is stopped?

@brandur thoughts on how to safely trigger the single producer to shut down? And is this API suitable for an operation which may take awhile (until all running jobs go away)? Keeping in mind there is no obvious way to cancel the context just for a single producer, and we may also need to differentiate between Stop and StopAndCancel for each individual producer—otherwise how will the producer know when it needs to cancel the context and try to more aggressively shut down running jobs?

In either case we still need to somehow wait for clean shutdown to complete, which may actually never happen before the program exits if the running jobs never exit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely pretty tricky. I find the existing StopAndCancel to be quite confusing to reason about, so it'd be nice if we could avoid adding another version of that here.

One possibility might be that RemoveQueue will only wait for a graceful shutdown, and if a hard shutdown is necessary, the caller will just have to use StopAndCancel.

Also, if it's not needed, I like your idea of just punting on RemoveQueue for now.

Or, a possible variation of that is to have RemoveQueue, but only allow it to be called if the client is stopped, which would allow it if necessary, but make a lot of the harder problems go away. If we went that route though, we should make sure to future-proof the API (e.g. by making it take a ctx if we think there will be a hard stop later) in case we want to remove this limitation in the future.

}

// Start starts the client's job fetching and working loops. Once this is called,
// the client will run in a background goroutine until stopped. All jobs are
// run with a context inheriting from the provided context, but with a timeout
Expand Down
7 changes: 7 additions & 0 deletions client_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (m *clientMonitor) SetProducerStatus(queueName string, status componentstat
m.bufferStatusUpdate()
}

func (m *clientMonitor) RemoveProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
delete(m.currentSnapshot.Producers, queueName)
m.bufferStatusUpdate()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can probably merge this into SetProducer and decide whether or not to remove based upon the status.


func (m *clientMonitor) SetElectorStatus(newStatus componentstatus.ElectorStatus) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
Expand Down