Skip to content

Commit

Permalink
Refactor elector somewhat + test suite + poll-only mode
Browse files Browse the repository at this point in the history
Here, do a little refactoring in the elector. The overall code doesn't
change that much, but we try to tighten things up with little things
like improved logging and real exponential backoff. It becomes a
start/stop service so that it's a little more normalized with other code
and more robust on start/stop.

The major improvement is the addition of a test suite. Although there
was a nominal suite before that was added during the driver refactor to
test the `attemptElectOrReelect` function, the elector was only tested
indirectly otherwise through the client. The change comes with a variety
of tests that exercise the elector's various behaviors, including one
that pits multiple competing electors against each other to make sure
that works.

Lastly, the elector gains a "poll only" mode in which we check that it
can still function using polling only in case a listener isn't
available. This doesn't have any effect on River feature-wise yet, but
the idea is that after we've added a similar capability to the producer,
we'll be able to support systems where `LISTEN`/`NOTIFY` aren't
available like PgBouncer in transaction mode, or possibly even MySQL or
SQLite in the future. We send the poll only mode through the same
barrage of tests that we require it to pass when using a database pool.
  • Loading branch information
brandur committed Mar 11, 2024
1 parent d9a7fc3 commit cf45da5
Show file tree
Hide file tree
Showing 4 changed files with 589 additions and 173 deletions.
16 changes: 6 additions & 10 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,11 +461,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
instanceName := "default"

client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus)
var err error
client.elector, err = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, instanceName, client.ID(), 5*time.Second, 10*time.Second, logger)
if err != nil {
return nil, err
}

client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, instanceName, client.ID())

if err := client.provisionProducers(); err != nil {
return nil, err
Expand Down Expand Up @@ -637,11 +634,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error {
return err
}

c.wg.Add(1)
go func() {
c.elector.Run(fetchNewWorkCtx)
c.wg.Done()
}()
if err := c.elector.Start(fetchNewWorkCtx); err != nil {
return err
}
}

c.runProducers(fetchNewWorkCtx, workCtx)
Expand All @@ -664,6 +659,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) {
// complete. We probably need a timeout or way to move on in those cases.
c.completer.Wait()

c.elector.Stop()
c.notifier.Stop()
c.queueMaintainer.Stop()

Expand Down
Loading

0 comments on commit cf45da5

Please sign in to comment.