Skip to content

Commit

Permalink
fix StopAndTest after ongoing Stop
Browse files Browse the repository at this point in the history
This regression was introduced in #272, which I noticed because our demo
app is not currently cancelling active jobs gracefully before exiting.

This commit fixes the behavior by always cancelling the work context
upon `StopAndCancel()`, regardless of whether or not another shutdown is
in progress.
  • Loading branch information
bgentry committed May 30, 2024
1 parent 9fab070 commit 23a3960
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Fix `StopAndCancel` to not hang if called in parallel to an ongoing `Stop` call. [PR #376](https://github.com/riverqueue/river/pull/376).

## [0.6.1] - 2024-05-21

### Fixed
Expand Down
6 changes: 3 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,14 +796,14 @@ func (c *Client[TTx]) Stop(ctx context.Context) error {
// no need to call this method if the context passed to Run is cancelled
// instead.
func (c *Client[TTx]) StopAndCancel(ctx context.Context) error {
c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

shouldStop, stopped, finalizeStop := c.baseStartStop.StopInit()
if !shouldStop {
return nil
}

c.baseService.Logger.InfoContext(ctx, c.baseService.Name+": Hard stop started; cancelling all work")
c.workCancel(rivercommon.ErrShutdown)

select {
case <-ctx.Done(): // stop context cancelled
finalizeStop(false) // not stopped; allow Stop to be called again
Expand Down
90 changes: 66 additions & 24 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,39 +644,81 @@ func Test_Client(t *testing.T) {
t.Run("StopAndCancel", func(t *testing.T) {
t.Parallel()

client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

type JobArgs struct {
JobArgsReflectKind[JobArgs]
type testBundle struct {
jobDoneChan chan struct{}
jobStartedChan chan int64
}

AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))
setupStopAndCancel := func(t *testing.T) (*Client[pgx.Tx], *testBundle) {

Check failure on line 652 in client_test.go

View workflow job for this annotation

GitHub Actions / lint

test helper function should start from t.Helper() (thelper)
client, _ := setup(t)
jobStartedChan := make(chan int64)
jobDoneChan := make(chan struct{})

startClient(ctx, t, client)
type JobArgs struct {
JobArgsReflectKind[JobArgs]
}

insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)
AddWorker(client.config.Workers, WorkFunc(func(ctx context.Context, job *Job[JobArgs]) error {
jobStartedChan <- job.ID
<-ctx.Done()
require.ErrorIs(t, context.Cause(ctx), rivercommon.ErrShutdown)
close(jobDoneChan)
return nil
}))

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)
startClient(ctx, t, client)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
insertRes, err := client.Insert(ctx, &JobArgs{}, nil)
require.NoError(t, err)

startedJobID := riverinternaltest.WaitOrTimeout(t, jobStartedChan)
require.Equal(t, insertRes.Job.ID, startedJobID)

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
default:
}

return client, &testBundle{
jobDoneChan: jobDoneChan,
jobStartedChan: jobStartedChan,
}
}

require.NoError(t, client.StopAndCancel(ctx))
t.Run("OnItsOwn", func(t *testing.T) {
t.Parallel()

client, _ := setupStopAndCancel(t)

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())
})

t.Run("AfterStop", func(t *testing.T) {
t.Parallel()

client, bundle := setupStopAndCancel(t)

riverinternaltest.WaitOrTimeout(t, client.Stopped())
go func() {
require.NoError(t, client.Stop(ctx))
}()

select {
case <-client.Stopped():
t.Fatal("expected client to not be stopped yet")
case <-time.After(500 * time.Millisecond):
}

require.NoError(t, client.StopAndCancel(ctx))
riverinternaltest.WaitOrTimeout(t, client.Stopped())

select {
case <-bundle.jobDoneChan:
default:
t.Fatal("expected job to be have exited")
}
})
})
}

Expand Down

0 comments on commit 23a3960

Please sign in to comment.