Skip to content

Commit

Permalink
Implement unique job insertions (#10)
Browse files Browse the repository at this point in the history
Implement unique job insertion in a way that's compatible with the main
Go library. Unique opts live on insert opts just like in Go, and we
copy as much of its API as possible including return result.

    insert_res = client.insert(args, insert_opts: River::InsertOpts.new(
      unique_opts: River::UniqueOpts.new(
        by_args: true,
        by_period: 15 * 60,
        by_queue: true,
        by_state: [River::JOB_STATE_AVAILABLE]
      )
    )

    # contains either a newly inserted job, or an existing one if insertion was skipped
    insert_res.job

    # true if insertion was skipped
    insert_res.unique_skipped_as_duplicated
  • Loading branch information
brandur authored Apr 28, 2024
1 parent 6b55d91 commit 683f098
Show file tree
Hide file tree
Showing 19 changed files with 874 additions and 69 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Implement unique job insertion. [PR #10](https://github.com/riverqueue/riverqueue-ruby/pull/10).

## [0.2.0] - 2024-04-27

### Added
Expand Down
2 changes: 2 additions & 0 deletions Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: .
specs:
riverqueue (0.2.0)
fnv-hash

GEM
remote: https://rubygems.org/
Expand Down Expand Up @@ -31,6 +32,7 @@ GEM
drb (2.2.1)
ffi (1.16.3)
fileutils (1.7.2)
fnv-hash (0.2.0)
i18n (1.14.4)
concurrent-ruby (~> 1.0)
io-console (0.7.2)
Expand Down
2 changes: 2 additions & 0 deletions Steepfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ target :lib do
check "lib"

library "json"
library "time"

signature "sig"
signature "sig_gem"

configure_code_diagnostics(D::Ruby.strict)
end
25 changes: 23 additions & 2 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,27 @@ insert_res = client.insert(
)
```

## Inserting unique jobs

[Unique jobs](https://riverqueue.com/docs/unique-jobs) are supported through `InsertOpts#unique_opts`, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.

```ruby
insert_res = client.insert(args, insert_opts: River::InsertOpts.new(
unique_opts: River::UniqueOpts.new(
by_args: true,
by_period: 15 * 60,
by_queue: true,
by_state: [River::JOB_STATE_AVAILABLE]
)
)

# contains either a newly inserted job, or an existing one if insertion was skipped
insert_res.job

# true if insertion was skipped
insert_res.unique_skipped_as_duplicated
```

## Inserting jobs in bulk

Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:
Expand All @@ -75,8 +96,8 @@ Or with `InsertManyParams`, which may include insertion options:

```ruby
num_inserted = client.insert_many([
River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)),
River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority"))
River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5)),
River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: River::InsertOpts.new(queue: "high_priority"))
])
```

Expand Down
2 changes: 2 additions & 0 deletions drivers/riverqueue-activerecord/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: ../..
specs:
riverqueue (0.2.0)
fnv-hash

PATH
remote: .
Expand Down Expand Up @@ -41,6 +42,7 @@ GEM
diff-lcs (1.5.1)
docile (1.4.0)
drb (2.2.1)
fnv-hash (0.2.0)
i18n (1.14.4)
concurrent-ruby (~> 1.0)
io-console (0.7.2)
Expand Down
21 changes: 19 additions & 2 deletions drivers/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,32 @@ def errors = {}
end
end

def insert(insert_params)
def advisory_lock(key)
::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})")
end

def job_get_by_kind_and_unique_properties(get_params)
data_set = RiverJob.where(kind: get_params.kind)
data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.take
end

def job_insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
end

def insert_many(insert_params_many)
def job_insert_many(insert_params_many)
RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

def transaction(&)
::ActiveRecord::Base.transaction(requires_new: true, &)
end

private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
Expand Down
171 changes: 169 additions & 2 deletions drivers/riverqueue-activerecord/spec/driver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,155 @@ class SimpleArgsWithInsertOpts < SimpleArgs
let!(:driver) { River::Driver::ActiveRecord.new }
let(:client) { River::Client.new(driver) }

describe "#insert" do
describe "unique insertion" do
it "inserts a unique job once" do
args = SimpleArgsWithInsertOpts.new(job_num: 1)
args.insert_opts = River::InsertOpts.new(
unique_opts: River::UniqueOpts.new(
by_queue: true
)
)

insert_res = client.insert(args)
expect(insert_res.job).to_not be_nil
expect(insert_res.unique_skipped_as_duplicated).to be false
original_job = insert_res.job

insert_res = client.insert(args)
expect(insert_res.job.id).to eq(original_job.id)
expect(insert_res.unique_skipped_as_duplicated).to be true
end

it "inserts a unique job with an advisory lock prefix" do
client = River::Client.new(driver, advisory_lock_prefix: 123456)

args = SimpleArgsWithInsertOpts.new(job_num: 1)
args.insert_opts = River::InsertOpts.new(
unique_opts: River::UniqueOpts.new(
by_queue: true
)
)

insert_res = client.insert(args)
expect(insert_res.job).to_not be_nil
expect(insert_res.unique_skipped_as_duplicated).to be false
original_job = insert_res.job

insert_res = client.insert(args)
expect(insert_res.job.id).to eq(original_job.id)
expect(insert_res.unique_skipped_as_duplicated).to be true
end
end

describe "#advisory_lock" do
it "takes an advisory lock" do
driver.advisory_lock(123)
end
end

describe "#job_get_by_kind_and_unique_properties" do
let(:job_args) { SimpleArgs.new(job_num: 1) }

it "gets a job by kind" do
insert_res = client.insert(job_args)

job = driver.send(
:to_job_row,
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind
))
)
expect(job.id).to eq(insert_res.job.id)

