Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix running job cancellation when not fetching #678

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #678](https://github.com/riverqueue/river/pull/678).

## [0.14.1] - 2024-11-04

### Fixed
Expand Down
22 changes: 18 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,12 @@ func Test_Client(t *testing.T) {
// _outside of_ a transaction. The exact same test logic applies to each case,
// the only difference is a different cancelFunc provided by the specific
// subtest.
cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
client, bundle := setup(t)
cancelRunningJobTestHelper := func(t *testing.T, config *Config, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper
defaultConfig, bundle := setupConfig(t)
if config == nil {
config = defaultConfig
}
client := newTestClient(t, bundle.dbPool, config)

jobStartedChan := make(chan int64)

Expand Down Expand Up @@ -492,15 +496,25 @@ func Test_Client(t *testing.T) {
t.Run("CancelRunningJob", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobWithLongPollInterval", func(t *testing.T) {
t.Parallel()

config := newTestConfig(t, nil)
config.FetchPollInterval = 60 * time.Second
cancelRunningJobTestHelper(t, config, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
return client.JobCancel(ctx, jobID)
})
})

t.Run("CancelRunningJobInTx", func(t *testing.T) {
t.Parallel()

cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) {
var (
job *rivertype.JobRow
err error
Expand Down
10 changes: 5 additions & 5 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/kr/pty v1.1.1 h1:VkoXIwSboBpnk99O/KFauAEILuNHv5DVFKZMBN/gUgw=
github.com/riverqueue/river v0.13.0-rc.1/go.mod h1:ZxTeoNZWNpwl+dCBWF5AomGV1exZbHu/E75ufb09HIo=
github.com/riverqueue/river v0.14.0/go.mod h1:R98qxNGrFOm1rtapS76Ef6y2WbQ56jtOc2kuVSKW/zA=
github.com/riverqueue/river/riverdriver v0.14.0/go.mod h1:DUayJJgiCWwfnsLC3sLBuM/N1cRh2lEoAohV6bHeaiA=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.0/go.mod h1:G6ymkGCy+H6SmRUTSBC9uXnk+dy4TttkuM5L1yS/KDA=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.0/go.mod h1:VlHbD3GF4ioT52J2S2VM2cFHbuG8D9u1bIbT4R/JuPE=
github.com/riverqueue/river/rivertype v0.14.0/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/riverqueue/river v0.14.1/go.mod h1:3cQREff7+iGZC+u2lire03SOxUmT41bjzpqZWAWPXtk=
github.com/riverqueue/river/riverdriver v0.14.1/go.mod h1:bJDNRwDNiCyXv3ZEfOGUvGBEo6C3fNnPc4VQRF1P+Ys=
github.com/riverqueue/river/riverdriver/riverdatabasesql v0.14.1/go.mod h1:C+A3pzwxMwyclSwfeTRyWoDRoFd9BhNmsSPSe8bv4l8=
github.com/riverqueue/river/riverdriver/riverpgxv5 v0.14.1/go.mod h1:P9rfgq0hgRM19ty6CHMQTAKUq3crmP28f4BINDfRCyw=
github.com/riverqueue/river/rivertype v0.14.1/go.mod h1:wVOhGBeay6+JcIi0pTFlF4KtUgHYFkhMYv8dpxU46W0=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 2 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit
default:
p.Logger.DebugContext(workCtx, p.Name+": Unknown queue control action", "action", msg.Action)
}
case jobID := <-p.cancelCh:
p.maybeCancelJob(jobID)
case <-fetchLimiter.C():
if p.paused {
continue
Expand Down
Loading