Skip to content

Commit

Permalink
fix memory leak on job execution from baseservice
Browse files Browse the repository at this point in the history
The `BaseService` type was originally only used for long-running
internal services. At some point, we refactored the job execution code
to also make use of this type. That exposed a memory leak: every time a
new `BaseService` is `Init`'d, a new random source is allocated. That
meant for every single job.

This is of course totally unnecessary, and to avoid it we can move the
rand source to the `Archetype`, which is only allocated once per client.

Partially fixes #239.
  • Loading branch information
bgentry committed Feb 29, 2024
1 parent b2cb142 commit 80432fb
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Fixed a memory leak caused by allocating a new random source on every job execution. Thank you @shawnstephens for reporting ❤️ [PR #240](https://github.com/riverqueue/river/pull/240).
- Fix a problem where `JobListParams.Queues()` didn't filter correctly based on its arguments. [PR #212](https://github.com/riverqueue/river/pull/212).
- Fix a problem in `DebouncedChan` where it would fire on its "out" channel too often when it was being signaled continuousy on its "in" channel. This would have caused work to be fetched more often than intended in busy systems. [PR #222](https://github.com/riverqueue/river/pull/222).

Expand Down
2 changes: 2 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/riverqueue/river/internal/maintenance"
"github.com/riverqueue/river/internal/notifier"
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/internal/util/valutil"
"github.com/riverqueue/river/internal/workunit"
Expand Down Expand Up @@ -433,6 +434,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
archetype := &baseservice.Archetype{
DisableSleep: config.disableSleep,
Logger: config.Logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

Expand Down
11 changes: 10 additions & 1 deletion internal/baseservice/base_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,15 @@ type Archetype struct {
// Logger is a structured logger.
Logger *slog.Logger

// Rand is a random source safe for concurrent access and seeded with a
// cryptographically random seed to ensure good distribution between nodes
// and services. The random source itself is _not_ cryptographically secure,
// and therefore should not be used anywhere security-related. This is a
// deliberate choice because Go's non-crypto rand source is about twenty
// times faster, and so far none of our uses of random require cryptographic
// secure randomness.
Rand *rand.Rand

// TimeNowUTC returns the current time as UTC. Normally it's implemented as
// a call to `time.Now().UTC()`, but may be overridden in tests for time
// injection. Services should try to use this function instead of the
Expand Down Expand Up @@ -116,7 +125,7 @@ func Init[TService withBaseService](archetype *Archetype, service TService) TSer
baseService.DisableSleep = archetype.DisableSleep
baseService.Logger = archetype.Logger
baseService.Name = reflect.TypeOf(service).Elem().Name()
baseService.Rand = randutil.NewCryptoSeededConcurrentSafeRand()
baseService.Rand = archetype.Rand
baseService.TimeNowUTC = archetype.TimeNowUTC

return service
Expand Down
2 changes: 2 additions & 0 deletions internal/baseservice/base_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/riverqueue/river/internal/util/randutil"

Check failure on line 10 in internal/baseservice/base_service_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s Standard -s Default -s Prefix(github.com/riverqueue) (gci)
"github.com/stretchr/testify/require"

Check failure on line 11 in internal/baseservice/base_service_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s Standard -s Default -s Prefix(github.com/riverqueue) (gci)
)

Expand Down Expand Up @@ -69,6 +70,7 @@ func archetype() *Archetype {
return &Archetype{
DisableSleep: true,
Logger: slog.New(slog.NewTextHandler(os.Stdout, nil)),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}
2 changes: 2 additions & 0 deletions internal/riverinternaltest/riverinternaltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/riverqueue/river/internal/rivercommon"
"github.com/riverqueue/river/internal/riverinternaltest/slogtest" //nolint:depguard
"github.com/riverqueue/river/internal/testdb"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/valutil"
)

Expand Down Expand Up @@ -57,6 +58,7 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {

return &baseservice.Archetype{
Logger: Logger(tb),
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}
}
Expand Down
2 changes: 2 additions & 0 deletions rivermigrate/river_migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/riverqueue/river/internal/baseservice"
"github.com/riverqueue/river/internal/util/dbutil"
"github.com/riverqueue/river/internal/util/maputil"
"github.com/riverqueue/river/internal/util/randutil"
"github.com/riverqueue/river/internal/util/sliceutil"
"github.com/riverqueue/river/riverdriver"
)
Expand Down Expand Up @@ -94,6 +95,7 @@ func New[TTx any](driver riverdriver.Driver[TTx], config *Config) *Migrator[TTx]

archetype := &baseservice.Archetype{
Logger: logger,
Rand: randutil.NewCryptoSeededConcurrentSafeRand(),
TimeNowUTC: func() time.Time { return time.Now().UTC() },
}

Expand Down

0 comments on commit 80432fb

Please sign in to comment.