-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AddQueue to the Client #399
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ import ( | |
"os" | ||
"regexp" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/riverqueue/river/internal/baseservice" | ||
|
@@ -304,22 +305,23 @@ type Client[TTx any] struct { | |
baseService baseservice.BaseService | ||
baseStartStop startstop.BaseStartStop | ||
|
||
completer jobcompleter.JobCompleter | ||
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated | ||
config *Config | ||
driver riverdriver.Driver[TTx] | ||
elector *leadership.Elector | ||
insertNotifyLimiter *notifylimiter.Limiter | ||
monitor *clientMonitor | ||
notifier *notifier.Notifier // may be nil in poll-only mode | ||
periodicJobs *PeriodicJobBundle | ||
producersByQueueName map[string]*producer | ||
queueMaintainer *maintenance.QueueMaintainer | ||
services []startstop.Service | ||
subscriptionManager *subscriptionManager | ||
stopped chan struct{} | ||
testSignals clientTestSignals | ||
uniqueInserter *dbunique.UniqueInserter | ||
completer jobcompleter.JobCompleter | ||
completerSubscribeCh chan []jobcompleter.CompleterJobUpdated | ||
config *Config | ||
driver riverdriver.Driver[TTx] | ||
elector *leadership.Elector | ||
insertNotifyLimiter *notifylimiter.Limiter | ||
monitor *clientMonitor | ||
notifier *notifier.Notifier // may be nil in poll-only mode | ||
periodicJobs *PeriodicJobBundle | ||
producersByQueueNameMu sync.Mutex | ||
producersByQueueName map[string]*producer | ||
queueMaintainer *maintenance.QueueMaintainer | ||
services []startstop.Service | ||
subscriptionManager *subscriptionManager | ||
stopped chan struct{} | ||
testSignals clientTestSignals | ||
uniqueInserter *dbunique.UniqueInserter | ||
|
||
// workCancel cancels the context used for all work goroutines. Normal Stop | ||
// does not cancel that context. | ||
|
@@ -616,6 +618,36 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client | |
return client, nil | ||
} | ||
|
||
func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) { | ||
c.producersByQueueNameMu.Lock() | ||
defer c.producersByQueueNameMu.Unlock() | ||
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ | ||
ClientID: c.config.ID, | ||
Completer: c.completer, | ||
ErrorHandler: c.config.ErrorHandler, | ||
FetchCooldown: c.config.FetchCooldown, | ||
FetchPollInterval: c.config.FetchPollInterval, | ||
JobTimeout: c.config.JobTimeout, | ||
MaxWorkers: queueConfig.MaxWorkers, | ||
Notifier: c.notifier, | ||
Queue: queueName, | ||
RetryPolicy: c.config.RetryPolicy, | ||
SchedulerInterval: c.config.schedulerInterval, | ||
StatusFunc: c.monitor.SetProducerStatus, | ||
Workers: c.config.Workers, | ||
}) | ||
c.monitor.InitializeProducerStatus(queueName) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method is going to need to do some validation on the We should probably extract that to an internal |
||
|
||
func (c *Client[TTx]) RemoveQueue(queueName string) { | ||
c.producersByQueueNameMu.Lock() | ||
defer c.producersByQueueNameMu.Unlock() | ||
delete(c.producersByQueueName, queueName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this one also return an |
||
|
||
// Remove queue from currentSnapshot.Producers | ||
c.monitor.RemoveProducerStatus(queueName) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Safe removal is a bit trickier than this. We need to actually shut down the producer somehow, and then the producer itself should be responsible for reporting its status to the monitor until it is shut down. At that point, yeah, we need to somehow remove it from the monitor's map so we don't keep old state around in that map forever. Maybe @brandur thoughts on how to safely trigger the single producer to shut down? And is this API suitable for an operation which may take awhile (until all running jobs go away)? Keeping in mind there is no obvious way to cancel the context just for a single producer, and we may also need to differentiate between In either case we still need to somehow wait for clean shutdown to complete, which may actually never happen before the program exits if the running jobs never exit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah definitely pretty tricky. I find the existing One possibility might be that Also, if it's not needed, I like your idea of just punting on Or, a possible variation of that is to have |
||
} | ||
|
||
// Start starts the client's job fetching and working loops. Once this is called, | ||
// the client will run in a background goroutine until stopped. All jobs are | ||
// run with a context inheriting from the provided context, but with a timeout | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -67,6 +67,13 @@ func (m *clientMonitor) SetProducerStatus(queueName string, status componentstat | |
m.bufferStatusUpdate() | ||
} | ||
|
||
func (m *clientMonitor) RemoveProducerStatus(queueName string) { | ||
m.statusSnapshotMu.Lock() | ||
defer m.statusSnapshotMu.Unlock() | ||
delete(m.currentSnapshot.Producers, queueName) | ||
m.bufferStatusUpdate() | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can probably merge this into |
||
|
||
func (m *clientMonitor) SetElectorStatus(newStatus componentstatus.ElectorStatus) { | ||
m.statusSnapshotMu.Lock() | ||
defer m.statusSnapshotMu.Unlock() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just given this is a lot of duplicative code, let's move this to a new
addProducer(queueName string, queueConfig QueueConfig)
internal helper that can share it withStart
.