Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dynamic queue handling #396

Open
PumpkinSeed opened this issue Jun 21, 2024 · 3 comments
Open

Dynamic queue handling #396

PumpkinSeed opened this issue Jun 21, 2024 · 3 comments
Labels
enhancement New feature or request

Comments

@PumpkinSeed
Copy link
Contributor

We have a use-case where we need to add queues dynamically. The description of the use-case: we have multiple Kind of jobs, and one particular is the manage_rewards. This Kind connects to a server where the rate limit handled per route. Sometimes happens that one route blocks the other jobs because it's not handled properly so we figured out that we create queues per route, since those handled parallel. But for this we want to add queues in runtime.

I tweaked the code and created the following method:

func (c *Client[TTx]) AddQueue(queue string, queueConfig QueueConfig) {
	archetype := &baseservice.Archetype{
		Logger:     c.config.Logger,
		Rand:       randutil.NewCryptoSeededConcurrentSafeRand(),
		TimeNowUTC: func() time.Time { return time.Now().UTC() },
	}

	c.producersByQueueName[queue] = newProducer(archetype, c.driver.GetExecutor(), &producerConfig{
		ClientID:          c.config.ID,
		Completer:         c.completer,
		ErrorHandler:      c.config.ErrorHandler,
		FetchCooldown:     c.config.FetchCooldown,
		FetchPollInterval: c.config.FetchPollInterval,
		JobTimeout:        c.config.JobTimeout,
		MaxWorkers:        queueConfig.MaxWorkers,
		Notifier:          c.notifier,
		Queue:             queue,
		RetryPolicy:       c.config.RetryPolicy,
		SchedulerInterval: c.config.schedulerInterval,
		StatusFunc:        c.monitor.SetProducerStatus,
		Workers:           c.config.Workers,
	})
	c.monitor.InitializeProducerStatus(queue)
}

After that I ran the following:

if err := riverClient.Stop(ctx); err != nil {
	t.Fatal(err)
}

riverClient.AddQueue("new_queue", river.QueueConfig{
	MaxWorkers: 10,
})

if err := riverClient.Start(ctx); err != nil {
	t.Fatal(err)
}

If you agree with these changes or you have any suggestion let me know and I can send a PR about it. We tested with 1800 queue's for the same Kind with 10.000 jobs per queue. That worked all fine.

Note we are using River in production with millions of jobs and that works awesome.

@brandur
Copy link
Contributor

brandur commented Jun 22, 2024

Note we are using River in production with millions of jobs and that works awesome.

That's awesome to hear! Thanks for checking in about it.

Code looks mostly right, with my only notes that instead of initializing a new archetype, it's better just to reuse the one from client.baseService, and since the newProducer code is quite a large block that gets repeated twice and might be prone to drift, we'd probably want to extract into its own function. We also might need some sort of mutex on the producers map in case AddQueue is being called simultaneously with start/stop.

@bgentry Thoughts on this one? I'm not sure I'd generally recommend this as a pattern, but it doesn't seem too harmful to allow it.

@bgentry
Copy link
Contributor

bgentry commented Jun 24, 2024

I'm not opposed to this idea, though as my comments in #399 show it's more complex than it may seem at first glance. Particularly for removal because of complex shutdown considerations. We can continue the discussion on specifics in that PR now that it's open.

@bgentry bgentry added the enhancement New feature or request label Jun 24, 2024
@PumpkinSeed
Copy link
Contributor Author

I just wanted to connect the PR which resolves the first part of this ticket: #410

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants