From ce141fa03fe2bc2603dba50c2ec27f94a8e2af23 Mon Sep 17 00:00:00 2001 From: Blake Gentry Date: Sat, 2 Nov 2024 14:19:03 -0500 Subject: [PATCH] More aggressive refetching when full (#664) * Immediately fetch another batch when a full set of job is returned Jobs can often be stuck on queues for a long while when jobs are sporadically enqueued in large batches. Prior to this change jobs would be fetched every FetchPollInterval (default 1s) unless new jobs were enqueued. This change reduces that to FetchCooldown (default 100ms) when a full set of jobs is returned implying that there may be more in the queue. * trigger fetch when worker freed up Tune the client to be more aggressive about fetching when it either just fetched a full batch of jobs, or when it skipped its previously triggered fetch because it was already full. Rather than immediately calling the fetch limiter as in the previous commit, note this situation with a boolean so that we can trigger the fetch limiter _after_ a worker slot has opened up. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Finally, don't run the fetch query if there are no worker slots available. --------- Co-authored-by: Chris Gaffney --- CHANGELOG.md | 4 ++++ client_test.go | 5 ++--- example_client_from_context_dbsql_test.go | 2 +- example_client_from_context_test.go | 2 +- internal/util/chanutil/debounced_chan.go | 8 +++---- producer.go | 26 +++++++++++++++++++++++ 6 files changed, 38 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 713fc2b4..a6a565a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663). + ### Fixed - `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661). diff --git a/client_test.go b/client_test.go index 84207bed..b0529845 100644 --- a/client_test.go +++ b/client_test.go @@ -5643,9 +5643,8 @@ func TestInsert(t *testing.T) { AddWorker(workers, &noOpWorker{}) config := &Config{ - FetchCooldown: 2 * time.Millisecond, - Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, - Workers: workers, + Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + Workers: workers, } client, err := NewClient(riverpgxv5.New(dbPool), config) diff --git a/example_client_from_context_dbsql_test.go b/example_client_from_context_dbsql_test.go index f306e914..cfeab56f 100644 --- a/example_client_from_context_dbsql_test.go +++ b/example_client_from_context_dbsql_test.go @@ -37,7 +37,7 @@ func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[Contex // ExampleClientFromContext_databaseSQL demonstrates how to extract the River // client from the worker context when using the [database/sql] driver. -// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]) +// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]). func ExampleClientFromContext_databaseSQL() { ctx := context.Background() diff --git a/example_client_from_context_test.go b/example_client_from_context_test.go index d40f690d..b5979c15 100644 --- a/example_client_from_context_test.go +++ b/example_client_from_context_test.go @@ -36,7 +36,7 @@ func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextCl // ExampleClientFromContext_pgx demonstrates how to extract the River client // from the worker context when using the pgx/v5 driver. -// ([github.com/riverqueue/river/riverdriver/riverpgxv5]) +// ([github.com/riverqueue/river/riverdriver/riverpgxv5]). func ExampleClientFromContext_pgx() { ctx := context.Background() diff --git a/internal/util/chanutil/debounced_chan.go b/internal/util/chanutil/debounced_chan.go index 3fbdf68b..7e685805 100644 --- a/internal/util/chanutil/debounced_chan.go +++ b/internal/util/chanutil/debounced_chan.go @@ -6,10 +6,10 @@ import ( "time" ) -// DebouncedChan is a function that will only be called once per cooldown -// period, at the leading edge. If it is called again during the cooldown, the -// subsequent calls are delayed until the cooldown period has elapsed and are -// also coalesced into a single call. +// DebouncedChan is a channel that will only fire once per cooldown period, at +// the leading edge. If it is called again during the cooldown, the subsequent +// calls are delayed until the cooldown period has elapsed and are also +// coalesced into a single call. type DebouncedChan struct { c chan struct{} cooldown time.Duration diff --git a/producer.go b/producer.go index 856e11a6..59a1f297 100644 --- a/producer.go +++ b/producer.go @@ -159,6 +159,11 @@ type producer struct { // main goroutine. cancelCh chan int64 + // Set to true when the producer thinks it should trigger another fetch as + // soon as slots are available. This is written and read by the main + // goroutine. + fetchWhenSlotsAreAvailable bool + // Receives completed jobs from workers. Written by completed workers, only // read from main goroutine. jobResultCh chan *rivertype.JobRow @@ -457,12 +462,28 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } case result := <-p.jobResultCh: p.removeActiveJob(result.ID) + if p.fetchWhenSlotsAreAvailable { + // If we missed a fetch because all worker slots were full, or if we + // fetched the maximum number of jobs on the last attempt, get a little + // more aggressive triggering the fetch limiter now that we have a slot + // available. + p.fetchWhenSlotsAreAvailable = false + fetchLimiter.Call() + } } } } func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { limit := p.maxJobsToFetch() + if limit <= 0 { + // We have no slots for new jobs, so don't bother fetching. However, since + // we knew it was time to fetch, we keep track of what happened so we can + // trigger another fetch as soon as we have open slots. + p.fetchWhenSlotsAreAvailable = true + return + } + go p.dispatchWork(workCtx, limit, fetchResultCh) for { @@ -472,6 +493,11 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) + + // Fetch returned the maximum number of jobs that were requested, + // implying there may be more in the queue. Trigger another fetch when + // slots are available. + p.fetchWhenSlotsAreAvailable = true } return case result := <-p.jobResultCh: