Skip to content

Commit

Permalink
Break BaseService functions into serviceutil (#536)
Browse files Browse the repository at this point in the history
I was going through trying to refactor the demo project to use
`rivershared`, and while doing so, noticed that it'd broken out some
of the `BaseService` functions like `CancellableSleep` into a new
package so that they could be used without an available `BaseService`.

Looking over this API again, there's no good reason for functions like
`CancellableSleep` and `ExponentialBackoff` to be tied so tight to
`BaseService`. The reason they are is that `BaseService` provides a
random source that both can conveniently use, but that can be moved to a
function argument instead.

The reason that `BaseService` has a random source is to avoid the old
`math/rand` seeding problem. A single random source is initialized and
seeding securely once, then placed onto an archetype that's inherited
by all `BaseService` instances.

Soon, even that can go away. One of the biggest things fixed by Go
1.22's `math/rand/v2` is that it's no longer necessary to seed the top
level functions. Once we drop support for Go 1.21, we'll be able to
simplify this code dramatically by dropping `BaseService.Rand` and the
random source arguments from functions like `ExponentialBackoff` in
favor of just using top level `math/rand/v2` functions.

I also drop the variant `CancellableSleepBetween` in favor of having
callers use `CancellableSleep` combined with `randutil.DurationBetween`,
a new random helper similar to `IntBetween`.

With all this in, we'll be able to fully jettison all utilities from the
demo project in favor of going all in with `rivershared`'s equivalents.
  • Loading branch information
brandur authored Aug 22, 2024
1 parent 6a0d904 commit 8f5ee36
Show file tree
Hide file tree
Showing 18 changed files with 251 additions and 233 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ jobs:
- name: Display Go version
run: go version

- name: Install dependencies
run: go get -t ./...

- name: Set up test DBs
run: go run ./internal/cmd/testdbman create
env:
Expand Down
4 changes: 3 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstoptest"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -838,7 +840,7 @@ func Test_Client_Stop(t *testing.T) {
require.NoError(t, err)

// Sleep a brief time between inserts.
client.baseService.CancellableSleepRandomBetween(ctx, 1*time.Microsecond, 10*time.Millisecond)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(client.baseService.Rand, 1*time.Microsecond, 10*time.Millisecond))
}
}()
}
Expand Down
5 changes: 3 additions & 2 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -534,11 +535,11 @@ func withRetries[T any](logCtx context.Context, baseService *baseservice.BaseSer
}

lastErr = err
sleepDuration := baseService.ExponentialBackoff(attempt, baseservice.MaxAttemptsBeforeResetDefault)
sleepDuration := serviceutil.ExponentialBackoff(baseService.Rand, attempt, serviceutil.MaxAttemptsBeforeResetDefault)
baseService.Logger.ErrorContext(logCtx, baseService.Name+": Completer error (will retry after sleep)",
"attempt", attempt, "err", err, "sleep_duration", sleepDuration, "timeout", timeout)
if !disableSleep {
baseService.CancellableSleep(logCtx, sleepDuration)
serviceutil.CancellableSleep(logCtx, sleepDuration)
}
continue
}
Expand Down
18 changes: 10 additions & 8 deletions internal/leadership/elector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/valutil"
)

Expand Down Expand Up @@ -200,9 +202,9 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
}

numErrors++
sleepDuration := e.ExponentialBackoff(numErrors, baseservice.MaxAttemptsBeforeResetDefault)
sleepDuration := serviceutil.ExponentialBackoff(e.Rand, numErrors, serviceutil.MaxAttemptsBeforeResetDefault)
e.Logger.ErrorContext(ctx, e.Name+": Error attempting to elect", "client_id", e.config.ClientID, "err", err, "num_errors", numErrors, "sleep_duration", sleepDuration)
e.CancellableSleep(ctx, sleepDuration)
serviceutil.CancellableSleep(ctx, sleepDuration)
continue
}
if elected {
Expand All @@ -219,15 +221,15 @@ func (e *Elector) attemptGainLeadershipLoop(ctx context.Context) error {
// of resignations. May want to make this reusable & cancel it when retrying?
// We may also want to consider a specialized ticker utility that can tick
// within a random range.
case <-e.CancellableSleepRandomBetweenC(ctx, e.config.ElectInterval, e.config.ElectInterval+e.config.ElectIntervalJitter):
case <-serviceutil.CancellableSleepC(ctx, randutil.DurationBetween(e.Rand, e.config.ElectInterval, e.config.ElectInterval+e.config.ElectIntervalJitter)):
if ctx.Err() != nil { // context done
return ctx.Err()
}

case <-e.leadershipNotificationChan:
// Somebody just resigned, try to win the next election after a very
// short random interval (to prevent all clients from bidding at once).
e.CancellableSleepRandomBetween(ctx, 0, 50*time.Millisecond)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(e.Rand, 0, 50*time.Millisecond))
}
}
}
Expand Down Expand Up @@ -340,10 +342,10 @@ func (e *Elector) keepLeadershipLoop(ctx context.Context) error {
return err
}

sleepDuration := e.ExponentialBackoff(numErrors, baseservice.MaxAttemptsBeforeResetDefault)
sleepDuration := serviceutil.ExponentialBackoff(e.Rand, numErrors, serviceutil.MaxAttemptsBeforeResetDefault)
e.Logger.Error(e.Name+": Error attempting reelection",
"client_id", e.config.ClientID, "err", err, "sleep_duration", sleepDuration)
e.CancellableSleep(ctx, sleepDuration)
serviceutil.CancellableSleep(ctx, sleepDuration)
continue
}
if !reelected {
Expand Down Expand Up @@ -381,10 +383,10 @@ func (e *Elector) attemptResignLoop(ctx context.Context) {
if err := e.attemptResign(ctx, attempt); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error attempting to resign", "attempt", attempt, "client_id", e.config.ClientID, "err", err)

sleepDuration := e.ExponentialBackoff(attempt, baseservice.MaxAttemptsBeforeResetDefault)
sleepDuration := serviceutil.ExponentialBackoff(e.Rand, attempt, serviceutil.MaxAttemptsBeforeResetDefault)
e.Logger.ErrorContext(ctx, e.Name+": Error attempting to resign",
"client_id", e.config.ClientID, "err", err, "num_errors", attempt, "sleep_duration", sleepDuration)
e.CancellableSleep(ctx, sleepDuration)
serviceutil.CancellableSleep(ctx, sleepDuration)

continue
}
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/job_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
)
Expand Down Expand Up @@ -176,7 +178,7 @@ func (s *JobCleaner) runOnce(ctx context.Context) (*jobCleanerRunOnceResult, err
slog.Int("num_jobs_deleted", numDeleted),
)

s.CancellableSleepRandomBetween(ctx, BatchBackoffMin, BatchBackoffMax)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(s.Rand, BatchBackoffMin, BatchBackoffMax))
}

return res, nil
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/job_rescuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
"github.com/riverqueue/river/rivertype"
Expand Down Expand Up @@ -234,7 +236,7 @@ func (s *JobRescuer) runOnce(ctx context.Context) (*rescuerRunOnceResult, error)
break
}

s.CancellableSleepRandomBetween(ctx, BatchBackoffMin, BatchBackoffMax)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(s.Rand, BatchBackoffMin, BatchBackoffMax))
}

return res, nil
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
)
Expand Down Expand Up @@ -194,7 +196,7 @@ func (s *JobScheduler) runOnce(ctx context.Context) (*schedulerRunOnceResult, er
break
}

s.CancellableSleepRandomBetween(ctx, BatchBackoffMin, BatchBackoffMax)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(s.Rand, BatchBackoffMin, BatchBackoffMax))
}

