diff --git a/client.go b/client.go index fa507b76..d39f7c0b 100644 --- a/client.go +++ b/client.go @@ -461,11 +461,8 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client instanceName := "default" client.notifier = notifier.New(archetype, driver.GetListener(), client.monitor.SetNotifierStatus) - var err error - client.elector, err = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, instanceName, client.ID(), 5*time.Second, 10*time.Second, logger) - if err != nil { - return nil, err - } + + client.elector = leadership.NewElector(archetype, driver.GetExecutor(), client.notifier, instanceName, client.ID()) if err := client.provisionProducers(); err != nil { return nil, err @@ -637,11 +634,9 @@ func (c *Client[TTx]) Start(ctx context.Context) error { return err } - c.wg.Add(1) - go func() { - c.elector.Run(fetchNewWorkCtx) - c.wg.Done() - }() + if err := c.elector.Start(fetchNewWorkCtx); err != nil { + return err + } } c.runProducers(fetchNewWorkCtx, workCtx) @@ -664,6 +659,7 @@ func (c *Client[TTx]) signalStopComplete(ctx context.Context) { // complete. We probably need a timeout or way to move on in those cases. c.completer.Wait() + c.elector.Stop() c.notifier.Stop() c.queueMaintainer.Stop() diff --git a/internal/leadership/elector.go b/internal/leadership/elector.go index 75adf669..e50cc435 100644 --- a/internal/leadership/elector.go +++ b/internal/leadership/elector.go @@ -4,16 +4,23 @@ import ( "context" "encoding/json" "errors" - "log/slog" "sync" "time" "github.com/riverqueue/river/internal/baseservice" + "github.com/riverqueue/river/internal/maintenance/startstop" "github.com/riverqueue/river/internal/notifier" + "github.com/riverqueue/river/internal/rivercommon" "github.com/riverqueue/river/internal/util/dbutil" "github.com/riverqueue/river/riverdriver" ) +const ( + electInterval = 5 * time.Second + electInteralJitter = 1 * time.Second + electIntervalTTLPadding = 10 * time.Second +) + type pgNotification struct { Name string `json:"name"` LeaderID string `json:"leader_id"` @@ -43,16 +50,36 @@ func (s *Subscription) Unlisten() { }) } +// Test-only properties. +type electorTestSignals struct { + DeniedLeadership rivercommon.TestSignal[struct{}] // notifies when elector fails to gain leadership + GainedLeadership rivercommon.TestSignal[struct{}] // notifies when elector gains leadership + LostLeadership rivercommon.TestSignal[struct{}] // notifies when an elected leader loses leadership + MaintainedLeadership rivercommon.TestSignal[struct{}] // notifies when elector maintains leadership + ResignedLeadership rivercommon.TestSignal[struct{}] // notifies when elector resigns leadership +} + +func (ts *electorTestSignals) Init() { + ts.DeniedLeadership.Init() + ts.GainedLeadership.Init() + ts.LostLeadership.Init() + ts.MaintainedLeadership.Init() + ts.ResignedLeadership.Init() +} + type Elector struct { baseservice.BaseService - - exec riverdriver.Executor - id string - interval time.Duration - logger *slog.Logger - name string - notifier *notifier.Notifier - ttl time.Duration + startstop.BaseStartStop + + clientID string + electInterval time.Duration // period on which each elector attempts elect even without having received a resignation notification + electIntervalJitter time.Duration + exec riverdriver.Executor + instanceName string + leadershipNotificationChan chan struct{} + notifier *notifier.Notifier + testSignals electorTestSignals + ttl time.Duration mu sync.Mutex isLeader bool @@ -62,206 +89,294 @@ type Elector struct { // NewElector returns an Elector using the given adapter. The name should correspond // to the name of the database + schema combo and should be shared across all Clients // running with that combination. The id should be unique to the Client. -func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, notifier *notifier.Notifier, name, id string, interval, ttlPadding time.Duration, logger *slog.Logger) (*Elector, error) { - // TODO: validate name + id length/format, interval, etc +func NewElector(archetype *baseservice.Archetype, exec riverdriver.Executor, notifier *notifier.Notifier, instanceName, clientID string) *Elector { return baseservice.Init(archetype, &Elector{ - exec: exec, - id: id, - interval: interval, - name: name, - notifier: notifier, - logger: logger.WithGroup("elector"), + exec: exec, + clientID: clientID, + electInterval: electInterval, + electIntervalJitter: electInteralJitter, + instanceName: instanceName, + notifier: notifier, // TTL is at least the relect run interval used by clients to try and // gain leadership or reelect themselves as leader, plus a little // padding to account to give the leader a little breathing room in its // reelection loop. - ttl: interval + ttlPadding, - }), nil + ttl: electInterval + electIntervalTTLPadding, + }) } -func (e *Elector) Run(ctx context.Context) { - // Before the elector returns, run a delete with NOTIFY to give up any - // leadership that we have. If we do that here, we guarantee that any locks we - // have will be released (even if they were acquired in gainLeadership but we - // didn't wait for the response) - // - // This doesn't use ctx because it runs *after* the ctx is done. - defer e.giveUpLeadership() //nolint:contextcheck +func (e *Elector) Start(ctx context.Context) error { + ctx, shouldStart, stopped := e.StartInit(ctx) + if !shouldStart { + return nil + } // We'll send to this channel anytime a leader resigns on the key with `name` - leadershipNotificationChan := make(chan struct{}) - - handleNotification := func(topic notifier.NotificationTopic, payload string) { - if topic != notifier.NotificationTopicLeadership { - // This should not happen unless the notifier is broken. - e.logger.Error("received unexpected notification", "topic", topic, "payload", payload) - return - } - notification := pgNotification{} - if err := json.Unmarshal([]byte(payload), ¬ification); err != nil { - e.logger.Error("unable to unmarshal leadership notification", "err", err) - return - } + e.leadershipNotificationChan = make(chan struct{}) + + var sub *notifier.Subscription + if e.notifier == nil { + e.Logger.Info(e.Name+": No notifier configured; starting in poll mode", "client_id", e.clientID) + } else { + handleNotification := func(topic notifier.NotificationTopic, payload string) { + if topic != notifier.NotificationTopicLeadership { + // This should not happen unless the notifier is broken. + e.Logger.Error(e.Name+": Received unexpected notification", "client_id", e.clientID, "topic", topic, "payload", payload) + return + } - if notification.Action != "resigned" || notification.Name != e.name { - // We only care about resignations on because we use them to preempt the - // election attempt backoff. And we only care about our own key name. - return - } + notification := pgNotification{} + if err := json.Unmarshal([]byte(payload), ¬ification); err != nil { + e.Logger.Error(e.Name+": Unable to unmarshal leadership notification", "client_id", e.clientID, "err", err) + return + } - select { - case <-ctx.Done(): - return - case leadershipNotificationChan <- struct{}{}: - } - } + e.Logger.Info(e.Name+": Received notification from notifier", "action", notification.Action, "client_id", e.clientID) - sub, err := notifier.ListenRetryLoop(ctx, &e.BaseService, e.notifier, notifier.NotificationTopicLeadership, handleNotification) - if err != nil { //nolint:staticcheck - // TODO(brandur): Propagate this after refactor. - } - if sub != nil { - defer sub.Unlisten(ctx) - } + if notification.Action != "resigned" || notification.Name != e.instanceName { + // We only care about resignations on because we use them to preempt the + // election attempt backoff. And we only care about our own key name. + return + } - for { - if success := e.gainLeadership(ctx, leadershipNotificationChan); !success { select { case <-ctx.Done(): return - default: - // TODO: proper backoff - e.logger.Error("gainLeadership returned unexpectedly, waiting to try again") - time.Sleep(time.Second) - continue + case e.leadershipNotificationChan <- struct{}{}: } } - // notify all subscribers that we're the leader - e.notifySubscribers(true) + e.Logger.Info(e.Name+": Listening for leadership changes", "client_id", e.clientID, "topic", notifier.NotificationTopicLeadership) - err := e.keepLeadership(ctx, leadershipNotificationChan) - e.notifySubscribers(false) - if err != nil { - select { - case <-ctx.Done(): + var err error + sub, err = notifier.ListenRetryLoop(ctx, &e.BaseService, e.notifier, notifier.NotificationTopicLeadership, handleNotification) + if err != nil { //nolint:staticcheck + // TODO(brandur): Propagate this after refactor. + } + } + + go func() { + // This defer should come first so that it's last out, thereby avoiding + // races. + defer close(stopped) + + e.Logger.InfoContext(ctx, e.Name+": Run loop started") + defer e.Logger.InfoContext(ctx, e.Name+": Run loop stopped") + + if sub != nil { + defer sub.Unlisten(ctx) + } + + for { + if err := e.attemptGainLeadershipLoop(ctx); err != nil { + // Function above only returns an error if context was cancelled + // or overall context is done. + if !errors.Is(err, context.Canceled) && ctx.Err() == nil { + panic(err) + } return - default: - // TODO: backoff - e.logger.Error("error keeping leadership", "err", err) - continue + } + + e.Logger.Info(e.Name+": Gained leadership", "client_id", e.clientID) + e.testSignals.GainedLeadership.Signal(struct{}{}) + + err := e.keepLeadershipLoop(ctx) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + + if !errors.Is(err, errLostLeadership) { + e.Logger.Error(e.Name+": Error keeping leadership", "client_id", e.clientID, "err", err) + } } } - } + }() + + return nil } -func (e *Elector) gainLeadership(ctx context.Context, leadershipNotificationChan <-chan struct{}) bool { +func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error { + var numErrors int + for { - success, err := e.attemptElect(ctx) - if err != nil && !errors.Is(err, context.Canceled) { - e.logger.Error("error attempting to elect", "err", err) + e.Logger.Info(e.Name+": Attempting to gain leadership", "client_id", e.clientID) + + elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ + LeaderID: e.clientID, + Name: e.instanceName, + TTL: e.ttl, + }) + if err != nil { + if errors.Is(err, context.Canceled) || ctx.Err() != nil { + return err + } + + numErrors++ + e.Logger.Error(e.Name+": Error attempting to elect", "client_id", e.clientID, "err", err, "num_errors", numErrors) + e.CancellableSleepExponentialBackoff(ctx, numErrors-1, baseservice.MaxAttemptsBeforeResetDefault) + continue } - if success { - return true + if elected { + return nil } + numErrors = 0 + + e.Logger.Info(e.Name+": Leadership bid was unsuccessful (not an error)", "client_id", e.clientID) + e.testSignals.DeniedLeadership.Signal(struct{}{}) + select { - case <-ctx.Done(): - return false - case <-time.After(e.interval): - // TODO: This could potentially leak memory / timers if we're seeing a ton - // of resignations. May want to make this reusable & cancel it when retrying? - case <-leadershipNotificationChan: - // Somebody just resigned, try to win the next election immediately. + case <-e.CancellableSleepRandomBetweenC(ctx, e.electInterval, e.electInterval+e.electIntervalJitter): + if ctx.Err() != nil { // context done + return ctx.Err() + } + + case <-e.leadershipNotificationChan: + // Somebody just resigned, try to win the next election after a very + // short random interval (to prevent all clients from bidding at once). + e.CancellableSleepRandomBetween(ctx, 0, 50*time.Millisecond) } } } -func (e *Elector) attemptElect(ctx context.Context) (bool, error) { - elected, err := attemptElectOrReelect(ctx, e.exec, false, &riverdriver.LeaderElectParams{ - LeaderID: e.id, - Name: e.name, - TTL: e.ttl, - }) - if err != nil { - return false, err - } +var errLostLeadership = errors.New("lost leadership with no error") - select { - case <-ctx.Done(): - // Whether or not we won an election here, it will be given up momentarily - // when the parent loop exits. - return elected, ctx.Err() - default: - } +func (e *Elector) keepLeadershipLoop(ctx context.Context) error { + // notify all subscribers that we're the leader + e.notifySubscribers(true) - return elected, nil -} + // Defer is LIFO. This will run after the resign below. + defer e.notifySubscribers(false) + + var lostLeadership bool + + // Before the elector returns, run a delete with NOTIFY to give up any + // leadership that we have. If we do that here, we guarantee that any locks + // we have will be released (even if they were acquired in + // attemptGainLeadership but we didn't wait for the response) + // + // This doesn't use ctx because it runs *after* the ctx is done. + defer func() { + if !lostLeadership { + e.attemptResignLoop() + } + }() + + const maxNumErrors = 5 + + var ( + numErrors = 0 + timer = time.NewTimer(0) // reset immediately below + ) + <-timer.C -func (e *Elector) keepLeadership(ctx context.Context, leadershipNotificationChan <-chan struct{}) error { - reelectionErrCount := 0 for { + timer.Reset(e.electInterval) + select { case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() - case <-leadershipNotificationChan: - // We don't care about notifications when we know we're the leader, do we? - case <-time.After(e.interval): - // TODO: this leaks timers if we're receiving notifications - reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ - LeaderID: e.id, - Name: e.name, - TTL: e.ttl, - }) - if err != nil { - if errors.Is(err, context.Canceled) { - return err - } - reelectionErrCount += 1 - if reelectionErrCount > 5 { - return err - } - e.logger.Error("error attempting reelection", "err", err) - continue + + case <-timer.C: + // Reelect timer expired; attempt releection below. + + case <-e.leadershipNotificationChan: + // Used only in tests for force an immediately reelect attempt. + + if !timer.Stop() { + <-timer.C } - if !reelected { - return errors.New("lost leadership with no error") + } + + e.Logger.Info(e.Name+": Current leader attempting reelect", "client_id", e.clientID) + + reelected, err := attemptElectOrReelect(ctx, e.exec, true, &riverdriver.LeaderElectParams{ + LeaderID: e.clientID, + Name: e.instanceName, + TTL: e.ttl, + }) + if err != nil { + if errors.Is(err, context.Canceled) { + return err + } + + numErrors++ + if numErrors >= maxNumErrors { + return err } - reelectionErrCount = 0 + + e.Logger.Error(e.Name+": Error attempting reelection", "client_id", e.clientID, "err", err) + e.CancellableSleepExponentialBackoff(ctx, numErrors-1, baseservice.MaxAttemptsBeforeResetDefault) + continue } + if !reelected { + lostLeadership = true + e.testSignals.LostLeadership.Signal(struct{}{}) + return errLostLeadership + } + + numErrors = 0 + e.testSignals.MaintainedLeadership.Signal(struct{}{}) } } // try up to 10 times to give up any currently held leadership. -func (e *Elector) giveUpLeadership() { - for i := 0; i < 10; i++ { - if err := e.attemptResign(i); err != nil { - e.logger.Error("error attempting to resign", "err", err) - // TODO: exponential backoff? wait longer than ~1s total? - time.Sleep(100 * time.Millisecond) +func (e *Elector) attemptResignLoop() { + e.Logger.Info(e.Name+": Attempting to resign leadership", "client_id", e.clientID) + + // Make a good faith attempt to resign, even in the presence of errors, but + // don't keep hammering if it doesn't work. In case a resignation failure, + // leader TTLs will act as an additional hedge to ensure a new leader can + // still be elected. + const maxNumErrors = 3 + + // This does not inherit the parent context because we want to give up leadership + // even during a shutdown. There is no way to short-circuit this. + ctx := context.Background() + + for attempt := 1; attempt <= maxNumErrors; attempt++ { + if err := e.attemptResign(ctx, attempt); err != nil { + e.Logger.Error(e.Name+": Error attempting to resign", "attempt", attempt, "client_id", e.clientID, "err", err) + + e.CancellableSleepExponentialBackoff(ctx, attempt-1, baseservice.MaxAttemptsBeforeResetDefault) + continue } + return } } // attemptResign attempts to resign any currently held leaderships for the // elector's name and leader ID. -func (e *Elector) attemptResign(attempt int) error { +func (e *Elector) attemptResign(ctx context.Context, attempt int) error { // Wait one second longer each time we try to resign: - timeout := time.Duration(attempt+1) * time.Second - // This does not inherit the parent context because we want to give up leadership - // even during a shutdown. There is no way to short-circuit this. - ctx, cancel := context.WithTimeout(context.Background(), timeout) + timeout := time.Duration(attempt) * time.Second + + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() - _, err := e.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ - LeaderID: e.id, + resigned, err := e.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + LeaderID: e.clientID, LeadershipTopic: string(notifier.NotificationTopicLeadership), - Name: e.name, + Name: e.instanceName, }) - return err + if err != nil { + return err + } + + if resigned { + e.Logger.Info(e.Name+": Resigned leadership successfully", "client_id", e.clientID) + e.testSignals.ResignedLeadership.Signal(struct{}{}) + } + + return nil } func (e *Elector) Listen() *Subscription { diff --git a/internal/leadership/elector_test.go b/internal/leadership/elector_test.go index e6e8486b..1fd71dca 100644 --- a/internal/leadership/elector_test.go +++ b/internal/leadership/elector_test.go @@ -2,18 +2,320 @@ package leadership import ( "context" + "log/slog" "testing" "time" + "github.com/jackc/pgx/v5" "github.com/stretchr/testify/require" + "github.com/riverqueue/river/internal/baseservice" + "github.com/riverqueue/river/internal/componentstatus" + "github.com/riverqueue/river/internal/notifier" "github.com/riverqueue/river/internal/riverinternaltest" + "github.com/riverqueue/river/internal/riverinternaltest/sharedtx" + "github.com/riverqueue/river/internal/riverinternaltest/startstoptest" "github.com/riverqueue/river/internal/riverinternaltest/testfactory" "github.com/riverqueue/river/internal/util/ptrutil" "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/riverdriver/riverpgxv5" + "github.com/riverqueue/river/rivertype" ) +const defaultInstanceName = "default" + +// This system of "elector bundles" may appear to be a little convoluted, but +// it's built so that we can initialize multiple electors against a single +// database or transaction. +func testElector[TElectorBundle any]( + ctx context.Context, + t *testing.T, + makeElectorBundle func(t *testing.T) TElectorBundle, + makeElector func(t *testing.T, bundle TElectorBundle) *Elector, +) { + t.Helper() + + type testBundle struct { + electorBundle TElectorBundle + exec riverdriver.Executor + } + + setup := func(t *testing.T) (*Elector, *testBundle) { + t.Helper() + + electorBundle := makeElectorBundle(t) + + elector := makeElector(t, electorBundle) + elector.testSignals.Init() + + return elector, &testBundle{ + electorBundle: electorBundle, + exec: elector.exec, + } + } + + startElector := func(ctx context.Context, t *testing.T, elector *Elector) { + t.Helper() + t.Logf("Starting " + elector.clientID) + require.NoError(t, elector.Start(ctx)) + t.Cleanup(elector.Stop) + } + + t.Run("StartsGainsLeadershipAndStops", func(t *testing.T) { + t.Parallel() + + elector, bundle := setup(t) + + startElector(ctx, t, elector) + + elector.testSignals.GainedLeadership.WaitOrTimeout() + + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName) + require.NoError(t, err) + require.Equal(t, elector.clientID, leader.LeaderID) + + elector.Stop() + + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + _, err = bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("NotifiesSubscribers", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + sub := elector.Listen() + t.Cleanup(func() { elector.unlisten(sub) }) + + // Drain an initial notification that occurs on Listen. + notification := riverinternaltest.WaitOrTimeout(t, sub.ch) + require.False(t, notification.IsLeader) + + startElector(ctx, t, elector) + + elector.testSignals.GainedLeadership.WaitOrTimeout() + + notification = riverinternaltest.WaitOrTimeout(t, sub.ch) + require.True(t, notification.IsLeader) + + elector.Stop() + + elector.testSignals.ResignedLeadership.WaitOrTimeout() + + notification = riverinternaltest.WaitOrTimeout(t, sub.ch) + require.False(t, notification.IsLeader) + }) + + t.Run("SustainsLeadership", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + + startElector(ctx, t, elector) + + elector.testSignals.GainedLeadership.WaitOrTimeout() + + // The leadership maintenance loop also listens on the leadership + // notification channel. Take advantage of that to cause an + // immediate reelect attempt with no sleep. + elector.leadershipNotificationChan <- struct{}{} + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + + elector.leadershipNotificationChan <- struct{}{} + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + + elector.leadershipNotificationChan <- struct{}{} + elector.testSignals.MaintainedLeadership.WaitOrTimeout() + + elector.Stop() + + elector.testSignals.ResignedLeadership.WaitOrTimeout() + }) + + t.Run("LosesLeadership", func(t *testing.T) { + t.Parallel() + + elector, bundle := setup(t) + + startElector(ctx, t, elector) + + elector.testSignals.GainedLeadership.WaitOrTimeout() + + t.Logf("Force resigning " + elector.clientID) + + // Artificially force resign the elector and add a new leader record + // so that it can't be elected again. + _, err := bundle.exec.LeaderResign(ctx, &riverdriver.LeaderResignParams{ + LeaderID: elector.clientID, + LeadershipTopic: string(notifier.NotificationTopicLeadership), + Name: defaultInstanceName, + }) + require.NoError(t, err) + + _ = testfactory.Leader(ctx, t, bundle.exec, &testfactory.LeaderOpts{ + LeaderID: ptrutil.Ptr("other-client-id"), + }) + + elector.leadershipNotificationChan <- struct{}{} + elector.testSignals.LostLeadership.WaitOrTimeout() + + // Wait for the elector to try and fail to gain leadership so we + // don't finish the test while it's still operating. + elector.testSignals.DeniedLeadership.WaitOrTimeout() + + elector.Stop() + }) + + t.Run("CompetingElectors", func(t *testing.T) { + t.Parallel() + + elector1, bundle := setup(t) + elector1.clientID = "elector1" + + { + startElector(ctx, t, elector1) + + // next to avoid any raciness. + t.Logf("Waiting for %s to gain leadership", elector1.clientID) + elector1.testSignals.GainedLeadership.WaitOrTimeout() + + leader, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName) + require.NoError(t, err) + require.Equal(t, elector1.clientID, leader.LeaderID) + } + + // Make another elector and make sure it's using the same executor. + elector2 := makeElector(t, bundle.electorBundle) + elector2.clientID = "elector2" + elector2.exec = elector1.exec + elector2.testSignals.Init() + + { + startElector(ctx, t, elector2) + + elector2.testSignals.DeniedLeadership.WaitOrTimeout() + + t.Logf("Stopping " + elector1.clientID) + elector1.Stop() + elector1.testSignals.ResignedLeadership.WaitOrTimeout() + + // Cheat if we're in poll only by notifying leadership channel to + // wake the elector from sleep. + if elector2.notifier == nil { + elector2.leadershipNotificationChan <- struct{}{} + } + + t.Logf("Waiting for %s to gain leadership", elector2.clientID) + elector2.testSignals.GainedLeadership.WaitOrTimeout() + + t.Logf("Stopping " + elector2.clientID) + elector2.Stop() + elector2.testSignals.ResignedLeadership.WaitOrTimeout() + } + + _, err := bundle.exec.LeaderGetElectedLeader(ctx, defaultInstanceName) + require.ErrorIs(t, err, rivertype.ErrNotFound) + }) + + t.Run("StartStopStress", func(t *testing.T) { + t.Parallel() + + elector, _ := setup(t) + elector.Logger = riverinternaltest.LoggerWarn(t) // loop started/stop log is very noisy; suppress + elector.testSignals = electorTestSignals{} // deinit so channels don't fill + + startstoptest.Stress(ctx, t, elector) + }) +} + +func TestElector_PollOnly(t *testing.T) { + t.Parallel() + + var ( + ctx = context.Background() + driver = riverpgxv5.New(nil) + ) + + type electorBundle struct { + tx pgx.Tx + } + + testElector(ctx, t, + func(t *testing.T) *electorBundle { + t.Helper() + + tx := riverinternaltest.TestTx(ctx, t) + + // We'll put multiple electors on one transaction. Make sure they can + // live with each other in relative harmony. + tx = sharedtx.NewSharedTx(tx) + + return &electorBundle{ + tx: tx, + } + }, + func(t *testing.T, electorBundle *electorBundle) *Elector { + t.Helper() + + return NewElector( + riverinternaltest.BaseServiceArchetype(t), + driver.UnwrapExecutor(electorBundle.tx), + nil, + defaultInstanceName, + "test-client-id", + ) + }) +} + +func TestElector_WithNotifier(t *testing.T) { + t.Parallel() + + ctx := context.Background() + + type electorBundle struct { + archetype *baseservice.Archetype + exec riverdriver.Executor + notifier *notifier.Notifier + } + + testElector(ctx, t, + func(t *testing.T) *electorBundle { + t.Helper() + + var ( + archetype = riverinternaltest.BaseServiceArchetype(t) + dbPool = riverinternaltest.TestDB(ctx, t) + driver = riverpgxv5.New(dbPool) + ) + + notifier := notifier.New(archetype, driver.GetListener(), func(s componentstatus.Status) {}) + { + require.NoError(t, notifier.Start(ctx)) + t.Cleanup(notifier.Stop) + } + + return &electorBundle{ + archetype: archetype, + exec: driver.GetExecutor(), + notifier: notifier, + } + }, + func(t *testing.T, electorBundle *electorBundle) *Elector { + t.Helper() + + return NewElector( + electorBundle.archetype, + electorBundle.exec, + electorBundle.notifier, + defaultInstanceName, + "test-client-id", + ) + }) +} + func TestAttemptElectOrReelect(t *testing.T) { t.Parallel() @@ -26,7 +328,8 @@ func TestAttemptElectOrReelect(t *testing.T) { ctx := context.Background() type testBundle struct { - exec riverdriver.Executor + exec riverdriver.Executor + logger *slog.Logger } setup := func(t *testing.T) *testBundle { @@ -35,7 +338,8 @@ func TestAttemptElectOrReelect(t *testing.T) { driver := riverpgxv5.New(nil) return &testBundle{ - exec: driver.UnwrapExecutor(riverinternaltest.TestTx(ctx, t)), + exec: driver.UnwrapExecutor(riverinternaltest.TestTx(ctx, t)), + logger: riverinternaltest.Logger(t), } } diff --git a/internal/notifier/notifier.go b/internal/notifier/notifier.go index d1cf9a54..d95be9fe 100644 --- a/internal/notifier/notifier.go +++ b/internal/notifier/notifier.go @@ -105,8 +105,8 @@ func (n *Notifier) Start(ctx context.Context) error { go func() { defer close(stopped) - n.Logger.InfoContext(ctx, n.Name+": Notifier started") - defer n.Logger.InfoContext(ctx, n.Name+": Notifier stopped") + n.Logger.InfoContext(ctx, n.Name+": Run loop started") + defer n.Logger.InfoContext(ctx, n.Name+": Run loop stopped") n.withLock(func() { n.isStarted = true }) defer n.withLock(func() { n.isStarted = false }) @@ -442,6 +442,8 @@ func (n *Notifier) Listen(ctx context.Context, topic NotificationTopic, notifyFu } n.subscriptions[topic] = append(existingSubs, sub) + n.Logger.InfoContext(ctx, n.Name+": Added subscription", "new_num_subscriptions", len(n.subscriptions[topic]), "topic", topic) + // We add the new subscription to the subscription list optimistically, and // it needs to be done this way in case of a restart after an interrupt // below has been run, but after a return to this function (say we were to @@ -450,15 +452,7 @@ func (n *Notifier) Listen(ctx context.Context, topic NotificationTopic, notifyFu // // By the time this function is run (i.e. after an interrupt), a lock on // `n.mu` has been reacquired, and modifying subscription state is safe. - removeSub := func() { - n.subscriptions[sub.topic] = slices.DeleteFunc(n.subscriptions[sub.topic], func(s *Subscription) bool { - return s == sub - }) - - if len(n.subscriptions[sub.topic]) <= 1 { - delete(n.subscriptions, sub.topic) - } - } + removeSub := func() { n.removeSubscription(ctx, sub) } if !existingTopic { // If already waiting, send an interrupt to the wait function to run a @@ -528,15 +522,22 @@ func (n *Notifier) unlisten(ctx context.Context, sub *Subscription) error { } } - n.subscriptions[sub.topic] = slices.DeleteFunc(subs, func(s *Subscription) bool { + n.removeSubscription(ctx, sub) + + return nil +} + +// This function requires that the caller already has a lock on `n.mu`. +func (n *Notifier) removeSubscription(ctx context.Context, sub *Subscription) { + n.subscriptions[sub.topic] = slices.DeleteFunc(n.subscriptions[sub.topic], func(s *Subscription) bool { return s == sub }) - if len(subs) <= 1 { + if len(n.subscriptions[sub.topic]) < 1 { delete(n.subscriptions, sub.topic) } - return nil + n.Logger.InfoContext(ctx, n.Name+": Removed subscription", "new_num_subscriptions", len(n.subscriptions[sub.topic]), "topic", sub.topic) } func (n *Notifier) withLock(lockedFunc func()) {