From cf158c36edf1738b1a86c955e19d48ced121dad3 Mon Sep 17 00:00:00 2001 From: Brandur Date: Sun, 14 Apr 2024 18:16:28 -0700 Subject: [PATCH] Add concurrent safe `StubTime` test helper This one's not strictly necessary, but it came up in another project so I thought I may as well port a similar feature here. There's currently a test helper for setting a stub time in tests, but it's not very useful, and not useful in case a test case is setting a new test time while a service is simultaneously trying to access it. Here, replace `StubTime` with a new test helper that returns a `(getTime, setTime)` function pair in which each invocation is protected by a mutex so the concurrency problem described above is solved. As seen here, this helper isn't used all that much and could plausibly be removed or refactored out. However, there's no particularly good reason to do that since it's not causing any trouble so I think augmenting it is just as good of a potential path. --- .../riverinternaltest/riverinternaltest.go | 31 ++++++++++++----- .../riverinternaltest_test.go | 33 +++++++++++++++++++ job_executor_test.go | 4 +-- 3 files changed, 58 insertions(+), 10 deletions(-) diff --git a/internal/riverinternaltest/riverinternaltest.go b/internal/riverinternaltest/riverinternaltest.go index 13c86be0..132512ef 100644 --- a/internal/riverinternaltest/riverinternaltest.go +++ b/internal/riverinternaltest/riverinternaltest.go @@ -221,14 +221,29 @@ func TestDB(ctx context.Context, tb testing.TB) *pgxpool.Pool { return testPool.Pool() } -// StubTime is a shortcut for stubbing time for a the given archetype at the -// given time. It returns the time given as argument for convenience. -func StubTime(archetype *baseservice.Archetype, t time.Time) time.Time { - // Strip monotonic portion and make UTC so that comparisons are less fraught. - t = t.Round(0).UTC() - - archetype.TimeNowUTC = func() time.Time { return t } - return t +// StubTime returns a pair of function for (getTime, setTime), the former of +// which is compatible with `TimeNowUTC` found in the service archetype. +// It's concurrent safe is so that a started service can access its stub +// time while the test case is setting it, and without the race detector +// triggering. +func StubTime(initialTime time.Time) (func() time.Time, func(t time.Time)) { + var ( + mu sync.RWMutex + stubbedTime = &initialTime + ) + + getTime := func() time.Time { + mu.RLock() + defer mu.RUnlock() + return *stubbedTime + } + setTime := func(newTime time.Time) { + mu.Lock() + defer mu.Unlock() + stubbedTime = &newTime + } + + return getTime, setTime } // A pool and mutex to protect it, lazily initialized by TestTx. Once open, this diff --git a/internal/riverinternaltest/riverinternaltest_test.go b/internal/riverinternaltest/riverinternaltest_test.go index 84c3021e..8d526156 100644 --- a/internal/riverinternaltest/riverinternaltest_test.go +++ b/internal/riverinternaltest/riverinternaltest_test.go @@ -4,12 +4,45 @@ import ( "context" "sync" "testing" + "time" "github.com/jackc/pgerrcode" "github.com/jackc/pgx/v5/pgconn" "github.com/stretchr/testify/require" ) +func TestStubTime(t *testing.T) { + t.Parallel() + + t.Run("BasicUsage", func(t *testing.T) { + t.Parallel() + + initialTime := time.Now() + + getTime, setTime := StubTime(initialTime) + require.Equal(t, initialTime, getTime()) + + newTime := initialTime.Add(1 * time.Second) + setTime(newTime) + require.Equal(t, newTime, getTime()) + }) + + t.Run("Stress", func(t *testing.T) { + t.Parallel() + + getTime, setTime := StubTime(time.Now()) + + for i := 0; i < 10; i++ { + go func() { + for j := 0; j < 50; j++ { + setTime(time.Now()) + _ = getTime() + } + }() + } + }) +} + // Implemented by `pgx.Tx` or `pgxpool.Pool`. Normally we'd use a similar type // from `dbsqlc` or `dbutil`, but riverinternaltest is extremely low level and // that would introduce a cyclic dependency. We could package as diff --git a/job_executor_test.go b/job_executor_test.go index 5c98cb2e..79f2179e 100644 --- a/job_executor_test.go +++ b/job_executor_test.go @@ -226,7 +226,7 @@ func TestJobExecutor_Execute(t *testing.T) { executor, bundle := setup(t) - baselineTime := riverinternaltest.StubTime(&executor.Archetype, time.Now()) + executor.Archetype.TimeNowUTC, _ = riverinternaltest.StubTime(time.Now().UTC()) workerErr := errors.New("job error") executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return workerErr }, nil).MakeUnit(bundle.jobRow) @@ -239,7 +239,7 @@ func TestJobExecutor_Execute(t *testing.T) { require.WithinDuration(t, executor.ClientRetryPolicy.NextRetry(bundle.jobRow), job.ScheduledAt, 1*time.Second) require.Equal(t, rivertype.JobStateRetryable, job.State) require.Len(t, job.Errors, 1) - require.Equal(t, baselineTime, job.Errors[0].At) + require.Equal(t, executor.Archetype.TimeNowUTC().Truncate(1*time.Microsecond), job.Errors[0].At.Truncate(1*time.Microsecond)) require.Equal(t, 1, job.Errors[0].Attempt) require.Equal(t, "job error", job.Errors[0].Error) require.Equal(t, "", job.Errors[0].Trace)