Skip to content

Commit

Permalink
Make client a start/stop service
Browse files Browse the repository at this point in the history
Here, continue service refactoring to its ultimate conclusion, and make
the client itself a start/stop service, having a couple advantages:

* (IMO) considerably simplifies the start and stop code, putting it all
  in one place, and largely even one function. Fewer helpers to follow
  around to understand what's going on.

* Makes the client behave gracefully on double starts/stops.

* Allows the client to easily handle start/stop under duress. Any number
  of goroutines can be starting or stopping it simultaneously and it's
  guaranteed free from races.

Because the client's stop functions are different from the signature of
other start/stop services (taking a context and returning an error), I
had to modify the base start/stop service somewhat and allow for an
additional `StopInit` helper (like `StartInit`) that lets stop behavior
be customized while still providing race-free operations.
  • Loading branch information
brandur committed Mar 17, 2024
1 parent e68e430 commit 4091846
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 163 deletions.
289 changes: 151 additions & 138 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/riverqueue/river/internal/maintenance/startstop"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/maputil"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
Expand Down Expand Up @@ -262,20 +263,16 @@ type QueueConfig struct {
// multiple instances operating on different databases or Postgres schemas
// within a single database.
type Client[TTx any] struct {
// BaseService can't be embedded like on other services because its
// properties would leak to the external API.
baseService baseservice.BaseService
// BaseService and BaseStartStop can't be embedded like on other services
// because their properties would leak to the external API.
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector

// fetchWorkCancel cancels the context used for fetching new work. This
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchWorkCancel context.CancelCauseFunc

monitor *clientMonitor
notifier *notifier.Notifier
producersByQueueName map[string]*producer
Expand All @@ -284,10 +281,10 @@ type Client[TTx any] struct {
subscriptions map[int]*eventSubscription
subscriptionsMu sync.Mutex
subscriptionsSeq int // used for generating simple IDs
stopComplete chan struct{}
statsAggregate jobstats.JobStatistics
statsMu sync.Mutex
statsNumJobs int
stopped chan struct{}
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter

Expand Down Expand Up @@ -433,7 +430,6 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
driver: driver,
monitor: newClientMonitor(),
producersByQueueName: make(map[string]*producer),
stopComplete: make(chan struct{}),
subscriptions: make(map[int]*eventSubscription),
testSignals: clientTestSignals{},
uniqueInserter: baseservice.Init(archetype, &dbunique.UniqueInserter{
Expand Down Expand Up @@ -575,146 +571,154 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
// jobs, but will also cancel the context for any currently-running jobs. If
// using StopAndCancel, there's no need to also call Stop.
func (c *Client[TTx]) Start(ctx context.Context) error {
if !c.config.willExecuteJobs() {
return errors.New("client Queues and Workers must be configured for a client to start working")
}
if c.config.Workers != nil && len(c.config.Workers.workersMap) < 1 {
return errors.New("at least one Worker must be added to the Workers bundle")
fetchCtx, shouldStart, stopped := c.baseStartStop.StartInit(ctx)
if !shouldStart {
return nil
}

// Before doing anything else, make an initial connection to the database to
// verify that it appears healthy. Many of the subcomponents below start up
// in a goroutine and in case of initial failure, only produce a log line,
// so even in the case of a fundamental failure like the database not being
// available, the client appears to have started even though it's completely
// non-functional. Here we try to make an initial assessment of health and
// return quickly in case of an apparent problem.
_, err := c.driver.GetExecutor().Exec(ctx, "SELECT 1")
if err != nil {
return fmt.Errorf("error making initial connection to database: %w", err)
}
c.stopped = stopped

// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services)
c.monitor.Stop()
stopProducers := func() {
startstop.StopAllParallel(sliceutil.Map(
maputil.Values(c.producersByQueueName),
func(p *producer) startstop.Service { return p }),
)
}

// Monitor should be the first subprocess to start, and the last to stop.
// It's not part of the waitgroup because we need to wait for everything else
// to shut down prior to closing the monitor.
//
// Unlike other services, it's given a background context so that it doesn't
// cancel on normal stops.
if err := c.monitor.Start(context.Background()); err != nil { //nolint:contextcheck
return err
}
var workCtx context.Context

if c.completer != nil {
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
// Startup code. Wrapped in a closure so it doesn't have to remember to
// close the stopped channel if returning with an error.
if err := func() error {
if !c.config.willExecuteJobs() {
return errors.New("client Queues and Workers must be configured for a client to start working")
}
if c.config.Workers != nil && len(c.config.Workers.workersMap) < 1 {
return errors.New("at least one Worker must be added to the Workers bundle")
}

// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
}

// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchCtx, fetchWorkCancel := context.WithCancelCause(ctx)
c.fetchWorkCancel = fetchWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel

for _, service := range c.services {
// TODO(brandur): Reevaluate the use of fetchCtx here. It's currently
// necessary to speed up shutdown so that all services start shutting
// down before having to wait for the producers to finish, but as
// stopping becomes more normalized (hopefully by making the client
// itself a start/stop service), we can likely accomplish that in a
// cleaner way.
if err := service.Start(fetchCtx); err != nil {
stopServicesOnError()
if errors.Is(context.Cause(ctx), rivercommon.ErrShutdown) {
return nil
}
return err
// Before doing anything else, make an initial connection to the database to
// verify that it appears healthy. Many of the subcomponents below start up
// in a goroutine and in case of initial failure, only produce a log line,
// so even in the case of a fundamental failure like the database not being
// available, the client appears to have started even though it's completely
// non-functional. Here we try to make an initial assessment of health and
// return quickly in case of an apparent problem.
_, err := c.driver.GetExecutor().Exec(fetchCtx, "SELECT 1")
if err != nil {
return fmt.Errorf("error making initial connection to database: %w", err)
}
}

for _, producer := range c.producersByQueueName {
producer := producer
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
stopServicesOnError := func() {
startstop.StopAllParallel(c.services)
c.monitor.Stop()
}

if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
// Monitor should be the first subprocess to start, and the last to stop.
// It's not part of the waitgroup because we need to wait for everything else
// to shut down prior to closing the monitor.
//
// Unlike other services, it's given a background context so that it doesn't
// cancel on normal stops.
if err := c.monitor.Start(context.Background()); err != nil { //nolint:contextcheck
return err
}
}

go func() {
<-fetchCtx.Done()
c.signalStopComplete(ctx)
}()
if c.completer != nil {
// The completer is part of the services list below, but although it can
// stop gracefully along with all the other services, it needs to be
// started with a context that's _not_ fetchCtx. This ensures that even
// when fetch is cancelled on shutdown, the completer is still given a
// separate opportunity to start stopping only after the producers have
// finished up and returned.
if err := c.completer.Start(ctx); err != nil {
stopServicesOnError()
return err
}

c.baseService.Logger.InfoContext(workCtx, "River client successfully started", slog.String("client_id", c.ID()))
// Receives job complete notifications from the completer and
// distributes them to any subscriptions.
c.completer.Subscribe(c.distributeJobCompleterCallback)
}

return nil
}
// We use separate contexts for fetching and working to allow for a graceful
// stop. Both inherit from the provided context, so if it's cancelled, a
// more aggressive stop will be initiated.
workCtx, c.workCancel = context.WithCancelCause(withClient[TTx](ctx, c))

// ctx is used only for logging, not for lifecycle.
func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
for _, producer := range c.producersByQueueName {
producer.Stop()
}
for _, service := range c.services {
if err := service.Start(fetchCtx); err != nil {
stopServicesOnError()
return err
}
}

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
// This list of services contains the completer, which should always
// stop after the producers so that any remaining work that was enqueued
// will have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that
// won't complete. We probably need a timeout or way to move on in those
// cases.
c.services,
for _, producer := range c.producersByQueueName {
producer := producer

// Will only be started if this client was leader, but can tolerate a stop
// without having been started.
c.queueMaintainer,
))
if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
stopProducers()
stopServicesOnError()
return err
}
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": All services stopped")
return nil
}(); err != nil {
defer close(stopped)
if errors.Is(context.Cause(fetchCtx), startstop.ErrStop) {
return rivercommon.ErrShutdown
}
return err
}

// As of now, the Adapter doesn't have any async behavior, so we don't need
// to wait for it to stop. Once all executors and completers are done, we
// know that nothing else is happening that's from us.
go func() {
defer close(stopped)

// Remove all subscriptions and close corresponding channels.
func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()
c.baseService.Logger.InfoContext(ctx, "River client started", slog.String("client_id", c.ID()))
defer c.baseService.Logger.InfoContext(ctx, "River client stopped", slog.String("client_id", c.ID()))

for subID, sub := range c.subscriptions {
close(sub.Chan)
delete(c.subscriptions, subID)
}
}()
// The call to Stop cancels this context. Block here until shutdown.
<-fetchCtx.Done()

// On stop, have the producers stop fetching first of all.
stopProducers()

// Stop all mainline services where stop order isn't important.
startstop.StopAllParallel(append(
// This list of services contains the completer, which should always
// stop after the producers so that any remaining work that was enqueued
// will have a chance to have its state completed as it finishes.
//
// TODO: there's a risk here that the completer is stuck on a job that
// won't complete. We probably need a timeout or way to move on in those
// cases.
c.services,

// Will only be started if this client was leader, but can tolerate a
// stop without having been started.
c.queueMaintainer,
))

// Remove all subscriptions and close corresponding channels.
func() {
c.subscriptionsMu.Lock()
defer c.subscriptionsMu.Unlock()

for subID, sub := range c.subscriptions {
close(sub.Chan)
delete(c.subscriptions, subID)
}
}()

// Shut down the monitor last so it can broadcast final status updates:
c.monitor.Stop()
// Shut down the monitor last so it can broadcast final status updates:
c.monitor.Stop()
}()

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop complete")
close(c.stopComplete)
return nil
}

// Stop performs a graceful shutdown of the Client. It signals all producers
Expand All @@ -725,20 +729,17 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
// There's no need to call this method if a hard stop has already been initiated
// by cancelling the context passed to Start or by calling StopAndCancel.
func (c *Client[TTx]) Stop(ctx context.Context) error {
if c.fetchWorkCancel == nil {
return errors.New("client not started")
shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Stop started")
c.fetchWorkCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)
}

func (c *Client[TTx]) awaitStop(ctx context.Context) error {
select {
case <-ctx.Done():
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
return ctx.Err()
case <-c.stopComplete:
case <-stopped:
finalizeStop(true)
return nil
}
}
Expand All @@ -754,18 +755,30 @@ func (c *Client[TTx]) awaitStop(ctx context.Context) error {
// no need to call this method if the context passed to Run is cancelled
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.fetchWorkCancel(rivercommon.ErrShutdown)
c.workCancel(rivercommon.ErrShutdown)
return c.awaitStop(ctx)

select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
return ctx.Err()
case <-stopped:
finalizeStop(true)
return nil
}
}

// Stopped returns a channel that will be closed when the Client has stopped.
// It can be used to wait for a graceful shutdown to complete.
//
// It is not affected by any contexts passed to Stop or StopAndCancel.
func (c *Client[TTx]) Stopped() <-chan struct{} {
return c.stopComplete
return c.stopped
}

// Subscribe subscribes to the provided kinds of events that occur within the
Expand Down
Loading

0 comments on commit 4091846

Please sign in to comment.