return res, nil
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/queue_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
"github.com/riverqueue/river/rivershared/util/valutil"
)
Expand Down Expand Up @@ -152,7 +154,7 @@ func (s *QueueCleaner) runOnce(ctx context.Context) (*queueCleanerRunOnceResult,
break
}

s.CancellableSleepRandomBetween(ctx, BatchBackoffMin, BatchBackoffMax)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(s.Rand, BatchBackoffMin, BatchBackoffMax))
}

return res, nil
Expand Down
4 changes: 3 additions & 1 deletion internal/maintenance/queue_maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
)

const (
Expand Down Expand Up @@ -116,7 +118,7 @@ func (s *queueMaintainerServiceBase) StaggerStart(ctx context.Context) {
return
}

s.CancellableSleepRandomBetween(ctx, 0*time.Second, 1*time.Second)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(s.Rand, 0*time.Second, 1*time.Second))
}

// StaggerStartupDisable sets whether the short staggered sleep on start up
Expand Down
5 changes: 3 additions & 2 deletions internal/notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
)

Expand Down Expand Up @@ -126,12 +127,12 @@ func (n *Notifier) Start(ctx context.Context) error {
break
}

sleepDuration := n.ExponentialBackoff(attempt, baseservice.MaxAttemptsBeforeResetDefault)
sleepDuration := serviceutil.ExponentialBackoff(n.Rand, attempt, serviceutil.MaxAttemptsBeforeResetDefault)
n.Logger.ErrorContext(ctx, n.Name+": Error running listener (will attempt reconnect after backoff)",
"attempt", attempt, "err", err, "sleep_duration", sleepDuration)
n.testSignals.BackoffError.Signal(err)
if !n.disableSleep {
n.CancellableSleep(ctx, sleepDuration)
serviceutil.CancellableSleep(ctx, sleepDuration)
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion internal/notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/startstoptest"
"github.com/riverqueue/river/rivershared/util/maputil"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
)

func TestNotifier(t *testing.T) {
Expand Down Expand Up @@ -437,7 +439,7 @@ func TestNotifier(t *testing.T) {
require.NoError(t, err)

// Pause a random brief amount of time.
notifier.BaseService.CancellableSleepRandomBetween(ctx, 15*time.Millisecond, 50*time.Millisecond)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(notifier.Rand, 15*time.Millisecond, 50*time.Millisecond))

sub.Unlisten(ctx)
}
Expand Down
4 changes: 3 additions & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
"github.com/riverqueue/river/rivershared/testsignal"
"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/serviceutil"
"github.com/riverqueue/river/rivertype"
)

Expand Down Expand Up @@ -647,7 +649,7 @@ func (p *producer) fetchQueueSettings(ctx context.Context) (*rivertype.Queue, er
}

