Skip to content

Commit

Permalink
Make producer start/stop service + poll-only mode + expanded tests
Browse files Browse the repository at this point in the history
Follow up changes like #253, #262, and #263 to make the producer a
start/stop service, giving it a more predictable way to invoke start and
stop, making it safer to run and cleaning up caller code where it's used
in the client and test cases. With all these changes taken together
we'll have every service in the client using the same unified service
interface, which will clean up code and let us write some neat utilities
to operate across all of them.

Aside from that, we clean up the producer in ways to bring it more
inline with other code, like making logging uniform and having the
constructor return only a `*producer` instead of `(*producer, error)`
that needs to be checked despite an error always being indicative of a
bug in this context.

We expand the test suite, adding tests like (1) verifying that workers
are really stopped when `workCtx` is cancelled, (2) verifying that the
max worker slots work as expected and that the producer limits its
fetches, and (3) start/stop stress.

Like with #263, we give the producer a poll only mode, which also gets
the full test barrage using a shared test transaction instead of full
database pool. Also like #263, this poll only mode is still prospective
and not yet put to full use (although it will be soon).
  • Loading branch information
brandur committed Mar 12, 2024
1 parent 3e7d87d commit 387d229
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 239 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:

- name: Test
working-directory: .
run: go test -p 1 -race ./...
run: go test -p 1 -race ./... -timeout 2m

- name: Test cmd/river
working-directory: ./cmd/river
Expand Down
49 changes: 22 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ type Client[TTx any] struct {
statsNumJobs int
testSignals clientTestSignals
uniqueInserter *dbunique.UniqueInserter
wg sync.WaitGroup

// workCancel cancels the context used for all work goroutines. Normal Stop
// does not cancel that context.
Expand Down Expand Up @@ -586,7 +585,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
// We use separate contexts for fetching and working to allow for a graceful
// stop. However, both inherit from the provided context so if it is
// cancelled a more aggressive stop will be initiated.
fetchNewWorkCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
fetchCtx, fetchNewWorkCancel := context.WithCancelCause(ctx)
c.fetchNewWorkCancel = fetchNewWorkCancel
workCtx, workCancel := context.WithCancelCause(withClient[TTx](ctx, c))
c.workCancel = workCancel
Expand Down Expand Up @@ -618,7 +617,7 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
c.completer.Subscribe(c.distributeJobCompleterCallback)

for _, service := range c.services {
if err := service.Start(fetchNewWorkCtx); err != nil {
if err := service.Start(fetchCtx); err != nil {
// In case of error, stop any services that might have started. This
// is safe because even services that were never started will still
// tolerate being stopped.
Expand All @@ -633,8 +632,18 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
}
}

c.runProducers(fetchNewWorkCtx, workCtx)
go c.signalStopComplete(workCtx)
for _, producer := range c.producersByQueueName {
producer := producer

if err := producer.StartWorkContext(fetchCtx, workCtx); err != nil {
return err
}
}

go func() {
<-fetchCtx.Done()
c.signalStopComplete(ctx)
}()

c.baseService.Logger.InfoContext(workCtx, "River client successfully started", slog.String("client_id", c.ID()))

Expand All @@ -643,8 +652,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {

// ctx is used only for logging, not for lifecycle.
func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
// Wait for producers to exit:
c.wg.Wait()
for _, producer := range c.producersByQueueName {
producer.Stop()
}

// Stop all mainline services where stop order isn't important. Contains the
// elector and notifier, amongst others.
Expand Down Expand Up @@ -950,41 +960,26 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar

func (c *Client[TTx]) provisionProducers() error {
for queue, queueConfig := range c.config.Queues {
config := &producerConfig{
c.producersByQueueName[queue] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkerCount: uint16(queueConfig.MaxWorkers),
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queue,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
}
producer, err := newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), c.completer, config)
if err != nil {
return err
}
c.producersByQueueName[queue] = producer
})
c.monitor.InitializeProducerStatus(queue)
}
return nil
}

func (c *Client[TTx]) runProducers(fetchNewWorkCtx, workCtx context.Context) {
c.wg.Add(len(c.producersByQueueName))
for _, producer := range c.producersByQueueName {
producer := producer

go func() {
defer c.wg.Done()
producer.Run(fetchNewWorkCtx, workCtx, c.monitor.SetProducerStatus)
}()
}
}

// JobCancel cancels the job with the given ID. If possible, the job is
// cancelled immediately and will not be retried. The provided context is used
// for the underlying Postgres update and can be used to cancel the operation or
Expand Down
10 changes: 8 additions & 2 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -149,10 +150,15 @@ func (e *Elector) Start(ctx context.Context) error {

e.Logger.InfoContext(ctx, e.Name+": Listening for leadership changes", "client_id", e.clientID, "topic", notifier.NotificationTopicLeadership)

// TODO(brandur): Get rid of this retry loop after refactor.
var err error
sub, err = notifier.ListenRetryLoop(ctx, &e.BaseService, e.notifier, notifier.NotificationTopicLeadership, handleNotification)
if err != nil { //nolint:staticcheck
// TODO(brandur): Propagate this after refactor.
if err != nil {
close(stopped)
if strings.HasSuffix(err.Error(), "conn closed") || errors.Is(err, context.Canceled) {
return nil
}
return err
}
}

Expand Down
7 changes: 4 additions & 3 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -560,12 +561,12 @@ func ListenRetryLoop(ctx context.Context, baseService *baseservice.BaseService,
return nil, err
}

if attempt >= maxListenAttempts {
baseService.Logger.Error(baseService.Name+": Error listening for on topic; giving up", "attempt", attempt, "err", err, "topic", topic)
if attempt >= maxListenAttempts || strings.HasSuffix(err.Error(), "conn closed") {
baseService.Logger.ErrorContext(ctx, baseService.Name+": Error listening for on topic; giving up", "attempt", attempt, "err", err, "topic", topic)
return nil, err
}

baseService.Logger.Error(baseService.Name+": Error listening for on topic; will retry after backoff", "attempt", attempt, "err", err, "topic", topic)
baseService.Logger.ErrorContext(ctx, baseService.Name+": Error listening for on topic; will retry after backoff", "attempt", attempt, "err", err, "topic", topic)

baseService.CancellableSleepExponentialBackoff(ctx, attempt-1, baseservice.MaxAttemptsBeforeResetDefault)
continue
Expand Down
Loading

0 comments on commit 387d229

Please sign in to comment.