diff --git a/.gitignore b/.gitignore index e8fe1c4..2250ed9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +.DS_Store *.gem coverage/ diff --git a/Gemfile.lock b/Gemfile.lock index 3ac4e45..9e9a2b5 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -143,6 +143,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/docs/README.md b/docs/README.md index c9ca412..dbc1e61 100644 --- a/docs/README.md +++ b/docs/README.md @@ -42,8 +42,8 @@ insert_res.job # inserted job row Job args should: -* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. -* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. +- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. +- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. @@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: ```ruby -client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) +client = River::Client.new(mock_driver) ``` Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other. diff --git a/driver/riverqueue-activerecord/Gemfile.lock b/driver/riverqueue-activerecord/Gemfile.lock index 6b24648..9256781 100644 --- a/driver/riverqueue-activerecord/Gemfile.lock +++ b/driver/riverqueue-activerecord/Gemfile.lock @@ -120,6 +120,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 96d5925..1dbbec3 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -31,36 +31,19 @@ def errors = {} end end - def advisory_lock(key) - ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") - nil - end - - def advisory_lock_try(key) - ::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"] - end - def job_get_by_id(id) data_set = RiverJob.where(id: id) data_set.first ? to_job_row_from_model(data_set.first) : nil end - def job_get_by_kind_and_unique_properties(get_params) - data_set = RiverJob.where(kind: get_params.kind) - data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at - data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args - data_set = data_set.where(queue: get_params.queue) if get_params.queue - data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first ? to_job_row_from_model(data_set.first) : nil - end - def job_insert(insert_params) - to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params))) + res = job_insert_many([insert_params]).first + [to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] end - def job_insert_unique(insert_params, unique_key) - res = RiverJob.upsert( - insert_params_to_hash(insert_params).merge(unique_key: unique_key), + def job_insert_many(insert_params_many) + RiverJob.upsert_all( + insert_params_many.map { |param| insert_params_to_hash(param) }, on_duplicate: Arel.sql("kind = EXCLUDED.kind"), returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"), @@ -69,15 +52,9 @@ def job_insert_unique(insert_params, unique_key) # ActiveRecord tries to look up a unique index instead of letting # Postgres handle that, and of course it doesn't support a `WHERE` # clause. The workaround is to target the index name instead of columns. - unique_by: "river_job_kind_unique_key_idx" + unique_by: "river_job_unique_idx" ) - - [to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] - end - - def job_insert_many(insert_params_many) - RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) }) - insert_params_many.count + .map { |row| to_insert_result(row) } end def job_list @@ -104,10 +81,16 @@ def transaction(&) queue: insert_params.queue, state: insert_params.state, scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags + tags: insert_params.tags, + unique_key: insert_params.unique_key, + unique_states: insert_params.unique_states }.compact end + private def to_insert_result(result) + [to_job_row_from_model(result), result.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] + end + private def to_job_row_from_model(river_job) # needs to be accessed through values because `errors` is shadowed by both # ActiveRecord and the patch above @@ -139,7 +122,8 @@ def transaction(&) scheduled_at: river_job.scheduled_at.getutc, state: river_job.state, tags: river_job.tags, - unique_key: river_job.unique_key + unique_key: river_job.unique_key, + unique_states: river_job.unique_states ) end @@ -182,7 +166,8 @@ def transaction(&) scheduled_at: river_job["scheduled_at"].getutc, state: river_job["state"], tags: river_job["tags"], - unique_key: river_job["unique_key"] + unique_key: river_job["unique_key"], + unique_states: ::River::UniqueBitmask.to_states(river_job["unique_states"]&.to_i(2)) ) end end diff --git a/driver/riverqueue-sequel/Gemfile.lock b/driver/riverqueue-sequel/Gemfile.lock index d6d0aee..df64f8f 100644 --- a/driver/riverqueue-sequel/Gemfile.lock +++ b/driver/riverqueue-sequel/Gemfile.lock @@ -78,6 +78,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index a21714e..097f141 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -13,51 +13,27 @@ def initialize(db) @db.extension(:pg_json) end - def advisory_lock(key) - @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first - nil - end - - def advisory_lock_try(key) - @db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock] - end - def job_get_by_id(id) data_set = @db[:river_job].where(id: id) data_set.first ? to_job_row(data_set.first) : nil end - def job_get_by_kind_and_unique_properties(get_params) - data_set = @db[:river_job].where(kind: get_params.kind) - data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at - data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args - data_set = data_set.where(queue: get_params.queue) if get_params.queue - data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first ? to_job_row(data_set.first) : nil - end - def job_insert(insert_params) - to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params))) + job_insert_many([insert_params]).first end - def job_insert_unique(insert_params, unique_key) - values = @db[:river_job] + def job_insert_many(insert_params_array) + @db[:river_job] .insert_conflict( - target: [:kind, :unique_key], - conflict_where: ::Sequel.lit("unique_key IS NOT NULL"), + target: [:unique_key], + conflict_where: ::Sequel.lit( + "unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)" + ), update: {kind: ::Sequel[:excluded][:kind]} ) .returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate")) - .insert_select( - insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key)) - ) - - [to_job_row(values), values[:unique_skipped_as_duplicate]] - end - - def job_insert_many(insert_params_many) - @db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) - insert_params_many.count + .multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) }) + .map { |row| to_insert_result(row) } end def job_list @@ -76,6 +52,7 @@ def transaction(&) private def insert_params_to_hash(insert_params) # the call to `#compact` is important so that we remove nils and table # default values get picked up instead + # TODO: but I had to remove it for bulk unique inserts... { args: insert_params.encoded_args, kind: insert_params.kind, @@ -84,8 +61,14 @@ def transaction(&) queue: insert_params.queue, state: insert_params.state, scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil - }.compact + tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags, :text) : nil, + unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil, + unique_states: insert_params.unique_states + } + end + + private def to_insert_result(result) + [to_job_row(result), result[:unique_skipped_as_duplicate]] end private def to_job_row(river_job) @@ -113,7 +96,8 @@ def transaction(&) scheduled_at: river_job[:scheduled_at].getutc, state: river_job[:state], tags: river_job[:tags].to_a, - unique_key: river_job[:unique_key]&.to_s + unique_key: river_job[:unique_key]&.to_s, + unique_states: ::River::UniqueBitmask.to_states(river_job[:unique_states]&.to_i(2)) ) end end diff --git a/lib/client.rb b/lib/client.rb index ea280fd..b3da831 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,5 +1,4 @@ require "digest" -require "fnv" require "time" module River @@ -25,9 +24,8 @@ module River # River drivers are found in separate gems like `riverqueue-sequel` to help # minimize transient dependencies. class Client - def initialize(driver, advisory_lock_prefix: nil) + def initialize(driver) @driver = driver - @advisory_lock_prefix = check_advisory_lock_prefix_bounds(advisory_lock_prefix) @time_now_utc = -> { Time.now.utc } # for test time stubbing end @@ -73,11 +71,8 @@ def initialize(driver, advisory_lock_prefix: nil) # # Returns an instance of InsertResult. def insert(args, insert_opts: EMPTY_INSERT_OPTS) - insert_params, unique_opts = make_insert_params(args, insert_opts) - check_unique_job(insert_params, unique_opts) do - job = @driver.job_insert(insert_params) - InsertResult.new(job) - end + insert_params = make_insert_params(args, insert_opts) + insert_and_check_unique_job(insert_params) end # Inserts many new jobs as part of a single batch operation for improved @@ -122,30 +117,20 @@ def insert(args, insert_opts: EMPTY_INSERT_OPTS) # # See also JobArgsHash for an easy way to insert a job from a hash. # - # Unique job insertion isn't supported with bulk insertion because it'd run - # the risk of major lock contention. - # # Returns the number of jobs inserted. def insert_many(args) all_params = args.map do |arg| if arg.is_a?(InsertManyParams) - make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert + make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS) else # jobArgs - make_insert_params(arg, EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert + make_insert_params(arg, EMPTY_INSERT_OPTS) end end @driver.job_insert_many(all_params) - end - - private def check_advisory_lock_prefix_bounds(advisory_lock_prefix) - return nil if advisory_lock_prefix.nil? - - # 2**32-1 is 0xffffffff (the largest number that's four bytes) - if advisory_lock_prefix < 0 || advisory_lock_prefix > 2**32 - 1 - raise ArgumentError, "advisory lock prefix must fit inside four bytes" + .map do |job, unique_skipped_as_duplicate| + InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) end - advisory_lock_prefix end # Default states that are used during a unique insert. Can be overridden by @@ -153,6 +138,7 @@ def insert_many(args) DEFAULT_UNIQUE_STATES = [ JOB_STATE_AVAILABLE, JOB_STATE_COMPLETED, + JOB_STATE_PENDING, JOB_STATE_RETRYABLE, JOB_STATE_RUNNING, JOB_STATE_SCHEDULED @@ -162,114 +148,75 @@ def insert_many(args) EMPTY_INSERT_OPTS = InsertOpts.new.freeze private_constant :EMPTY_INSERT_OPTS - private def check_unique_job(insert_params, unique_opts, &block) - return block.call if unique_opts.nil? + private def insert_and_check_unique_job(insert_params) + job, unique_skipped_as_duplicate = @driver.job_insert(insert_params) + InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) + end + + private def make_insert_params(args, insert_opts) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) - any_unique_opts = false - get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) + # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. + args_json = args.to_json + raise "args should return non-nil from `#to_json`" if !args_json + + args_insert_opts = if args.respond_to?(:insert_opts) + args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace + args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS + else + EMPTY_INSERT_OPTS + end + + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at + + insert_params = Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc || Time.now, + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: validate_tags(insert_opts.tags || args_insert_opts.tags || []) + ) + + unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + if unique_opts + unique_key, unique_states = make_unique_key_and_bitmask(insert_params, unique_opts) + insert_params.unique_key = unique_key + insert_params.unique_states = unique_states + end + insert_params + end + + private def make_unique_key_and_bitmask(insert_params, unique_opts) unique_key = "" # It's extremely important here that this lock string format and algorithm # match the one in the main River library _exactly_. Don't change them # unless they're updated everywhere. + unless unique_opts.exclude_kind + unique_key += "&kind=#{insert_params.kind}" + end + if unique_opts.by_args - any_unique_opts = true - get_params.encoded_args = insert_params.encoded_args unique_key += "&args=#{insert_params.encoded_args}" end if unique_opts.by_period lower_period_bound = truncate_time(@time_now_utc.call, unique_opts.by_period).utc - any_unique_opts = true - get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}" end if unique_opts.by_queue - any_unique_opts = true - get_params.queue = insert_params.queue unique_key += "&queue=#{insert_params.queue}" end - if unique_opts.by_state - any_unique_opts = true - get_params.state = unique_opts.by_state - unique_key += "&state=#{unique_opts.by_state.join(",")}" - else - get_params.state = DEFAULT_UNIQUE_STATES - unique_key += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" - end - - return block.call unless any_unique_opts - - # fast path - if !unique_opts.by_state || unique_opts.by_state.sort == DEFAULT_UNIQUE_STATES - unique_key_hash = Digest::SHA256.digest(unique_key) - job, unique_skipped_as_duplicate = @driver.job_insert_unique(insert_params, unique_key_hash) - return InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) - end - - @driver.transaction do - lock_str = "unique_key" - lock_str += "kind=#{insert_params.kind}" - lock_str += unique_key - - lock_key = if @advisory_lock_prefix.nil? - FNV.fnv1_hash(lock_str, size: 64) - else - # Steep should be able to tell that this is not nil, but it can't. - prefix = @advisory_lock_prefix #: Integer # rubocop:disable Layout/LeadingCommentSpace - prefix << 32 | FNV.fnv1_hash(lock_str, size: 32) - end - - # Packs a uint64 then unpacks to int64, which we need to do to keep the - # value within the bounds of Postgres' bigint. Overflow is okay because - # we can use the full bigint space (including negative numbers) for the - # advisory lock. - lock_key = uint64_to_int64(lock_key) - - @driver.advisory_lock(lock_key) - - existing_job = @driver.job_get_by_kind_and_unique_properties(get_params) - return InsertResult.new(existing_job, unique_skipped_as_duplicated: true) if existing_job - - block.call - end - end - - private def make_insert_params(args, insert_opts, is_insert_many: false) - raise "args should respond to `#kind`" if !args.respond_to?(:kind) - - # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. - args_json = args.to_json - raise "args should return non-nil from `#to_json`" if !args_json - - args_insert_opts = if args.respond_to?(:insert_opts) - args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace - args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS - else - EMPTY_INSERT_OPTS - end - - scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at - unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + unique_key_hash = Digest::SHA256.digest(unique_key) + unique_states = unique_opts.by_state || DEFAULT_UNIQUE_STATES - raise ArgumentError, "unique opts can't be used with `#insert_many`" if is_insert_many && unique_opts - - [ - Driver::JobInsertParams.new( - encoded_args: args_json, - kind: args.kind, - max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, - priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, - queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, - scheduled_at: scheduled_at&.utc, # database defaults to now - state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: validate_tags(insert_opts.tags || args_insert_opts.tags) - ), - unique_opts - ] + [unique_key_hash, UniqueBitmask.from_states(unique_states)] end # Truncates the given time down to the interval. For example: @@ -322,7 +269,7 @@ class InsertResult # job matching unique property already being present. attr_reader :unique_skipped_as_duplicated - def initialize(job, unique_skipped_as_duplicated: false) + def initialize(job, unique_skipped_as_duplicated:) @job = job @unique_skipped_as_duplicated = unique_skipped_as_duplicated end diff --git a/lib/driver.rb b/lib/driver.rb index a3c11c6..d2992f2 100644 --- a/lib/driver.rb +++ b/lib/driver.rb @@ -4,29 +4,6 @@ module River # considered to be for internal use only and subject to change. API stability # is not guaranteed. module Driver - # Parameters for looking up a job by kind and unique properties. - class JobGetByKindAndUniquePropertiesParam - attr_accessor :created_at - attr_accessor :encoded_args - attr_accessor :kind - attr_accessor :queue - attr_accessor :state - - def initialize( - kind:, - created_at: nil, - encoded_args: nil, - queue: nil, - state: nil - ) - self.kind = kind - self.created_at = created_at - self.encoded_args = encoded_args - self.queue = queue - self.state = state - end - end - # Insert parameters for a job. This is sent to underlying drivers and is meant # for internal use only. Its interface is subject to change. class JobInsertParams @@ -38,6 +15,8 @@ class JobInsertParams attr_accessor :scheduled_at attr_accessor :state attr_accessor :tags + attr_accessor :unique_key + attr_accessor :unique_states def initialize( encoded_args:, diff --git a/lib/fnv.rb b/lib/fnv.rb deleted file mode 100644 index 28ffeef..0000000 --- a/lib/fnv.rb +++ /dev/null @@ -1,35 +0,0 @@ -module River - # FNV is the Fowler–Noll–Vo hash function, a simple hash that's very easy to - # implement, and hash the perfect characteristics for use with the 64 bits of - # available space in a PG advisory lock. - # - # I'm implemented it myself so that the River gem can stay dependency free - # (and because it's quite easy to do). - module FNV - def self.fnv1_hash(str, size:) - hash = OFFSET_BASIS.fetch(size) - mask = (2**size - 1).to_int # creates a mask of 1s of `size` bits long like 0xffffffff - prime = PRIME.fetch(size) - - str.each_byte do |byte| - hash *= prime - hash &= mask - hash ^= byte - end - - hash - end - - OFFSET_BASIS = { - 32 => 0x811c9dc5, - 64 => 0xcbf29ce484222325 - }.freeze - private_constant :OFFSET_BASIS - - PRIME = { - 32 => 0x01000193, - 64 => 0x00000100000001B3 - }.freeze - private_constant :PRIME - end -end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb index abd8dd3..11f42aa 100644 --- a/lib/insert_opts.rb +++ b/lib/insert_opts.rb @@ -114,23 +114,30 @@ class UniqueOpts # Unlike other unique options, ByState gets a default when it's not set for # user convenience. The default is equivalent to: # - # by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_RUNNING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_SCHEDULED] + # by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_SCHEDULED] # # With this setting, any jobs of the same kind that have been completed or # discarded, but not yet cleaned out by the system, won't count towards the # uniqueness of a new insert. attr_accessor :by_state + # Indicates that the job kind should not be considered for uniqueness. This + # is useful when you want to enforce uniqueness based on other properties + # across multiple worker types. + attr_accessor :exclude_kind + def initialize( by_args: nil, by_period: nil, by_queue: nil, - by_state: nil + by_state: nil, + exclude_kind: nil ) self.by_args = by_args self.by_period = by_period self.by_queue = by_queue self.by_state = by_state + self.exclude_kind = exclude_kind end end end diff --git a/lib/job.rb b/lib/job.rb index 494ded9..ca56780 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -3,6 +3,7 @@ module River JOB_STATE_CANCELLED = "cancelled" JOB_STATE_COMPLETED = "completed" JOB_STATE_DISCARDED = "discarded" + JOB_STATE_PENDING = "pending" JOB_STATE_RETRYABLE = "retryable" JOB_STATE_RUNNING = "running" JOB_STATE_SCHEDULED = "scheduled" @@ -113,6 +114,9 @@ class JobRow # configuration. attr_accessor :unique_key + # A list of states that the job must be in to be considered for uniqueness. + attr_accessor :unique_states + def initialize( id:, args:, @@ -132,7 +136,8 @@ def initialize( errors: nil, finalized_at: nil, tags: nil, - unique_key: nil + unique_key: nil, + unique_states: nil ) self.id = id self.args = args @@ -151,6 +156,7 @@ def initialize( self.state = state self.tags = tags self.unique_key = unique_key + self.unique_states = unique_states end end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index 87f62b2..429e0d5 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -1,11 +1,11 @@ require "json" -require_relative "fnv" require_relative "insert_opts" require_relative "job" require_relative "client" require_relative "driver" +require_relative "unique_bitmask" module River end diff --git a/lib/unique_bitmask.rb b/lib/unique_bitmask.rb new file mode 100644 index 0000000..389885c --- /dev/null +++ b/lib/unique_bitmask.rb @@ -0,0 +1,41 @@ +module River + class UniqueBitmask + JOB_STATE_BIT_POSITIONS = { + ::River::JOB_STATE_AVAILABLE => 7, + ::River::JOB_STATE_CANCELLED => 6, + ::River::JOB_STATE_COMPLETED => 5, + ::River::JOB_STATE_DISCARDED => 4, + ::River::JOB_STATE_PENDING => 3, + ::River::JOB_STATE_RETRYABLE => 2, + ::River::JOB_STATE_RUNNING => 1, + ::River::JOB_STATE_SCHEDULED => 0 + }.freeze + private_constant :JOB_STATE_BIT_POSITIONS + + def self.from_states(states) + val = 0 + + states.each do |state| + bit_index = JOB_STATE_BIT_POSITIONS[state] + + bit_position = 7 - (bit_index % 8) + val |= 1 << bit_position + end + + format("%08b", val) + end + + def self.to_states(mask) + states = [] #: Array[jobStateAll] # rubocop:disable Layout/LeadingCommentSpace + + JOB_STATE_BIT_POSITIONS.each do |state, bit_index| + bit_position = 7 - (bit_index % 8) + if (mask & (1 << bit_position)) != 0 + states << state + end + end + + states.sort + end + end +end diff --git a/sig/client.rbs b/sig/client.rbs index aaa01b7..eac8e45 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -4,21 +4,19 @@ module River QUEUE_DEFAULT: String class Client - @advisory_lock_prefix: Integer? @driver: _Driver @time_now_utc: ^() -> Time - def initialize: (_Driver driver, ?advisory_lock_prefix: Integer?) -> void + def initialize: (_Driver driver) -> void def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult - def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer - - private def check_advisory_lock_prefix_bounds: (Integer?) -> Integer? + def insert_many: (Array[jobArgs | InsertManyParams]) -> Array[InsertResult] DEFAULT_UNIQUE_STATES: Array[jobStateAll] EMPTY_INSERT_OPTS: InsertOpts - private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult - private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] + private def insert_and_check_unique_job: (Driver::JobInsertParams) -> InsertResult + private def make_insert_params: (jobArgs, InsertOpts) -> Driver::JobInsertParams + private def make_unique_key_and_bitmask: (Driver::JobInsertParams, UniqueOpts) -> [String, String] private def truncate_time: (Time, Integer) -> Time private def uint64_to_int64: (Integer) -> Integer @@ -45,6 +43,6 @@ module River attr_reader job: JobRow attr_reader unique_skipped_as_duplicated: bool - def initialize: (JobRow job, ?unique_skipped_as_duplicated: bool) -> void + def initialize: (JobRow job, unique_skipped_as_duplicated: bool) -> void end end diff --git a/sig/driver.rbs b/sig/driver.rbs index be4d4dc..26be08b 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -2,9 +2,8 @@ module River interface _Driver def advisory_lock: (Integer) -> void def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? - def job_insert: (Driver::JobInsertParams) -> JobRow - def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer - def job_insert_unique: (Driver::JobInsertParams, String) -> [JobRow, bool] + def job_insert: (Driver::JobInsertParams) -> [JobRow, bool] + def job_insert_many: (Array[Driver::JobInsertParams]) -> Array[[JobRow, bool]] def transaction: [T] () { () -> T } -> T # this set of methods is used only in tests @@ -34,6 +33,8 @@ module River attr_accessor scheduled_at: Time? attr_accessor state: jobStateAll attr_accessor tags: Array[String]? + attr_accessor unique_key: String? + attr_accessor unique_states: String? def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void end diff --git a/sig/fnv.rbs b/sig/fnv.rbs deleted file mode 100644 index 3a016dc..0000000 --- a/sig/fnv.rbs +++ /dev/null @@ -1,9 +0,0 @@ -module River - module FNV - def self.fnv1_hash: (String, size: 32 | 64) -> Integer - - MASK: Hash[Integer, Integer] - OFFSET_BASIS: Hash[Integer, Integer] - PRIME: Hash[Integer, Integer] - end -end \ No newline at end of file diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs index 69ad0ca..c28598b 100644 --- a/sig/insert_opts.rbs +++ b/sig/insert_opts.rbs @@ -15,7 +15,8 @@ module River attr_accessor by_period: Integer? attr_accessor by_queue: bool? attr_accessor by_state: Array[jobStateAll]? + attr_accessor exclude_kind: bool? - def initialize: (?by_args: bool?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?) -> void + def initialize: (?by_args: bool?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?, ?exclude_kind: bool?) -> void end end diff --git a/sig/job.rbs b/sig/job.rbs index 3193f60..16a0288 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -3,11 +3,12 @@ module River JOB_STATE_CANCELLED: "cancelled" JOB_STATE_COMPLETED: "completed" JOB_STATE_DISCARDED: "discarded" + JOB_STATE_PENDING: "pending" JOB_STATE_RETRYABLE: "retryable" JOB_STATE_RUNNING: "running" JOB_STATE_SCHEDULED: "scheduled" - type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "retryable" | "running" | "scheduled" + type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "pending" | "retryable" | "running" | "scheduled" interface _JobArgs def is_a?: (Class) -> bool @@ -52,8 +53,9 @@ module River attr_accessor state: jobStateAll attr_accessor tags: Array[String]? attr_accessor unique_key: String? + attr_accessor unique_states: Array[jobStateAll]? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?, ?unique_states: Array[jobStateAll]?) -> void end class AttemptError diff --git a/sig/unique_bitmask.rbs b/sig/unique_bitmask.rbs new file mode 100644 index 0000000..af5f95b --- /dev/null +++ b/sig/unique_bitmask.rbs @@ -0,0 +1,9 @@ +module River + class UniqueBitmask + JOB_STATE_BIT_POSITIONS: Hash[jobStateAll, Integer] + + def self.from_states: (Array[jobStateAll]) -> String + + def self.to_states: (Integer) -> Array[jobStateAll] + end +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index f3c8e9b..a9d9234 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -119,22 +119,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs ) end - it "errors if advisory lock prefix is larger than four bytes" do - River::Client.new(driver, advisory_lock_prefix: 123) - - expect do - River::Client.new(driver, advisory_lock_prefix: -1) - end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") - - # 2^32-1 is 0xffffffff (1s for 32 bits) which fits - River::Client.new(driver, advisory_lock_prefix: 2**32 - 1) - - # 2^32 is 0x100000000, which does not - expect do - River::Client.new(driver, advisory_lock_prefix: 2**32) - end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") - end - it "errors if args don't respond to #kind" do args_klass = Class.new do def to_json = {} @@ -186,16 +170,7 @@ def check_bigint_bounds(int) let(:now) { Time.now.utc } before { client.instance_variable_set(:@time_now_utc, -> { now }) } - let(:advisory_lock_keys) { [] } - - before do - # required so it's properly captured by the lambda below - keys = advisory_lock_keys - - driver.singleton_class.send(:define_method, :advisory_lock, ->(key) { keys.push(key) }) - end - - it "inserts a new unique job with minimal options on the fast path" do + it "inserts a new unique job with minimal options" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -207,23 +182,22 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - expect(advisory_lock_keys).to be_empty - - unique_key_str = "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + unique_key_str = "&kind=#{insert_res.job.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_PENDING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with minimal options on the slow path" do + it "inserts a new unique job with custom states" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] ) ) @@ -231,99 +205,45 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + lock_str = "&kind=#{job_args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" - expect(insert_res.job.unique_key).to be_nil + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(lock_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options on the fast path" do + it "inserts a new unique job with all options" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_args: true, by_period: 15 * 60, by_queue: true, - by_state: River::Client.const_get(:DEFAULT_UNIQUE_STATES) + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_CANCELLED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED], + exclude_kind: true ) ) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - - expect(advisory_lock_keys).to be_empty + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_CANCELLED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) unique_key_str = "&args=#{JSON.dump({job_num: 1})}" \ "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + "&queue=#{River::QUEUE_DEFAULT}" expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_CANCELLED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options on the slow path" do - job_args = SimpleArgsWithInsertOpts.new(job_num: 1) - job_args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_args: true, - by_period: 15 * 60, - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE] - ) - ) - - insert_res = client.insert(job_args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - - lock_str = "unique_keykind=#{job_args.kind}" \ - "&args=#{JSON.dump({job_num: 1})}" \ - "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - - expect(insert_res.job.unique_key).to be_nil - - insert_res = client.insert(job_args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a new unique job with advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - job_args = SimpleArgsWithInsertOpts.new(job_num: 1) - job_args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path - ) - ) - - insert_res = client.insert(job_args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) - - lock_key = advisory_lock_keys[0] - expect(lock_key >> 32).to eq(123456) - end - it "skips unique check if unique opts empty" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( @@ -339,11 +259,13 @@ def check_bigint_bounds(int) describe "#insert_many" do it "inserts jobs from jobArgs with defaults" do - num_inserted = client.insert_many([ + results = client.insert_many([ SimpleArgs.new(job_num: 1), SimpleArgs.new(job_num: 2) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(2) + expect(results[0].job).to have_attributes(args: {"job_num" => 1}) + expect(results[1].job).to have_attributes(args: {"job_num" => 2}) jobs = driver.job_list expect(jobs.count).to be 2 @@ -376,11 +298,13 @@ def check_bigint_bounds(int) end it "inserts jobs from InsertManyParams with defaults" do - num_inserted = client.insert_many([ + results = client.insert_many([ River::InsertManyParams.new(SimpleArgs.new(job_num: 1)), River::InsertManyParams.new(SimpleArgs.new(job_num: 2)) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(2) + expect(results[0].job).to have_attributes(args: {"job_num" => 1}) + expect(results[1].job).to have_attributes(args: {"job_num" => 2}) jobs = driver.job_list expect(jobs.count).to be 2 @@ -413,6 +337,19 @@ def check_bigint_bounds(int) end it "inserts jobs with insert opts" do + # First, insert a job which will cause a duplicate conflict with the bulk + # insert so the bulk insert's row gets skipped. + dupe_job_args = SimpleArgsWithInsertOpts.new(job_num: 0) + dupe_job_args.insert_opts = River::InsertOpts.new( + queue: "job_to_duplicate", + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(dupe_job_args) + expect(insert_res.job).to_not be_nil + # We set job insert opts in this spec too so that we can verify that the # options passed at insertion time take precedence. args1 = SimpleArgsWithInsertOpts.new(job_num: 1) @@ -429,8 +366,16 @@ def check_bigint_bounds(int) queue: "job_custom_queue_2", tags: ["job_custom_2"] ) + args3 = SimpleArgsWithInsertOpts.new(job_num: 3) + args3.insert_opts = River::InsertOpts.new( + queue: "to_duplicate", # duplicate by queue, will be skipped + tags: ["job_custom_3"], + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) - num_inserted = client.insert_many([ + results = client.insert_many([ River::InsertManyParams.new(args1, insert_opts: River::InsertOpts.new( max_attempts: 17, priority: 3, @@ -442,37 +387,43 @@ def check_bigint_bounds(int) priority: 4, queue: "my_queue_2", tags: ["custom_2"] + )), + River::InsertManyParams.new(args3, insert_opts: River::InsertOpts.new( + queue: "job_to_duplicate", # duplicate by queue, will be skipped + tags: ["custom_3"], + unique_opts: River::UniqueOpts.new( + by_queue: true + ) )) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(3) # all rows returned, including skipped duplicates + expect(results[0].job).to have_attributes(tags: ["custom_1"]) + expect(results[1].job).to have_attributes(tags: ["custom_2"]) + expect(results[2].unique_skipped_as_duplicated).to be true + expect(results[2].job).to have_attributes( + id: insert_res.job.id, + tags: [] + ) jobs = driver.job_list - expect(jobs.count).to be 2 + expect(jobs.count).to be 3 - expect(jobs[0]).to have_attributes( + expect(jobs[0]).to have_attributes(queue: "job_to_duplicate") + + expect(jobs[1]).to have_attributes( max_attempts: 17, priority: 3, queue: "my_queue_1", tags: ["custom_1"] ) - expect(jobs[1]).to have_attributes( + expect(jobs[2]).to have_attributes( max_attempts: 18, priority: 4, queue: "my_queue_2", tags: ["custom_2"] ) end - - it "raises error with unique opts" do - expect do - client.insert_many([ - River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( - unique_opts: River::UniqueOpts.new - )) - ]) - end.to raise_error(ArgumentError, "unique opts can't be used with `#insert_many`") - end end describe River::Client.const_get(:DEFAULT_UNIQUE_STATES) do diff --git a/spec/driver_shared_examples.rb b/spec/driver_shared_examples.rb index d1d659a..8465eb3 100644 --- a/spec/driver_shared_examples.rb +++ b/spec/driver_shared_examples.rb @@ -19,7 +19,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs shared_examples "driver shared examples" do describe "unique insertion" do - it "inserts a unique job once on the fast path" do + it "inserts a unique job once" do args = SimpleArgsWithInsertOpts.new(job_num: 1) args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -37,14 +37,14 @@ class SimpleArgsWithInsertOpts < SimpleArgs expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a unique job on the slow path" do + it "inserts a unique job with custom states" do client = River::Client.new(driver) args = SimpleArgsWithInsertOpts.new(job_num: 1) args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] ) ) @@ -57,47 +57,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs expect(insert_res.job.id).to eq(original_job.id) expect(insert_res.unique_skipped_as_duplicated).to be true end - - it "inserts a unique job on the slow path with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - end - - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - expect(driver.advisory_lock_try(123)).to be false - end.join - end - end - end - - describe "#advisory_lock_try" do - it "takes an advisory lock" do - driver.transaction do - expect(driver.advisory_lock_try(123)).to be true - end - end end describe "#job_get_by_id" do @@ -113,93 +72,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end - describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) @@ -215,6 +87,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs state: River::JOB_STATE_AVAILABLE, tags: [] ) + expect(insert_res.unique_skipped_as_duplicated).to be false # Make sure it made it to the database. Assert only minimally since we're # certain it's the same as what we checked above. @@ -235,6 +108,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs scheduled_at: be_within(2).of(target_time), state: River::JOB_STATE_SCHEDULED ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with job insert opts" do @@ -253,6 +127,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: "job_custom_queue", tags: ["job_custom"] ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with insert opts" do @@ -278,6 +153,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: "my_queue", tags: ["custom"] ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with job args hash" do @@ -288,6 +164,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs args: {"job_num" => 1}, kind: "hash_kind" ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts in a transaction" do @@ -298,6 +175,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs job = driver.job_get_by_id(insert_res.job.id) expect(job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false raise driver.rollback_exception end @@ -306,6 +184,50 @@ class SimpleArgsWithInsertOpts < SimpleArgs job = driver.job_get_by_id(insert_res.job.id) expect(job).to be_nil end + + it "inserts a unique job" do + insert_params = River::Driver::JobInsertParams.new( + encoded_args: JSON.dump({"job_num" => 1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: Time.now.getutc, + state: River::JOB_STATE_AVAILABLE, + tags: nil + ) + + job_row, unique_skipped_as_duplicated = driver.job_insert(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be false + + # second insertion should be skipped + job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [] + ) + expect(unique_skipped_as_duplicated).to be true + end end describe "#job_insert_many" do @@ -368,52 +290,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#job_insert_unique" do - it "inserts a job" do - insert_params = River::Driver::JobInsertParams.new( - encoded_args: JSON.dump({"job_num" => 1}), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: Time.now.getutc, - state: River::JOB_STATE_AVAILABLE, - tags: nil - ) - - job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") - expect(job_row).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.getutc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.getutc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) - expect(unique_skipped_as_duplicated).to be false - - # second insertion should be skipped - job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") - expect(job_row).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.getutc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.getutc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) - expect(unique_skipped_as_duplicated).to be true - end - end - describe "#job_list" do let(:job_args) { SimpleArgs.new(job_num: 1) } diff --git a/spec/fnv_spec.rb b/spec/fnv_spec.rb deleted file mode 100644 index df21872..0000000 --- a/spec/fnv_spec.rb +++ /dev/null @@ -1,641 +0,0 @@ -require "spec_helper" - -describe River::FNV do - describe "#fnv1_hash" do - it "hashes strings to 32 bits" do - TEST_STRS.each do |test_str| - expect(River::FNV.fnv1_hash(test_str, size: 32)).to eq(FNV1_32_HASHES[test_str]) - end - end - - it "hashes strings to 64 bits" do - TEST_STRS.each do |test_str| - expect(River::FNV.fnv1_hash(test_str, size: 64)).to eq(FNV1_64_HASHES[test_str]) - end - end - end -end - -# -# Test strings pulled from the Python FNV hash test suite: -# -# https://github.com/znerol/py-fnvhash/blob/master/fnvhash/test/vector.py -# - -TEST_STRS = [ - "", - "a", - "b", - "c", - "d", - "e", - "f", - "fo", - "foo", - "foob", - "fooba", - "foobar", - "" + "\x00", - "a" + "\x00", - "b" + "\x00", - "c" + "\x00", - "d" + "\x00", - "e" + "\x00", - "f" + "\x00", - "fo" + "\x00", - "foo" + "\x00", - "foob" + "\x00", - "fooba" + "\x00", - "foobar" + "\x00", - "ch", - "cho", - "chon", - "chong", - "chongo", - "chongo ", - "chongo w", - "chongo wa", - "chongo was", - "chongo was ", - "chongo was h", - "chongo was he", - "chongo was her", - "chongo was here", - "chongo was here!", - "chongo was here!\n", - "ch" + "\x00", - "cho" + "\x00", - "chon" + "\x00", - "chong" + "\x00", - "chongo" + "\x00", - "chongo " + "\x00", - "chongo w" + "\x00", - "chongo wa" + "\x00", - "chongo was" + "\x00", - "chongo was " + "\x00", - "chongo was h" + "\x00", - "chongo was he" + "\x00", - "chongo was her" + "\x00", - "chongo was here" + "\x00", - "chongo was here!" + "\x00", - "chongo was here!\n" + "\x00", - "cu", - "cur", - "curd", - "curds", - "curds ", - "curds a", - "curds an", - "curds and", - "curds and ", - "curds and w", - "curds and wh", - "curds and whe", - "curds and whey", - "curds and whey\n", - "cu" + "\x00", - "cur" + "\x00", - "curd" + "\x00", - "curds" + "\x00", - "curds " + "\x00", - "curds a" + "\x00", - "curds an" + "\x00", - "curds and" + "\x00", - "curds and " + "\x00", - "curds and w" + "\x00", - "curds and wh" + "\x00", - "curds and whe" + "\x00", - "curds and whey" + "\x00", - "curds and whey\n" + "\x00", - "hi", - "hi" + "\x00", - "hello", - "hello" + "\x00", - "\xff\x00\x00\x01", - "\x01\x00\x00\xff", - "\xff\x00\x00\x02", - "\x02\x00\x00\xff", - "\xff\x00\x00\x03", - "\x03\x00\x00\xff", - "\xff\x00\x00\x04", - "\x04\x00\x00\xff", - "\x40\x51\x4e\x44", - "\x44\x4e\x51\x40", - "\x40\x51\x4e\x4a", - "\x4a\x4e\x51\x40", - "\x40\x51\x4e\x54", - "\x54\x4e\x51\x40", - "127.0.0.1", - "127.0.0.1" + "\x00", - "127.0.0.2", - "127.0.0.2" + "\x00", - "127.0.0.3", - "127.0.0.3" + "\x00", - "64.81.78.68", - "64.81.78.68" + "\x00", - "64.81.78.74", - "64.81.78.74" + "\x00", - "64.81.78.84", - "64.81.78.84" + "\x00", - "feedface", - "feedface" + "\x00", - "feedfacedaffdeed", - "feedfacedaffdeed" + "\x00", - "feedfacedeadbeef", - "feedfacedeadbeef" + "\x00", - "line 1\nline 2\nline 3", - "chongo /\\../\\", - "chongo /\\../\\" + "\x00", - "chongo (Landon Curt Noll) /\\../\\", - "chongo (Landon Curt Noll) /\\../\\" + "\x00", - "http://antwrp.gsfc.nasa.gov/apod/astropix.html", - "http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash", - "http://epod.usra.edu/", - "http://exoplanet.eu/", - "http://hvo.wr.usgs.gov/cam3/", - "http://hvo.wr.usgs.gov/cams/HMcam/", - "http://hvo.wr.usgs.gov/kilauea/update/deformation.html", - "http://hvo.wr.usgs.gov/kilauea/update/images.html", - "http://hvo.wr.usgs.gov/kilauea/update/maps.html", - "http://hvo.wr.usgs.gov/volcanowatch/current_issue.html", - "http://neo.jpl.nasa.gov/risk/", - "http://norvig.com/21-days.html", - "http://primes.utm.edu/curios/home.php", - "http://slashdot.org/", - "http://tux.wr.usgs.gov/Maps/155.25-19.5.html", - "http://volcano.wr.usgs.gov/kilaueastatus.php", - "http://www.avo.alaska.edu/activity/Redoubt.php", - "http://www.dilbert.com/fast/", - "http://www.fourmilab.ch/gravitation/orbits/", - "http://www.fpoa.net/", - "http://www.ioccc.org/index.html", - "http://www.isthe.com/cgi-bin/number.cgi", - "http://www.isthe.com/chongo/bio.html", - "http://www.isthe.com/chongo/index.html", - "http://www.isthe.com/chongo/src/calc/lucas-calc", - "http://www.isthe.com/chongo/tech/astro/venus2004.html", - "http://www.isthe.com/chongo/tech/astro/vita.html", - "http://www.isthe.com/chongo/tech/comp/c/expert.html", - "http://www.isthe.com/chongo/tech/comp/calc/index.html", - "http://www.isthe.com/chongo/tech/comp/fnv/index.html", - "http://www.isthe.com/chongo/tech/math/number/howhigh.html", - "http://www.isthe.com/chongo/tech/math/number/number.html", - "http://www.isthe.com/chongo/tech/math/prime/mersenne.html", - "http://www.isthe.com/chongo/tech/math/prime/mersenne.html#largest", - "http://www.lavarnd.org/cgi-bin/corpspeak.cgi", - "http://www.lavarnd.org/cgi-bin/haiku.cgi", - "http://www.lavarnd.org/cgi-bin/rand-none.cgi", - "http://www.lavarnd.org/cgi-bin/randdist.cgi", - "http://www.lavarnd.org/index.html", - "http://www.lavarnd.org/what/nist-test.html", - "http://www.macosxhints.com/", - "http://www.mellis.com/", - "http://www.nature.nps.gov/air/webcams/parks/havoso2alert/havoalert.cfm", - "http://www.nature.nps.gov/air/webcams/parks/havoso2alert/timelines_24.cfm", - "http://www.paulnoll.com/", - "http://www.pepysdiary.com/", - "http://www.sciencenews.org/index/home/activity/view", - "http://www.skyandtelescope.com/", - "http://www.sput.nl/~rob/sirius.html", - "http://www.systemexperts.com/", - "http://www.tq-international.com/phpBB3/index.php", - "http://www.travelquesttours.com/index.htm", - "http://www.wunderground.com/global/stations/89606.html", - "21701" * 10, - "M21701" * 10, - "2^21701-1" * 10, - "\x54\xc5" * 10, - "\xc5\x54" * 10, - "23209" * 10, - "M23209" * 10, - "2^23209-1" * 10, - "\x5a\xa9" * 10, - "\xa9\x5a" * 10, - "391581216093" * 10, - "391581*2^216093-1" * 10, - "\x05\xf9\x9d\x03\x4c\x81" * 10, - "FEDCBA9876543210" * 10, - "\xfe\xdc\xba\x98\x76\x54\x32\x10" * 10, - "EFCDAB8967452301" * 10, - "\xef\xcd\xab\x89\x67\x45\x23\x01" * 10, - "0123456789ABCDEF" * 10, - "\x01\x23\x45\x67\x89\xab\xcd\xef" * 10, - "1032547698BADCFE" * 10, - "\x10\x32\x54\x76\x98\xba\xdc\xfe" * 10, - "\x00" * 500, - "\x07" * 500, - "~" * 500, - "\x7f" * 500 -] - -FNV1_32_HASHES = { - TEST_STRS[0] => 0x811c9dc5, - TEST_STRS[1] => 0x050c5d7e, - TEST_STRS[2] => 0x050c5d7d, - TEST_STRS[3] => 0x050c5d7c, - TEST_STRS[4] => 0x050c5d7b, - TEST_STRS[5] => 0x050c5d7a, - TEST_STRS[6] => 0x050c5d79, - TEST_STRS[7] => 0x6b772514, - TEST_STRS[8] => 0x408f5e13, - TEST_STRS[9] => 0xb4b1178b, - TEST_STRS[10] => 0xfdc80fb0, - TEST_STRS[11] => 0x31f0b262, - TEST_STRS[12] => 0x050c5d1f, - TEST_STRS[13] => 0x70772d5a, - TEST_STRS[14] => 0x6f772bc7, - TEST_STRS[15] => 0x6e772a34, - TEST_STRS[16] => 0x6d7728a1, - TEST_STRS[17] => 0x6c77270e, - TEST_STRS[18] => 0x6b77257b, - TEST_STRS[19] => 0x408f5e7c, - TEST_STRS[20] => 0xb4b117e9, - TEST_STRS[21] => 0xfdc80fd1, - TEST_STRS[22] => 0x31f0b210, - TEST_STRS[23] => 0xffe8d046, - TEST_STRS[24] => 0x6e772a5c, - TEST_STRS[25] => 0x4197aebb, - TEST_STRS[26] => 0xfcc8100f, - TEST_STRS[27] => 0xfdf147fa, - TEST_STRS[28] => 0xbcd44ee1, - TEST_STRS[29] => 0x23382c13, - TEST_STRS[30] => 0x846d619e, - TEST_STRS[31] => 0x1630abdb, - TEST_STRS[32] => 0xc99e89b2, - TEST_STRS[33] => 0x1692c316, - TEST_STRS[34] => 0x9f091bca, - TEST_STRS[35] => 0x2556be9b, - TEST_STRS[36] => 0x628e0e73, - TEST_STRS[37] => 0x98a0bf6c, - TEST_STRS[38] => 0xb10d5725, - TEST_STRS[39] => 0xdd002f35, - TEST_STRS[40] => 0x4197aed4, - TEST_STRS[41] => 0xfcc81061, - TEST_STRS[42] => 0xfdf1479d, - TEST_STRS[43] => 0xbcd44e8e, - TEST_STRS[44] => 0x23382c33, - TEST_STRS[45] => 0x846d61e9, - TEST_STRS[46] => 0x1630abba, - TEST_STRS[47] => 0xc99e89c1, - TEST_STRS[48] => 0x1692c336, - TEST_STRS[49] => 0x9f091ba2, - TEST_STRS[50] => 0x2556befe, - TEST_STRS[51] => 0x628e0e01, - TEST_STRS[52] => 0x98a0bf09, - TEST_STRS[53] => 0xb10d5704, - TEST_STRS[54] => 0xdd002f3f, - TEST_STRS[55] => 0x1c4a506f, - TEST_STRS[56] => 0x6e772a41, - TEST_STRS[57] => 0x26978421, - TEST_STRS[58] => 0xe184ff97, - TEST_STRS[59] => 0x9b5e5ac6, - TEST_STRS[60] => 0x5b88e592, - TEST_STRS[61] => 0xaa8164b7, - TEST_STRS[62] => 0x20b18c7b, - TEST_STRS[63] => 0xf28025c5, - TEST_STRS[64] => 0x84bb753f, - TEST_STRS[65] => 0x3219925a, - TEST_STRS[66] => 0x384163c6, - TEST_STRS[67] => 0x54f010d7, - TEST_STRS[68] => 0x8cea820c, - TEST_STRS[69] => 0xe12ab8ee, - TEST_STRS[70] => 0x26978453, - TEST_STRS[71] => 0xe184fff3, - TEST_STRS[72] => 0x9b5e5ab5, - TEST_STRS[73] => 0x5b88e5b2, - TEST_STRS[74] => 0xaa8164d6, - TEST_STRS[75] => 0x20b18c15, - TEST_STRS[76] => 0xf28025a1, - TEST_STRS[77] => 0x84bb751f, - TEST_STRS[78] => 0x3219922d, - TEST_STRS[79] => 0x384163ae, - TEST_STRS[80] => 0x54f010b2, - TEST_STRS[81] => 0x8cea8275, - TEST_STRS[82] => 0xe12ab8e4, - TEST_STRS[83] => 0x64411eaa, - TEST_STRS[84] => 0x6977223c, - TEST_STRS[85] => 0x428ae474, - TEST_STRS[86] => 0xb6fa7167, - TEST_STRS[87] => 0x73408525, - TEST_STRS[88] => 0xb78320a1, - TEST_STRS[89] => 0x0caf4135, - TEST_STRS[90] => 0xb78320a2, - TEST_STRS[91] => 0xcdc88e80, - TEST_STRS[92] => 0xb78320a3, - TEST_STRS[93] => 0x8ee1dbcb, - TEST_STRS[94] => 0xb78320a4, - TEST_STRS[95] => 0x4ffb2716, - TEST_STRS[96] => 0x860632aa, - TEST_STRS[97] => 0xcc2c5c64, - TEST_STRS[98] => 0x860632a4, - TEST_STRS[99] => 0x2a7ec4a6, - TEST_STRS[100] => 0x860632ba, - TEST_STRS[101] => 0xfefe8e14, - TEST_STRS[102] => 0x0a3cffd8, - TEST_STRS[103] => 0xf606c108, - TEST_STRS[104] => 0x0a3cffdb, - TEST_STRS[105] => 0xf906c5c1, - TEST_STRS[106] => 0x0a3cffda, - TEST_STRS[107] => 0xf806c42e, - TEST_STRS[108] => 0xc07167d7, - TEST_STRS[109] => 0xc9867775, - TEST_STRS[110] => 0xbf716668, - TEST_STRS[111] => 0xc78435b8, - TEST_STRS[112] => 0xc6717155, - TEST_STRS[113] => 0xb99568cf, - TEST_STRS[114] => 0x7662e0d6, - TEST_STRS[115] => 0x33a7f0e2, - TEST_STRS[116] => 0xc2732f95, - TEST_STRS[117] => 0xb053e78f, - TEST_STRS[118] => 0x3a19c02a, - TEST_STRS[119] => 0xa089821e, - TEST_STRS[120] => 0x31ae8f83, - TEST_STRS[121] => 0x995fa9c4, - TEST_STRS[122] => 0x35983f8c, - TEST_STRS[123] => 0x5036a251, - TEST_STRS[124] => 0x97018583, - TEST_STRS[125] => 0xb4448d60, - TEST_STRS[126] => 0x025dfe59, - TEST_STRS[127] => 0xc5eab3af, - TEST_STRS[128] => 0x7d21ba1e, - TEST_STRS[129] => 0x7704cddb, - TEST_STRS[130] => 0xd0071bfe, - TEST_STRS[131] => 0x0ff3774c, - TEST_STRS[132] => 0xb0fea0ea, - TEST_STRS[133] => 0x58177303, - TEST_STRS[134] => 0x4f599cda, - TEST_STRS[135] => 0x3e590a47, - TEST_STRS[136] => 0x965595f8, - TEST_STRS[137] => 0xc37f178d, - TEST_STRS[138] => 0x9711dd26, - TEST_STRS[139] => 0x23c99b7f, - TEST_STRS[140] => 0x6e568b17, - TEST_STRS[141] => 0x43f0245b, - TEST_STRS[142] => 0xbcb7a001, - TEST_STRS[143] => 0x12e6dffe, - TEST_STRS[144] => 0x0792f2d6, - TEST_STRS[145] => 0xb966936b, - TEST_STRS[146] => 0x46439ac5, - TEST_STRS[147] => 0x728d49af, - TEST_STRS[148] => 0xd33745c9, - TEST_STRS[149] => 0xbc382a57, - TEST_STRS[150] => 0x4bda1d31, - TEST_STRS[151] => 0xce35ccae, - TEST_STRS[152] => 0x3b6eed94, - TEST_STRS[153] => 0x445c9c58, - TEST_STRS[154] => 0x3db8bf9d, - TEST_STRS[155] => 0x2dee116d, - TEST_STRS[156] => 0xc18738da, - TEST_STRS[157] => 0x5b156176, - TEST_STRS[158] => 0x2aa7d593, - TEST_STRS[159] => 0xb2409658, - TEST_STRS[160] => 0xe1489528, - TEST_STRS[161] => 0xfe1ee07e, - TEST_STRS[162] => 0xe8842315, - TEST_STRS[163] => 0x3a6a63a2, - TEST_STRS[164] => 0x06d2c18c, - TEST_STRS[165] => 0xf8ef7225, - TEST_STRS[166] => 0x843d3300, - TEST_STRS[167] => 0xbb24f7ae, - TEST_STRS[168] => 0x878c0ec9, - TEST_STRS[169] => 0xb557810f, - TEST_STRS[170] => 0x57423246, - TEST_STRS[171] => 0x87f7505e, - TEST_STRS[172] => 0xbb809f20, - TEST_STRS[173] => 0x8932abb5, - TEST_STRS[174] => 0x0a9b3aa0, - TEST_STRS[175] => 0xb8682a24, - TEST_STRS[176] => 0xa7ac1c56, - TEST_STRS[177] => 0x11409252, - TEST_STRS[178] => 0xa987f517, - TEST_STRS[179] => 0xf309e7ed, - TEST_STRS[180] => 0xc9e8f417, - TEST_STRS[181] => 0x7f447bdd, - TEST_STRS[182] => 0xb929adc5, - TEST_STRS[183] => 0x57022879, - TEST_STRS[184] => 0xdcfd2c49, - TEST_STRS[185] => 0x6edafff5, - TEST_STRS[186] => 0xf04fb1f1, - TEST_STRS[187] => 0xfb7de8b9, - TEST_STRS[188] => 0xc5f1d7e9, - TEST_STRS[189] => 0x32c1f439, - TEST_STRS[190] => 0x7fd3eb7d, - TEST_STRS[191] => 0x81597da5, - TEST_STRS[192] => 0x05eb7a25, - TEST_STRS[193] => 0x9c0fa1b5, - TEST_STRS[194] => 0x53ccb1c5, - TEST_STRS[195] => 0xfabece15, - TEST_STRS[196] => 0x4ad745a5, - TEST_STRS[197] => 0xe5bdc495, - TEST_STRS[198] => 0x23b3c0a5, - TEST_STRS[199] => 0xfa823dd5, - TEST_STRS[200] => 0x0c6c58b9, - TEST_STRS[201] => 0xe2dbccd5, - TEST_STRS[202] => 0xdb7f50f9 -} - -FNV1_64_HASHES = { - TEST_STRS[0] => 0xcbf29ce484222325, - TEST_STRS[1] => 0xaf63bd4c8601b7be, - TEST_STRS[2] => 0xaf63bd4c8601b7bd, - TEST_STRS[3] => 0xaf63bd4c8601b7bc, - TEST_STRS[4] => 0xaf63bd4c8601b7bb, - TEST_STRS[5] => 0xaf63bd4c8601b7ba, - TEST_STRS[6] => 0xaf63bd4c8601b7b9, - TEST_STRS[7] => 0x08326207b4eb2f34, - TEST_STRS[8] => 0xd8cbc7186ba13533, - TEST_STRS[9] => 0x0378817ee2ed65cb, - TEST_STRS[10] => 0xd329d59b9963f790, - TEST_STRS[11] => 0x340d8765a4dda9c2, - TEST_STRS[12] => 0xaf63bd4c8601b7df, - TEST_STRS[13] => 0x08326707b4eb37da, - TEST_STRS[14] => 0x08326607b4eb3627, - TEST_STRS[15] => 0x08326507b4eb3474, - TEST_STRS[16] => 0x08326407b4eb32c1, - TEST_STRS[17] => 0x08326307b4eb310e, - TEST_STRS[18] => 0x08326207b4eb2f5b, - TEST_STRS[19] => 0xd8cbc7186ba1355c, - TEST_STRS[20] => 0x0378817ee2ed65a9, - TEST_STRS[21] => 0xd329d59b9963f7f1, - TEST_STRS[22] => 0x340d8765a4dda9b0, - TEST_STRS[23] => 0x50a6d3b724a774a6, - TEST_STRS[24] => 0x08326507b4eb341c, - TEST_STRS[25] => 0xd8d5c8186ba98bfb, - TEST_STRS[26] => 0x1ccefc7ef118dbef, - TEST_STRS[27] => 0x0c92fab3ad3db77a, - TEST_STRS[28] => 0x9b77794f5fdec421, - TEST_STRS[29] => 0x0ac742dfe7874433, - TEST_STRS[30] => 0xd7dad5766ad8e2de, - TEST_STRS[31] => 0xa1bb96378e897f5b, - TEST_STRS[32] => 0x5b3f9b6733a367d2, - TEST_STRS[33] => 0xb07ce25cbea969f6, - TEST_STRS[34] => 0x8d9e9997f9df0d6a, - TEST_STRS[35] => 0x838c673d9603cb7b, - TEST_STRS[36] => 0x8b5ee8a5e872c273, - TEST_STRS[37] => 0x4507c4e9fb00690c, - TEST_STRS[38] => 0x4c9ca59581b27f45, - TEST_STRS[39] => 0xe0aca20b624e4235, - TEST_STRS[40] => 0xd8d5c8186ba98b94, - TEST_STRS[41] => 0x1ccefc7ef118db81, - TEST_STRS[42] => 0x0c92fab3ad3db71d, - TEST_STRS[43] => 0x9b77794f5fdec44e, - TEST_STRS[44] => 0x0ac742dfe7874413, - TEST_STRS[45] => 0xd7dad5766ad8e2a9, - TEST_STRS[46] => 0xa1bb96378e897f3a, - TEST_STRS[47] => 0x5b3f9b6733a367a1, - TEST_STRS[48] => 0xb07ce25cbea969d6, - TEST_STRS[49] => 0x8d9e9997f9df0d02, - TEST_STRS[50] => 0x838c673d9603cb1e, - TEST_STRS[51] => 0x8b5ee8a5e872c201, - TEST_STRS[52] => 0x4507c4e9fb006969, - TEST_STRS[53] => 0x4c9ca59581b27f64, - TEST_STRS[54] => 0xe0aca20b624e423f, - TEST_STRS[55] => 0x13998e580afa800f, - TEST_STRS[56] => 0x08326507b4eb3401, - TEST_STRS[57] => 0xd8d5ad186ba95dc1, - TEST_STRS[58] => 0x1c72e17ef0ca4e97, - TEST_STRS[59] => 0x2183c1b327c38ae6, - TEST_STRS[60] => 0xb66d096c914504f2, - TEST_STRS[61] => 0x404bf57ad8476757, - TEST_STRS[62] => 0x887976bd815498bb, - TEST_STRS[63] => 0x3afd7f02c2bf85a5, - TEST_STRS[64] => 0xfc4476b0eb70177f, - TEST_STRS[65] => 0x186d2da00f77ecba, - TEST_STRS[66] => 0xf97140fa48c74066, - TEST_STRS[67] => 0xa2b1cf49aa926d37, - TEST_STRS[68] => 0x0690712cd6cf940c, - TEST_STRS[69] => 0xf7045b3102b8906e, - TEST_STRS[70] => 0xd8d5ad186ba95db3, - TEST_STRS[71] => 0x1c72e17ef0ca4ef3, - TEST_STRS[72] => 0x2183c1b327c38a95, - TEST_STRS[73] => 0xb66d096c914504d2, - TEST_STRS[74] => 0x404bf57ad8476736, - TEST_STRS[75] => 0x887976bd815498d5, - TEST_STRS[76] => 0x3afd7f02c2bf85c1, - TEST_STRS[77] => 0xfc4476b0eb70175f, - TEST_STRS[78] => 0x186d2da00f77eccd, - TEST_STRS[79] => 0xf97140fa48c7400e, - TEST_STRS[80] => 0xa2b1cf49aa926d52, - TEST_STRS[81] => 0x0690712cd6cf9475, - TEST_STRS[82] => 0xf7045b3102b89064, - TEST_STRS[83] => 0x74f762479f9d6aea, - TEST_STRS[84] => 0x08326007b4eb2b9c, - TEST_STRS[85] => 0xd8c4c9186b9b1a14, - TEST_STRS[86] => 0x7b495389bdbdd4c7, - TEST_STRS[87] => 0x3b6dba0d69908e25, - TEST_STRS[88] => 0xd6b2b17bf4b71261, - TEST_STRS[89] => 0x447bfb7f98e615b5, - TEST_STRS[90] => 0xd6b2b17bf4b71262, - TEST_STRS[91] => 0x3bd2807f93fe1660, - TEST_STRS[92] => 0xd6b2b17bf4b71263, - TEST_STRS[93] => 0x3329057f8f16170b, - TEST_STRS[94] => 0xd6b2b17bf4b71264, - TEST_STRS[95] => 0x2a7f8a7f8a2e19b6, - TEST_STRS[96] => 0x23d3767e64b2f98a, - TEST_STRS[97] => 0xff768d7e4f9d86a4, - TEST_STRS[98] => 0x23d3767e64b2f984, - TEST_STRS[99] => 0xccd1837e334e4aa6, - TEST_STRS[100] => 0x23d3767e64b2f99a, - TEST_STRS[101] => 0x7691fd7e028f6754, - TEST_STRS[102] => 0x34ad3b1041204318, - TEST_STRS[103] => 0xa29e749ea9d201c8, - TEST_STRS[104] => 0x34ad3b104120431b, - TEST_STRS[105] => 0xa29e779ea9d206e1, - TEST_STRS[106] => 0x34ad3b104120431a, - TEST_STRS[107] => 0xa29e769ea9d2052e, - TEST_STRS[108] => 0x02a17ebca4aa3497, - TEST_STRS[109] => 0x229ef18bcd375c95, - TEST_STRS[110] => 0x02a17dbca4aa32c8, - TEST_STRS[111] => 0x229b6f8bcd3449d8, - TEST_STRS[112] => 0x02a184bca4aa3ed5, - TEST_STRS[113] => 0x22b3618bcd48c3ef, - TEST_STRS[114] => 0x5c2c346706186f36, - TEST_STRS[115] => 0xb78c410f5b84f8c2, - TEST_STRS[116] => 0xed9478212b267395, - TEST_STRS[117] => 0xd9bbb55c5256662f, - TEST_STRS[118] => 0x8c54f0203249438a, - TEST_STRS[119] => 0xbd9790b5727dc37e, - TEST_STRS[120] => 0xa64e5f36c9e2b0e3, - TEST_STRS[121] => 0x8fd0680da3088a04, - TEST_STRS[122] => 0x67aad32c078284cc, - TEST_STRS[123] => 0xb37d55d81c57b331, - TEST_STRS[124] => 0x55ac0f3829057c43, - TEST_STRS[125] => 0xcb27f4b8e1b6cc20, - TEST_STRS[126] => 0x26caf88bcbef2d19, - TEST_STRS[127] => 0x8e6e063b97e61b8f, - TEST_STRS[128] => 0xb42750f7f3b7c37e, - TEST_STRS[129] => 0xf3c6ba64cf7ca99b, - TEST_STRS[130] => 0xebfb69b427ea80fe, - TEST_STRS[131] => 0x39b50c3ed970f46c, - TEST_STRS[132] => 0x5b9b177aa3eb3e8a, - TEST_STRS[133] => 0x6510063ecf4ec903, - TEST_STRS[134] => 0x2b3bbd2c00797c7a, - TEST_STRS[135] => 0xf1d6204ff5cb4aa7, - TEST_STRS[136] => 0x4836e27ccf099f38, - TEST_STRS[137] => 0x82efbb0dd073b44d, - TEST_STRS[138] => 0x4a80c282ffd7d4c6, - TEST_STRS[139] => 0x305d1a9c9ee43bdf, - TEST_STRS[140] => 0x15c366948ffc6997, - TEST_STRS[141] => 0x80153ae218916e7b, - TEST_STRS[142] => 0xfa23e2bdf9e2a9e1, - TEST_STRS[143] => 0xd47e8d8a2333c6de, - TEST_STRS[144] => 0x7e128095f688b056, - TEST_STRS[145] => 0x2f5356890efcedab, - TEST_STRS[146] => 0x95c2b383014f55c5, - TEST_STRS[147] => 0x4727a5339ce6070f, - TEST_STRS[148] => 0xb0555ecd575108e9, - TEST_STRS[149] => 0x48d785770bb4af37, - TEST_STRS[150] => 0x09d4701c12af02b1, - TEST_STRS[151] => 0x79f031e78f3cf62e, - TEST_STRS[152] => 0x52a1ee85db1b5a94, - TEST_STRS[153] => 0x6bd95b2eb37fa6b8, - TEST_STRS[154] => 0x74971b7077aef85d, - TEST_STRS[155] => 0xb4e4fae2ffcc1aad, - TEST_STRS[156] => 0x2bd48bd898b8f63a, - TEST_STRS[157] => 0xe9966ac1556257f6, - TEST_STRS[158] => 0x92a3d1cd078ba293, - TEST_STRS[159] => 0xf81175a482e20ab8, - TEST_STRS[160] => 0x5bbb3de722e73048, - TEST_STRS[161] => 0x6b4f363492b9f2be, - TEST_STRS[162] => 0xc2d559df73d59875, - TEST_STRS[163] => 0xf75f62284bc7a8c2, - TEST_STRS[164] => 0xda8dd8e116a9f1cc, - TEST_STRS[165] => 0xbdc1e6ab76057885, - TEST_STRS[166] => 0xfec6a4238a1224a0, - TEST_STRS[167] => 0xc03f40f3223e290e, - TEST_STRS[168] => 0x1ed21673466ffda9, - TEST_STRS[169] => 0xdf70f906bb0dd2af, - TEST_STRS[170] => 0xf3dcda369f2af666, - TEST_STRS[171] => 0x9ebb11573cdcebde, - TEST_STRS[172] => 0x81c72d9077fedca0, - TEST_STRS[173] => 0x0ec074a31be5fb15, - TEST_STRS[174] => 0x2a8b3280b6c48f20, - TEST_STRS[175] => 0xfd31777513309344, - TEST_STRS[176] => 0x194534a86ad006b6, - TEST_STRS[177] => 0x3be6fdf46e0cfe12, - TEST_STRS[178] => 0x017cc137a07eb057, - TEST_STRS[179] => 0x9428fc6e7d26b54d, - TEST_STRS[180] => 0x9aaa2e3603ef8ad7, - TEST_STRS[181] => 0x82c6d3f3a0ccdf7d, - TEST_STRS[182] => 0xc86eeea00cf09b65, - TEST_STRS[183] => 0x705f8189dbb58299, - TEST_STRS[184] => 0x415a7f554391ca69, - TEST_STRS[185] => 0xcfe3d49fa2bdc555, - TEST_STRS[186] => 0xf0f9c56039b25191, - TEST_STRS[187] => 0x7075cb6abd1d32d9, - TEST_STRS[188] => 0x43c94e2c8b277509, - TEST_STRS[189] => 0x3cbfd4e4ea670359, - TEST_STRS[190] => 0xc05887810f4d019d, - TEST_STRS[191] => 0x14468ff93ac22dc5, - TEST_STRS[192] => 0xebed699589d99c05, - TEST_STRS[193] => 0x6d99f6df321ca5d5, - TEST_STRS[194] => 0x0cd410d08c36d625, - TEST_STRS[195] => 0xef1b2a2c86831d35, - TEST_STRS[196] => 0x3b349c4d69ee5f05, - TEST_STRS[197] => 0x55248ce88f45f035, - TEST_STRS[198] => 0xaa69ca6a18a4c885, - TEST_STRS[199] => 0x1fe3fce62bd816b5, - TEST_STRS[200] => 0x0289a488a8df69d9, - TEST_STRS[201] => 0x15e96e1613df98b5, - TEST_STRS[202] => 0xe6be57375ad89b99 -} diff --git a/spec/unique_bitmask_spec.rb b/spec/unique_bitmask_spec.rb new file mode 100644 index 0000000..4357b5d --- /dev/null +++ b/spec/unique_bitmask_spec.rb @@ -0,0 +1,20 @@ +require "spec_helper" +require_relative "../driver/riverqueue-sequel/spec/spec_helper" + +RSpec.describe River::UniqueBitmask do + describe ".from_states" do + it "converts an array of states to a bitmask string" do + expect(described_class.from_states(River::Client.const_get(:DEFAULT_UNIQUE_STATES))).to eq("11110101") + expect(described_class.from_states([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED])).to eq("11010001") + expect(described_class.from_states([River::JOB_STATE_AVAILABLE])).to eq("00000001") + end + end + + describe ".to_states" do + it "converts a bitmask string to an array of states" do + expect(described_class.to_states(0b11110101)).to eq(River::Client.const_get(:DEFAULT_UNIQUE_STATES)) + expect(described_class.to_states(0b11010001)).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) + expect(described_class.to_states(0b00000001)).to eq([River::JOB_STATE_AVAILABLE]) + end + end +end