From 621831b535a85533b509689a201221f4f7cfa951 Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Sat, 31 Aug 2024 00:33:37 -0400 Subject: [PATCH] Support fast unique insertion path (#28) Updates the Ruby client to be compatible with the fast unique insertion added to the main River in [1] which uses a unique index instead of advisory lock + fetch as long as uniqueness is constrained to the default set of unique job states. We also reorganize the driver tests such that the majority of the tests are put in a single set of shared examples, largely so that ActiveRecord and Sequel aren't so duplicative of each other, and so we can easily add new tests for all drivers in one place. Lastly, I killed the mock driver in use at the top level. Adding anything new required all kinds of engineering around it, and I found lots of test bugs that were the result of imperfect mocking that wasn't fully checking the client end to end. [1] https://github.com/riverqueue/river/pull/451 --- CHANGELOG.md | 4 + Gemfile | 1 + Gemfile.lock | 11 + Steepfile | 1 + driver/riverqueue-activerecord/lib/driver.rb | 101 +++- .../spec/driver_spec.rb | 468 +++++------------- driver/riverqueue-sequel/lib/driver.rb | 100 ++-- driver/riverqueue-sequel/spec/driver_spec.rb | 410 ++------------- lib/client.rb | 28 +- lib/job.rb | 9 +- sig/driver.rbs | 7 + sig/job.rbs | 3 +- spec/client_spec.rb | 241 +++++---- spec/driver_shared_examples.rb | 454 +++++++++++++++++ spec/spec_helper.rb | 4 + 15 files changed, 945 insertions(+), 897 deletions(-) create mode 100644 spec/driver_shared_examples.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d0dd8d..d38eb70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #28](https://github.com/riverqueue/riverqueue-ruby/pull/28). + ## [0.6.1] - 2024-08-21 ### Fixed diff --git a/Gemfile b/Gemfile index 213a8d2..d174c52 100644 --- a/Gemfile +++ b/Gemfile @@ -11,5 +11,6 @@ group :test do gem "debug" gem "rspec-core" gem "rspec-expectations" + gem "riverqueue-sequel", path: "driver/riverqueue-sequel" gem "simplecov", require: false end diff --git a/Gemfile.lock b/Gemfile.lock index b716247..aa2867b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,6 +3,13 @@ PATH specs: riverqueue (0.6.1) +PATH + remote: driver/riverqueue-sequel + specs: + riverqueue-sequel (0.6.1) + pg (> 0, < 1000) + sequel (> 0, < 1000) + GEM remote: https://rubygems.org/ specs: @@ -50,6 +57,7 @@ GEM parser (3.3.0.5) ast (~> 2.4.1) racc + pg (1.5.6) psych (5.1.2) stringio racc (1.7.3) @@ -89,6 +97,8 @@ GEM rubocop-ast (>= 1.30.0, < 2.0) ruby-progressbar (1.13.0) securerandom (0.3.1) + sequel (5.79.0) + bigdecimal simplecov (0.22.0) docile (~> 1.1) simplecov-html (~> 0.11) @@ -137,6 +147,7 @@ PLATFORMS DEPENDENCIES debug riverqueue! + riverqueue-sequel! rspec-core rspec-expectations simplecov diff --git a/Steepfile b/Steepfile index 656fe44..ac910d1 100644 --- a/Steepfile +++ b/Steepfile @@ -3,6 +3,7 @@ D = Steep::Diagnostic target :lib do check "lib" + library "digest" library "json" library "time" diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 3fce2ee..96d5925 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -33,6 +33,16 @@ def errors = {} def advisory_lock(key) ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") + nil + end + + def advisory_lock_try(key) + ::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"] + end + + def job_get_by_id(id) + data_set = RiverJob.where(id: id) + data_set.first ? to_job_row_from_model(data_set.first) : nil end def job_get_by_kind_and_unique_properties(get_params) @@ -41,11 +51,28 @@ def job_get_by_kind_and_unique_properties(get_params) data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args data_set = data_set.where(queue: get_params.queue) if get_params.queue data_set = data_set.where(state: get_params.state) if get_params.state - data_set.take + data_set.first ? to_job_row_from_model(data_set.first) : nil end def job_insert(insert_params) - to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params))) + end + + def job_insert_unique(insert_params, unique_key) + res = RiverJob.upsert( + insert_params_to_hash(insert_params).merge(unique_key: unique_key), + on_duplicate: Arel.sql("kind = EXCLUDED.kind"), + returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"), + + # It'd be nice to specify this as `(kind, unique_key) WHERE unique_key + # IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking + # ActiveRecord tries to look up a unique index instead of letting + # Postgres handle that, and of course it doesn't support a `WHERE` + # clause. The workaround is to target the index name instead of columns. + unique_by: "river_job_kind_unique_key_idx" + ) + + [to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] end def job_insert_many(insert_params_many) @@ -53,6 +80,15 @@ def job_insert_many(insert_params_many) insert_params_many.count end + def job_list + data_set = RiverJob.order(:id) + data_set.all.map { |job| to_job_row_from_model(job) } + end + + def rollback_exception + ::ActiveRecord::Rollback + end + def transaction(&) ::ActiveRecord::Base.transaction(requires_new: true, &) end @@ -72,21 +108,18 @@ def transaction(&) }.compact end - # Type type injected to this method is not a `RiverJob`, but rather a raw - # hash with stringified keys because we're inserting with the Arel framework - # directly rather than generating a record from a model. - private def to_job_row(river_job) + private def to_job_row_from_model(river_job) # needs to be accessed through values because `errors` is shadowed by both # ActiveRecord and the patch above errors = river_job.attributes["errors"] River::JobRow.new( id: river_job.id, - args: river_job.args ? JSON.parse(river_job.args) : nil, + args: JSON.parse(river_job.args), attempt: river_job.attempt, - attempted_at: river_job.attempted_at, + attempted_at: river_job.attempted_at&.getutc, attempted_by: river_job.attempted_by, - created_at: river_job.created_at, + created_at: river_job.created_at.getutc, errors: errors&.map { |e| deserialized_error = JSON.parse(e, symbolize_names: true) @@ -97,15 +130,59 @@ def transaction(&) trace: deserialized_error[:trace] ) }, - finalized_at: river_job.finalized_at, + finalized_at: river_job.finalized_at&.getutc, kind: river_job.kind, max_attempts: river_job.max_attempts, metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, - scheduled_at: river_job.scheduled_at, + scheduled_at: river_job.scheduled_at.getutc, state: river_job.state, - tags: river_job.tags + tags: river_job.tags, + unique_key: river_job.unique_key + ) + end + + # This is really awful, but some of ActiveRecord's methods (e.g. `.create`) + # return a model, and others (e.g. `.upsert`) return raw values, and + # therefore this second version from unmarshaling a job row exists. I + # searched long and hard for a way to have the former type of method return + # raw or the latter type of method return a model, but was unable to find + # anything. + private def to_job_row_from_raw(res) + river_job = {} + + res.rows[0].each_with_index do |val, i| + river_job[res.columns[i]] = res.column_types[i].deserialize(val) + end + + River::JobRow.new( + id: river_job["id"], + args: JSON.parse(river_job["args"]), + attempt: river_job["attempt"], + attempted_at: river_job["attempted_at"]&.getutc, + attempted_by: river_job["attempted_by"], + created_at: river_job["created_at"].getutc, + errors: river_job["errors"]&.map { |e| + deserialized_error = JSON.parse(e) + + River::AttemptError.new( + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] + ) + }, + finalized_at: river_job["finalized_at"]&.getutc, + kind: river_job["kind"], + max_attempts: river_job["max_attempts"], + metadata: river_job["metadata"], + priority: river_job["priority"], + queue: river_job["queue"], + scheduled_at: river_job["scheduled_at"].getutc, + state: river_job["state"], + tags: river_job["tags"], + unique_key: river_job["unique_key"] ) end end diff --git a/driver/riverqueue-activerecord/spec/driver_spec.rb b/driver/riverqueue-activerecord/spec/driver_spec.rb index 9245294..11d4a38 100644 --- a/driver/riverqueue-activerecord/spec/driver_spec.rb +++ b/driver/riverqueue-activerecord/spec/driver_spec.rb @@ -1,23 +1,5 @@ require "spec_helper" - -class SimpleArgs - attr_accessor :job_num - - def initialize(job_num:) - self.job_num = job_num - end - - def kind = "simple" - - def to_json = JSON.dump({job_num: job_num}) -end - -# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. -# Real args that make use of this functionality will probably want to make -# `#insert_opts` a non-accessor method instead. -class SimpleArgsWithInsertOpts < SimpleArgs - attr_accessor :insert_opts -end +require_relative "../../../spec/driver_shared_examples" RSpec.describe River::Driver::ActiveRecord do around(:each) { |ex| test_transaction(&ex) } @@ -25,360 +7,152 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::ActiveRecord.new } let(:client) { River::Client.new(driver) } - describe "unique insertion" do - it "inserts a unique job once" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a unique job with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true + before do + if ENV["RIVER_DEBUG"] == "1" || ENV["RIVER_DEBUG"] == "true" + ActiveRecord::Base.logger = Logger.new($stdout) end end - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - conn = ::ActiveRecord::Base.connection_pool.checkout - begin - expect(conn.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]).to be false - ensure - ::ActiveRecord::Base.connection_pool.checkin(conn) - end - end.join - end - end - end - - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) + it_behaves_like "driver shared examples" - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) + describe "#to_job_row_from_model" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = River::Driver::ActiveRecord::RiverJob.create( + id: 1, + args: %({"job_num":1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + state: River::JOB_STATE_AVAILABLE ) - expect(job.id).to eq(insert_res.job.id) - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end + job_row = driver.send(:to_job_row_from_model, river_job) - describe "#job_insert" do - it "inserts a job" do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - expect(insert_res.job).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, args: {"job_num" => 1}, attempt: 0, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, tags: [] ) - - # Make sure it made it to the database. Assert only minimally since we're - # certain it's the same as what we checked above. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to have_attributes( - kind: "simple" - ) end - it "schedules a job" do - target_time = Time.now.utc + 1 * 3600 - - insert_res = client.insert( - SimpleArgs.new(job_num: 1), - insert_opts: River::InsertOpts.new(scheduled_at: target_time) - ) - expect(insert_res.job).to have_attributes( - scheduled_at: be_within(2).of(target_time), - state: River::JOB_STATE_SCHEDULED + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = River::Driver::ActiveRecord::RiverJob.create( + id: 1, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) - end - it "inserts with job insert opts" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) + job_row = driver.send(:to_job_row_from_model, river_job) - insert_res = client.insert(args) - expect(insert_res.job).to have_attributes( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now.getutc, + attempted_by: ["client1"], + created_at: now.getutc, + finalized_at: now.getutc, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now.getutc, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end - it "inserts with insert opts" do - # We set job insert opts in this spec too so that we can verify that the - # options passed at insertion time take precedence. - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] + it "with errors" do + now = Time.now.utc + river_job = River::Driver::ActiveRecord::RiverJob.create( + args: %({"job_num":1}), + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )], + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE ) - insert_res = client.insert(args, insert_opts: River::InsertOpts.new( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - )) - expect(insert_res.job).to have_attributes( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - ) - end + job_row = driver.send(:to_job_row_from_model, river_job) - it "inserts with job args hash" do - insert_res = client.insert(River::JobArgsHash.new("hash_kind", { - job_num: 1 - })) - expect(insert_res.job).to have_attributes( - args: {"job_num" => 1}, - kind: "hash_kind" + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" ) end - - it "inserts in a transaction" do - insert_res = nil - - ActiveRecord::Base.transaction(requires_new: true) do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise ActiveRecord::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to be_nil - end end - describe "#job_insert_many" do - it "inserts multiple jobs" do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) - expect(job1).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + describe "#to_job_row_from_raw" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = River::Driver::ActiveRecord::RiverJob.insert({ + id: 1, + args: %({"job_num":1}), kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) + max_attempts: River::MAX_ATTEMPTS_DEFAULT + }, returning: Arel.sql("*")) + + job_row = driver.send(:to_job_row_from_raw, river_job) - job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) - expect(job2).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, attempt: 0, - args: {"job_num" => 2}, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, tags: [] ) end - it "inserts multiple jobs in a transaction" do - job1 = nil - job2 = nil - - ActiveRecord::Base.transaction(requires_new: true) do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) - job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) - - raise ActiveRecord::Rollback - end - - # Not present because the jobs were rolled back. - expect do - River::Driver::ActiveRecord::RiverJob.find(job1.id) - end.to raise_error(ActiveRecord::RecordNotFound) - expect do - River::Driver::ActiveRecord::RiverJob.find(job2.id) - end.to raise_error(ActiveRecord::RecordNotFound) - end - end - - describe "#transaction" do - it "runs block in a transaction" do - insert_res = nil - - driver.transaction do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise ActiveRecord::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#to_job_row" do - it "converts a database record to `River::JobRow`" do - now = Time.now.utc - river_job = River::Driver::ActiveRecord::RiverJob.new( + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = River::Driver::ActiveRecord::RiverJob.insert({ id: 1, attempt: 1, attempted_at: now, @@ -392,33 +166,36 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: now, state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] - ) + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") + }, returning: Arel.sql("*")) - job_row = driver.send(:to_job_row, river_job) + job_row = driver.send(:to_job_row_from_raw, river_job) expect(job_row).to be_an_instance_of(River::JobRow) expect(job_row).to have_attributes( id: 1, args: {"job_num" => 1}, attempt: 1, - attempted_at: now, + attempted_at: be_within(2).of(now.getutc), attempted_by: ["client1"], - created_at: now, - finalized_at: now, + created_at: be_within(2).of(now.getutc), + finalized_at: be_within(2).of(now.getutc), kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, priority: River::PRIORITY_DEFAULT, queue: River::QUEUE_DEFAULT, - scheduled_at: now, + scheduled_at: be_within(2).of(now.getutc), state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end it "with errors" do now = Time.now.utc - river_job = River::Driver::ActiveRecord::RiverJob.new( + river_job = River::Driver::ActiveRecord::RiverJob.insert({ + args: %({"job_num":1}), errors: [JSON.dump( { at: now, @@ -426,10 +203,13 @@ class SimpleArgsWithInsertOpts < SimpleArgs error: "job failure", trace: "error trace" } - )] - ) + )], + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE + }, returning: Arel.sql("*")) - job_row = driver.send(:to_job_row, river_job) + job_row = driver.send(:to_job_row_from_raw, river_job) expect(job_row.errors.count).to be(1) expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index 69d99ca..a21714e 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -9,40 +9,66 @@ module River::Driver class Sequel def initialize(db) @db = db - - # It's Ruby, so we can only define a model after Sequel's established a - # connection because it's all dynamic. - if !River::Driver::Sequel.const_defined?(:RiverJob) - River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) - - # Since we only define our model once, take advantage of knowing this is - # our first initialization to add required extensions. - db.extension(:pg_array) - end + @db.extension(:pg_array) + @db.extension(:pg_json) end def advisory_lock(key) @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first + nil + end + + def advisory_lock_try(key) + @db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock] + end + + def job_get_by_id(id) + data_set = @db[:river_job].where(id: id) + data_set.first ? to_job_row(data_set.first) : nil end def job_get_by_kind_and_unique_properties(get_params) - data_set = RiverJob.where(kind: get_params.kind) + data_set = @db[:river_job].where(kind: get_params.kind) data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args data_set = data_set.where(queue: get_params.queue) if get_params.queue data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first + data_set.first ? to_job_row(data_set.first) : nil end def job_insert(insert_params) - to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params))) + end + + def job_insert_unique(insert_params, unique_key) + values = @db[:river_job] + .insert_conflict( + target: [:kind, :unique_key], + conflict_where: ::Sequel.lit("unique_key IS NOT NULL"), + update: {kind: ::Sequel[:excluded][:kind]} + ) + .returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate")) + .insert_select( + insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key)) + ) + + [to_job_row(values), values[:unique_skipped_as_duplicate]] end def job_insert_many(insert_params_many) - RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) + @db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def job_list + data_set = @db[:river_job].order_by(:id) + data_set.all.map { |job| to_job_row(job) } + end + + def rollback_exception + ::Sequel::Rollback + end + def transaction(&) @db.transaction(savepoint: true, &) end @@ -63,35 +89,31 @@ def transaction(&) end private def to_job_row(river_job) - # needs to be accessed through values because Sequel shadows `errors` - errors = river_job.values[:errors] - River::JobRow.new( - id: river_job.id, - args: river_job.args ? JSON.parse(river_job.args) : nil, - attempt: river_job.attempt, - attempted_at: river_job.attempted_at, - attempted_by: river_job.attempted_by, - created_at: river_job.created_at, - errors: errors&.map { |e| - deserialized_error = JSON.parse(e, symbolize_names: true) - + id: river_job[:id], + args: river_job[:args].to_h, + attempt: river_job[:attempt], + attempted_at: river_job[:attempted_at]&.getutc, + attempted_by: river_job[:attempted_by], + created_at: river_job[:created_at].getutc, + errors: river_job[:errors]&.map { |deserialized_error| River::AttemptError.new( - at: Time.parse(deserialized_error[:at]), - attempt: deserialized_error[:attempt], - error: deserialized_error[:error], - trace: deserialized_error[:trace] + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] ) }, - finalized_at: river_job.finalized_at, - kind: river_job.kind, - max_attempts: river_job.max_attempts, - metadata: river_job.metadata, - priority: river_job.priority, - queue: river_job.queue, - scheduled_at: river_job.scheduled_at, - state: river_job.state, - tags: river_job.tags + finalized_at: river_job[:finalized_at]&.getutc, + kind: river_job[:kind], + max_attempts: river_job[:max_attempts], + metadata: river_job[:metadata], + priority: river_job[:priority], + queue: river_job[:queue], + scheduled_at: river_job[:scheduled_at].getutc, + state: river_job[:state], + tags: river_job[:tags].to_a, + unique_key: river_job[:unique_key]&.to_s ) end end diff --git a/driver/riverqueue-sequel/spec/driver_spec.rb b/driver/riverqueue-sequel/spec/driver_spec.rb index 6264c21..b18f1d8 100644 --- a/driver/riverqueue-sequel/spec/driver_spec.rb +++ b/driver/riverqueue-sequel/spec/driver_spec.rb @@ -1,23 +1,5 @@ require "spec_helper" - -class SimpleArgs - attr_accessor :job_num - - def initialize(job_num:) - self.job_num = job_num - end - - def kind = "simple" - - def to_json = JSON.dump({job_num: job_num}) -end - -# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. -# Real args that make use of this functionality will probably want to make -# `#insert_opts` a non-accessor method instead. -class SimpleArgsWithInsertOpts < SimpleArgs - attr_accessor :insert_opts -end +require_relative "../../../spec/driver_shared_examples" RSpec.describe River::Driver::Sequel do around(:each) { |ex| test_transaction(&ex) } @@ -25,356 +7,45 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::Sequel.new(DB) } let(:client) { River::Client.new(driver) } - describe "unique insertion" do - it "inserts a unique job once" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a unique job with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - end - - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - expect(DB.fetch("SELECT pg_try_advisory_xact_lock(?)", 123).first[:pg_try_advisory_xact_lock]).to be false - end.join - end - end - end - - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end + it_behaves_like "driver shared examples" - describe "#job_insert" do - it "inserts a job" do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - expect(insert_res.job).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + describe "#to_job_row" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = DB[:river_job].returning.insert_select({ + id: 1, + args: %({"job_num":1}), kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) - ) - - # Make sure it made it to the database. Assert only minimally since we're - # certain it's the same as what we checked above. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to have_attributes( - kind: "simple" - ) - end - - it "schedules a job" do - target_time = Time.now.utc + 1 * 3600 + max_attempts: River::MAX_ATTEMPTS_DEFAULT + }) - insert_res = client.insert( - SimpleArgs.new(job_num: 1), - insert_opts: River::InsertOpts.new(scheduled_at: target_time) - ) - expect(insert_res.job).to have_attributes( - scheduled_at: be_within(2).of(target_time), - state: River::JOB_STATE_SCHEDULED - ) - end - - it "inserts with job insert opts" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - - insert_res = client.insert(args) - expect(insert_res.job).to have_attributes( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - end - - it "inserts with insert opts" do - # We set job insert opts in this spec too so that we can verify that the - # options passed at insertion time take precedence. - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - - insert_res = client.insert(args, insert_opts: River::InsertOpts.new( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - )) - expect(insert_res.job).to have_attributes( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - ) - end + job_row = driver.send(:to_job_row, river_job) - it "inserts with job args hash" do - insert_res = client.insert(River::JobArgsHash.new("hash_kind", { - job_num: 1 - })) - expect(insert_res.job).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, args: {"job_num" => 1}, - kind: "hash_kind" - ) - end - - it "inserts in a transaction" do - insert_res = nil - - DB.transaction(savepoint: true) do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise Sequel::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#job_insert_many" do - it "inserts multiple jobs" do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) - expect(job1).to have_attributes( attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) - ) - - job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) - expect(job2).to have_attributes( - attempt: 0, - args: {"job_num" => 2}, - created_at: be_within(2).of(Time.now.utc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) + tags: [] ) end - it "inserts multiple jobs in a transaction" do - job1 = nil - job2 = nil - - DB.transaction(savepoint: true) do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) - job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) - - raise Sequel::Rollback - end - - # Not present because the jobs were rolled back. - river_job1 = River::Driver::Sequel::RiverJob.first(id: job1.id) - expect(river_job1).to be_nil - river_job2 = River::Driver::Sequel::RiverJob.first(id: job2.id) - expect(river_job2).to be_nil - end - end - - describe "#transaction" do - it "runs block in a transaction" do - insert_res = nil - - driver.transaction do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise Sequel::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#to_job_row" do - it "converts a database record to `River::JobRow`" do - now = Time.now.utc - river_job = River::Driver::Sequel::RiverJob.new( + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = DB[:river_job].returning.insert_select({ + id: 1, attempt: 1, attempted_at: now, - attempted_by: ["client1"], + attempted_by: ::Sequel.pg_array(["client1"]), created_at: now, args: %({"job_num":1}), finalized_at: now, @@ -384,9 +55,9 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: now, state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] - ) - river_job.id = 1 + tags: ::Sequel.pg_array(["tag1"]), + unique_key: ::Sequel.blob(Digest::SHA256.digest("unique_key_str")) + }) job_row = driver.send(:to_job_row, river_job) @@ -395,32 +66,37 @@ class SimpleArgsWithInsertOpts < SimpleArgs id: 1, args: {"job_num" => 1}, attempt: 1, - attempted_at: now, + attempted_at: be_within(2).of(now.getutc), attempted_by: ["client1"], - created_at: now, - finalized_at: now, + created_at: be_within(2).of(now.getutc), + finalized_at: be_within(2).of(now.getutc), kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, priority: River::PRIORITY_DEFAULT, queue: River::QUEUE_DEFAULT, - scheduled_at: now, + scheduled_at: be_within(2).of(now.getutc), state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end it "with errors" do now = Time.now.utc - river_job = River::Driver::Sequel::RiverJob.new( - errors: [JSON.dump( - { + river_job = DB[:river_job].returning.insert_select({ + args: %({"job_num":1}), + errors: ::Sequel.pg_array([ + ::Sequel.pg_json_wrap({ at: now, attempt: 1, error: "job failure", trace: "error trace" - } - )] - ) + }) + ]), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE + }) job_row = driver.send(:to_job_row, river_job) diff --git a/lib/client.rb b/lib/client.rb index ac2d222..ea280fd 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,3 +1,4 @@ +require "digest" require "fnv" require "time" @@ -152,8 +153,8 @@ def insert_many(args) DEFAULT_UNIQUE_STATES = [ JOB_STATE_AVAILABLE, JOB_STATE_COMPLETED, - JOB_STATE_RUNNING, JOB_STATE_RETRYABLE, + JOB_STATE_RUNNING, JOB_STATE_SCHEDULED ].freeze private_constant :DEFAULT_UNIQUE_STATES @@ -166,17 +167,15 @@ def insert_many(args) any_unique_opts = false get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) + unique_key = "" # It's extremely important here that this lock string format and algorithm # match the one in the main River library _exactly_. Don't change them # unless they're updated everywhere. - lock_str = "unique_key" - lock_str += "kind=#{insert_params.kind}" - if unique_opts.by_args any_unique_opts = true get_params.encoded_args = insert_params.encoded_args - lock_str += "&args=#{insert_params.encoded_args}" + unique_key += "&args=#{insert_params.encoded_args}" end if unique_opts.by_period @@ -184,27 +183,38 @@ def insert_many(args) any_unique_opts = true get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] - lock_str += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}" end if unique_opts.by_queue any_unique_opts = true get_params.queue = insert_params.queue - lock_str += "&queue=#{insert_params.queue}" + unique_key += "&queue=#{insert_params.queue}" end if unique_opts.by_state any_unique_opts = true get_params.state = unique_opts.by_state - lock_str += "&state=#{unique_opts.by_state.join(",")}" + unique_key += "&state=#{unique_opts.by_state.join(",")}" else get_params.state = DEFAULT_UNIQUE_STATES - lock_str += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" + unique_key += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" end return block.call unless any_unique_opts + # fast path + if !unique_opts.by_state || unique_opts.by_state.sort == DEFAULT_UNIQUE_STATES + unique_key_hash = Digest::SHA256.digest(unique_key) + job, unique_skipped_as_duplicate = @driver.job_insert_unique(insert_params, unique_key_hash) + return InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) + end + @driver.transaction do + lock_str = "unique_key" + lock_str += "kind=#{insert_params.kind}" + lock_str += unique_key + lock_key = if @advisory_lock_prefix.nil? FNV.fnv1_hash(lock_str, size: 64) else diff --git a/lib/job.rb b/lib/job.rb index bd3cf0f..494ded9 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -108,6 +108,11 @@ class JobRow # to help group and categorize jobs. attr_accessor :tags + # A unique key for the job within its kind that's used for unique job + # insertions. It's generated by hashing an inserted job's unique opts + # configuration. + attr_accessor :unique_key + def initialize( id:, args:, @@ -126,7 +131,8 @@ def initialize( attempted_by: nil, errors: nil, finalized_at: nil, - tags: nil + tags: nil, + unique_key: nil ) self.id = id self.args = args @@ -144,6 +150,7 @@ def initialize( self.scheduled_at = scheduled_at self.state = state self.tags = tags + self.unique_key = unique_key end end diff --git a/sig/driver.rbs b/sig/driver.rbs index ddfac56..be4d4dc 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -4,7 +4,14 @@ module River def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? def job_insert: (Driver::JobInsertParams) -> JobRow def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer + def job_insert_unique: (Driver::JobInsertParams, String) -> [JobRow, bool] def transaction: [T] () { () -> T } -> T + + # this set of methods is used only in tests + def advisory_lock_try: (Integer) -> bool + def job_get_by_id: (Integer) -> JobRow? + def job_list: -> Array[JobRow] + def rollback_exception: -> Exception end module Driver diff --git a/sig/job.rbs b/sig/job.rbs index fff8c3d..3193f60 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -51,8 +51,9 @@ module River attr_accessor scheduled_at: Time attr_accessor state: jobStateAll attr_accessor tags: Array[String]? + attr_accessor unique_key: String? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?) -> void end class AttemptError diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 9129525..f3c8e9b 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1,67 +1,5 @@ require "spec_helper" - -# We use a mock here, but each driver has a more comprehensive test suite that -# performs full integration level tests. -class MockDriver - attr_accessor :advisory_lock_calls - attr_accessor :inserted_jobs - attr_accessor :job_get_by_kind_and_unique_properties_calls - attr_accessor :job_get_by_kind_and_unique_properties_returns - - def initialize - @advisory_lock_calls = [] - @inserted_jobs = [] - @job_get_by_kind_and_unique_properties_calls = [] - @job_get_by_kind_and_unique_properties_returns = [] - @next_id = 0 - end - - def advisory_lock(key) - @advisory_lock_calls << key - end - - def job_get_by_kind_and_unique_properties(get_params) - @job_get_by_kind_and_unique_properties_calls << get_params - job_get_by_kind_and_unique_properties_returns.shift - end - - def job_insert(insert_params) - insert_params_to_jow_row(insert_params) - end - - def job_insert_many(insert_params_many) - insert_params_many.each do |insert_params| - insert_params_to_jow_row(insert_params) - end - insert_params_many.count - end - - def transaction(&) - yield - end - - private def insert_params_to_jow_row(insert_params) - job = River::JobRow.new( - id: (@next_id += 1), - args: JSON.parse(insert_params.encoded_args), - attempt: 0, - attempted_by: nil, - created_at: Time.now, - errors: nil, - finalized_at: nil, - kind: insert_params.kind, - max_attempts: insert_params.max_attempts, - metadata: nil, - priority: insert_params.priority, - queue: insert_params.queue, - scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB - state: insert_params.state, - tags: insert_params.tags - ) - inserted_jobs << job - job - end -end +require_relative "../driver/riverqueue-sequel/spec/spec_helper" class SimpleArgs attr_accessor :job_num @@ -82,15 +20,23 @@ class SimpleArgsWithInsertOpts < SimpleArgs attr_accessor :insert_opts end +# I originally had this top-level client test set up so that it was using a mock +# driver, but it just turned out to be too horribly unsustainable. Adding +# anything new required careful mock engineering, and even once done, we weren't +# getting good guarantees that the right things were happening because it wasn't +# end to end. We now use the real Sequel driver, with the only question being +# whether we should maybe move all these tests into the common driver shared +# examples so that all drivers get the full barrage. RSpec.describe River::Client do - let(:client) { River::Client.new(mock_driver) } - let(:mock_driver) { MockDriver.new } + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::Sequel.new(DB) } + let(:client) { River::Client.new(driver) } describe "#insert" do it "inserts a job with defaults" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( - id: 1, args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -100,7 +46,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -174,18 +120,18 @@ class SimpleArgsWithInsertOpts < SimpleArgs end it "errors if advisory lock prefix is larger than four bytes" do - River::Client.new(mock_driver, advisory_lock_prefix: 123) + River::Client.new(driver, advisory_lock_prefix: 123) expect do - River::Client.new(mock_driver, advisory_lock_prefix: -1) + River::Client.new(driver, advisory_lock_prefix: -1) end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") # 2^32-1 is 0xffffffff (1s for 32 bits) which fits - River::Client.new(mock_driver, advisory_lock_prefix: 2**32 - 1) + River::Client.new(driver, advisory_lock_prefix: 2**32 - 1) # 2^32 is 0x100000000, which does not expect do - River::Client.new(mock_driver, advisory_lock_prefix: 2**32) + River::Client.new(driver, advisory_lock_prefix: 2**32) end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") end @@ -240,7 +186,16 @@ def check_bigint_bounds(int) let(:now) { Time.now.utc } before { client.instance_variable_set(:@time_now_utc, -> { now }) } - it "inserts a new unique job with minimal options" do + let(:advisory_lock_keys) { [] } + + before do + # required so it's properly captured by the lambda below + keys = advisory_lock_keys + + driver.singleton_class.send(:define_method, :advisory_lock, ->(key) { keys.push(key) }) + end + + it "inserts a new unique job with minimal options on the fast path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -252,20 +207,23 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ + expect(advisory_lock_keys).to be_empty + + unique_key_str = "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options" do + it "inserts a new unique job with minimal options on the slow path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( - by_args: true, - by_period: 15 * 60, by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE] + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path ) ) @@ -274,20 +232,25 @@ def check_bigint_bounds(int) expect(insert_res.unique_skipped_as_duplicated).to be false lock_str = "unique_keykind=#{job_args.kind}" \ - "&args=#{JSON.dump({job_num: 1})}" \ - "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - end + "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - it "inserts a new unique job with advisory lock prefix" do - client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) + expect(insert_res.job.unique_key).to be_nil + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true + end + it "inserts a new unique job with all options on the fast path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( - by_queue: true + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: River::Client.const_get(:DEFAULT_UNIQUE_STATES) ) ) @@ -295,20 +258,20 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ + expect(advisory_lock_keys).to be_empty + + unique_key_str = "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) - lock_key = mock_driver.advisory_lock_calls[0] - expect(lock_key >> 32).to eq(123456) - end - - def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) - mock_driver.send(:insert_params_to_jow_row, client.send(:make_insert_params, job_args, insert_opts)[0]) + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true end - it "gets an existing unique job" do + it "inserts a new unique job with all options on the slow path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -319,21 +282,46 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ) ) - job = job_args_to_row(job_args) - mock_driver.job_get_by_kind_and_unique_properties_returns << job - insert_res = client.insert(job_args) - expect(insert_res).to have_attributes( - job: job, - unique_skipped_as_duplicated: true - ) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false lock_str = "unique_keykind=#{job_args.kind}" \ "&args=#{JSON.dump({job_num: 1})}" \ "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + + expect(insert_res.job.unique_key).to be_nil + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a new unique job with advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + job_args = SimpleArgsWithInsertOpts.new(job_num: 1) + job_args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{job_args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) + + lock_key = advisory_lock_keys[0] + expect(lock_key >> 32).to eq(123456) end it "skips unique check if unique opts empty" do @@ -357,9 +345,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( - id: 1, + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -369,12 +358,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( - id: 2, + expect(jobs[1]).to have_attributes( args: {"job_num" => 2}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -384,7 +371,7 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -395,9 +382,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( - id: 1, + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -407,12 +395,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( - id: 2, + expect(jobs[1]).to have_attributes( args: {"job_num" => 2}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -422,7 +408,7 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -460,16 +446,17 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( max_attempts: 17, priority: 3, queue: "my_queue_1", tags: ["custom_1"] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( + expect(jobs[1]).to have_attributes( max_attempts: 18, priority: 4, queue: "my_queue_2", @@ -488,6 +475,12 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) end end + describe River::Client.const_get(:DEFAULT_UNIQUE_STATES) do + it "should be sorted" do + expect(River::Client.const_get(:DEFAULT_UNIQUE_STATES)).to eq(River::Client.const_get(:DEFAULT_UNIQUE_STATES).sort) + end + end + describe "#truncate_time" do it "truncates times to nearest interval" do expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60).utc).to eq(Time.parse("Thu Jan 15 21:26:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing diff --git a/spec/driver_shared_examples.rb b/spec/driver_shared_examples.rb new file mode 100644 index 0000000..d1d659a --- /dev/null +++ b/spec/driver_shared_examples.rb @@ -0,0 +1,454 @@ +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +shared_examples "driver shared examples" do + describe "unique insertion" do + it "inserts a unique job once on the fast path" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job on the slow path" do + client = River::Client.new(driver) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job on the slow path with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.transaction do + driver.advisory_lock(123) + + Thread.new do + expect(driver.advisory_lock_try(123)).to be false + end.join + end + end + end + + describe "#advisory_lock_try" do + it "takes an advisory lock" do + driver.transaction do + expect(driver.advisory_lock_try(123)).to be true + end + end + end + + describe "#job_get_by_id" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by ID" do + insert_res = client.insert(job_args) + expect(driver.job_get_by_id(insert_res.job.id)).to_not be nil + end + + it "returns nil on not found" do + expect(driver.job_get_by_id(-1)).to be nil + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.getutc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to_not be_nil + + raise driver.rollback_exception + end + + # Not present because the job was rolled back. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to be_nil + end + end + + describe "#job_insert_many" do + it "inserts multiple jobs" do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + expect(jobs[1]).to have_attributes( + attempt: 0, + args: {"job_num" => 2}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + end + + it "inserts multiple jobs in a transaction" do + jobs = nil + + driver.transaction do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + raise driver.rollback_exception + end + + # Not present because the jobs were rolled back. + expect(driver.job_get_by_id(jobs[0].id)).to be nil + expect(driver.job_get_by_id(jobs[1].id)).to be nil + end + end + + describe "#job_insert_unique" do + it "inserts a job" do + insert_params = River::Driver::JobInsertParams.new( + encoded_args: JSON.dump({"job_num" => 1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: Time.now.getutc, + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + + job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be false + + # second insertion should be skipped + job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be true + end + end + + describe "#job_list" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by ID" do + insert_res1 = client.insert(job_args) + insert_res2 = client.insert(job_args) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0].id).to be insert_res1.job.id + expect(jobs[1].id).to be insert_res2.job.id + end + + it "returns nil on not found" do + expect(driver.job_get_by_id(-1)).to be nil + end + end + + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to_not be_nil + + raise driver.rollback_exception + end + + # Not present because the job was rolled back. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to be_nil + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a906f38..feb82b6 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,6 +6,10 @@ SimpleCov.start do enable_coverage :branch minimum_coverage line: 100, branch: 100 + + # Drivers have their own spec suite where they're covered 100.0%, but + # they're not fully covered from this top level test suite. + add_filter("driver/riverqueue-sequel/") end end