diff --git a/CHANGELOG.md b/CHANGELOG.md index 713fc2b4..a6a565a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663). + ### Fixed - `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661). diff --git a/client_test.go b/client_test.go index 84207bed..b0529845 100644 --- a/client_test.go +++ b/client_test.go @@ -5643,9 +5643,8 @@ func TestInsert(t *testing.T) { AddWorker(workers, &noOpWorker{}) config := &Config{ - FetchCooldown: 2 * time.Millisecond, - Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, - Workers: workers, + Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}}, + Workers: workers, } client, err := NewClient(riverpgxv5.New(dbPool), config) diff --git a/example_client_from_context_dbsql_test.go b/example_client_from_context_dbsql_test.go index f306e914..cfeab56f 100644 --- a/example_client_from_context_dbsql_test.go +++ b/example_client_from_context_dbsql_test.go @@ -37,7 +37,7 @@ func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[Contex // ExampleClientFromContext_databaseSQL demonstrates how to extract the River // client from the worker context when using the [database/sql] driver. -// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]) +// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]). func ExampleClientFromContext_databaseSQL() { ctx := context.Background() diff --git a/example_client_from_context_test.go b/example_client_from_context_test.go index d40f690d..b5979c15 100644 --- a/example_client_from_context_test.go +++ b/example_client_from_context_test.go @@ -36,7 +36,7 @@ func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextCl // ExampleClientFromContext_pgx demonstrates how to extract the River client // from the worker context when using the pgx/v5 driver. -// ([github.com/riverqueue/river/riverdriver/riverpgxv5]) +// ([github.com/riverqueue/river/riverdriver/riverpgxv5]). func ExampleClientFromContext_pgx() { ctx := context.Background() diff --git a/internal/util/chanutil/debounced_chan.go b/internal/util/chanutil/debounced_chan.go index 3fbdf68b..7e685805 100644 --- a/internal/util/chanutil/debounced_chan.go +++ b/internal/util/chanutil/debounced_chan.go @@ -6,10 +6,10 @@ import ( "time" ) -// DebouncedChan is a function that will only be called once per cooldown -// period, at the leading edge. If it is called again during the cooldown, the -// subsequent calls are delayed until the cooldown period has elapsed and are -// also coalesced into a single call. +// DebouncedChan is a channel that will only fire once per cooldown period, at +// the leading edge. If it is called again during the cooldown, the subsequent +// calls are delayed until the cooldown period has elapsed and are also +// coalesced into a single call. type DebouncedChan struct { c chan struct{} cooldown time.Duration diff --git a/producer.go b/producer.go index 856e11a6..59a1f297 100644 --- a/producer.go +++ b/producer.go @@ -159,6 +159,11 @@ type producer struct { // main goroutine. cancelCh chan int64 + // Set to true when the producer thinks it should trigger another fetch as + // soon as slots are available. This is written and read by the main + // goroutine. + fetchWhenSlotsAreAvailable bool + // Receives completed jobs from workers. Written by completed workers, only // read from main goroutine. jobResultCh chan *rivertype.JobRow @@ -457,12 +462,28 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } case result := <-p.jobResultCh: p.removeActiveJob(result.ID) + if p.fetchWhenSlotsAreAvailable { + // If we missed a fetch because all worker slots were full, or if we + // fetched the maximum number of jobs on the last attempt, get a little + // more aggressive triggering the fetch limiter now that we have a slot + // available. + p.fetchWhenSlotsAreAvailable = false + fetchLimiter.Call() + } } } } func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { limit := p.maxJobsToFetch() + if limit <= 0 { + // We have no slots for new jobs, so don't bother fetching. However, since + // we knew it was time to fetch, we keep track of what happened so we can + // trigger another fetch as soon as we have open slots. + p.fetchWhenSlotsAreAvailable = true + return + } + go p.dispatchWork(workCtx, limit, fetchResultCh) for { @@ -472,6 +493,11 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) + + // Fetch returned the maximum number of jobs that were requested, + // implying there may be more in the queue. Trigger another fetch when + // slots are available. + p.fetchWhenSlotsAreAvailable = true } return case result := <-p.jobResultCh: