diff --git a/CHANGELOG.md b/CHANGELOG.md index e314332..c1e712f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added + +- Implement unique job insertion. [PR #10](https://github.com/riverqueue/riverqueue-ruby/pull/10). + ## [0.2.0] - 2024-04-27 ### Added diff --git a/Gemfile.lock b/Gemfile.lock index c76f9a9..4e71c8e 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: . specs: riverqueue (0.2.0) + fnv-hash GEM remote: https://rubygems.org/ @@ -31,6 +32,7 @@ GEM drb (2.2.1) ffi (1.16.3) fileutils (1.7.2) + fnv-hash (0.2.0) i18n (1.14.4) concurrent-ruby (~> 1.0) io-console (0.7.2) diff --git a/Steepfile b/Steepfile index 620dab0..65146c1 100644 --- a/Steepfile +++ b/Steepfile @@ -4,8 +4,10 @@ target :lib do check "lib" library "json" + library "time" signature "sig" + signature "sig_gem" configure_code_diagnostics(D::Ruby.strict) end diff --git a/docs/README.md b/docs/README.md index caa223a..fc1f327 100644 --- a/docs/README.md +++ b/docs/README.md @@ -60,6 +60,27 @@ insert_res = client.insert( ) ``` +## Inserting unique jobs + +[Unique jobs](https://riverqueue.com/docs/unique-jobs) are supported through `InsertOpts#unique_opts`, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned. + +```ruby +insert_res = client.insert(args, insert_opts: River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) +) + +# contains either a newly inserted job, or an existing one if insertion was skipped +insert_res.job + +# true if insertion was skipped +insert_res.unique_skipped_as_duplicated +``` + ## Inserting jobs in bulk Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency: @@ -75,8 +96,8 @@ Or with `InsertManyParams`, which may include insertion options: ```ruby num_inserted = client.insert_many([ - River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: InsertOpts.new(max_attempts: 5)), - River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: InsertOpts.new(queue: "high_priority")) + River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(max_attempts: 5)), + River::InsertManyParams.new(SimpleArgs.new(job_num: 2), insert_opts: River::InsertOpts.new(queue: "high_priority")) ]) ``` diff --git a/drivers/riverqueue-activerecord/Gemfile.lock b/drivers/riverqueue-activerecord/Gemfile.lock index a4f51e4..20d4642 100644 --- a/drivers/riverqueue-activerecord/Gemfile.lock +++ b/drivers/riverqueue-activerecord/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: ../.. specs: riverqueue (0.2.0) + fnv-hash PATH remote: . @@ -41,6 +42,7 @@ GEM diff-lcs (1.5.1) docile (1.4.0) drb (2.2.1) + fnv-hash (0.2.0) i18n (1.14.4) concurrent-ruby (~> 1.0) io-console (0.7.2) diff --git a/drivers/riverqueue-activerecord/lib/driver.rb b/drivers/riverqueue-activerecord/lib/driver.rb index 6eacf92..8c329f5 100644 --- a/drivers/riverqueue-activerecord/lib/driver.rb +++ b/drivers/riverqueue-activerecord/lib/driver.rb @@ -31,15 +31,32 @@ def errors = {} end end - def insert(insert_params) + def advisory_lock(key) + ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") + end + + def job_get_by_kind_and_unique_properties(get_params) + data_set = RiverJob.where(kind: get_params.kind) + data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at + data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args + data_set = data_set.where(queue: get_params.queue) if get_params.queue + data_set = data_set.where(state: get_params.state) if get_params.state + data_set.take + end + + def job_insert(insert_params) to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def transaction(&) + ::ActiveRecord::Base.transaction(requires_new: true, &) + end + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead diff --git a/drivers/riverqueue-activerecord/spec/driver_spec.rb b/drivers/riverqueue-activerecord/spec/driver_spec.rb index a501788..95fc03e 100644 --- a/drivers/riverqueue-activerecord/spec/driver_spec.rb +++ b/drivers/riverqueue-activerecord/spec/driver_spec.rb @@ -25,7 +25,155 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::ActiveRecord.new } let(:client) { River::Client.new(driver) } - describe "#insert" do + describe "unique insertion" do + it "inserts a unique job once" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.advisory_lock(123) + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( @@ -133,7 +281,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#insert_many" do + describe "#job_insert_many" do it "inserts multiple jobs" do num_inserted = client.insert_many([ SimpleArgs.new(job_num: 1), @@ -197,6 +345,25 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise ActiveRecord::Rollback + end + + # Not present because the job was rolled back. + river_job = River::Driver::ActiveRecord::RiverJob.find_by(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc diff --git a/drivers/riverqueue-sequel/Gemfile.lock b/drivers/riverqueue-sequel/Gemfile.lock index d4aa8f5..fbb7cf5 100644 --- a/drivers/riverqueue-sequel/Gemfile.lock +++ b/drivers/riverqueue-sequel/Gemfile.lock @@ -2,6 +2,7 @@ PATH remote: ../.. specs: riverqueue (0.2.0) + fnv-hash PATH remote: . @@ -17,6 +18,7 @@ GEM bigdecimal (3.1.7) diff-lcs (1.5.1) docile (1.4.0) + fnv-hash (0.2.0) json (2.7.2) language_server-protocol (3.17.0.3) lint_roller (1.1.0) diff --git a/drivers/riverqueue-sequel/lib/driver.rb b/drivers/riverqueue-sequel/lib/driver.rb index 6973f94..5f615d1 100644 --- a/drivers/riverqueue-sequel/lib/driver.rb +++ b/drivers/riverqueue-sequel/lib/driver.rb @@ -21,15 +21,32 @@ def initialize(db) end end - def insert(insert_params) + def advisory_lock(key) + @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first + end + + def job_get_by_kind_and_unique_properties(get_params) + data_set = RiverJob.where(kind: get_params.kind) + data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at + data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args + data_set = data_set.where(queue: get_params.queue) if get_params.queue + data_set = data_set.where(state: get_params.state) if get_params.state + data_set.first + end + + def job_insert(insert_params) to_job_row(RiverJob.create(insert_params_to_hash(insert_params))) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) RiverJob.multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) insert_params_many.count end + def transaction(&) + @db.transaction(savepoint: true, &) + end + private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead diff --git a/drivers/riverqueue-sequel/spec/driver_spec.rb b/drivers/riverqueue-sequel/spec/driver_spec.rb index 44b3b96..ef2abc1 100644 --- a/drivers/riverqueue-sequel/spec/driver_spec.rb +++ b/drivers/riverqueue-sequel/spec/driver_spec.rb @@ -25,7 +25,155 @@ class SimpleArgsWithInsertOpts < SimpleArgs let!(:driver) { River::Driver::Sequel.new(DB) } let(:client) { River::Client.new(driver) } - describe "#insert" do + describe "unique insertion" do + it "inserts a unique job once" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + + it "inserts a unique job with an advisory lock prefix" do + client = River::Client.new(driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + original_job = insert_res.job + + insert_res = client.insert(args) + expect(insert_res.job.id).to eq(original_job.id) + expect(insert_res.unique_skipped_as_duplicated).to be true + end + end + + describe "#advisory_lock" do + it "takes an advisory lock" do + driver.advisory_lock(123) + end + end + + describe "#job_get_by_kind_and_unique_properties" do + let(:job_args) { SimpleArgs.new(job_num: 1) } + + it "gets a job by kind" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: "does_not_exist" + )) + ).to be_nil + end + + it "gets a job by created at period" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] + )) + ).to be_nil + end + + it "gets a job by encoded args" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump(insert_res.job.args) + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + encoded_args: JSON.dump({"job_num" => 2}) + )) + ).to be_nil + end + + it "gets a job by queue" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: insert_res.job.queue + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + queue: "other_queue" + )) + ).to be_nil + end + + it "gets a job by state" do + insert_res = client.insert(job_args) + + job = driver.send( + :to_job_row, + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] + )) + ) + expect(job.id).to eq(insert_res.job.id) + + expect( + driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( + kind: job_args.kind, + state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + )) + ).to be_nil + end + end + + describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) expect(insert_res.job).to have_attributes( @@ -133,7 +281,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#insert_many" do + describe "#job_insert_many" do it "inserts multiple jobs" do num_inserted = client.insert_many([ SimpleArgs.new(job_num: 1), @@ -195,6 +343,25 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end + describe "#transaction" do + it "runs block in a transaction" do + insert_res = nil + + driver.transaction do + insert_res = client.insert(SimpleArgs.new(job_num: 1)) + + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to_not be_nil + + raise Sequel::Rollback + end + + # Not present because the job was rolled back. + river_job = River::Driver::Sequel::RiverJob.first(id: insert_res.job.id) + expect(river_job).to be_nil + end + end + describe "#to_job_row" do it "converts a database record to `River::JobRow`" do now = Time.now.utc diff --git a/lib/client.rb b/lib/client.rb index c4192a3..b6b8218 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,3 +1,6 @@ +require "fnv" +require "time" + module River MAX_ATTEMPTS_DEFAULT = 25 PRIORITY_DEFAULT = 1 @@ -16,10 +19,21 @@ module River # River drivers are found in separate gems like `riverqueue-sequel` to help # minimize transient dependencies. class Client - def initialize(driver) + def initialize(driver, advisory_lock_prefix: nil) @driver = driver + @advisory_lock_prefix = advisory_lock_prefix + @time_now_utc = -> { Time.now.utc } # for test time stubbing end + DEFAULT_UNIQUE_STATES = [ + JOB_STATE_AVAILABLE, + JOB_STATE_COMPLETED, + JOB_STATE_RUNNING, + JOB_STATE_RETRYABLE, + JOB_STATE_SCHEDULED + ].freeze + private_constant :DEFAULT_UNIQUE_STATES + EMPTY_INSERT_OPTS = InsertOpts.new.freeze private_constant :EMPTY_INSERT_OPTS @@ -65,8 +79,11 @@ def initialize(driver) # # Returns an instance of InsertResult. def insert(args, insert_opts: EMPTY_INSERT_OPTS) - job = @driver.insert(make_insert_params(args, insert_opts)) - InsertResult.new(job) + insert_params, unique_opts = make_insert_params(args, insert_opts) + check_unique_job(insert_params, unique_opts) do + job = @driver.job_insert(insert_params) + InsertResult.new(job) + end end # Inserts many new jobs as part of a single batch operation for improved @@ -111,20 +128,90 @@ def insert(args, insert_opts: EMPTY_INSERT_OPTS) # # See also JobArgsHash for an easy way to insert a job from a hash. # + # Unique job insertion isn't supported with bulk insertion because it'd run + # the risk of major lock contention. + # # Returns the number of jobs inserted. def insert_many(args) all_params = args.map do |arg| if arg.is_a?(InsertManyParams) - make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS) + make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert else # jobArgs - make_insert_params(arg, EMPTY_INSERT_OPTS) + make_insert_params(arg, EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert end end - @driver.insert_many(all_params) + @driver.job_insert_many(all_params) end - private def make_insert_params(args, insert_opts) + private def check_unique_job(insert_params, unique_opts, &block) + return block.call if unique_opts.nil? + + any_changes = false + get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) + + # It's extremely important here that this lock string format and algorithm + # match the one in the main River library _exactly_. Don't change them + # unless they're updated everywhere. + lock_str = "unique_key" + lock_str += "kind=#{insert_params.kind}" + + if unique_opts.by_args + any_changes = true + get_params.encoded_args = insert_params.encoded_args + lock_str += "&args=#{insert_params.encoded_args}" + end + + if unique_opts.by_period + lower_period_bound = truncate_time(@time_now_utc.call, unique_opts.by_period).utc + + any_changes = true + get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] + lock_str += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + end + + if unique_opts.by_queue + any_changes = true + get_params.queue = insert_params.queue + lock_str += "&queue=#{insert_params.queue}" + end + + if unique_opts.by_state + any_changes = true + get_params.state = unique_opts.by_state + lock_str += "&state=#{unique_opts.by_state.join(",")}" + else + get_params.state = DEFAULT_UNIQUE_STATES + lock_str += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" + end + + return block.call unless any_changes + + @driver.transaction do + lock_key = if @advisory_lock_prefix.nil? + Fnv::Hash.fnv_1(lock_str, size: 64) + else + # Steep should be able to tell that this is not nil, but it can't. + prefix = @advisory_lock_prefix #: Integer # rubocop:disable Layout/LeadingCommentSpace + prefix << 32 | Fnv::Hash.fnv_1(lock_str, size: 32) + end + + # Packs a uint64 then unpacks to int64, which we need to do to keep the + # value within the bounds of Postgres' bigint. Overflow is okay because + # we can use the full bigint space (including negative numbers) for the + # advisory lock. + lock_key = uint64_to_int64(lock_key) + + @driver.advisory_lock(lock_key) + + existing_job = @driver.job_get_by_kind_and_unique_properties(get_params) + return InsertResult.new(existing_job, unique_skipped_as_duplicated: true) if existing_job + + block.call + end + end + + private def make_insert_params(args, insert_opts, is_insert_many: false) raise "args should respond to `#kind`" if !args.respond_to?(:kind) # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. @@ -139,17 +226,37 @@ def insert_many(args) end scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + + raise ArgumentError, "unique opts can't be used with `#insert_many`" if is_insert_many && unique_opts + + [ + Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc, # database defaults to now + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: insert_opts.tags || args_insert_opts.tags + ), + unique_opts + ] + end - Driver::JobInsertParams.new( - encoded_args: args_json, - kind: args.kind, - max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, - priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, - queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, - scheduled_at: scheduled_at&.utc, # database defaults to now - state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: insert_opts.tags || args_insert_opts.tags - ) + # Truncates the given time down to the interval. For example: + # + # Thu Jan 15 21:26:36 UTC 2024 @ 15 minutes -> + # Thu Jan 15 21:15:00 UTC 2024 + private def truncate_time(time, interval_seconds) + Time.at((time.to_f / interval_seconds).floor * interval_seconds) + end + + # Moves an integer that may occupy the entire uint64 space to one that's + # bounded within int64. Allows overflow. + private def uint64_to_int64(int) + [int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace end end @@ -170,11 +277,17 @@ def initialize(args, insert_opts: nil) # Result of a single insertion. class InsertResult - # Inserted job row. + # Inserted job row, or an existing job row if insert was skipped due to a + # previously existing unique job. attr_reader :job - def initialize(job) + # True if for a unique job, the insertion was skipped due to an equivalent + # job matching unique property already being present. + attr_reader :unique_skipped_as_duplicated + + def initialize(job, unique_skipped_as_duplicated: false) @job = job + @unique_skipped_as_duplicated = unique_skipped_as_duplicated end end end diff --git a/lib/driver.rb b/lib/driver.rb index 97045fd..a3c11c6 100644 --- a/lib/driver.rb +++ b/lib/driver.rb @@ -4,6 +4,29 @@ module River # considered to be for internal use only and subject to change. API stability # is not guaranteed. module Driver + # Parameters for looking up a job by kind and unique properties. + class JobGetByKindAndUniquePropertiesParam + attr_accessor :created_at + attr_accessor :encoded_args + attr_accessor :kind + attr_accessor :queue + attr_accessor :state + + def initialize( + kind:, + created_at: nil, + encoded_args: nil, + queue: nil, + state: nil + ) + self.kind = kind + self.created_at = created_at + self.encoded_args = encoded_args + self.queue = queue + self.state = state + end + end + # Insert parameters for a job. This is sent to underlying drivers and is meant # for internal use only. Its interface is subject to change. class JobInsertParams @@ -16,13 +39,6 @@ class JobInsertParams attr_accessor :state attr_accessor :tags - # TODO(brandur): Get these supported. - # attr_accessor :unique - # attr_accessor :unique_by_args - # attr_accessor :unique_by_period - # attr_accessor :unique_by_queue - # attr_accessor :unique_by_state - def initialize( encoded_args:, kind:, diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb index 98035c0..5c07fa8 100644 --- a/lib/insert_opts.rb +++ b/lib/insert_opts.rb @@ -5,42 +5,40 @@ class InsertOpts # discarded. attr_accessor :max_attempts - # Priority is the priority of the job, with 1 being the highest priority and - # 4 being the lowest. When fetching available jobs to work, the highest - # priority jobs will always be fetched before any lower priority jobs are - # fetched. Note that if your workers are swamped with more high-priority jobs - # then they can handle, lower priority jobs may not be fetched. + # The priority of the job, with 1 being the highest priority and 4 being the + # lowest. When fetching available jobs to work, the highest priority jobs + # will always be fetched before any lower priority jobs are fetched. Note + # that if your workers are swamped with more high-priority jobs then they + # can handle, lower priority jobs may not be fetched. # # Defaults to PRIORITY_DEFAULT. attr_accessor :priority - # Queue is the name of the job queue in which to insert the job. + # The name of the job queue in which to insert the job. # # Defaults to QUEUE_DEFAULT. attr_accessor :queue - # ScheduledAt is a time in future at which to schedule the job (i.e. in - # cases where it shouldn't be run immediately). The job is guaranteed not - # to run before this time, but may run slightly after depending on the - # number of other scheduled jobs and how busy the queue is. + # A time in future at which to schedule the job (i.e. in cases where it + # shouldn't be run immediately). The job is guaranteed not to run before + # this time, but may run slightly after depending on the number of other + # scheduled jobs and how busy the queue is. # # Use of this option generally only makes sense when passing options into # Insert rather than when a job args is returning `#insert_opts`, however, # it will work in both cases. attr_accessor :scheduled_at - # Tags are an arbitrary list of keywords to add to the job. They have no - # functional behavior and are meant entirely as a user-specified construct - # to help group and categorize jobs. + # An arbitrary list of keywords to add to the job. They have no functional + # behavior and are meant entirely as a user-specified construct to help + # group and categorize jobs. # # If tags are specified from both a job args override and from options on # Insert, the latter takes precedence. Tags are not merged. attr_accessor :tags - # UniqueOpts returns options relating to job uniqueness. An empty struct - # avoids setting any worker-level unique options. - # - # TODO: Implement. + # Options relating to job uniqueness. No unique options means that the job + # is never treated as unique. attr_accessor :unique_opts def initialize( @@ -60,6 +58,77 @@ def initialize( end end + # Parameters for uniqueness for a job. + # + # If all properties are nil, no uniqueness at is enforced. As each property is + # initialized, it's added as a dimension on the uniqueness matrix, and with + # any property on, the job's kind always counts toward uniqueness. + # + # So for example, if only #by_queue is on, then for the given job kind, only a + # single instance is allowed in any given queue, regardless of other + # properties on the job. If both #by_args and #by_queue are on, then for the + # given job kind, a single instance is allowed for each combination of args + # and queues. If either args or queue is changed on a new job, it's allowed to + # be inserted as a new job. + # + # Uniquenes is checked at insert time by taking a Postgres advisory lock, + # doing a look up for an equivalent row, and inserting only if none was found. + # There's no database-level mechanism that guarantees jobs stay unique, so if + # an equivalent row is inserted out of band (or batch inserted, where a unique + # check doesn't occur), it's conceivable that duplicates could coexist. class UniqueOpts + # Indicates that uniqueness should be enforced for any specific instance of + # encoded args for a job. + # + # Default is false, meaning that as long as any other unique property is + # enabled, uniqueness will be enforced for a kind regardless of input args. + attr_accessor :by_args + + # Defines uniqueness within a given period. On an insert time is rounded + # down to the nearest multiple of the given period, and a job is only + # inserted if there isn't an existing job that will run between then and the + # next multiple of the period. + # + # The period should be specified in seconds. So a job that's unique every 15 + # minute period would have a value of 900. + # + # Default is no unique period, meaning that as long as any other unique + # property is enabled, uniqueness will be enforced across all jobs of the + # kind in the database, regardless of when they were scheduled. + attr_accessor :by_period + + # Indicates that uniqueness should be enforced within each queue. + # + # Default is false, meaning that as long as any other unique property is + # enabled, uniqueness will be enforced for a kind across all queues. + attr_accessor :by_queue + + # Indicates that uniqueness should be enforced across any of the states in + # the given set. For example, if the given states were `(scheduled, + # running)` then a new job could be inserted even if one of the same kind + # was already being worked by the queue (new jobs are inserted as + # `available`). + # + # Unlike other unique options, ByState gets a default when it's not set for + # user convenience. The default is equivalent to: + # + # ByState: []rivertype.JobState{rivertype.JobStateAvailable, rivertype.JobStateCompleted, rivertype.JobStateRunning, rivertype.JobStateRetryable, rivertype.JobStateScheduled} + # + # With this setting, any jobs of the same kind that have been completed or + # discarded, but not yet cleaned out by the system, won't count towards the + # uniqueness of a new insert. + attr_accessor :by_state + + def initialize( + by_args: nil, + by_period: nil, + by_queue: nil, + by_state: nil + ) + self.by_args = by_args + self.by_period = by_period + self.by_queue = by_queue + self.by_state = by_state + end end end diff --git a/riverqueue.gemspec b/riverqueue.gemspec index 2cd7ed8..08ed85d 100644 --- a/riverqueue.gemspec +++ b/riverqueue.gemspec @@ -8,4 +8,6 @@ Gem::Specification.new do |s| s.files = ["lib/riverqueue.rb"] s.homepage = "https://riverqueue.com" s.license = "LGPL-3.0-or-later" + + s.add_dependency "fnv-hash" end diff --git a/sig/client.rbs b/sig/client.rbs index 76cf8f8..40f55f0 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -4,15 +4,21 @@ module River QUEUE_DEFAULT: String class Client + @advisory_lock_prefix: Integer? @driver: _Driver + @time_now_utc: ^() -> Time + DEFAULT_UNIQUE_STATES: Array[jobStateAll] EMPTY_INSERT_OPTS: InsertOpts - def initialize: (_Driver driver) -> void + def initialize: (_Driver driver, ?advisory_lock_prefix: Integer?) -> void def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer - private def make_insert_params: (jobArgs, InsertOpts) -> Driver::JobInsertParams + private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult + private def uint64_to_int64: (Integer) -> Integer + private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] + private def truncate_time: (Time, Integer) -> Time end class InsertManyParams @@ -28,9 +34,11 @@ module River class InsertResult @job: JobRow + @unique_skipped_as_duplicated: bool attr_reader job: JobRow + attr_reader unique_skipped_as_duplicated: bool - def initialize: (JobRow job) -> void + def initialize: (JobRow job, ?unique_skipped_as_duplicated: bool) -> void end end diff --git a/sig/driver.rbs b/sig/driver.rbs index 21b5516..ddfac56 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -1,10 +1,23 @@ module River interface _Driver - def insert: (Driver::JobInsertParams) -> JobRow - def insert_many: (Array[Driver::JobInsertParams]) -> Integer + def advisory_lock: (Integer) -> void + def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? + def job_insert: (Driver::JobInsertParams) -> JobRow + def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer + def transaction: [T] () { () -> T } -> T end module Driver + class JobGetByKindAndUniquePropertiesParam + attr_accessor created_at: [Time, Time]? + attr_accessor encoded_args: String? + attr_accessor kind: String + attr_accessor queue: String? + attr_accessor state: Array[jobStateAll]? + + def initialize: (kind: String, ?created_at: [Time, Time]?, ?encoded_args: String?, ?queue: String?, ?state: Array[jobStateAll]?) -> void + end + class JobInsertParams attr_accessor encoded_args: String attr_accessor kind: String @@ -15,13 +28,6 @@ module River attr_accessor state: jobStateAll attr_accessor tags: Array[String]? - # TODO(brandur): Get these supported. - # attr_accessor :unique - # attr_accessor :unique_by_args - # attr_accessor :unique_by_period - # attr_accessor :unique_by_queue - # attr_accessor :unique_by_state - def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void end end diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs index 698f692..69ad0ca 100644 --- a/sig/insert_opts.rbs +++ b/sig/insert_opts.rbs @@ -11,6 +11,11 @@ module River end class UniqueOpts - # TODO + attr_accessor by_args: bool? + attr_accessor by_period: Integer? + attr_accessor by_queue: bool? + attr_accessor by_state: Array[jobStateAll]? + + def initialize: (?by_args: bool?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?) -> void end end diff --git a/sig_gem/fnv-hash/fnv.rbs b/sig_gem/fnv-hash/fnv.rbs new file mode 100644 index 0000000..2963be6 --- /dev/null +++ b/sig_gem/fnv-hash/fnv.rbs @@ -0,0 +1,21 @@ +module Fnv + class Hash + # Calculates the FNV-1 hash for the given + # item value + # + # @param item The item to hash + # @param size [Integer] the size of the resulting hash + # + # @return [Integer] the calculated hash value + def self.fnv_1: (Object, size: Integer) -> Integer + + # Calculates the FNV-1a hash for the given + # item value + # + # @param item The item to hash + # @param size [Integer] the size of the resulting hash + # + # @return [Integer] the calculated hash value + def self.fnv_1a: (Object, size: Integer) -> Integer + end +end \ No newline at end of file diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 67915cb..abba887 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -3,25 +3,44 @@ # We use a mock here, but each driver has a more comprehensive test suite that # performs full integration level tests. class MockDriver + attr_accessor :advisory_lock_calls attr_accessor :inserted_jobs + attr_accessor :job_get_by_kind_and_unique_properties_calls + attr_accessor :job_get_by_kind_and_unique_properties_returns def initialize + @advisory_lock_calls = [] @inserted_jobs = [] + @job_get_by_kind_and_unique_properties_calls = [] + @job_get_by_kind_and_unique_properties_returns = [] @next_id = 0 end - def insert(insert_params) + def advisory_lock(key) + @advisory_lock_calls << key + end + + def job_get_by_kind_and_unique_properties(get_params) + @job_get_by_kind_and_unique_properties_calls << get_params + job_get_by_kind_and_unique_properties_returns.shift + end + + def job_insert(insert_params) insert_params_to_jow_row(insert_params) end - def insert_many(insert_params_many) + def job_insert_many(insert_params_many) insert_params_many.each do |insert_params| insert_params_to_jow_row(insert_params) end insert_params_many.count end - private def insert_params_to_jow_row(insert_params) + def transaction(&) + yield + end + + def insert_params_to_jow_row(insert_params) job = River::JobRow.new( id: (@next_id += 1), args: JSON.parse(insert_params.encoded_args), @@ -174,6 +193,123 @@ def to_json = nil client.insert(args_klass.new) end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") end + + def check_bigint_bounds(int) + raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807 + raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808 + int + end + + # These unique insertion specs are pretty mock heavy, but each of the + # individual drivers has their own unique insert tests that make sure to do + # a full round trip. + describe "unique opts" do + let(:now) { Time.now.utc } + before { client.instance_variable_set(:@time_now_utc, -> { now }) } + + it "inserts a new unique job with minimal options" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "inserts a new unique job with all options" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "inserts a new unique job with advisory lock prefix" do + client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) + + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + + lock_str = "unique_keykind=#{args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | Fnv::Hash.fnv_1(lock_str, size: 32)))]) + + lock_key = mock_driver.advisory_lock_calls[0] + expect(lock_key >> 32).to eq(123456) + end + + it "gets an existing unique job" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_args: true, + by_period: 15 * 60, + by_queue: true, + by_state: [River::JOB_STATE_AVAILABLE] + ) + ) + + job = mock_driver.insert_params_to_jow_row(client.send(:make_insert_params, args, River::InsertOpts.new)[0]) + mock_driver.job_get_by_kind_and_unique_properties_returns << job + + insert_res = client.insert(args) + expect(insert_res).to have_attributes( + job: job, + unique_skipped_as_duplicated: true + ) + + lock_str = "unique_keykind=#{args.kind}" \ + "&args=#{JSON.dump({job_num: 1})}" \ + "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ + "&queue=#{River::QUEUE_DEFAULT}" \ + "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" + expect(mock_driver.advisory_lock_calls).to eq([check_bigint_bounds(client.send(:uint64_to_int64, Fnv::Hash.fnv_1(lock_str, size: 64)))]) + end + + it "skips unique check if unique opts empty" do + args = SimpleArgsWithInsertOpts.new(job_num: 1) + args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new + ) + + insert_res = client.insert(args) + expect(insert_res.job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false + end + end end describe "#insert_many" do @@ -303,6 +439,34 @@ def to_json = nil tags: ["custom_2"] ) end + + it "raises error with unique opts" do + expect do + client.insert_many([ + River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + unique_opts: River::UniqueOpts.new + )) + ]) + end.to raise_error(ArgumentError, "unique opts can't be used with `#insert_many`") + end + end + + describe "#truncate_time" do + it "truncates times to nearest interval" do + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60).utc).to eq(Time.parse("Thu Jan 15 21:26:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 5 * 60).utc).to eq(Time.parse("Thu Jan 15 21:25:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 15 * 60).utc).to eq(Time.parse("Thu Jan 15 21:15:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 1 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 21:00:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 5 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 17:00:00 UTC 2024")) # rubocop:disable Layout/ExtraSpacing + expect(client.send(:truncate_time, Time.parse("Thu Jan 15 21:26:36 UTC 2024").utc, 24 * 60 * 60).utc).to eq(Time.parse("Thu Jan 15 00:00:00 UTC 2024")) + end + end + + describe "#uint64_to_int64" do + it "converts between integer types" do + expect(client.send(:uint64_to_int64, 123456)).to eq(123456) + expect(client.send(:uint64_to_int64, 13977996710702069744)).to eq(-4468747363007481872) + end end end