-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add AddQueue to the Client #399
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the idea! I'm not opposed to the idea of adding this, though I think it turns out to be a fair bit more complex than what this PR demonstrates. Adding is actually easy IMO, it's removal that is a lot trickier in practice and raises a lot of UX questions.
Interested in whether @brandur has ideas on how to address these. One option would be to reduce the scope of this PR to only AddQueue
at this time and punt on RemoveQueue
because of its much higher complexity.
client.go
Outdated
func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) { | ||
c.producersByQueueNameMu.Lock() | ||
defer c.producersByQueueNameMu.Unlock() | ||
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ | ||
ClientID: c.config.ID, | ||
Completer: c.completer, | ||
ErrorHandler: c.config.ErrorHandler, | ||
FetchCooldown: c.config.FetchCooldown, | ||
FetchPollInterval: c.config.FetchPollInterval, | ||
JobTimeout: c.config.JobTimeout, | ||
MaxWorkers: queueConfig.MaxWorkers, | ||
Notifier: c.notifier, | ||
Queue: queueName, | ||
RetryPolicy: c.config.RetryPolicy, | ||
SchedulerInterval: c.config.schedulerInterval, | ||
StatusFunc: c.monitor.SetProducerStatus, | ||
Workers: c.config.Workers, | ||
}) | ||
c.monitor.InitializeProducerStatus(queueName) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is going to need to do some validation on the QueueConfig
as we currently do when initializing the client, and it will also need to return an error
if validation fails.
We should probably extract that to an internal func (c QueueConfig) validate() error
similar to what we have for the top level Config
so that we can use the same logic in each place. And of course we'll need to make sure this new surface area (add/remove) is fully covered by tests in this regard.
client.go
Outdated
func (c *Client[TTx]) RemoveQueue(queueName string) { | ||
c.producersByQueueNameMu.Lock() | ||
defer c.producersByQueueNameMu.Unlock() | ||
delete(c.producersByQueueName, queueName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this one also return an error
? We could reuse rivertype.ErrNotFound
in case the queue does not exist.
client.go
Outdated
// Remove queue from currentSnapshot.Producers | ||
c.monitor.RemoveProducerStatus(queueName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Safe removal is a bit trickier than this. We need to actually shut down the producer somehow, and then the producer itself should be responsible for reporting its status to the monitor until it is shut down. At that point, yeah, we need to somehow remove it from the monitor's map so we don't keep old state around in that map forever.
Maybe SetProducerStatus
should just be smart enough to do that last part automatically when the producer is stopped?
@brandur thoughts on how to safely trigger the single producer to shut down? And is this API suitable for an operation which may take awhile (until all running jobs go away)? Keeping in mind there is no obvious way to cancel the context just for a single producer, and we may also need to differentiate between Stop
and StopAndCancel
for each individual producer—otherwise how will the producer know when it needs to cancel the context and try to more aggressively shut down running jobs?
In either case we still need to somehow wait for clean shutdown to complete, which may actually never happen before the program exits if the running jobs never exit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah definitely pretty tricky. I find the existing StopAndCancel
to be quite confusing to reason about, so it'd be nice if we could avoid adding another version of that here.
One possibility might be that RemoveQueue
will only wait for a graceful shutdown, and if a hard shutdown is necessary, the caller will just have to use StopAndCancel
.
Also, if it's not needed, I like your idea of just punting on RemoveQueue
for now.
Or, a possible variation of that is to have RemoveQueue
, but only allow it to be called if the client is stopped, which would allow it if necessary, but make a lot of the harder problems go away. If we went that route though, we should make sure to future-proof the API (e.g. by making it take a ctx
if we think there will be a hard stop later) in case we want to remove this limitation in the future.
client_monitor.go
Outdated
func (m *clientMonitor) RemoveProducerStatus(queueName string) { | ||
m.statusSnapshotMu.Lock() | ||
defer m.statusSnapshotMu.Unlock() | ||
delete(m.currentSnapshot.Producers, queueName) | ||
m.bufferStatusUpdate() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can probably merge this into SetProducer
and decide whether or not to remove based upon the status.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
A couple larger points:
What do you think about to add StartWorkContext in the AddQueue section
Yeah, I think we'll need something like this. Ideally, if the client is started, AddQueue
starts the producer. If not, it doesn't.
Also, rather than stop all producers on failure (if the client was started), it maybe should just return an error from the AddQueue
function.
Can you take a look at PeriodicJobs()
too? See here:
Line 1598 in c9dd291
func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs } |
I think we should follow that pattern and make these Queues().Add(...)
and Queues.Remove(...)
. It sorts the functions together (since client has a lot of functions already), and future proofs us in case we want to add any future queues-related functions.
And lastly, tests of course! Although those can wait until the API has fully stabilized.
client.go
Outdated
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{ | ||
ClientID: c.config.ID, | ||
Completer: c.completer, | ||
ErrorHandler: c.config.ErrorHandler, | ||
FetchCooldown: c.config.FetchCooldown, | ||
FetchPollInterval: c.config.FetchPollInterval, | ||
JobTimeout: c.config.JobTimeout, | ||
MaxWorkers: queueConfig.MaxWorkers, | ||
Notifier: c.notifier, | ||
Queue: queueName, | ||
RetryPolicy: c.config.RetryPolicy, | ||
SchedulerInterval: c.config.schedulerInterval, | ||
StatusFunc: c.monitor.SetProducerStatus, | ||
Workers: c.config.Workers, | ||
}) | ||
c.monitor.InitializeProducerStatus(queueName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just given this is a lot of duplicative code, let's move this to a new addProducer(queueName string, queueConfig QueueConfig)
internal helper that can share it with Start
.
I agree with the reduction of the scope of the PR. I will focus on AddQueue and remove the RemoveQueue, I will address it in a next PR. |
Is there any documentation about setting up development environment? I'm using Arch Linux and it would be nice to use postgres image/container for testing. I found docs/development.md but most of these using the raw postgres client, however it would be nice to set an env var or something like that so we can customize the postgres connection. |
Testing notes I checked the CI and I tried to replicate it, based on this I have a few notes. I created a postgres docker container: docker run --name river-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres I added the following environment variables: export PGHOST: 127.0.0.1
export PGPORT: 5432
export PGUSER: postgres
export PGPASSWORD: postgres
export PGSSLMODE: disable
export ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432?sslmode=disable
export DATABASE_URL: postgres://postgres:[email protected]:5432/river_dev?sslmode=disable
export TEST_DATABASE_URL: postgres://postgres:[email protected]:5432/river_testdb?sslmode=disable I use Goland so there is a one-liner:
I changed the
After this I run the database creation/migration detailed in the go run ./internal/cmd/testdbman create |
I added the requested changes. I still have a question whether I need to return
|
Hey @PumpkinSeed, thanks for the updates here. This is a bit of a tricky change, and there's quite a bit of nuance that's quite subtle — e.g. believe it or not, the way you're checking for Do you mind if we take this one over to get it over the finish line? |
I'm totally fine with it. Thanks for that. |
Opened #410 as an alternative. |
Closing in favor of #410. |
Closes #396
Based on the proposal I added AddQueue/RemoveQueue. This won't do anything just add the queues, so we need to "Restart" the client after that.
What do you think about to add StartWorkContext in the AddQueue section