Skip to content

Commit

Permalink
use new unique jobs implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry committed Dec 17, 2024
1 parent 54b4549 commit 07a2f6c
Show file tree
Hide file tree
Showing 24 changed files with 326 additions and 1,200 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.DS_Store
*.gem
coverage/
1 change: 1 addition & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
6 changes: 3 additions & 3 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ insert_res.job # inserted job row

Job args should:

* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.
- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize.
- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go.

They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.

Expand Down Expand Up @@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```ruby
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
client = River::Client.new(mock_driver)
```

Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-activerecord/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
51 changes: 18 additions & 33 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,36 +31,19 @@ def errors = {}
end
end

def advisory_lock(key)
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
nil
end

def advisory_lock_try(key)
::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]
end

def job_get_by_id(id)
data_set = RiverJob.where(id: id)
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_get_by_kind_and_unique_properties(get_params)
data_set = RiverJob.where(kind: get_params.kind)
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.first ? to_job_row_from_model(data_set.first) : nil
end

def job_insert(insert_params)
to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params)))
res = job_insert_many([insert_params]).first
[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

def job_insert_unique(insert_params, unique_key)
res = RiverJob.upsert(
insert_params_to_hash(insert_params).merge(unique_key: unique_key),
def job_insert_many(insert_params_many)
RiverJob.upsert_all(
insert_params_many.map { |param| insert_params_to_hash(param) },
on_duplicate: Arel.sql("kind = EXCLUDED.kind"),
returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"),

Expand All @@ -69,15 +52,9 @@ def job_insert_unique(insert_params, unique_key)
# ActiveRecord tries to look up a unique index instead of letting
# Postgres handle that, and of course it doesn't support a `WHERE`
# clause. The workaround is to target the index name instead of columns.
unique_by: "river_job_kind_unique_key_idx"
unique_by: "river_job_unique_idx"
)

[to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

def job_insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
.map { |row| to_insert_result(row) }
end

def job_list
Expand All @@ -104,10 +81,16 @@ def transaction(&)
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags
tags: insert_params.tags,
unique_key: insert_params.unique_key,
unique_states: insert_params.unique_states
}.compact
end

private def to_insert_result(result)
[to_job_row_from_model(result), result.send(:hash_rows)[0]["unique_skipped_as_duplicate"]]
end

private def to_job_row_from_model(river_job)
# needs to be accessed through values because `errors` is shadowed by both
# ActiveRecord and the patch above
Expand Down Expand Up @@ -139,7 +122,8 @@ def transaction(&)
scheduled_at: river_job.scheduled_at.getutc,
state: river_job.state,
tags: river_job.tags,
unique_key: river_job.unique_key
unique_key: river_job.unique_key,
unique_states: river_job.unique_states
)
end

Expand Down Expand Up @@ -182,7 +166,8 @@ def transaction(&)
scheduled_at: river_job["scheduled_at"].getutc,
state: river_job["state"],
tags: river_job["tags"],
unique_key: river_job["unique_key"]
unique_key: river_job["unique_key"],
unique_states: ::River::UniqueBitmask.to_states(river_job["unique_states"]&.to_i(2))
)
end
end
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-sequel/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ GEM

PLATFORMS
arm64-darwin-22
arm64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
56 changes: 20 additions & 36 deletions driver/riverqueue-sequel/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,51 +13,27 @@ def initialize(db)
@db.extension(:pg_json)
end

def advisory_lock(key)
@db.fetch("SELECT pg_advisory_xact_lock(?)", key).first
nil
end

def advisory_lock_try(key)
@db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock]
end

def job_get_by_id(id)
data_set = @db[:river_job].where(id: id)
data_set.first ? to_job_row(data_set.first) : nil
end

def job_get_by_kind_and_unique_properties(get_params)
data_set = @db[:river_job].where(kind: get_params.kind)
data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.first ? to_job_row(data_set.first) : nil
end

def job_insert(insert_params)
to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params)))
job_insert_many([insert_params]).first
end

def job_insert_unique(insert_params, unique_key)
values = @db[:river_job]
def job_insert_many(insert_params_array)
@db[:river_job]
.insert_conflict(
target: [:kind, :unique_key],
conflict_where: ::Sequel.lit("unique_key IS NOT NULL"),
target: [:unique_key],
conflict_where: ::Sequel.lit(
"unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)"
),
update: {kind: ::Sequel[:excluded][:kind]}
)
.returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate"))
.insert_select(
insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key))
)

[to_job_row(values), values[:unique_skipped_as_duplicate]]
end

def job_insert_many(insert_params_many)
@db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
.multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) })
.map { |row| to_insert_result(row) }
end

def job_list
Expand All @@ -76,6 +52,7 @@ def transaction(&)
private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
# TODO: but I had to remove it for bulk unique inserts...
{
args: insert_params.encoded_args,
kind: insert_params.kind,
Expand All @@ -84,8 +61,14 @@ def transaction(&)
queue: insert_params.queue,
state: insert_params.state,
scheduled_at: insert_params.scheduled_at,
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil
}.compact
tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags, :text) : nil,
unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil,
unique_states: insert_params.unique_states
}
end

private def to_insert_result(result)
[to_job_row(result), result[:unique_skipped_as_duplicate]]
end

private def to_job_row(river_job)
Expand Down Expand Up @@ -113,7 +96,8 @@ def transaction(&)
scheduled_at: river_job[:scheduled_at].getutc,
state: river_job[:state],
tags: river_job[:tags].to_a,
unique_key: river_job[:unique_key]&.to_s
unique_key: river_job[:unique_key]&.to_s,
unique_states: ::River::UniqueBitmask.to_states(river_job[:unique_states]&.to_i(2))
)
end
end
Expand Down
Loading

0 comments on commit 07a2f6c

Please sign in to comment.