Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More aggressive refetching when full #664

Merged
merged 2 commits into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663).

### Fixed

- `riverpgxv5` driver: `Hijack()` the underlying listener connection as soon as it is acquired from the `pgxpool.Pool` in order to prevent the pool from automatically closing it after it reaches its max age. A max lifetime makes sense in the context of a pool with many conns, but a long-lived listener does not need a max lifetime as long as it can ensure the conn remains healthy. [PR #661](https://github.com/riverqueue/river/pull/661).
Expand Down
5 changes: 2 additions & 3 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5643,9 +5643,8 @@ func TestInsert(t *testing.T) {
AddWorker(workers, &noOpWorker{})

config := &Config{
FetchCooldown: 2 * time.Millisecond,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
Workers: workers,
}

client, err := NewClient(riverpgxv5.New(dbPool), config)
Expand Down
2 changes: 1 addition & 1 deletion example_client_from_context_dbsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (w *ContextClientSQLWorker) Work(ctx context.Context, job *river.Job[Contex

// ExampleClientFromContext_databaseSQL demonstrates how to extract the River
// client from the worker context when using the [database/sql] driver.
// ([github.com/riverqueue/river/riverdriver/riverdatabasesql])
// ([github.com/riverqueue/river/riverdriver/riverdatabasesql]).
func ExampleClientFromContext_databaseSQL() {
ctx := context.Background()

Expand Down
2 changes: 1 addition & 1 deletion example_client_from_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (w *ContextClientWorker) Work(ctx context.Context, job *river.Job[ContextCl

// ExampleClientFromContext_pgx demonstrates how to extract the River client
// from the worker context when using the pgx/v5 driver.
// ([github.com/riverqueue/river/riverdriver/riverpgxv5])
// ([github.com/riverqueue/river/riverdriver/riverpgxv5]).
func ExampleClientFromContext_pgx() {
ctx := context.Background()

Expand Down
8 changes: 4 additions & 4 deletions internal/util/chanutil/debounced_chan.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"time"
)

// DebouncedChan is a function that will only be called once per cooldown
// period, at the leading edge. If it is called again during the cooldown, the
// subsequent calls are delayed until the cooldown period has elapsed and are
// also coalesced into a single call.
// DebouncedChan is a channel that will only fire once per cooldown period, at
// the leading edge. If it is called again during the cooldown, the subsequent
// calls are delayed until the cooldown period has elapsed and are also
// coalesced into a single call.
type DebouncedChan struct {
c chan struct{}
cooldown time.Duration
Expand Down
26 changes: 26 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ type producer struct {
// main goroutine.
cancelCh chan int64

// Set to true when the producer thinks it should trigger another fetch as
// soon as slots are available. This is written and read by the main
// goroutine.
fetchWhenSlotsAreAvailable bool

// Receives completed jobs from workers. Written by completed workers, only
// read from main goroutine.
jobResultCh chan *rivertype.JobRow
Expand Down Expand Up @@ -457,12 +462,28 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
}
case result := <-p.jobResultCh:
p.removeActiveJob(result.ID)
if p.fetchWhenSlotsAreAvailable {
// If we missed a fetch because all worker slots were full, or if we
// fetched the maximum number of jobs on the last attempt, get a little
// more aggressive triggering the fetch limiter now that we have a slot
// available.
p.fetchWhenSlotsAreAvailable = false
fetchLimiter.Call()
}
}
}
}

func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan producerFetchResult) {
limit := p.maxJobsToFetch()
if limit <= 0 {
// We have no slots for new jobs, so don't bother fetching. However, since
// we knew it was time to fetch, we keep track of what happened so we can
// trigger another fetch as soon as we have open slots.
p.fetchWhenSlotsAreAvailable = true
return
}

go p.dispatchWork(workCtx, limit, fetchResultCh)

for {
Expand All @@ -472,6 +493,11 @@ func (p *producer) innerFetchLoop(workCtx context.Context, fetchResultCh chan pr
p.Logger.ErrorContext(workCtx, p.Name+": Error fetching jobs", slog.String("err", result.err.Error()))
} else if len(result.jobs) > 0 {
p.startNewExecutors(workCtx, result.jobs)

// Fetch returned the maximum number of jobs that were requested,
// implying there may be more in the queue. Trigger another fetch when
// slots are available.
p.fetchWhenSlotsAreAvailable = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me how this is triggered only when a full batch has been returned. Should it only be set to true when len(results.jobs) == limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, no, I totally missed that clause 🤦‍♂️ Fix incoming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fixed in #668 / v0.14.1. Thanks for catching this 🙏

}
return
case result := <-p.jobResultCh:
Expand Down
Loading