diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d0dd8d..d38eb70 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Now compatible with "fast path" unique job insertion that uses a unique index instead of advisory lock and fetch [as introduced in River #451](https://github.com/riverqueue/river/pull/451). [PR #28](https://github.com/riverqueue/riverqueue-ruby/pull/28). + ## [0.6.1] - 2024-08-21 ### Fixed diff --git a/Gemfile b/Gemfile index 213a8d2..d174c52 100644 --- a/Gemfile +++ b/Gemfile @@ -11,5 +11,6 @@ group :test do gem "debug" gem "rspec-core" gem "rspec-expectations" + gem "riverqueue-sequel", path: "driver/riverqueue-sequel" gem "simplecov", require: false end diff --git a/Gemfile.lock b/Gemfile.lock index b716247..aa2867b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -3,6 +3,13 @@ PATH specs: riverqueue (0.6.1) +PATH + remote: driver/riverqueue-sequel + specs: + riverqueue-sequel (0.6.1) + pg (> 0, < 1000) + sequel (> 0, < 1000) + GEM remote: https://rubygems.org/ specs: @@ -50,6 +57,7 @@ GEM parser (3.3.0.5) ast (~> 2.4.1) racc + pg (1.5.6) psych (5.1.2) stringio racc (1.7.3) @@ -89,6 +97,8 @@ GEM rubocop-ast (>= 1.30.0, < 2.0) ruby-progressbar (1.13.0) securerandom (0.3.1) + sequel (5.79.0) + bigdecimal simplecov (0.22.0) docile (~> 1.1) simplecov-html (~> 0.11) @@ -137,6 +147,7 @@ PLATFORMS DEPENDENCIES debug riverqueue! + riverqueue-sequel! rspec-core rspec-expectations simplecov diff --git a/Steepfile b/Steepfile index 656fe44..ac910d1 100644 --- a/Steepfile +++ b/Steepfile @@ -3,6 +3,7 @@ D = Steep::Diagnostic target :lib do check "lib" + library "digest" library "json" library "time" diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 3fce2ee..96d5925 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -33,6 +33,16 @@ def errors = {} def advisory_lock(key) ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") + nil + end + + def advisory_lock_try(key) + ::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"] + end + + def job_get_by_id(id) + data_set = RiverJob.where(id: id) + data_set.first ? to_job_row_from_model(data_set.first) : nil end def job_get_by_kind_and_unique_properties(get_params) @@ -41,11 +51,28 @@ def job_get_by_kind_and_unique_properties(get_params) data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args data_set = data_set.where(queue: get_params.queue) if get_params.queue data_set = data_set.where(state: get_params.state) if get_params.state - data_set.take + data_set.first ? to_job_row_from_model(data_set.first) : nil end def job_insert(insert_params) - to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params))) + end + + def job_insert_unique(insert_params, unique_key) + res = RiverJob.upsert( + insert_params_to_hash(insert_params).merge(unique_key: unique_key), + on_duplicate: Arel.sql("kind = EXCLUDED.kind"), + returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"), + + # It'd be nice to specify this as `(kind, unique_key) WHERE unique_key + # IS NOT NULL` like we do elsewhere, but in its pure ingenuity, fucking + # ActiveRecord tries to look up a unique index instead of letting + # Postgres handle that, and of course it doesn't support a `WHERE` + # clause. The workaround is to target the index name instead of columns. + unique_by: "river_job_kind_unique_key_idx" + ) + + [to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] end def job_insert_many(insert_params_many) @@ -53,6 +80,15 @@ def job_insert_many(insert_params_many) insert_params_many.count end + def job_list + data_set = RiverJob.order(:id) + data_set.all.map { |job| to_job_row_from_model(job) } + end + + def rollback_exception + ::ActiveRecord::Rollback + end + def transaction(&) ::ActiveRecord::Base.transaction(requires_new: true, &) end @@ -72,21 +108,18 @@ def transaction(&) }.compact end - # Type type injected to this method is not a `RiverJob`, but rather a raw - # hash with stringified keys because we're inserting with the Arel framework - # directly rather than generating a record from a model. - private def to_job_row(river_job) + private def to_job_row_from_model(river_job) # needs to be accessed through values because `errors` is shadowed by both # ActiveRecord and the patch above errors = river_job.attributes["errors"] River::JobRow.new( id: river_job.id, - args: river_job.args ? JSON.parse(river_job.args) : nil, + args: JSON.parse(river_job.args), attempt: river_job.attempt, - attempted_at: river_job.attempted_at, + attempted_at: river_job.attempted_at&.getutc, attempted_by: river_job.attempted_by, - created_at: river_job.created_at, + created_at: river_job.created_at.getutc, errors: errors&.map { |e| deserialized_error = JSON.parse(e, symbolize_names: true) @@ -97,15 +130,59 @@ def transaction(&) trace: deserialized_error[:trace] ) }, - finalized_at: river_job.finalized_at, + finalized_at: river_job.finalized_at&.getutc, kind: river_job.kind, max_attempts: river_job.max_attempts, metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, - scheduled_at: river_job.scheduled_at, + scheduled_at: river_job.scheduled_at.getutc, state: river_job.state, - tags: river_job.tags + tags: river_job.tags, + unique_key: river_job.unique_key + ) + end + + # This is really awful, but some of ActiveRecord's methods (e.g. `.create`) + # return a model, and others (e.g. `.upsert`) return raw values, and + # therefore this second version from unmarshaling a job row exists. I + # searched long and hard for a way to have the former type of method return + # raw or the latter type of method return a model, but was unable to find + # anything. + private def to_job_row_from_raw(res) + river_job = {} + + res.rows[0].each_with_index do |val, i| + river_job[res.columns[i]] = res.column_types[i].deserialize(val) + end + + River::JobRow.new( + id: river_job["id"], + args: JSON.parse(river_job["args"]), + attempt: river_job["attempt"], + attempted_at: river_job["attempted_at"]&.getutc, + attempted_by: river_job["attempted_by"], + created_at: river_job["created_at"].getutc, + errors: river_job["errors"]&.map { |e| + deserialized_error = JSON.parse(e) + + River::AttemptError.new( + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] + ) + }, + finalized_at: river_job["finalized_at"]&.getutc, + kind: river_job["kind"], + max_attempts: river_job["max_attempts"], + metadata: river_job["metadata"], + priority: river_job["priority"], + queue: river_job["queue"], + scheduled_at: river_job["scheduled_at"].getutc, + state: river_job["state"], + tags: river_job["tags"], + unique_key: river_job["unique_key"] ) end end diff --git a/driver/riverqueue-activerecord/spec/driver_spec.rb b/driver/riverqueue-activerecord/spec/driver_spec.rb index 9245294..11d4a38 100644 --- a/driver/riverqueue-activerecord/spec/driver_spec.rb +++ b/driver/riverqueue-activerecord/spec/driver_spec.rb @@ -1,23 +1,5 @@ require "spec_helper" - -class SimpleArgs - attr_accessor :job_num - - def initialize(job_num:) - self.job_num = job_num - end - - def kind = "simple" - - def to_json = JSON.dump({job_num: job_num}) -end - -# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. -# Real args that make use of this functionality will probably want to make -# `#insert_opts` a non-accessor method instead. -class SimpleArgsWithInsertOpts < SimpleArgs - attr_accessor :insert_opts -end +require_relative "../../../spec/driver_shared_examples" RSpec.describe River::Driver::ActiveRecord do around(:each) { |ex| test_transaction(&ex) } @@ -25,360 +7,152 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::ActiveRecord.new } let(:client) { River::Client.new(driver) } - describe "unique insertion" do - it "inserts a unique job once" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a unique job with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true + before do + if ENV["RIVER_DEBUG"] == "1" || ENV["RIVER_DEBUG"] == "true" + ActiveRecord::Base.logger = Logger.new($stdout) end end - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - conn = ::ActiveRecord::Base.connection_pool.checkout - begin - expect(conn.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"]).to be false - ensure - ::ActiveRecord::Base.connection_pool.checkin(conn) - end - end.join - end - end - end - - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) + it_behaves_like "driver shared examples" - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) + describe "#to_job_row_from_model" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = River::Driver::ActiveRecord::RiverJob.create( + id: 1, + args: %({"job_num":1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + state: River::JOB_STATE_AVAILABLE ) - expect(job.id).to eq(insert_res.job.id) - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end + job_row = driver.send(:to_job_row_from_model, river_job) - describe "#job_insert" do - it "inserts a job" do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - expect(insert_res.job).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, args: {"job_num" => 1}, attempt: 0, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, tags: [] ) - - # Make sure it made it to the database. Assert only minimally since we're - # certain it's the same as what we checked above. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to have_attributes( - kind: "simple" - ) end - it "schedules a job" do - target_time = Time.now.utc + 1 * 3600 - - insert_res = client.insert( - SimpleArgs.new(job_num: 1), - insert_opts: River::InsertOpts.new(scheduled_at: target_time) - ) - expect(insert_res.job).to have_attributes( - scheduled_at: be_within(2).of(target_time), - state: River::JOB_STATE_SCHEDULED + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = River::Driver::ActiveRecord::RiverJob.create( + id: 1, + attempt: 1, + attempted_at: now, + attempted_by: ["client1"], + created_at: now, + args: %({"job_num":1}), + finalized_at: now, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) - end - it "inserts with job insert opts" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) + job_row = driver.send(:to_job_row_from_model, river_job) - insert_res = client.insert(args) - expect(insert_res.job).to have_attributes( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, + attempt: 1, + attempted_at: now.getutc, + attempted_by: ["client1"], + created_at: now.getutc, + finalized_at: now.getutc, + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + priority: River::PRIORITY_DEFAULT, + queue: River::QUEUE_DEFAULT, + scheduled_at: now.getutc, + state: River::JOB_STATE_COMPLETED, + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end - it "inserts with insert opts" do - # We set job insert opts in this spec too so that we can verify that the - # options passed at insertion time take precedence. - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] + it "with errors" do + now = Time.now.utc + river_job = River::Driver::ActiveRecord::RiverJob.create( + args: %({"job_num":1}), + errors: [JSON.dump( + { + at: now, + attempt: 1, + error: "job failure", + trace: "error trace" + } + )], + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE ) - insert_res = client.insert(args, insert_opts: River::InsertOpts.new( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - )) - expect(insert_res.job).to have_attributes( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - ) - end + job_row = driver.send(:to_job_row_from_model, river_job) - it "inserts with job args hash" do - insert_res = client.insert(River::JobArgsHash.new("hash_kind", { - job_num: 1 - })) - expect(insert_res.job).to have_attributes( - args: {"job_num" => 1}, - kind: "hash_kind" + expect(job_row.errors.count).to be(1) + expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) + expect(job_row.errors[0]).to have_attributes( + at: now.floor(0), + attempt: 1, + error: "job failure", + trace: "error trace" ) end - - it "inserts in a transaction" do - insert_res = nil - - ActiveRecord::Base.transaction(requires_new: true) do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise ActiveRecord::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to be_nil - end end - describe "#job_insert_many" do - it "inserts multiple jobs" do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) - expect(job1).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + describe "#to_job_row_from_raw" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = River::Driver::ActiveRecord::RiverJob.insert({ + id: 1, + args: %({"job_num":1}), kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) + max_attempts: River::MAX_ATTEMPTS_DEFAULT + }, returning: Arel.sql("*")) + + job_row = driver.send(:to_job_row_from_raw, river_job) - job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) - expect(job2).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, + args: {"job_num" => 1}, attempt: 0, - args: {"job_num" => 2}, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + queue: River::QUEUE_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, tags: [] ) end - it "inserts multiple jobs in a transaction" do - job1 = nil - job2 = nil - - ActiveRecord::Base.transaction(requires_new: true) do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.first) - job2 = driver.send(:to_job_row, River::Driver::ActiveRecord::RiverJob.offset(1).first) - - raise ActiveRecord::Rollback - end - - # Not present because the jobs were rolled back. - expect do - River::Driver::ActiveRecord::RiverJob.find(job1.id) - end.to raise_error(ActiveRecord::RecordNotFound) - expect do - River::Driver::ActiveRecord::RiverJob.find(job2.id) - end.to raise_error(ActiveRecord::RecordNotFound) - end - end - - describe "#transaction" do - it "runs block in a transaction" do - insert_res = nil - - driver.transaction do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise ActiveRecord::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#to_job_row" do - it "converts a database record to `River::JobRow`" do - now = Time.now.utc - river_job = River::Driver::ActiveRecord::RiverJob.new( + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = River::Driver::ActiveRecord::RiverJob.insert({ id: 1, attempt: 1, attempted_at: now, @@ -392,33 +166,36 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: now, state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] - ) + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") + }, returning: Arel.sql("*")) - job_row = driver.send(:to_job_row, river_job) + job_row = driver.send(:to_job_row_from_raw, river_job) expect(job_row).to be_an_instance_of(River::JobRow) expect(job_row).to have_attributes( id: 1, args: {"job_num" => 1}, attempt: 1, - attempted_at: now, + attempted_at: be_within(2).of(now.getutc), attempted_by: ["client1"], - created_at: now, - finalized_at: now, + created_at: be_within(2).of(now.getutc), + finalized_at: be_within(2).of(now.getutc), kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, priority: River::PRIORITY_DEFAULT, queue: River::QUEUE_DEFAULT, - scheduled_at: now, + scheduled_at: be_within(2).of(now.getutc), state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end it "with errors" do now = Time.now.utc - river_job = River::Driver::ActiveRecord::RiverJob.new( + river_job = River::Driver::ActiveRecord::RiverJob.insert({ + args: %({"job_num":1}), errors: [JSON.dump( { at: now, @@ -426,10 +203,13 @@ class SimpleArgsWithInsertOpts < SimpleArgs error: "job failure", trace: "error trace" } - )] - ) + )], + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE + }, returning: Arel.sql("*")) - job_row = driver.send(:to_job_row, river_job) + job_row = driver.send(:to_job_row_from_raw, river_job) expect(job_row.errors.count).to be(1) expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index 69d99ca..a21714e 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -9,40 +9,66 @@ module River::Driver class Sequel def initialize(db) @db = db - - # It's Ruby, so we can only define a model after Sequel's established a - # connection because it's all dynamic. - if !River::Driver::Sequel.const_defined?(:RiverJob) - River::Driver::Sequel.const_set(:RiverJob, Class.new(::Sequel::Model(:river_job))) - - # Since we only define our model once, take advantage of knowing this is - # our first initialization to add required extensions. - db.extension(:pg_array) - end + @db.extension(:pg_array) + @db.extension(:pg_json) end def advisory_lock(key) @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first + nil + end + + def advisory_lock_try(key) + @db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock] + end + + def job_get_by_id(id) + data_set = @db[:river_job].where(id: id) + data_set.first ? to_job_row(data_set.first) : nil end def job_get_by_kind_and_unique_properties(get_params) - data_set = RiverJob.where(kind: get_params.kind) + data_set = @db[:river_job].where(kind: get_params.kind) data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args data_set = data_set.where(queue: get_params.queue) if get_params.queue data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first + data_set.first ? to_job_row(data_set.first) : nil end def job_insert(insert_params) - to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) + to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params))) + end + + def job_insert_unique(insert_params, unique_key) + values = @db[:river_job] + .insert_conflict( + target: [:kind, :unique_key], + conflict_where: ::Sequel.lit("unique_key IS NOT NULL"), + update: {kind: ::Sequel[:excluded][:kind]} + ) + .returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate")) + .insert_select( + insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key)) + ) + + [to_job_row(values), values[:unique_skipped_as_duplicate]] end def job_insert_many(insert_params_many) - RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) + @db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def job_list + data_set = @db[:river_job].order_by(:id) + data_set.all.map { |job| to_job_row(job) } + end + + def rollback_exception + ::Sequel::Rollback + end + def transaction(&) @db.transaction(savepoint: true, &) end @@ -63,35 +89,31 @@ def transaction(&) end private def to_job_row(river_job) - # needs to be accessed through values because Sequel shadows `errors` - errors = river_job.values[:errors] - River::JobRow.new( - id: river_job.id, - args: river_job.args ? JSON.parse(river_job.args) : nil, - attempt: river_job.attempt, - attempted_at: river_job.attempted_at, - attempted_by: river_job.attempted_by, - created_at: river_job.created_at, - errors: errors&.map { |e| - deserialized_error = JSON.parse(e, symbolize_names: true) - + id: river_job[:id], + args: river_job[:args].to_h, + attempt: river_job[:attempt], + attempted_at: river_job[:attempted_at]&.getutc, + attempted_by: river_job[:attempted_by], + created_at: river_job[:created_at].getutc, + errors: river_job[:errors]&.map { |deserialized_error| River::AttemptError.new( - at: Time.parse(deserialized_error[:at]), - attempt: deserialized_error[:attempt], - error: deserialized_error[:error], - trace: deserialized_error[:trace] + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] ) }, - finalized_at: river_job.finalized_at, - kind: river_job.kind, - max_attempts: river_job.max_attempts, - metadata: river_job.metadata, - priority: river_job.priority, - queue: river_job.queue, - scheduled_at: river_job.scheduled_at, - state: river_job.state, - tags: river_job.tags + finalized_at: river_job[:finalized_at]&.getutc, + kind: river_job[:kind], + max_attempts: river_job[:max_attempts], + metadata: river_job[:metadata], + priority: river_job[:priority], + queue: river_job[:queue], + scheduled_at: river_job[:scheduled_at].getutc, + state: river_job[:state], + tags: river_job[:tags].to_a, + unique_key: river_job[:unique_key]&.to_s ) end end diff --git a/driver/riverqueue-sequel/spec/driver_spec.rb b/driver/riverqueue-sequel/spec/driver_spec.rb index 6264c21..b18f1d8 100644 --- a/driver/riverqueue-sequel/spec/driver_spec.rb +++ b/driver/riverqueue-sequel/spec/driver_spec.rb @@ -1,23 +1,5 @@ require "spec_helper" - -class SimpleArgs - attr_accessor :job_num - - def initialize(job_num:) - self.job_num = job_num - end - - def kind = "simple" - - def to_json = JSON.dump({job_num: job_num}) -end - -# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. -# Real args that make use of this functionality will probably want to make -# `#insert_opts` a non-accessor method instead. -class SimpleArgsWithInsertOpts < SimpleArgs - attr_accessor :insert_opts -end +require_relative "../../../spec/driver_shared_examples" RSpec.describe River::Driver::Sequel do around(:each) { |ex| test_transaction(&ex) } @@ -25,356 +7,45 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::Sequel.new(DB) } let(:client) { River::Client.new(driver) } - describe "unique insertion" do - it "inserts a unique job once" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a unique job with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - end - - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - expect(DB.fetch("SELECT pg_try_advisory_xact_lock(?)", 123).first[:pg_try_advisory_xact_lock]).to be false - end.join - end - end - end - - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) - - job = driver.send( - :to_job_row, - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) - ) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end + it_behaves_like "driver shared examples" - describe "#job_insert" do - it "inserts a job" do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - expect(insert_res.job).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + describe "#to_job_row" do + it "converts a database record to `River::JobRow` with minimal properties" do + river_job = DB[:river_job].returning.insert_select({ + id: 1, + args: %({"job_num":1}), kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) - ) - - # Make sure it made it to the database. Assert only minimally since we're - # certain it's the same as what we checked above. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to have_attributes( - kind: "simple" - ) - end - - it "schedules a job" do - target_time = Time.now.utc + 1 * 3600 + max_attempts: River::MAX_ATTEMPTS_DEFAULT + }) - insert_res = client.insert( - SimpleArgs.new(job_num: 1), - insert_opts: River::InsertOpts.new(scheduled_at: target_time) - ) - expect(insert_res.job).to have_attributes( - scheduled_at: be_within(2).of(target_time), - state: River::JOB_STATE_SCHEDULED - ) - end - - it "inserts with job insert opts" do - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - - insert_res = client.insert(args) - expect(insert_res.job).to have_attributes( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - end - - it "inserts with insert opts" do - # We set job insert opts in this spec too so that we can verify that the - # options passed at insertion time take precedence. - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - max_attempts: 23, - priority: 2, - queue: "job_custom_queue", - tags: ["job_custom"] - ) - - insert_res = client.insert(args, insert_opts: River::InsertOpts.new( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - )) - expect(insert_res.job).to have_attributes( - max_attempts: 17, - priority: 3, - queue: "my_queue", - tags: ["custom"] - ) - end + job_row = driver.send(:to_job_row, river_job) - it "inserts with job args hash" do - insert_res = client.insert(River::JobArgsHash.new("hash_kind", { - job_num: 1 - })) - expect(insert_res.job).to have_attributes( + expect(job_row).to be_an_instance_of(River::JobRow) + expect(job_row).to have_attributes( + id: 1, args: {"job_num" => 1}, - kind: "hash_kind" - ) - end - - it "inserts in a transaction" do - insert_res = nil - - DB.transaction(savepoint: true) do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise Sequel::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#job_insert_many" do - it "inserts multiple jobs" do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) - expect(job1).to have_attributes( attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.utc), + attempted_at: nil, + attempted_by: nil, + created_at: be_within(2).of(Time.now.getutc), + finalized_at: nil, kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), - state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) - ) - - job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) - expect(job2).to have_attributes( - attempt: 0, - args: {"job_num" => 2}, - created_at: be_within(2).of(Time.now.utc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.utc), + scheduled_at: be_within(2).of(Time.now.getutc), state: River::JOB_STATE_AVAILABLE, - tags: ::Sequel.pg_array([]) + tags: [] ) end - it "inserts multiple jobs in a transaction" do - job1 = nil - job2 = nil - - DB.transaction(savepoint: true) do - num_inserted = client.insert_many([ - SimpleArgs.new(job_num: 1), - SimpleArgs.new(job_num: 2) - ]) - expect(num_inserted).to eq(2) - - job1 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.first) - job2 = driver.send(:to_job_row, River::Driver::Sequel::RiverJob.limit(nil, 1).first) - - raise Sequel::Rollback - end - - # Not present because the jobs were rolled back. - river_job1 = River::Driver::Sequel::RiverJob.first(id: job1.id) - expect(river_job1).to be_nil - river_job2 = River::Driver::Sequel::RiverJob.first(id: job2.id) - expect(river_job2).to be_nil - end - end - - describe "#transaction" do - it "runs block in a transaction" do - insert_res = nil - - driver.transaction do - insert_res = client.insert(SimpleArgs.new(job_num: 1)) - - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to_not be_nil - - raise Sequel::Rollback - end - - # Not present because the job was rolled back. - river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) - expect(river_job).to be_nil - end - end - - describe "#to_job_row" do - it "converts a database record to `River::JobRow`" do - now = Time.now.utc - river_job = River::Driver::Sequel::RiverJob.new( + it "converts a database record to `River::JobRow` with all properties" do + now = Time.now + river_job = DB[:river_job].returning.insert_select({ + id: 1, attempt: 1, attempted_at: now, - attempted_by: ["client1"], + attempted_by: ::Sequel.pg_array(["client1"]), created_at: now, args: %({"job_num":1}), finalized_at: now, @@ -384,9 +55,9 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: now, state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] - ) - river_job.id = 1 + tags: ::Sequel.pg_array(["tag1"]), + unique_key: ::Sequel.blob(Digest::SHA256.digest("unique_key_str")) + }) job_row = driver.send(:to_job_row, river_job) @@ -395,32 +66,37 @@ class SimpleArgsWithInsertOpts < SimpleArgs id: 1, args: {"job_num" => 1}, attempt: 1, - attempted_at: now, + attempted_at: be_within(2).of(now.getutc), attempted_by: ["client1"], - created_at: now, - finalized_at: now, + created_at: be_within(2).of(now.getutc), + finalized_at: be_within(2).of(now.getutc), kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, priority: River::PRIORITY_DEFAULT, queue: River::QUEUE_DEFAULT, - scheduled_at: now, + scheduled_at: be_within(2).of(now.getutc), state: River::JOB_STATE_COMPLETED, - tags: ["tag1"] + tags: ["tag1"], + unique_key: Digest::SHA256.digest("unique_key_str") ) end it "with errors" do now = Time.now.utc - river_job = River::Driver::Sequel::RiverJob.new( - errors: [JSON.dump( - { + river_job = DB[:river_job].returning.insert_select({ + args: %({"job_num":1}), + errors: ::Sequel.pg_array([ + ::Sequel.pg_json_wrap({ at: now, attempt: 1, error: "job failure", trace: "error trace" - } - )] - ) + }) + ]), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + state: River::JOB_STATE_AVAILABLE + }) job_row = driver.send(:to_job_row, river_job) diff --git a/lib/client.rb b/lib/client.rb index ac2d222..ea280fd 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,3 +1,4 @@ +require "digest" require "fnv" require "time" @@ -152,8 +153,8 @@ def insert_many(args) DEFAULT_UNIQUE_STATES = [ JOB_STATE_AVAILABLE, JOB_STATE_COMPLETED, - JOB_STATE_RUNNING, JOB_STATE_RETRYABLE, + JOB_STATE_RUNNING, JOB_STATE_SCHEDULED ].freeze private_constant :DEFAULT_UNIQUE_STATES @@ -166,17 +167,15 @@ def insert_many(args) any_unique_opts = false get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) + unique_key = "" # It's extremely important here that this lock string format and algorithm # match the one in the main River library _exactly_. Don't change them # unless they're updated everywhere. - lock_str = "unique_key" - lock_str += "kind=#{insert_params.kind}" - if unique_opts.by_args any_unique_opts = true get_params.encoded_args = insert_params.encoded_args - lock_str += "&args=#{insert_params.encoded_args}" + unique_key += "&args=#{insert_params.encoded_args}" end if unique_opts.by_period @@ -184,27 +183,38 @@ def insert_many(args) any_unique_opts = true get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] - lock_str += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}" end if unique_opts.by_queue any_unique_opts = true get_params.queue = insert_params.queue - lock_str += "&queue=#{insert_params.queue}" + unique_key += "&queue=#{insert_params.queue}" end if unique_opts.by_state any_unique_opts = true get_params.state = unique_opts.by_state - lock_str += "&state=#{unique_opts.by_state.join(",")}" + unique_key += "&state=#{unique_opts.by_state.join(",")}" else get_params.state = DEFAULT_UNIQUE_STATES - lock_str += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" + unique_key += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" end return block.call unless any_unique_opts + # fast path + if !unique_opts.by_state || unique_opts.by_state.sort == DEFAULT_UNIQUE_STATES + unique_key_hash = Digest::SHA256.digest(unique_key) + job, unique_skipped_as_duplicate = @driver.job_insert_unique(insert_params, unique_key_hash) + return InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) + end + @driver.transaction do + lock_str = "unique_key" + lock_str += "kind=#{insert_params.kind}" + lock_str += unique_key + lock_key = if @advisory_lock_prefix.nil? FNV.fnv1_hash(lock_str, size: 64) else diff --git a/lib/job.rb b/lib/job.rb index bd3cf0f..494ded9 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -108,6 +108,11 @@ class JobRow # to help group and categorize jobs. attr_accessor :tags + # A unique key for the job within its kind that's used for unique job + # insertions. It's generated by hashing an inserted job's unique opts + # configuration. + attr_accessor :unique_key + def initialize( id:, args:, @@ -126,7 +131,8 @@ def initialize( attempted_by: nil, errors: nil, finalized_at: nil, - tags: nil + tags: nil, + unique_key: nil ) self.id = id self.args = args @@ -144,6 +150,7 @@ def initialize( self.scheduled_at = scheduled_at self.state = state self.tags = tags + self.unique_key = unique_key end end diff --git a/sig/driver.rbs b/sig/driver.rbs index ddfac56..be4d4dc 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -4,7 +4,14 @@ module River def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? def job_insert: (Driver::JobInsertParams) -> JobRow def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer + def job_insert_unique: (Driver::JobInsertParams, String) -> [JobRow, bool] def transaction: [T] () { () -> T } -> T + + # this set of methods is used only in tests + def advisory_lock_try: (Integer) -> bool + def job_get_by_id: (Integer) -> JobRow? + def job_list: -> Array[JobRow] + def rollback_exception: -> Exception end module Driver diff --git a/sig/job.rbs b/sig/job.rbs index fff8c3d..3193f60 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -51,8 +51,9 @@ module River attr_accessor scheduled_at: Time attr_accessor state: jobStateAll attr_accessor tags: Array[String]? + attr_accessor unique_key: String? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?) -> void end class AttemptError diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 9129525..f3c8e9b 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -1,67 +1,5 @@ require "spec_helper" - -# We use a mock here, but each driver has a more comprehensive test suite that -# performs full integration level tests. -class MockDriver - attr_accessor :advisory_lock_calls - attr_accessor :inserted_jobs - attr_accessor :job_get_by_kind_and_unique_properties_calls - attr_accessor :job_get_by_kind_and_unique_properties_returns - - def initialize - @advisory_lock_calls = [] - @inserted_jobs = [] - @job_get_by_kind_and_unique_properties_calls = [] - @job_get_by_kind_and_unique_properties_returns = [] - @next_id = 0 - end - - def advisory_lock(key) - @advisory_lock_calls << key - end - - def job_get_by_kind_and_unique_properties(get_params) - @job_get_by_kind_and_unique_properties_calls << get_params - job_get_by_kind_and_unique_properties_returns.shift - end - - def job_insert(insert_params) - insert_params_to_jow_row(insert_params) - end - - def job_insert_many(insert_params_many) - insert_params_many.each do |insert_params| - insert_params_to_jow_row(insert_params) - end - insert_params_many.count - end - - def transaction(&) - yield - end - - private def insert_params_to_jow_row(insert_params) - job = River::JobRow.new( - id: (@next_id += 1), - args: JSON.parse(insert_params.encoded_args), - attempt: 0, - attempted_by: nil, - created_at: Time.now, - errors: nil, - finalized_at: nil, - kind: insert_params.kind, - max_attempts: insert_params.max_attempts, - metadata: nil, - priority: insert_params.priority, - queue: insert_params.queue, - scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB - state: insert_params.state, - tags: insert_params.tags - ) - inserted_jobs << job - job - end -end +require_relative "../driver/riverqueue-sequel/spec/spec_helper" class SimpleArgs attr_accessor :job_num @@ -82,15 +20,23 @@ class SimpleArgsWithInsertOpts < SimpleArgs attr_accessor :insert_opts end +# I originally had this top-level client test set up so that it was using a mock +# driver, but it just turned out to be too horribly unsustainable. Adding +# anything new required careful mock engineering, and even once done, we weren't +# getting good guarantees that the right things were happening because it wasn't +# end to end. We now use the real Sequel driver, with the only question being +# whether we should maybe move all these tests into the common driver shared +# examples so that all drivers get the full barrage. RSpec.describe River::Client do - let(:client) { River::Client.new(mock_driver) } - let(:mock_driver) { MockDriver.new } + around(:each) { |ex| test_transaction(&ex) } + + let!(:driver) { River::Driver::Sequel.new(DB) } + let(:client) { River::Client.new(driver) } describe "#insert" do it "inserts a job with defaults" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( - id: 1, args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -100,7 +46,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -174,18 +120,18 @@ class SimpleArgsWithInsertOpts < SimpleArgs end it "errors if advisory lock prefix is larger than four bytes" do - River::Client.new(mock_driver, advisory_lock_prefix: 123) + River::Client.new(driver, advisory_lock_prefix: 123) expect do - River::Client.new(mock_driver, advisory_lock_prefix: -1) + River::Client.new(driver, advisory_lock_prefix: -1) end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") # 2^32-1 is 0xffffffff (1s for 32 bits) which fits - River::Client.new(mock_driver, advisory_lock_prefix: 2**32 - 1) + River::Client.new(driver, advisory_lock_prefix: 2**32 - 1) # 2^32 is 0x100000000, which does not expect do - River::Client.new(mock_driver, advisory_lock_prefix: 2**32) + River::Client.new(driver, advisory_lock_prefix: 2**32) end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") end @@ -240,7 +186,16 @@ def check_bigint_bounds(int) let(:now) { Time.now.utc } before { client.instance_variable_set(:@time_now_utc, -> { now }) } - it "inserts a new unique job with minimal options" do + let(:advisory_lock_keys) { [] } + + before do + # required so it's properly captured by the lambda below + keys = advisory_lock_keys + + driver.singleton_class.send(:define_method, :advisory_lock, ->(key) { keys.push(key) }) + end + + it "inserts a new unique job with minimal options on the fast path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -252,20 +207,23 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ + expect(advisory_lock_keys).to be_empty + + unique_key_str = "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options" do + it "inserts a new unique job with minimal options on the slow path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( - by_args: true, - by_period: 15 * 60, by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE] + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path ) ) @@ -274,20 +232,25 @@ def check_bigint_bounds(int) expect(insert_res.unique_skipped_as_duplicated).to be false lock_str = "unique_keykind=#{job_args.kind}" \ - "&args=#{JSON.dump({job_num: 1})}" \ - "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - end + "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - it "inserts a new unique job with advisory lock prefix" do - client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) + expect(insert_res.job.unique_key).to be_nil + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true + end + it "inserts a new unique job with all options on the fast path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( - by_queue: true + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: River::Client.const_get(:DEFAULT_UNIQUE_STATES) ) ) @@ -295,20 +258,20 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ + expect(advisory_lock_keys).to be_empty + + unique_key_str = "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) - lock_key = mock_driver.advisory_lock_calls[0] - expect(lock_key >> 32).to eq(123456) - end - - def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) - mock_driver.send(:insert_params_to_jow_row, client.send(:make_insert_params, job_args, insert_opts)[0]) + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true end - it "gets an existing unique job" do + it "inserts a new unique job with all options on the slow path" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -319,21 +282,46 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ) ) - job = job_args_to_row(job_args) - mock_driver.job_get_by_kind_and_unique_properties_returns << job - insert_res = client.insert(job_args) - expect(insert_res).to have_attributes( - job: job, - unique_skipped_as_duplicated: true - ) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false lock_str = "unique_keykind=#{job_args.kind}" \ "&args=#{JSON.dump({job_num: 1})}" \ "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ "&queue=#{River::QUEUE_DEFAULT}" \ "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + + expect(insert_res.job.unique_key).to be_nil + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a new unique job with advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + job_args = SimpleArgsWithInsertOpts.new(job_num: 1) + job_args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(job_args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{job_args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" + expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) + + lock_key = advisory_lock_keys[0] + expect(lock_key >> 32).to eq(123456) end it "skips unique check if unique opts empty" do @@ -357,9 +345,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( - id: 1, + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -369,12 +358,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( - id: 2, + expect(jobs[1]).to have_attributes( args: {"job_num" => 2}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -384,7 +371,7 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -395,9 +382,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( - id: 1, + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( args: {"job_num" => 1}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -407,12 +395,10 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( - id: 2, + expect(jobs[1]).to have_attributes( args: {"job_num" => 2}, attempt: 0, created_at: be_within(2).of(Time.now), @@ -422,7 +408,7 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) queue: River::QUEUE_DEFAULT, scheduled_at: be_within(2).of(Time.now), state: River::JOB_STATE_AVAILABLE, - tags: nil + tags: [] ) end @@ -460,16 +446,17 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) ]) expect(num_inserted).to eq(2) - job1 = mock_driver.inserted_jobs[0] - expect(job1).to have_attributes( + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( max_attempts: 17, priority: 3, queue: "my_queue_1", tags: ["custom_1"] ) - job2 = mock_driver.inserted_jobs[1] - expect(job2).to have_attributes( + expect(jobs[1]).to have_attributes( max_attempts: 18, priority: 4, queue: "my_queue_2", @@ -488,6 +475,12 @@ def job_args_to_row(job_args, insert_opts: River::InsertOpts.new) end end + describe River::Client.const_get(:DEFAULT_UNIQUE_STATES) do + it "should be sorted" do + expect(River::Client.const_get(:DEFAULT_UNIQUE_STATES)).to eq(River::Client.const_get(:DEFAULT_UNIQUE_STATES).sort) + end + end + describe "#truncate_time" do it "truncates times to nearest interval" do expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60).utc).to eq(Time.parse("Thu Jan 15 21:26:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing diff --git a/spec/driver_shared_examples.rb b/spec/driver_shared_examples.rb new file mode 100644 index 0000000..d1d659a --- /dev/null +++ b/spec/driver_shared_examples.rb @@ -0,0 +1,454 @@ +class SimpleArgs + attr_accessor :job_num + + def initialize(job_num:) + self.job_num = job_num + end + + def kind = "simple" + + def to_json = JSON.dump({job_num: job_num}) +end + +# Lets us test job-specific insertion opts by making `#insert_opts` an accessor. +# Real args that make use of this functionality will probably want to make +# `#insert_opts` a non-accessor method instead. +class SimpleArgsWithInsertOpts < SimpleArgs + attr_accessor :insert_opts +end + +shared_examples "driver shared examples" do + describe "unique insertion" do + it "inserts a unique job once on the fast path" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job on the slow path" do + client = River::Client.new(driver) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job on the slow path with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.transaction do + driver.advisory_lock(123) + + Thread.new do + expect(driver.advisory_lock_try(123)).to be false + end.join + end + end + end + + describe "#advisory_lock_try" do + it "takes an advisory lock" do + driver.transaction do + expect(driver.advisory_lock_try(123)).to be true + end + end + end + + describe "#job_get_by_id" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by ID" do + insert_res = client.insert(job_args) + expect(driver.job_get_by_id(insert_res.job.id)).to_not be nil + end + + it "returns nil on not found" do + expect(driver.job_get_by_id(-1)).to be nil + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do + it "inserts a job" do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + attempt: 0, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + # Make sure it made it to the database. Assert only minimally since we're + # certain it's the same as what we checked above. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to have_attributes( + kind: "simple" + ) + end + + it "schedules a job" do + target_time = Time.now.getutc + 1 * 3600 + + insert_res = client.insert( + SimpleArgs.new(job_num: 1), + insert_opts: River::InsertOpts.new(scheduled_at: target_time) + ) + expect(insert_res.job).to have_attributes( + scheduled_at: be_within(2).of(target_time), + state: River::JOB_STATE_SCHEDULED + ) + end + + it "inserts with job insert opts" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args) + expect(insert_res.job).to have_attributes( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + end + + it "inserts with insert opts" do + # We set job insert opts in this spec too so that we can verify that the + # options passed at insertion time take precedence. + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + max_attempts: 23, + priority: 2, + queue: "job_custom_queue", + tags: ["job_custom"] + ) + + insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + )) + expect(insert_res.job).to have_attributes( + max_attempts: 17, + priority: 3, + queue: "my_queue", + tags: ["custom"] + ) + end + + it "inserts with job args hash" do + insert_res = client.insert(River::JobArgsHash.new("hash_kind", { + job_num: 1 + })) + expect(insert_res.job).to have_attributes( + args: {"job_num" => 1}, + kind: "hash_kind" + ) + end + + it "inserts in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to_not be_nil + + raise driver.rollback_exception + end + + # Not present because the job was rolled back. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to be_nil + end + end + + describe "#job_insert_many" do + it "inserts multiple jobs" do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0]).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + + expect(jobs[1]).to have_attributes( + attempt: 0, + args: {"job_num" => 2}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + end + + it "inserts multiple jobs in a transaction" do + jobs = nil + + driver.transaction do + num_inserted = client.insert_many([ + SimpleArgs.new(job_num: 1), + SimpleArgs.new(job_num: 2) + ]) + expect(num_inserted).to eq(2) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + raise driver.rollback_exception + end + + # Not present because the jobs were rolled back. + expect(driver.job_get_by_id(jobs[0].id)).to be nil + expect(driver.job_get_by_id(jobs[1].id)).to be nil + end + end + + describe "#job_insert_unique" do + it "inserts a job" do + insert_params = River::Driver::JobInsertParams.new( + encoded_args: JSON.dump({"job_num" => 1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: Time.now.getutc, + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + + job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be false + + # second insertion should be skipped + job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be true + end + end + + describe "#job_list" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by ID" do + insert_res1 = client.insert(job_args) + insert_res2 = client.insert(job_args) + + jobs = driver.job_list + expect(jobs.count).to be 2 + + expect(jobs[0].id).to be insert_res1.job.id + expect(jobs[1].id).to be insert_res2.job.id + end + + it "returns nil on not found" do + expect(driver.job_get_by_id(-1)).to be nil + end + end + + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to_not be_nil + + raise driver.rollback_exception + end + + # Not present because the job was rolled back. + job = driver.job_get_by_id(insert_res.job.id) + expect(job).to be_nil + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a906f38..feb82b6 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,6 +6,10 @@ SimpleCov.start do enable_coverage :branch minimum_coverage line: 100, branch: 100 + + # Drivers have their own spec suite where they're covered 100.0%, but + # they're not fully covered from this top level test suite. + add_filter("driver/riverqueue-sequel/") end end