Skip to content

Commit

Permalink
fix StopAndTest after ongoing Stop
Browse files Browse the repository at this point in the history
This regression was introduced in #272, which I noticed because our demo
app is not currently cancelling active jobs gracefully before exiting.

This commit fixes the behavior by always cancelling the work context
upon `StopAndCancel()`, regardless of whether or not another shutdown is
in progress.
  • Loading branch information
bgentry committed Jun 9, 2024
1 parent edb42d2 commit 1384db0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- The default max attempts of 25 can now be customized on a per-client basis using `Config.MaxAttempts`. This is in addition to the ability to customize at the job type level with `JobArgs`, or on a per-job basis using `InsertOpts`. [PR #383](https://github.com/riverqueue/river/pull/383).

### Fixed

- Fix `StopAndCancel` to not hang if called in parallel to an ongoing `Stop` call. [PR #376](https://github.com/riverqueue/river/pull/376).

## [0.6.1] - 2024-05-21

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,14 +810,14 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// no need to call this method if the context passed to Run is cancelled
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
Expand Down
92 changes: 68 additions & 24 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,39 +645,83 @@ func Test_Client(t *testing.T) {
t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

type JobArgs struct {
JobArgsReflectKind[JobArgs]
type testBundle struct {
jobDoneChan chan struct{}
jobStartedChan chan int64
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))
setupStopAndCancel := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {
t.Helper()

startClient(ctx, t, client)
client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)
type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
startClient(ctx, t, client)

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
}
}

require.NoError(t, client.StopAndCancel(ctx))
t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

client, _ := setupStopAndCancel(t)

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())
})

t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

client, bundle := setupStopAndCancel(t)

go func() {
require.NoError(t, client.Stop(ctx))
}()

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
case <-time.After(500 * time.Millisecond):
}

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())

riverinternaltest.WaitOrTimeout(t, client.Stopped())
select {
case <-bundle.jobDoneChan:
default:
t.Fatal("expected job to be have exited")
}
})
})
}

Expand Down

0 comments on commit 1384db0

Please sign in to comment.