Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose JobCancelError, JobSnoozeError #665

Merged
merged 1 commit into from
Nov 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Expose `JobCancelError` and `JobSnoozeError` types to more easily facilitate testing. [PR #665](https://github.com/riverqueue/river/pull/665).

### Changed

- Tune the client to be more aggressive about fetching when it just fetched a full batch of jobs, or when it skipped its previous triggered fetch because it was already full. This should bring more consistent throughput to poll-only mode and in cases where there is a backlog of existing jobs but new ones aren't being actively inserted. This will result in increased fetch load on many installations, with the benefit of increased throughput. As before, `FetchCooldown` still limits how frequently these fetches can occur on each client and can be increased to reduce the amount of fetch querying. Thanks Chris Gaffney ([@gaffneyc](https://github.com/gaffneyc)) for the idea, initial implementation, and benchmarks. [PR #663](https://github.com/riverqueue/river/pull/663).
Expand Down
38 changes: 22 additions & 16 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,30 @@ func (e *UnknownJobKindError) Is(target error) bool {
// the job at the end of execution. Regardless of whether or not the job has any
// remaining attempts, this will ensure the job does not execute again.
func JobCancel(err error) error {
return &jobCancelError{err: err}
return &JobCancelError{err: err}
}

type jobCancelError struct {
// JobCancelError is the error type returned by JobCancel. It should not be
// initialized directly, but is returned from the [JobCancel] function and can
// be used for test assertions.
type JobCancelError struct {
err error
}

func (e *jobCancelError) Error() string {
func (e *JobCancelError) Error() string {
if e.err == nil {
return "jobCancelError: <nil>"
return "JobCancelError: <nil>"
}
// should not ever be called, but add a prefix just in case:
return "jobCancelError: " + e.err.Error()
return "JobCancelError: " + e.err.Error()
}

func (e *jobCancelError) Is(target error) bool {
_, ok := target.(*jobCancelError)
func (e *JobCancelError) Is(target error) bool {
_, ok := target.(*JobCancelError)
return ok
}

func (e *jobCancelError) Unwrap() error { return e.err }
func (e *JobCancelError) Unwrap() error { return e.err }

// JobSnooze can be returned from a Worker's Work method to cause the job to be
// tried again after the specified duration. This also has the effect of
Expand All @@ -81,20 +84,23 @@ func JobSnooze(duration time.Duration) error {
if duration < 0 {
panic("JobSnooze: duration must be >= 0")
}
return &jobSnoozeError{duration: duration}
return &JobSnoozeError{duration: duration}
}

type jobSnoozeError struct {
// JobSnoozeError is the error type returned by JobSnooze. It should not be
// initialized directly, but is returned from the [JobSnooze] function and can
// be used for test assertions.
type JobSnoozeError struct {
duration time.Duration
}

func (e *jobSnoozeError) Error() string {
func (e *JobSnoozeError) Error() string {
// should not ever be called, but add a prefix just in case:
return fmt.Sprintf("jobSnoozeError: %s", e.duration)
return fmt.Sprintf("JobSnoozeError: %s", e.duration)
}

func (e *jobSnoozeError) Is(target error) bool {
_, ok := target.(*jobSnoozeError)
func (e *JobSnoozeError) Is(target error) bool {
_, ok := target.(*JobSnoozeError)
return ok
}

Expand Down Expand Up @@ -270,7 +276,7 @@ func (e *jobExecutor) invokeErrorHandler(ctx context.Context, res *jobExecutorRe
}

func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult) {
var snoozeErr *jobSnoozeError
var snoozeErr *JobSnoozeError

if res.Err != nil && errors.As(res.Err, &snoozeErr) {
e.Logger.DebugContext(ctx, e.Name+": Job snoozed",
Expand Down Expand Up @@ -316,7 +322,7 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
func (e *jobExecutor) reportError(ctx context.Context, res *jobExecutorResult) {
var (
cancelJob bool
cancelErr *jobCancelError
cancelErr *JobCancelError
)

logAttrs := []any{
Expand Down
4 changes: 2 additions & 2 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Len(t, job.Errors, 1)
require.WithinDuration(t, time.Now(), job.Errors[0].At, 2*time.Second)
require.Equal(t, 1, job.Errors[0].Attempt)
require.Equal(t, "jobCancelError: throw away this job", job.Errors[0].Error)
require.Equal(t, "JobCancelError: throw away this job", job.Errors[0].Error)
require.Equal(t, "", job.Errors[0].Trace)
})

Expand Down Expand Up @@ -700,7 +700,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Len(t, job.Errors, 1)
require.WithinDuration(t, time.Now(), job.Errors[0].At, 2*time.Second)
require.Equal(t, 1, job.Errors[0].Attempt)
require.Equal(t, "jobCancelError: job cancelled remotely", job.Errors[0].Error)
require.Equal(t, "JobCancelError: job cancelled remotely", job.Errors[0].Error)
require.Equal(t, ErrJobCancelledRemotely.Error(), job.Errors[0].Error)
require.Equal(t, "", job.Errors[0].Trace)
})
Expand Down
Loading