Skip to content

Commit

Permalink
Support fast unique insertion path (#28)
Browse files Browse the repository at this point in the history
Updates the Ruby client to be compatible with the fast unique insertion
added to the main River in [1] which uses a unique index instead of
advisory lock + fetch as long as uniqueness is constrained to the
default set of unique job states.

We also reorganize the driver tests such that the majority of the tests
are put in a single set of shared examples, largely so that ActiveRecord
and Sequel aren't so duplicative of each other, and so we can easily add
new tests for all drivers in one place.

Lastly, I killed the mock driver in use at the top level. Adding
anything new required all kinds of engineering around it, and I found
lots of test bugs that were the result of imperfect mocking that wasn't
fully checking the client end to end.

[1] riverqueue/river#451
  • Loading branch information
brandur authored Aug 31, 2024
1 parent 29b8e4f commit 621831b
Show file tree
Hide file tree
Showing 15 changed files with 945 additions and 897 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #28](https://github.com/riverqueue/riverqueue-ruby/pull/28).

## [0.6.1] - 2024-08-21

### Fixed
Expand Down
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ group :test do
gem "debug"
gem "rspec-core"
gem "rspec-expectations"
gem "riverqueue-sequel", path: "driver/riverqueue-sequel"
gem "simplecov", require: false
end
11 changes: 11 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ PATH
specs:
riverqueue (0.6.1)

PATH
remote: driver/riverqueue-sequel
specs:
riverqueue-sequel (0.6.1)
pg (> 0, < 1000)
sequel (> 0, < 1000)

GEM
remote: https://rubygems.org/
specs:
Expand Down Expand Up @@ -50,6 +57,7 @@ GEM
parser (3.3.0.5)
ast (~> 2.4.1)
racc
pg (1.5.6)
psych (5.1.2)
stringio
racc (1.7.3)
Expand Down Expand Up @@ -89,6 +97,8 @@ GEM
rubocop-ast (>= 1.30.0, < 2.0)
ruby-progressbar (1.13.0)
securerandom (0.3.1)
sequel (5.79.0)
bigdecimal
simplecov (0.22.0)
docile (~> 1.1)
simplecov-html (~> 0.11)
Expand Down Expand Up @@ -137,6 +147,7 @@ PLATFORMS
DEPENDENCIES
debug
riverqueue!
riverqueue-sequel!
rspec-core
rspec-expectations
simplecov
Expand Down
1 change: 1 addition & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ D = Steep::Diagnostic
target :lib do
check "lib"

library "digest"
library "json"
library "time"

Expand Down
101 changes: 89 additions & 12 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ def errors = {}

def advisory_lock(key)
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
nil
end

def advisory_lock_try(key)
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
end

def job_get_by_id(id)
data_set = RiverJob.where(id: id)
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_get_by_kind_and_unique_properties(get_params)
Expand All @@ -41,18 +51,44 @@ def job_get_by_kind_and_unique_properties(get_params)
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.take
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
end

def job_insert_unique(insert_params, unique_key)
res = RiverJob.upsert(
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),

# It'd be nice to specify this as `(kind, unique_key) WHERE unique_key
# IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking
# ActiveRecord tries to look up a unique index instead of letting
# Postgres handle that, and of course it doesn't support a `WHERE`
# clause. The workaround is to target the index name instead of columns.
unique_by: "river_job_kind_unique_key_idx"
)

[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

def job_insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

def job_list
data_set = RiverJob.order(:id)
data_set.all.map { |job| to_job_row_from_model(job) }
end

def rollback_exception
::ActiveRecord::Rollback
end

def transaction(&)
::ActiveRecord::Base.transaction(requires_new: true, &)
end
Expand All @@ -72,21 +108,18 @@ def transaction(&)
}.compact
end

# Type type injected to this method is not a `RiverJob`, but rather a raw
# hash with stringified keys because we're inserting with the Arel framework
# directly rather than generating a record from a model.
private def to_job_row(river_job)
private def to_job_row_from_model(river_job)
# needs to be accessed through values because `errors` is shadowed by both
# ActiveRecord and the patch above
errors = river_job.attributes["errors"]

River::JobRow.new(
id: river_job.id,
args: river_job.args ? JSON.parse(river_job.args) : nil,
args: JSON.parse(river_job.args),
attempt: river_job.attempt,
attempted_at: river_job.attempted_at,
attempted_at: river_job.attempted_at&.getutc,
attempted_by: river_job.attempted_by,
created_at: river_job.created_at,
created_at: river_job.created_at.getutc,
errors: errors&.map { |e|
deserialized_error = JSON.parse(e, symbolize_names: true)

Expand All @@ -97,15 +130,59 @@ def transaction(&)
trace: deserialized_error[:trace]
)
},
finalized_at: river_job.finalized_at,
finalized_at: river_job.finalized_at&.getutc,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
scheduled_at: river_job.scheduled_at.getutc,
state: river_job.state,
tags: river_job.tags
tags: river_job.tags,
unique_key: river_job.unique_key
)
end

# This is really awful, but some of ActiveRecord's methods (e.g. `.create`)
# return a model, and others (e.g. `.upsert`) return raw values, and
# therefore this second version from unmarshaling a job row exists. I
# searched long and hard for a way to have the former type of method return
# raw or the latter type of method return a model, but was unable to find
# anything.
private def to_job_row_from_raw(res)
river_job = {}

res.rows[0].each_with_index do |val, i|
river_job[res.columns[i]] = res.column_types[i].deserialize(val)
end

River::JobRow.new(
id: river_job["id"],
args: JSON.parse(river_job["args"]),
attempt: river_job["attempt"],
attempted_at: river_job["attempted_at"]&.getutc,
attempted_by: river_job["attempted_by"],
created_at: river_job["created_at"].getutc,
errors: river_job["errors"]&.map { |e|
deserialized_error = JSON.parse(e)

River::AttemptError.new(
at: Time.parse(deserialized_error["at"]),
attempt: deserialized_error["attempt"],
error: deserialized_error["error"],
trace: deserialized_error["trace"]
)
},
finalized_at: river_job["finalized_at"]&.getutc,
kind: river_job["kind"],
max_attempts: river_job["max_attempts"],
metadata: river_job["metadata"],
priority: river_job["priority"],
queue: river_job["queue"],
scheduled_at: river_job["scheduled_at"].getutc,
state: river_job["state"],
tags: river_job["tags"],
unique_key: river_job["unique_key"]
)
end
end
Expand Down
Loading

0 comments on commit 621831b

Please sign in to comment.