From eb6a00c59c4fcf1ca1f32eef1ba574e0287120c9 Mon Sep 17 00:00:00 2001 From: Chris Gaffney Date: Sun, 20 Oct 2024 14:13:42 -0400 Subject: [PATCH] Immediately fetch another batch when a full set of job is returned Jobs can often be stuck on queues for a long while when jobs are sporadically enqueued in large batches. Prior to this change jobs would be fetched every FetchPollInterval (default 1s) unless new jobs were enqueued. This change reduces that to FetchCooldown (default 100ms) when a full set of jobs is returned implying that there may be more in the queue. --- producer.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/producer.go b/producer.go index 3d365365..ca7185b9 100644 --- a/producer.go +++ b/producer.go @@ -447,7 +447,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit if p.paused { continue } - p.innerFetchLoop(workCtx, fetchResultCh) + p.innerFetchLoop(workCtx, fetchResultCh, fetchLimiter) // Ensure we can't start another fetch when fetchCtx is done, even if // the fetchLimiter is also ready to fire: select { @@ -461,7 +461,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } } -func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { +func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult, fetchLimiter *chanutil.DebouncedChan) { limit := p.maxJobsToFetch() go p.dispatchWork(workCtx, limit, fetchResultCh) @@ -472,6 +472,13 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) + + // Fetch returned the maximum number of jobs that were requested + // which implies there may be more in the queue so trigger another + // fetch right after this one. + if len(result.jobs) == limit { + fetchLimiter.Call() + } } return case result := <-p.jobResultCh: