Skip to content

Commit

Permalink
Batch completer + additional completer test suite and benchmarks
Browse files Browse the repository at this point in the history
Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
  • Loading branch information
brandur committed Mar 12, 2024
1 parent 599b1d3 commit 01d3531
Show file tree
Hide file tree
Showing 20 changed files with 1,633 additions and 299 deletions.
138 changes: 98 additions & 40 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,10 +271,10 @@ type Client[TTx any] struct {
driver riverdriver.Driver[TTx]
elector *leadership.Elector

// fetchNewWorkCancel cancels the context used for fetching new work. This
// fetchWorkCancel cancels the context used for fetching new work. This
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelCauseFunc
fetchWorkCancel context.CancelCauseFunc

monitor *clientMonitor
notifier *notifier.Notifier
Expand Down Expand Up @@ -428,10 +428,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

completer := jobcompleter.NewAsyncCompleter(archetype, driver.GetExecutor(), 100)

client := &Client[TTx]{
completer: completer,
config: config,
driver: driver,
monitor: newClientMonitor(),
Expand Down Expand Up @@ -460,6 +457,9 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// we'll need to add a config for this.
instanceName := "default"

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)

Expand Down Expand Up @@ -582,14 +582,6 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return errors.New("at least one Worker must be added to the Workers bundle")
}

// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
c.fetchNewWorkCancel = fetchNewWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

// Before doing anything else, make an initial connection to the database to
// verify that it appears healthy. Many of the subcomponents below start up
// in a goroutine and in case of initial failure, only produce a log line,
Expand All @@ -602,6 +594,14 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return fmt.Errorf("error making initial connection to database: %w", err)
}

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services)
c.monitor.Stop()
}

// Monitor should be the first subprocess to start, and the last to stop.
// It's not part of the waitgroup because we need to wait for everything else
// to shut down prior to closing the monitor.
Expand All @@ -612,19 +612,40 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

// Receives job complete notifications from the completer and distributes
// them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
if c.completer != nil {
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
}

for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
startstop.StopAllParallel(c.services)
// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
}

c.monitor.Stop()
// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchCtx, fetchWorkCancel := context.WithCancelCause(ctx)
c.fetchWorkCancel = fetchWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

for _, service := range c.services {
// TODO(brandur): Reevaluate the use of fetchNewWorkCtx here. It's
// currently necessary to speed up shutdown so that all services start
// shutting down before having to wait for the producers to finish, but
// as stopping becomes more normalized (hopefully by making the client
// itself a start/stop service), we can likely accomplish that in a
// cleaner way.
if err := service.Start(fetchCtx); err != nil {
stopServicesOnError()
if errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
return nil
}
Expand Down Expand Up @@ -656,18 +677,21 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
producer.Stop()
}

// Stop all mainline services where stop order isn't important. Contains the
// elector and notifier, amongst others.
startstop.StopAllParallel(c.services)

// Once the producers have all finished, we know that completers have at least
// enqueued any remaining work. Wait for the completer to finish.
//
// TODO: there's a risk here that the completer is stuck on a job that won't
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()
// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
// This list of services contains the completer, which should always
// stop after the producers so that any remaining work that was enqueued
// will have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that
// won't complete. We probably need a timeout or way to move on in those
// cases.
c.services,

c.queueMaintainer.Stop()
// Will only be started if this client was leader, but can tolerate a stop
// without having been started.
c.queueMaintainer,
))

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": All services stopped")

Expand Down Expand Up @@ -701,12 +725,12 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
if c.fetchNewWorkCancel == nil {
if c.fetchWorkCancel == nil {
return errors.New("client not started")
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started")
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.fetchWorkCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

Expand All @@ -731,7 +755,7 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error {
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.fetchNewWorkCancel(rivercommon.ErrShutdown)
c.fetchWorkCancel(rivercommon.ErrShutdown)
c.workCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}
Expand Down Expand Up @@ -762,7 +786,41 @@ func (c *Client[TTx]) Stopped() <-chan struct{} {
// versions. If new event kinds are added, callers will have to explicitly add
// them to their requested list and ensure they can be handled correctly.
func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
for _, kind := range kinds {
return c.SubscribeConfig(&SubscribeConfig{Kinds: kinds})
}

// The default maximum size of the subscribe channel. Events that would overflow
// it will be dropped.
const subscribeChanSizeDefault = 1_000

// SubscribeConfig is more thorough subscription configuration used for
// Client.SubscribeConfig.
type SubscribeConfig struct {
// ChanSize is the size of the buffered channel that will be created for the
// subscription. Incoming events that overall this number because a listener
// isn't reading from the channel in a timely manner will be dropped.
//
// Defaults to 1000.
ChanSize int

// Kinds are the kinds of events that the subscription will receive.
// Requiring that kinds are specified explicitly allows for forward
// compatibility in case new kinds of events are added in future versions.
// If new event kinds are added, callers will have to explicitly add them to
// their requested list and esnure they can be handled correctly.
Kinds []EventKind
}

// Special internal variant that lets us inject an overridden size.
func (c *Client[TTx]) SubscribeConfig(config *SubscribeConfig) (<-chan *Event, func()) {
if config.ChanSize < 0 {
panic("SubscribeConfig.ChanSize must be greater or equal to 1")
}
if config.ChanSize == 0 {
config.ChanSize = subscribeChanSizeDefault
}

for _, kind := range config.Kinds {
if _, ok := allKinds[kind]; !ok {
panic(fmt.Errorf("unknown event kind: %s", kind))
}
Expand All @@ -771,15 +829,15 @@ func (c *Client[TTx]) Subscribe(kinds ...EventKind) (<-chan *Event, func()) {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

subChan := make(chan *Event, subscribeChanSize)
subChan := make(chan *Event, config.ChanSize)

// Just gives us an easy way of removing the subscription again later.
subID := c.subscriptionsSeq
c.subscriptionsSeq++

c.subscriptions[subID] = &eventSubscription{
Chan: subChan,
Kinds: sliceutil.KeyBy(kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
Kinds: sliceutil.KeyBy(config.Kinds, func(k EventKind) (EventKind, struct{}) { return k, struct{}{} }),
}

cancel := func() {
Expand Down
Loading

0 comments on commit 01d3531

Please sign in to comment.