Skip to content

Commit

Permalink
Have client insert functions return insert result instead of raw job …
Browse files Browse the repository at this point in the history
…row (breaking)

This one's aimed at resolving the problem described in this discussion
[1]. In short, when an inserted job with unique properties in skipped
because it was a duplicate, there's no easy way to ascertain through
River's API that this is what happened.

The cleanest resolution is what's proposed here. Instead of returning a
raw `JobRow`, insert functions return a `JobInsertResult` that contains
a job row, along with a boolean indicating whether an insertion was
skipped due to a duplicate unique. The result struct is also better for
future compatibility in that it could contain other metadata pertaining
to job insertions.

The downside of this approach is that it's a breaking change. The hope
is that it's not _that_ bad of a breaking change because most of the
time callers don't care that much about properties on a return result
row given they just sent most of it as a parameter to `Insert`, so they
might not be accessing any return values and therefore not be broken.

You can get a rough idea for changes in callers by looking at what
changed in this diff. `client_test.go` needed quite a few tweaks because
it's doing some more comprehensive testing, but only one example test
(more representative of user code) had to change, which is a good sign.

[1] #86
  • Loading branch information
brandur committed Apr 16, 2024
1 parent 06f9447 commit 6dffc2a
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 96 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.3.0] - 2024-04-15
### Changed

- **Breaking change:** Client `Insert` and `InsertTx` functions now return a `JobInsertResult` struct instead of a `JobRow`. This allows the result to include metadata like the new `UniqueSkippedAsDuplicate` property, so callers can tell whether an inserted job was skipped due to unique constraint. [PR #292](https://github.com/riverqueue/river/pull/292).

### Added

Expand Down
16 changes: 12 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ var errNoDriverDBPool = errors.New("driver must have non-nil database pool to us
// if err != nil {
// // handle error
// }
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) {
func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
if !c.driver.HasPool() {
return nil, errNoDriverDBPool
}
Expand All @@ -1295,11 +1295,11 @@ func (c *Client[TTx]) Insert(ctx context.Context, args JobArgs, opts *InsertOpts
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) {
func (c *Client[TTx]) InsertTx(ctx context.Context, tx TTx, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
return c.insert(ctx, c.driver.UnwrapExecutor(tx), args, opts)
}

func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobRow, error) {
func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, args JobArgs, opts *InsertOpts) (*rivertype.JobInsertResult, error) {
if err := c.validateJobArgs(args); err != nil {
return nil, err
}
Expand All @@ -1314,7 +1314,7 @@ func (c *Client[TTx]) insert(ctx context.Context, exec riverdriver.Executor, arg
return nil, err
}

return jobInsertRes.Job, nil
return jobInsertRes, nil
}

// InsertManyParams encapsulates a single job combined with insert options for
Expand Down Expand Up @@ -1342,6 +1342,10 @@ type InsertManyParams struct {
// if err != nil {
// // handle error
// }
//
// Job uniqueness is not respected when using InsertMany due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using Insert instead.
func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams) (int64, error) {
if !c.driver.HasPool() {
return 0, errNoDriverDBPool
Expand Down Expand Up @@ -1371,6 +1375,10 @@ func (c *Client[TTx]) InsertMany(ctx context.Context, params []InsertManyParams)
// // handle error
// }
//
// Job uniqueness is not respected when using InsertManyTx due to unique inserts
// using an internal transaction and advisory lock that might lead to
// significant lock contention. Insert unique jobs using InsertTx instead.
//
// This variant lets a caller insert jobs atomically alongside other database
// changes. An inserted job isn't visible to be worked until the transaction
// commits, and if the transaction rolls back, so too is the inserted job.
Expand Down
Loading

0 comments on commit 6dffc2a

Please sign in to comment.