Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AddQueue to the Client #399

Closed
wants to merge 4 commits into from

Conversation

PumpkinSeed
Copy link
Contributor

@PumpkinSeed PumpkinSeed commented Jun 24, 2024

Closes #396

Based on the proposal I added AddQueue/RemoveQueue. This won't do anything just add the queues, so we need to "Restart" the client after that.

What do you think about to add StartWorkContext in the AddQueue section

if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
	stopProducers()
	stopServicesOnError()
	return err
}

Copy link
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the idea! I'm not opposed to the idea of adding this, though I think it turns out to be a fair bit more complex than what this PR demonstrates. Adding is actually easy IMO, it's removal that is a lot trickier in practice and raises a lot of UX questions.

Interested in whether @brandur has ideas on how to address these. One option would be to reduce the scope of this PR to only AddQueue at this time and punt on RemoveQueue because of its much higher complexity.

client.go Outdated
Comment on lines 621 to 640
func (c *Client[TTx]) AddQueue(queueName string, queueConfig QueueConfig) {
c.producersByQueueNameMu.Lock()
defer c.producersByQueueNameMu.Unlock()
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is going to need to do some validation on the QueueConfig as we currently do when initializing the client, and it will also need to return an error if validation fails.

We should probably extract that to an internal func (c QueueConfig) validate() error similar to what we have for the top level Config so that we can use the same logic in each place. And of course we'll need to make sure this new surface area (add/remove) is fully covered by tests in this regard.

client.go Outdated
Comment on lines 642 to 645
func (c *Client[TTx]) RemoveQueue(queueName string) {
c.producersByQueueNameMu.Lock()
defer c.producersByQueueNameMu.Unlock()
delete(c.producersByQueueName, queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one also return an error? We could reuse rivertype.ErrNotFound in case the queue does not exist.

client.go Outdated
Comment on lines 647 to 648
// Remove queue from currentSnapshot.Producers
c.monitor.RemoveProducerStatus(queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safe removal is a bit trickier than this. We need to actually shut down the producer somehow, and then the producer itself should be responsible for reporting its status to the monitor until it is shut down. At that point, yeah, we need to somehow remove it from the monitor's map so we don't keep old state around in that map forever.

Maybe SetProducerStatus should just be smart enough to do that last part automatically when the producer is stopped?

@brandur thoughts on how to safely trigger the single producer to shut down? And is this API suitable for an operation which may take awhile (until all running jobs go away)? Keeping in mind there is no obvious way to cancel the context just for a single producer, and we may also need to differentiate between Stop and StopAndCancel for each individual producer—otherwise how will the producer know when it needs to cancel the context and try to more aggressively shut down running jobs?

In either case we still need to somehow wait for clean shutdown to complete, which may actually never happen before the program exits if the running jobs never exit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah definitely pretty tricky. I find the existing StopAndCancel to be quite confusing to reason about, so it'd be nice if we could avoid adding another version of that here.

One possibility might be that RemoveQueue will only wait for a graceful shutdown, and if a hard shutdown is necessary, the caller will just have to use StopAndCancel.

Also, if it's not needed, I like your idea of just punting on RemoveQueue for now.

Or, a possible variation of that is to have RemoveQueue, but only allow it to be called if the client is stopped, which would allow it if necessary, but make a lot of the harder problems go away. If we went that route though, we should make sure to future-proof the API (e.g. by making it take a ctx if we think there will be a hard stop later) in case we want to remove this limitation in the future.

Comment on lines 70 to 75
func (m *clientMonitor) RemoveProducerStatus(queueName string) {
m.statusSnapshotMu.Lock()
defer m.statusSnapshotMu.Unlock()
delete(m.currentSnapshot.Producers, queueName)
m.bufferStatusUpdate()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can probably merge this into SetProducer and decide whether or not to remove based upon the status.

Copy link
Contributor

@brandur brandur left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

A couple larger points:

What do you think about to add StartWorkContext in the AddQueue section

Yeah, I think we'll need something like this. Ideally, if the client is started, AddQueue starts the producer. If not, it doesn't.

Also, rather than stop all producers on failure (if the client was started), it maybe should just return an error from the AddQueue function.

Can you take a look at PeriodicJobs() too? See here:

river/client.go

Line 1598 in c9dd291

func (c *Client[TTx]) PeriodicJobs() *PeriodicJobBundle { return c.periodicJobs }

I think we should follow that pattern and make these Queues().Add(...) and Queues.Remove(...). It sorts the functions together (since client has a lot of functions already), and future proofs us in case we want to add any future queues-related functions.

And lastly, tests of course! Although those can wait until the API has fully stabilized.

client.go Outdated
Comment on lines 624 to 639
c.producersByQueueName[queueName] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queueName,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queueName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just given this is a lot of duplicative code, let's move this to a new addProducer(queueName string, queueConfig QueueConfig) internal helper that can share it with Start.

@PumpkinSeed
Copy link
Contributor Author

I agree with the reduction of the scope of the PR. I will focus on AddQueue and remove the RemoveQueue, I will address it in a next PR.

@PumpkinSeed
Copy link
Contributor Author

Is there any documentation about setting up development environment? I'm using Arch Linux and it would be nice to use postgres image/container for testing. I found docs/development.md but most of these using the raw postgres client, however it would be nice to set an env var or something like that so we can customize the postgres connection.

@PumpkinSeed
Copy link
Contributor Author

Testing notes

I checked the CI and I tried to replicate it, based on this I have a few notes.

I created a postgres docker container:

docker run --name river-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 -d postgres

I added the following environment variables:

export PGHOST: 127.0.0.1
export PGPORT: 5432
export PGUSER: postgres
export PGPASSWORD: postgres
export PGSSLMODE: disable
export ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432?sslmode=disable
export DATABASE_URL: postgres://postgres:[email protected]:5432/river_dev?sslmode=disable
export TEST_DATABASE_URL: postgres://postgres:[email protected]:5432/river_testdb?sslmode=disable

I use Goland so there is a one-liner:

PGHOST=127.0.0.1;PGPORT=5432;PGUSER=postgres;PGPASSWORD=postgres;PGSSLMODE=disable;ADMIN_DATABASE_URL=postgres://postgres:postgres@localhost:5432?sslmode=disable;DATABASE_URL=postgres://postgres:[email protected]:5432/river_dev?sslmode=disable;TEST_DATABASE_URL=postgres://postgres:[email protected]:5432/river_testdb?sslmode=disable

I changed the managementDatabaseURL in the ./internal/cmd/testdbman/main.go:

const managementDatabaseURL = "postgresql://postgres:postgres@localhost:5432/postgres"

After this I run the database creation/migration detailed in the docs/development.md

go run ./internal/cmd/testdbman create

@PumpkinSeed
Copy link
Contributor Author

I added the requested changes.

I still have a question whether I need to return s.context in the StartInit at the following section or not:

if s.started {
	return ctx, false, nil
}

@PumpkinSeed PumpkinSeed requested review from bgentry and brandur June 25, 2024 11:57
@PumpkinSeed
Copy link
Contributor Author

@brandur @bgentry Do you have any update on this?

@brandur
Copy link
Contributor

brandur commented Jun 28, 2024

Hey @PumpkinSeed, thanks for the updates here.

This is a bit of a tricky change, and there's quite a bit of nuance that's quite subtle — e.g. believe it or not, the way you're checking for IsStarted here is racy because a shutdown could have been finished between the check on IsStarted and the call to StartWorkContext. There's also a couple of other changes need like the addition of a Queues() bundle like I mentioned above.

Do you mind if we take this one over to get it over the finish line?

@PumpkinSeed
Copy link
Contributor Author

Do you mind if we take this one over to get it over the finish line?

I'm totally fine with it. Thanks for that.

@brandur
Copy link
Contributor

brandur commented Jun 30, 2024

Opened #410 as an alternative.

@brandur
Copy link
Contributor

brandur commented Jul 4, 2024

Closing in favor of #410.

@brandur brandur closed this Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Dynamic queue handling
3 participants