Skip to content

Commit

Permalink
Fix subscription stress test in notifier (#269)
Browse files Browse the repository at this point in the history
While I was trying to fix another problem in the notifier's subscription
stress test, I ended up introducing another one by using a single wait
group for both the notification sending goroutine and subscriptions
loops which caused the test intermittency described in #268. The test
was ending too soon, and channels would only receive 1-2 messages, and
sometimes zero (which would fail the test).

    notifier_test.go:462: Channel  0 contains   2 message(s)
    notifier_test.go:462: Channel  1 contains   2 message(s)
    notifier_test.go:462: Channel  2 contains   2 message(s)
    notifier_test.go:462: Channel  3 contains   2 message(s)
    notifier_test.go:462: Channel  4 contains   2 message(s)
    notifier_test.go:462: Channel  5 contains   2 message(s)
    notifier_test.go:462: Channel  6 contains   2 message(s)
    notifier_test.go:462: Channel  7 contains   0 message(s)

Here, fix the problem by putting in different symbols for the
notification send and subscription loops. The proper order of operations
to finish the test are:

1. Wait for all subscription groups to finish.
2. Tell the notification send loop to shut down.
3. Wait for the notification send loop to return (so as not to introduce
   any goroutine leaks like had been present before).

Fixes #268.
  • Loading branch information
brandur authored Mar 13, 2024
1 parent 67d41ed commit 8d7cfa6
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,13 @@ func TestNotifier(t *testing.T) {
notifyChans[i] = make(chan TopicAndPayload, 1000)
}

var wg sync.WaitGroup

// Start a goroutine to send messages constantly.
shutdown := make(chan struct{})
wg.Add(1)
var (
sendNotificationsDone = make(chan struct{})
sendNotificationsShutdown = make(chan struct{})
)
go func() {
defer wg.Done()
defer close(sendNotificationsDone)

ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
Expand All @@ -427,14 +427,15 @@ func TestNotifier(t *testing.T) {
select {
case <-ctx.Done():
return
case <-shutdown:
case <-sendNotificationsShutdown:
return
case <-ticker.C:
// loop again
}
}
}()

var wg sync.WaitGroup
wg.Add(len(notifyChans))
for i := range notifyChans {
notifyChan := notifyChans[i]
Expand All @@ -454,9 +455,9 @@ func TestNotifier(t *testing.T) {
}()
}

close(shutdown)
wg.Wait()
notifier.Stop()
wg.Wait() // wait for subscribe loops to finish all their work
close(sendNotificationsShutdown) // stop sending notifications
<-sendNotificationsDone // wait for notifications goroutine to finish

for i := range notifyChans {
t.Logf("Channel %2d contains %3d message(s)", i, len(notifyChans[i]))
Expand Down

0 comments on commit 8d7cfa6

Please sign in to comment.