expect(
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: "does_not_exist"
))
).to be_nil
end

it "gets a job by created at period" do
insert_res = client.insert(job_args)

job = driver.send(
:to_job_row,
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1]
))
)
expect(job.id).to eq(insert_res.job.id)

expect(
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3]
))
).to be_nil
end

it "gets a job by encoded args" do
insert_res = client.insert(job_args)

job = driver.send(
:to_job_row,
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
encoded_args: JSON.dump(insert_res.job.args)
))
)
expect(job.id).to eq(insert_res.job.id)

expect(
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
encoded_args: JSON.dump({"job_num" => 2})
))
).to be_nil
end

it "gets a job by queue" do
insert_res = client.insert(job_args)

job = driver.send(
:to_job_row,
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
queue: insert_res.job.queue
))
)
expect(job.id).to eq(insert_res.job.id)

expect(
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
queue: "other_queue"
))
).to be_nil
end

it "gets a job by state" do
insert_res = client.insert(job_args)

job = driver.send(
:to_job_row,
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED]
))
)
expect(job.id).to eq(insert_res.job.id)

expect(
driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new(
kind: job_args.kind,
state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]
))
).to be_nil
end
end

describe "#job_insert" do
it "inserts a job" do
insert_res = client.insert(SimpleArgs.new(job_num: 1))
expect(insert_res.job).to have_attributes(
Expand Down Expand Up @@ -133,7 +281,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs
end
end

describe "#insert_many" do
describe "#job_insert_many" do
it "inserts multiple jobs" do
num_inserted = client.insert_many([
SimpleArgs.new(job_num: 1),
Expand Down Expand Up @@ -197,6 +345,25 @@ class SimpleArgsWithInsertOpts < SimpleArgs
end
end

describe "#transaction" do
it "runs block in a transaction" do
insert_res = nil

driver.transaction do
insert_res = client.insert(SimpleArgs.new(job_num: 1))

river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id)
expect(river_job).to_not be_nil

raise ActiveRecord::Rollback
end

# Not present because the job was rolled back.
river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id)
expect(river_job).to be_nil
end
end

describe "#to_job_row" do
it "converts a database record to `River::JobRow`" do
now = Time.now.utc
Expand Down
2 changes: 2 additions & 0 deletions drivers/riverqueue-sequel/Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ PATH
remote: ../..
specs:
riverqueue (0.2.0)
fnv-hash

PATH
remote: .
Expand All @@ -17,6 +18,7 @@ GEM
bigdecimal (3.1.7)
diff-lcs (1.5.1)
docile (1.4.0)
fnv-hash (0.2.0)
json (2.7.2)
language_server-protocol (3.17.0.3)
lint_roller (1.1.0)
Expand Down
21 changes: 19 additions & 2 deletions drivers/riverqueue-sequel/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,32 @@ def initialize(db)
end
end

def insert(insert_params)
def advisory_lock(key)
@db.fetch("SELECT pg_advisory_xact_lock(?)", key).first
end

def job_get_by_kind_and_unique_properties(get_params)
data_set = RiverJob.where(kind: get_params.kind)
data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at
data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args
data_set = data_set.where(queue: get_params.queue) if get_params.queue
data_set = data_set.where(state: get_params.state) if get_params.state
data_set.first
end

def job_insert(insert_params)
to_job_row(RiverJob.create(insert_params_to_hash(insert_params)))
end

def insert_many(insert_params_many)
def job_insert_many(insert_params_many)
RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) })
insert_params_many.count
end

def transaction(&)
@db.transaction(savepoint: true, &)
end

private def insert_params_to_hash(insert_params)
# the call to `#compact` is important so that we remove nils and table
# default values get picked up instead
Expand Down
Loading

0 comments on commit 683f098

Please sign in to comment.