diff --git a/producer.go b/producer.go index 3d365365..ca7185b9 100644 --- a/producer.go +++ b/producer.go @@ -447,7 +447,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit if p.paused { continue } - p.innerFetchLoop(workCtx, fetchResultCh) + p.innerFetchLoop(workCtx, fetchResultCh, fetchLimiter) // Ensure we can't start another fetch when fetchCtx is done, even if // the fetchLimiter is also ready to fire: select { @@ -461,7 +461,7 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit } } -func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) { +func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult, fetchLimiter *chanutil.DebouncedChan) { limit := p.maxJobsToFetch() go p.dispatchWork(workCtx, limit, fetchResultCh) @@ -472,6 +472,13 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error())) } else if len(result.jobs) > 0 { p.startNewExecutors(workCtx, result.jobs) + + // Fetch returned the maximum number of jobs that were requested + // which implies there may be more in the queue so trigger another + // fetch right after this one. + if len(result.jobs) == limit { + fetchLimiter.Call() + } } return case result := <-p.jobResultCh: