Skip to content

Commit

Permalink
Poll only mode for environments without listen/notify (#281)
Browse files Browse the repository at this point in the history
Here, add a new River client option in `Config.PollOnly` which causes it
to start in "poll only" mode where a notifier isn't initialized, and no
`LISTEN` statements are issued. This has the downside of events like a
leadership resignation or a new job being available not noticed as
quickly, but the advantage of making River operable on systems where
listen/notify aren't supported, like PgBouncer in transaction mode, or
maybe even eventually MySQL or SQLite.

A slightly unrelated change that I also made here was to give the
elector a more traditional `Config` struct (like other services take)
that encompasses more of its configuration instead of raw constructor
parameters. I thought I needed to do this originally to better expose a
custom elect interval for the client's `PollOnly` test, but it turned
out that my slow test was actually due to a different problem and the
extra configuration wasn't actually necessary. It seemed to me to clean
things up somewhat though, so I left the refactor in.

I also remove the listen retry loop on client start up found in the
elector and producer. These are probably a bad idea because they may
hide a real database problem as they enter their retry loops, and cause
the client to require a very lengthy amount of time for its `Start` to
fail because of the built-in sleep backoffs. But we'd added them over
concerns that the notifier's previous implementation (prior to #253) of
discarding all listen/notify problems may have papered over errors that
would've otherwise been returned for people trying to use a River client
against say a PgBouncer operating in transaction pooling mode.

So instead, we now fail fast if there's a problem with listen/notify in
their database, and for those wanting to use PgBouncer in transaction
pooling mode, we can now recommend using the much cleaner approach of
activating `PollOnly` instead. Users can implement their own retry loop
on client `Start` if they'd like to protect against intermittent
problems on listen/notify start up, but personally I wouldn't expect
this to ever be desirable. (The alternative is to allow the program to
fail fast and have its supervisor resurrect it, like most programs would
do for any other start up hiccup.) The program will already fail fast
under most circumstances due to the `SELECT` it performs on start.
  • Loading branch information
brandur authored Mar 29, 2024
1 parent baeb245 commit 0fd853a
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 161 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- The River client now supports "poll only" mode with `Config.PollOnly` which makes it avoid issuing `LISTEN` statements to wait for new events like a leadership resignation or new job available. The program instead polls periodically to look for changes. A leader resigning or a new job being available will be noticed less quickly, but `PollOnly` potentially makes River operable on systems without listen/notify support, like PgBouncer operating in transaction pooling mode. [PR #281](https://github.com/riverqueue/river/pull/281).

## [0.2.0] - 2024-03-28

### Added
Expand Down
93 changes: 53 additions & 40 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ type Config struct {
// in the client.
PeriodicJobs []*PeriodicJob

// PollOnly starts the client in "poll only" mode, which avoids issuing
// `LISTEN` statements to wait for events like a leadership resignation or
// new job available. The program instead polls periodically to look for
// changes (checking for new jobs on the period in FetchPollInterval).
//
// The downside of this mode of operation is that events will usually be
// noticed less quickly. A new job in the queue may have to wait up to
// FetchPollInterval to be locked for work. When a leader resigns, it will
// be up to five seconds before a new one elects itself.
//
// The upside is that it makes River compatible with systems where
// listen/notify isn't available. For example, PgBouncer in transaction
// pooling mode.
PollOnly bool

// Queues is a list of queue names for this client to operate on along with
// configuration for the queue like the maximum number of workers to run for
// each queue.
Expand Down Expand Up @@ -270,13 +285,12 @@ type Client[TTx any] struct {
baseService baseservice.BaseService
baseStartStop startstop.BaseStartStop

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector

completer jobcompleter.JobCompleter
config *Config
driver riverdriver.Driver[TTx]
elector *leadership.Elector
monitor *clientMonitor
notifier *notifier.Notifier
notifier *notifier.Notifier // may be nil in poll-only mode
periodicJobs *PeriodicJobBundle
producersByQueueName map[string]*producer
queueMaintainer *maintenance.QueueMaintainer
Expand Down Expand Up @@ -408,6 +422,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
JobTimeout: valutil.ValOrDefault(config.JobTimeout, JobTimeoutDefault),
Logger: logger,
PeriodicJobs: config.PeriodicJobs,
PollOnly: config.PollOnly,
Queues: config.Queues,
ReindexerSchedule: config.ReindexerSchedule,
RescueStuckJobsAfter: valutil.ValOrDefault(config.RescueStuckJobsAfter, rescueAfter),
Expand Down Expand Up @@ -450,22 +465,39 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
return nil, errMissingDatabasePoolWithQueues
}

// TODO: for now we only support a single instance per database/schema.
// If we want to provide isolation within a single database/schema,
// we'll need to add a config for this.
instanceName := "default"

client.completer = jobcompleter.NewBatchCompleter(archetype, driver.GetExecutor())
client.services = append(client.services, client.completer)

client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
// In poll only mode, we don't try to initialize a notifier that uses
// listen/notify. Instead, each service polls for changes it's
// interested in. e.g. Elector polls to see if leader has expired.
if !config.PollOnly {
client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
client.services = append(client.services, client.notifier)
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, instanceName, client.ID())
client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, &leadership.Config{
ClientID: config.ID,
})
client.services = append(client.services, client.elector)

if err := client.provisionProducers(); err != nil {
return nil, err
for queue, queueConfig := range config.Queues {
client.producersByQueueName[queue] = newProducer(archetype, driver.GetExecutor(), &producerConfig{
ClientID: config.ID,
Completer: client.completer,
ErrorHandler: config.ErrorHandler,
FetchCooldown: config.FetchCooldown,
FetchPollInterval: config.FetchPollInterval,
JobTimeout: config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: client.notifier,
Queue: queue,
RetryPolicy: config.RetryPolicy,
SchedulerInterval: config.schedulerInterval,
StatusFunc: client.monitor.SetProducerStatus,
Workers: config.Workers,
})
client.monitor.InitializeProducerStatus(queue)
}

client.services = append(client.services,
Expand Down Expand Up @@ -981,12 +1013,15 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar

switch {
case notification.IsLeader:
// Starting the queue maintainer can take a little time so send to
// this test signal _first_ so tests waiting on it can finish,
// cancel the queue maintainer start, and overall run much faster.
c.testSignals.electedLeader.Signal(struct{}{})

if err := c.queueMaintainer.Start(ctx); err != nil {
c.baseService.Logger.ErrorContext(ctx, "Error starting queue maintainer", slog.String("err", err.Error()))
}

c.testSignals.electedLeader.Signal(struct{}{})

default:
c.queueMaintainer.Stop()
}
Expand Down Expand Up @@ -1018,28 +1053,6 @@ func (c *Client[TTx]) handleLeadershipChangeLoop(ctx context.Context, shouldStar
return nil
}

func (c *Client[TTx]) provisionProducers() error {
for queue, queueConfig := range c.config.Queues {
c.producersByQueueName[queue] = newProducer(&c.baseService.Archetype, c.driver.GetExecutor(), &producerConfig{
ClientID: c.config.ID,
Completer: c.completer,
ErrorHandler: c.config.ErrorHandler,
FetchCooldown: c.config.FetchCooldown,
FetchPollInterval: c.config.FetchPollInterval,
JobTimeout: c.config.JobTimeout,
MaxWorkers: queueConfig.MaxWorkers,
Notifier: c.notifier,
Queue: queue,
RetryPolicy: c.config.RetryPolicy,
SchedulerInterval: c.config.schedulerInterval,
StatusFunc: c.monitor.SetProducerStatus,
Workers: c.config.Workers,
})
c.monitor.InitializeProducerStatus(queue)
}
return nil
}

// JobCancel cancels the job with the given ID. If possible, the job is
// cancelled immediately and will not be retried. The provided context is used
// for the underlying Postgres update and can be used to cancel the operation or
Expand Down
75 changes: 59 additions & 16 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,41 @@ func Test_Client(t *testing.T) {
ctx := context.Background()

type testBundle struct {
dbPool *pgxpool.Pool
subscribeChan <-chan *Event
config *Config
dbPool *pgxpool.Pool
}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
// Alternate setup returning only client Config rather than a full Client.
setupConfig := func(t *testing.T) (*Config, *testBundle) {
t.Helper()

dbPool := riverinternaltest.TestDB(ctx, t)
config := newTestConfig(t, nil)
client := newTestClient(t, dbPool, config)

subscribeChan, _ := client.Subscribe(
return config, &testBundle{
config: config,
dbPool: dbPool,
}
}

setup := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

config, bundle := setupConfig(t)
return newTestClient(t, bundle.dbPool, config), bundle
}

subscribe := func(t *testing.T, client *Client[pgx.Tx]) <-chan *Event {
t.Helper()

subscribeChan, cancel := client.Subscribe(
EventKindJobCancelled,
EventKindJobCompleted,
EventKindJobFailed,
EventKindJobSnoozed,
)

return client, &testBundle{
dbPool: dbPool,
subscribeChan: subscribeChan,
}
t.Cleanup(cancel)
return subscribeChan
}

t.Run("StartInsertAndWork", func(t *testing.T) {
Expand Down Expand Up @@ -237,7 +250,7 @@ func Test_Client(t *testing.T) {
t.Run("JobCancelErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
client, _ := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
Expand All @@ -247,12 +260,13 @@ func Test_Client(t *testing.T) {
return JobCancel(errors.New("a persisted internal error"))
}))

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertedJob, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

event := riverinternaltest.WaitOrTimeout(t, bundle.subscribeChan)
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCancelled, event.Kind)
require.Equal(t, JobStateCancelled, event.Job.State)
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)
Expand All @@ -266,7 +280,7 @@ func Test_Client(t *testing.T) {
t.Run("JobSnoozeErrorReturned", func(t *testing.T) {
t.Parallel()

client, bundle := setup(t)
client, _ := setup(t)

type JobArgs struct {
JobArgsReflectKind[JobArgs]
Expand All @@ -276,12 +290,13 @@ func Test_Client(t *testing.T) {
return JobSnooze(15 * time.Minute)
}))

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

insertedJob, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

event := riverinternaltest.WaitOrTimeout(t, bundle.subscribeChan)
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobSnoozed, event.Kind)
require.Equal(t, JobStateScheduled, event.Job.State)
require.WithinDuration(t, time.Now().Add(15*time.Minute), event.Job.ScheduledAt, 2*time.Second)
Expand Down Expand Up @@ -312,6 +327,7 @@ func Test_Client(t *testing.T) {
}))

statusUpdateCh := client.monitor.RegisterUpdates()
subscribeChan := subscribe(t, client)
startClient(ctx, t, client)
waitForClientHealthy(ctx, t, statusUpdateCh)

Expand All @@ -329,7 +345,7 @@ func Test_Client(t *testing.T) {
// modify that column for a running job:
require.Equal(t, rivertype.JobStateRunning, updatedJob.State)

event := riverinternaltest.WaitOrTimeout(t, bundle.subscribeChan)
event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCancelled, event.Kind)
require.Equal(t, JobStateCancelled, event.Job.State)
require.WithinDuration(t, time.Now(), *event.Job.FinalizedAt, 2*time.Second)
Expand Down Expand Up @@ -429,7 +445,7 @@ func Test_Client(t *testing.T) {
require.NoError(t, err)
t.Cleanup(dbPool.Close)

client, err := NewClient(riverpgxv5.New(dbPool), newTestConfig(t, nil))
client, err := NewClient(riverpgxv5.New(dbPool), bundle.config)
require.NoError(t, err)

// We don't actually verify that River's functional on another schema so
Expand All @@ -445,6 +461,33 @@ func Test_Client(t *testing.T) {
require.Equal(t, `relation "river_job" does not exist`, pgErr.Message)
})

t.Run("PollOnly", func(t *testing.T) {
t.Parallel()

config, bundle := setupConfig(t)
bundle.config.PollOnly = true

client := newTestClient(t, bundle.dbPool, config)

// Notifier should not have been initialized at all.
require.Nil(t, client.notifier)

job, err := client.Insert(ctx, &noOpArgs{}, nil)
require.NoError(t, err)

subscribeChan := subscribe(t, client)
startClient(ctx, t, client)

// Despite no notifier, the client should still be able to elect itself
// leader.
client.testSignals.electedLeader.WaitOrTimeout()

event := riverinternaltest.WaitOrTimeout(t, subscribeChan)
require.Equal(t, EventKindJobCompleted, event.Kind)
require.Equal(t, job.ID, event.Job.ID)
require.Equal(t, JobStateCompleted, event.Job.State)
})

t.Run("StartStopStress", func(t *testing.T) {
t.Parallel()

Expand Down
Loading

0 comments on commit 0fd853a

Please sign in to comment.