diff --git a/CHANGELOG.md b/CHANGELOG.md index a02809f7..a5b65b8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Cancellation of running jobs relied on a channel that was only being received when in the job fetch routine, meaning that jobs which were cancelled would not be cancelled until the next scheduled fetch. This was fixed by also receiving from the job cancellation channel when in the main producer loop, even if no fetches are happening. [PR #XXX](https://github.com/riverqueue/river/pull/XXX). + ## [0.14.1] - 2024-11-04 ### Fixed diff --git a/client_test.go b/client_test.go index b0529845..8b47c4f2 100644 --- a/client_test.go +++ b/client_test.go @@ -445,8 +445,12 @@ func Test_Client(t *testing.T) { // _outside of_ a transaction. The exact same test logic applies to each case, // the only difference is a different cancelFunc provided by the specific // subtest. - cancelRunningJobTestHelper := func(t *testing.T, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper - client, bundle := setup(t) + cancelRunningJobTestHelper := func(t *testing.T, config *Config, cancelFunc func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error)) { //nolint:thelper + defaultConfig, bundle := setupConfig(t) + if config == nil { + config = defaultConfig + } + client := newTestClient(t, bundle.dbPool, config) jobStartedChan := make(chan int64) @@ -492,7 +496,17 @@ func Test_Client(t *testing.T) { t.Run("CancelRunningJob", func(t *testing.T) { t.Parallel() - cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + return client.JobCancel(ctx, jobID) + }) + }) + + t.Run("CancelRunningJobWithLongPollInterval", func(t *testing.T) { + t.Parallel() + + config := newTestConfig(t, nil) + config.FetchPollInterval = 60 * time.Second + cancelRunningJobTestHelper(t, config, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { return client.JobCancel(ctx, jobID) }) }) @@ -500,7 +514,7 @@ func Test_Client(t *testing.T) { t.Run("CancelRunningJobInTx", func(t *testing.T) { t.Parallel() - cancelRunningJobTestHelper(t, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { + cancelRunningJobTestHelper(t, nil, func(ctx context.Context, dbPool *pgxpool.Pool, client *Client[pgx.Tx], jobID int64) (*rivertype.JobRow, error) { var ( job *rivertype.JobRow err error diff --git a/producer.go b/producer.go index b58fa505..0ad16ffe 100644 --- a/producer.go +++ b/producer.go @@ -448,6 +448,8 @@ func (p *producer) fetchAndRunLoop(fetchCtx, workCtx context.Context, fetchLimit default: p.Logger.DebugContext(workCtx, p.Name+": Unknown queue control action", "action", msg.Action) } + case jobID := <-p.cancelCh: + p.maybeCancelJob(jobID) case <-fetchLimiter.C(): if p.paused { continue