func (p *producer) reportQueueStatusLoop(ctx context.Context) {
p.CancellableSleepRandomBetween(ctx, 0, time.Second)
serviceutil.CancellableSleep(ctx, randutil.DurationBetween(p.Rand, 0, time.Second))
reportTicker := time.NewTicker(p.config.QueueReportInterval)
for {
select {
Expand Down
99 changes: 4 additions & 95 deletions rivershared/baseservice/base_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,13 @@
package baseservice

import (
"context"
"log/slog"
"math"
"math/rand"
"reflect"
"regexp"
"time"

"github.com/riverqueue/river/rivershared/util/randutil"
"github.com/riverqueue/river/rivershared/util/timeutil"
)

// Archetype contains the set of base service properties that are immutable, or
Expand All @@ -33,6 +30,9 @@ type Archetype struct {
// deliberate choice because Go's non-crypto rand source is about twenty
// times faster, and so far none of our uses of random require cryptographic
// secure randomness.
//
// TODO: When we drop Go 1.21 support, drop this in favor of just using top
// level rand functions which are already safe for concurrent use.
Rand *rand.Rand

// Time returns a time generator to get the current time in UTC. Normally
Expand Down Expand Up @@ -71,98 +71,7 @@ type BaseService struct {
Name string
}

// CancellableSleep sleeps for the given duration, but returns early if context
// has been cancelled.
func (s *BaseService) CancellableSleep(ctx context.Context, sleepDuration time.Duration) {
timer := time.NewTimer(sleepDuration)

select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
}
}

// CancellableSleep sleeps for the given duration, but returns early if context
// has been cancelled.
//
// This variant returns a channel that should be waited on and which will be
// closed when either the sleep or context is done.
func (s *BaseService) CancellableSleepC(ctx context.Context, sleepDuration time.Duration) <-chan struct{} {
doneChan := make(chan struct{})

go func() {
defer close(doneChan)
s.CancellableSleep(ctx, sleepDuration)
}()

return doneChan
}

// CancellableSleepRandomBetween sleeps for a random duration between the given
// bounds (max bound is exclusive), but returns early if context has been
// cancelled.
func (s *BaseService) CancellableSleepRandomBetween(ctx context.Context, sleepDurationMin, sleepDurationMax time.Duration) {
s.CancellableSleep(ctx, time.Duration(randutil.IntBetween(s.Rand, int(sleepDurationMin), int(sleepDurationMax))))
}

// CancellableSleepRandomBetween sleeps for a random duration between the given
// bounds (max bound is exclusive), but returns early if context has been
// cancelled.
//
// This variant returns a channel that should be waited on and which will be
// closed when either the sleep or context is done.
func (s *BaseService) CancellableSleepRandomBetweenC(ctx context.Context, sleepDurationMin, sleepDurationMax time.Duration) <-chan struct{} {
doneChan := make(chan struct{})

go func() {
defer close(doneChan)
s.CancellableSleep(ctx, time.Duration(randutil.IntBetween(s.Rand, int(sleepDurationMin), int(sleepDurationMax))))
}()

return doneChan
}

// MaxAttemptsBeforeResetDefault is the number of attempts during exponential
// backoff after which attempts is reset so that sleep durations aren't flung
// into a ridiculously distant future. This constant is typically injected into
// the CancellableSleepExponentialBackoff function. It could technically take
// another value instead, but shouldn't unless there's a good reason to do so.
const MaxAttemptsBeforeResetDefault = 10

// ExponentialBackoff returns a duration for a reasonable exponential backoff
// interval for a service based on the given attempt number, which can then be
// fed into CancellableSleep to perform the sleep. Uses a 2**N second algorithm,
// +/- 10% random jitter. Sleep is cancelled if the given context is cancelled.
//
// Attempt should start at one for the first backoff/failure.
func (s *BaseService) ExponentialBackoff(attempt, maxAttemptsBeforeReset int) time.Duration {
retrySeconds := s.exponentialBackoffSecondsWithoutJitter(attempt, maxAttemptsBeforeReset)

// Jitter number of seconds +/- 10%.
retrySeconds += retrySeconds * (s.Rand.Float64()*0.2 - 0.1)

return timeutil.SecondsAsDuration(retrySeconds)
}

func (s *BaseService) exponentialBackoffSecondsWithoutJitter(attempt, maxAttemptsBeforeReset int) float64 {
// It's easier for callers and more intuitive if attempt starts at one, but
// subtract one before sending it the exponent so we start at only one
// second of sleep instead of two.
attempt--

// We use a different exponential backoff algorithm here compared to the
// default retry policy (2**N versus N**4) because it results in more
// retries sooner. When it comes to exponential backoffs in services we
// never want to sleep for hours/days, unlike with failed jobs.
return math.Pow(2, float64(attempt%maxAttemptsBeforeReset))
}

func (s *BaseService) GetBaseService() *BaseService {
return s
}
func (s *BaseService) GetBaseService() *BaseService { return s }

// withBaseService is an interface to a struct that embeds BaseService. An
// implementation is provided automatically by BaseService, and it's largely
Expand Down
Loading

0 comments on commit 8f5ee36

Please sign in to comment.