From 8ff0ab551a5991343156c0bc38a96829443cbec9 Mon Sep 17 00:00:00 2001 From: Chris Gaffney Date: Sun, 20 Oct 2024 14:13:42 -0400 Subject: [PATCH 1/2] Immediately fetch another batch when a full set of job is returned Jobs can often be stuck on queues for a long while when jobs are sporadically enqueued in large batches. Prior to this change jobs would be fetched every FetchPollInterval (default 1s) unless new jobs were enqueued. This change reduces that to FetchCooldown (default 100ms) when a full set of jobs is returned implying that there may be more in the queue. --- producer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/producer.go b/producer.go index 856e11a6..2e3b6d74 100644 --- a/producer.go +++ b/producer.go @@ -447,7 +447,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit if p.paused { continue } - p.innerFetchLoop(workCtx, fetchResultCh) + p.innerFetchLoop(workCtx, fetchResultCh, fetchLimiter) // Ensure we can't start another fetch when fetchCtx is done, even if // the fetchLimiter is also ready to fire: select { @@ -461,7 +461,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } } -func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { +func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult, fetchLimiter *chanutil.DebouncedChan) { limit := p.maxJobsToFetch() go p.dispatchWork(workCtx, limit, fetchResultCh) @@ -472,6 +472,13 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) + + // Fetch returned the maximum number of jobs that were requested + // which implies there may be more in the queue so trigger another + // fetch right after this one. + if len(result.jobs) == limit { + fetchLimiter.Call() + } } return case result := <-p.jobResultCh: From d4ac060fccf01a9f3032d2115ede3765addaa814 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 2 Nov 2024 11:37:05 -0500 Subject: [PATCH 2/2] trigger fetch when worker freed up Tune the client to be more aggressive about fetching when it either just fetched a full batch of jobs, or when it skipped its previously triggered fetch because it was already full. Rather than immediately calling the fetch limiter as in the previous commit, note this situation with a boolean so that we can trigger the fetch limiter _after_ a worker slot has opened up. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Finally, don't run the fetch query if there are no worker slots available. --- CHANGELOG.md | 4 +++ client_test.go | 5 ++-- example_client_from_context_dbsql_test.go | 2 +- example_client_from_context_test.go | 2 +- internal/util/chanutil/debounced_chan.go | 8 +++--- producer.go | 35 +++++++++++++++++------ 6 files changed, 39 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 713fc2b4..a6a565a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663). + ### Fixed - `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661). diff --git a/client_test.go b/client_test.go index 84207bed..b0529845 100644 --- a/client_test.go +++ b/client_test.go @@ -5643,9 +5643,8 @@ func TestInsert(t *testing.T) { AddWorker(workers, &noOpWorker{}) config := &Config{ - FetchCooldown: 2 * time.Millisecond, - Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, - Workers: workers, + Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + Workers: workers, } client, err := NewClient(riverpgxv5.New(dbPool), config) diff --git a/example_client_from_context_dbsql_test.go b/example_client_from_context_dbsql_test.go index f306e914..cfeab56f 100644 --- a/example_client_from_context_dbsql_test.go +++ b/example_client_from_context_dbsql_test.go @@ -37,7 +37,7 @@ func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[Contex // ExampleClientFromContext_databaseSQL demonstrates how to extract the River // client from the worker context when using the [database/sql] driver. -// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]) +// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]). func ExampleClientFromContext_databaseSQL() { ctx := context.Background() diff --git a/example_client_from_context_test.go b/example_client_from_context_test.go index d40f690d..b5979c15 100644 --- a/example_client_from_context_test.go +++ b/example_client_from_context_test.go @@ -36,7 +36,7 @@ func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextCl // ExampleClientFromContext_pgx demonstrates how to extract the River client // from the worker context when using the pgx/v5 driver. -// ([github.com/riverqueue/river/riverdriver/riverpgxv5]) +// ([github.com/riverqueue/river/riverdriver/riverpgxv5]). func ExampleClientFromContext_pgx() { ctx := context.Background() diff --git a/internal/util/chanutil/debounced_chan.go b/internal/util/chanutil/debounced_chan.go index 3fbdf68b..7e685805 100644 --- a/internal/util/chanutil/debounced_chan.go +++ b/internal/util/chanutil/debounced_chan.go @@ -6,10 +6,10 @@ import ( "time" ) -// DebouncedChan is a function that will only be called once per cooldown -// period, at the leading edge. If it is called again during the cooldown, the -// subsequent calls are delayed until the cooldown period has elapsed and are -// also coalesced into a single call. +// DebouncedChan is a channel that will only fire once per cooldown period, at +// the leading edge. If it is called again during the cooldown, the subsequent +// calls are delayed until the cooldown period has elapsed and are also +// coalesced into a single call. type DebouncedChan struct { c chan struct{} cooldown time.Duration diff --git a/producer.go b/producer.go index 2e3b6d74..59a1f297 100644 --- a/producer.go +++ b/producer.go @@ -159,6 +159,11 @@ type producer struct { // main goroutine. cancelCh chan int64 + // Set to true when the producer thinks it should trigger another fetch as + // soon as slots are available. This is written and read by the main + // goroutine. + fetchWhenSlotsAreAvailable bool + // Receives completed jobs from workers. Written by completed workers, only // read from main goroutine. jobResultCh chan *rivertype.JobRow @@ -447,7 +452,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit if p.paused { continue } - p.innerFetchLoop(workCtx, fetchResultCh, fetchLimiter) + p.innerFetchLoop(workCtx, fetchResultCh) // Ensure we can't start another fetch when fetchCtx is done, even if // the fetchLimiter is also ready to fire: select { @@ -457,12 +462,28 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } case result := <-p.jobResultCh: p.removeActiveJob(result.ID) + if p.fetchWhenSlotsAreAvailable { + // If we missed a fetch because all worker slots were full, or if we + // fetched the maximum number of jobs on the last attempt, get a little + // more aggressive triggering the fetch limiter now that we have a slot + // available. + p.fetchWhenSlotsAreAvailable = false + fetchLimiter.Call() + } } } } -func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult, fetchLimiter *chanutil.DebouncedChan) { +func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { limit := p.maxJobsToFetch() + if limit <= 0 { + // We have no slots for new jobs, so don't bother fetching. However, since + // we knew it was time to fetch, we keep track of what happened so we can + // trigger another fetch as soon as we have open slots. + p.fetchWhenSlotsAreAvailable = true + return + } + go p.dispatchWork(workCtx, limit, fetchResultCh) for { @@ -473,12 +494,10 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) - // Fetch returned the maximum number of jobs that were requested - // which implies there may be more in the queue so trigger another - // fetch right after this one. - if len(result.jobs) == limit { - fetchLimiter.Call() - } + // Fetch returned the maximum number of jobs that were requested, + // implying there may be more in the queue. Trigger another fetch when + // slots are available. + p.fetchWhenSlotsAreAvailable = true } return case result := <-p.jobResultCh: