From 21d1c230245b8089bbbe40aad04c283b36f8d01c Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Wed, 7 Aug 2024 21:58:40 -0500 Subject: [PATCH] Derive all internal contexts from Client context (#514) We were previously using `context.Background()` in some situations to avoid cancellation of things that really shouldn't be cancelled by the user context cancellation. However, we can now do that with `context.WithoutCancel` instead in order to preserve context values without any cancellation or deadline. Fixes #512. --- CHANGELOG.md | 4 ++++ internal/jobcompleter/job_completer.go | 2 +- internal/notifier/notifier.go | 4 ++-- producer.go | 19 ++++++++++--------- 4 files changed, 17 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6e0b2f3..479285c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Derive all internal contexts from user-provided `Client` context. This includes the job fetch context, notifier unlisten, and completer. [PR #514](https://github.com/riverqueue/river/pull/514). + ## [0.11.1] - 2024-08-05 ### Fixed diff --git a/internal/jobcompleter/job_completer.go b/internal/jobcompleter/job_completer.go index 3e91e175..31588c24 100644 --- a/internal/jobcompleter/job_completer.go +++ b/internal/jobcompleter/job_completer.go @@ -506,7 +506,7 @@ func (c *BatchCompleter) waitOrInitBacklogChannel(ctx context.Context) { const numRetries = 3 func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseService, disableSleep bool, retryFunc func(ctx context.Context) (T, error)) (T, error) { - uncancelledCtx := context.Background() + uncancelledCtx := context.WithoutCancel(logCtx) var ( defaultVal T diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index 2f86790e..a78d54cb 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -35,8 +35,8 @@ type Subscription struct { func (s *Subscription) Unlisten(ctx context.Context) { s.unlistenOnce.Do(func() { - // Unlisten uses background context in case of cancellation. - if err := s.notifier.unlisten(context.Background(), s); err != nil { //nolint:contextcheck + // Unlisten strips cancellation from the parent context to ensure it runs: + if err := s.notifier.unlisten(context.WithoutCancel(ctx), s); err != nil { //nolint:contextcheck s.notifier.Logger.ErrorContext(ctx, s.notifier.Name+": Error unlistening on topic", "err", err, "topic", s.topic) } }) diff --git a/producer.go b/producer.go index c4b2f0e4..b6edbae6 100644 --- a/producer.go +++ b/producer.go @@ -459,7 +459,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { limit := p.maxJobsToFetch() - go p.dispatchWork(limit, fetchResultCh) //nolint:contextcheck + go p.dispatchWork(workCtx, limit, fetchResultCh) for { select { @@ -509,14 +509,15 @@ func (p *producer) maybeCancelJob(id int64) { executor.Cancel() } -func (p *producer) dispatchWork(count int, fetchResultCh chan<- producerFetchResult) { - // This intentionally uses a background context because we don't want it to - // get cancelled if the producer is asked to shut down. In that situation, we - // want to finish fetching any jobs we are in the midst of fetching, work - // them, and then stop. Otherwise we'd have a risk of shutting down when we - // had already fetched jobs in the database, leaving those jobs stranded. We'd - // then potentially have to release them back to the queue. - jobs, err := p.exec.JobGetAvailable(context.Background(), &riverdriver.JobGetAvailableParams{ +func (p *producer) dispatchWork(workCtx context.Context, count int, fetchResultCh chan<- producerFetchResult) { + // This intentionally removes any deadlines or cancelleation from the parent + // context because we don't want it to get cancelled if the producer is asked + // to shut down. In that situation, we want to finish fetching any jobs we are + // in the midst of fetching, work them, and then stop. Otherwise we'd have a + // risk of shutting down when we had already fetched jobs in the database, + // leaving those jobs stranded. We'd then potentially have to release them + // back to the queue. + jobs, err := p.exec.JobGetAvailable(context.WithoutCancel(workCtx), &riverdriver.JobGetAvailableParams{ AttemptedBy: p.config.ClientID, Max: count, Queue: p.config.Queue,