Skip to content

Commit

Permalink
use new unique jobs implementation (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry authored Dec 19, 2024
1 parent 54b4549 commit 908377b
Show file tree
Hide file tree
Showing 26 changed files with 510 additions and 1,244 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
*.gem
coverage/
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

⚠️ Version 0.8.0 contains breaking changes to transition to River's new unique jobs implementation and to enable broader, more flexible application of unique jobs. Detailed notes on the implementation are contained in [the original River PR](https://github.com/riverqueue/river/pull/590), and the notes below include short summaries of the ways this impacts this client specifically.

Users should upgrade backends to River v0.12.0 before upgrading this library in order to ensure a seamless transition of all in-flight jobs. Afterward, the latest River version may be used.

### Breaking

- **Breaking change:** The return type of `Client#insert_many` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead.
- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list.

### Added

- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types.
- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields:

```ruby
UniqueOpts.new(by_args: ["customer_id"])
```

Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently.

### Changed

- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`.

This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list.

## [0.7.0] - 2024-08-30

### Changed
Expand Down
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
6 changes: 3 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ insert_res.job # inserted job row

Job args should:

* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.

They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.

Expand Down Expand Up @@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```ruby
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
client = River::Client.new(mock_driver)
```

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-activerecord/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
119 changes: 54 additions & 65 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,18 @@ def errors = {}
end
end

def advisory_lock(key)
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
nil
end

def advisory_lock_try(key)
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
end

def job_get_by_id(id)
data_set = RiverJob.where(id: id)
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_get_by_kind_and_unique_properties(get_params)
data_set = RiverJob.where(kind: get_params.kind)
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_insert(insert_params)
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
job_insert_many([insert_params]).first
end

def job_insert_unique(insert_params, unique_key)
res = RiverJob.upsert(
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
def job_insert_many(insert_params_many)
res = RiverJob.upsert_all(
insert_params_many.map { |param| insert_params_to_hash(param) },
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),

Expand All @@ -69,15 +51,9 @@ def job_insert_unique(insert_params, unique_key)
# ActiveRecord tries to look up a unique index instead of letting
# Postgres handle that, and of course it doesn't support a `WHERE`
# clause. The workaround is to target the index name instead of columns.
unique_by: "river_job_kind_unique_key_idx"
unique_by: "river_job_unique_idx"
)

[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

def job_insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
to_insert_results(res)
end

def job_list
Expand All @@ -94,8 +70,6 @@ def transaction(&)
end

private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
{
args: insert_params.encoded_args,
kind: insert_params.kind,
Expand All @@ -104,8 +78,10 @@ def transaction(&)
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags
}.compact
tags: insert_params.tags || [],
unique_key: insert_params.unique_key,
unique_states: insert_params.unique_states
}
end

private def to_job_row_from_model(river_job)
Expand Down Expand Up @@ -139,51 +115,64 @@ def transaction(&)
scheduled_at: river_job.scheduled_at.getutc,
state: river_job.state,
tags: river_job.tags,
unique_key: river_job.unique_key
unique_key: river_job.unique_key,
unique_states: river_job.unique_states
)
end

private def to_insert_results(res)
res.rows.map do |row|
to_job_row_from_raw(row, res.columns, res.column_types)
end
end

# This is really awful, but some of ActiveRecord's methods (e.g. `.create`)
# return a model, and others (e.g. `.upsert`) return raw values, and
# therefore this second version from unmarshaling a job row exists. I
# searched long and hard for a way to have the former type of method return
# raw or the latter type of method return a model, but was unable to find
# anything.
private def to_job_row_from_raw(res)
private def to_job_row_from_raw(row, columns, column_types)
river_job = {}

res.rows[0].each_with_index do |val, i|
river_job[res.columns[i]] = res.column_types[i].deserialize(val)
row.each_with_index do |val, i|
river_job[columns[i]] = column_types[i].deserialize(val)
end

River::JobRow.new(
id: river_job["id"],
args: JSON.parse(river_job["args"]),
attempt: river_job["attempt"],
attempted_at: river_job["attempted_at"]&.getutc,
attempted_by: river_job["attempted_by"],
created_at: river_job["created_at"].getutc,
errors: river_job["errors"]&.map { |e|
deserialized_error = JSON.parse(e)
errors = river_job["errors"]&.map do |e|
deserialized_error = JSON.parse(e)

River::AttemptError.new(
at: Time.parse(deserialized_error["at"]),
attempt: deserialized_error["attempt"],
error: deserialized_error["error"],
trace: deserialized_error["trace"]
)
},
finalized_at: river_job["finalized_at"]&.getutc,
kind: river_job["kind"],
max_attempts: river_job["max_attempts"],
metadata: river_job["metadata"],
priority: river_job["priority"],
queue: river_job["queue"],
scheduled_at: river_job["scheduled_at"].getutc,
state: river_job["state"],
tags: river_job["tags"],
unique_key: river_job["unique_key"]
)
River::AttemptError.new(
at: Time.parse(deserialized_error["at"]),
attempt: deserialized_error["attempt"],
error: deserialized_error["error"],
trace: deserialized_error["trace"]
)
end

[
River::JobRow.new(
id: river_job["id"],
args: JSON.parse(river_job["args"]),
attempt: river_job["attempt"],
attempted_at: river_job["attempted_at"]&.getutc,
attempted_by: river_job["attempted_by"],
created_at: river_job["created_at"].getutc,
errors: errors,
finalized_at: river_job["finalized_at"]&.getutc,
kind: river_job["kind"],
max_attempts: river_job["max_attempts"],
metadata: river_job["metadata"],
priority: river_job["priority"],
queue: river_job["queue"],
scheduled_at: river_job["scheduled_at"].getutc,
state: river_job["state"],
tags: river_job["tags"],
unique_key: river_job["unique_key"],
unique_states: ::River::UniqueBitmask.to_states(river_job["unique_states"]&.to_i(2))
),
river_job["unique_skipped_as_duplicate"]
]
end
end
end
21 changes: 12 additions & 9 deletions driver/riverqueue-activerecord/spec/driver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,14 @@

describe "#to_job_row_from_raw" do
it "converts a database record to `River::JobRow` with minimal properties" do
river_job = River::Driver::ActiveRecord::RiverJob.insert({
res = River::Driver::ActiveRecord::RiverJob.insert({
id: 1,
args: %({"job_num":1}),
kind: "simple",
max_attempts: River::MAX_ATTEMPTS_DEFAULT
}, returning: Arel.sql("*"))
}, returning: Arel.sql("*, false AS unique_skipped_as_duplicate"))

job_row = driver.send(:to_job_row_from_raw, river_job)
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)

expect(job_row).to be_an_instance_of(River::JobRow)
expect(job_row).to have_attributes(
Expand All @@ -148,11 +148,12 @@
state: River::JOB_STATE_AVAILABLE,
tags: []
)
expect(skipped_as_duplicate).to be(false)
end

it "converts a database record to `River::JobRow` with all properties" do
now = Time.now
river_job = River::Driver::ActiveRecord::RiverJob.insert({
res = River::Driver::ActiveRecord::RiverJob.insert({
id: 1,
attempt: 1,
attempted_at: now,
Expand All @@ -168,9 +169,9 @@
state: River::JOB_STATE_COMPLETED,
tags: ["tag1"],
unique_key: Digest::SHA256.digest("unique_key_str")
}, returning: Arel.sql("*"))
}, returning: Arel.sql("*, true AS unique_skipped_as_duplicate"))

job_row = driver.send(:to_job_row_from_raw, river_job)
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)

expect(job_row).to be_an_instance_of(River::JobRow)
expect(job_row).to have_attributes(
Expand All @@ -190,11 +191,12 @@
tags: ["tag1"],
unique_key: Digest::SHA256.digest("unique_key_str")
)
expect(skipped_as_duplicate).to be(true)
end

it "with errors" do
now = Time.now.utc
river_job = River::Driver::ActiveRecord::RiverJob.insert({
res = River::Driver::ActiveRecord::RiverJob.insert({
args: %({"job_num":1}),
errors: [JSON.dump(
{
Expand All @@ -207,9 +209,9 @@
kind: "simple",
max_attempts: River::MAX_ATTEMPTS_DEFAULT,
state: River::JOB_STATE_AVAILABLE
}, returning: Arel.sql("*"))
}, returning: Arel.sql("*, false AS unique_skipped_as_duplicate"))

job_row = driver.send(:to_job_row_from_raw, river_job)
job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types)

expect(job_row.errors.count).to be(1)
expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError)
Expand All @@ -219,6 +221,7 @@
error: "job failure",
trace: "error trace"
)
expect(skipped_as_duplicate).to be(false)
end
end
end
1 change: 1 addition & 0 deletions driver/riverqueue-sequel/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
Loading

0 comments on commit 908377b

Please sign in to comment.