diff --git a/.gitignore b/.gitignore index e8fe1c4..2250ed9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ +.DS_Store *.gem coverage/ diff --git a/CHANGELOG.md b/CHANGELOG.md index d4f26b1..32b0ece 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,32 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +⚠️ Version 0.8.0 contains breaking changes to transition to River's new unique jobs implementation and to enable broader, more flexible application of unique jobs. Detailed notes on the implementation are contained in [the original River PR](https://github.com/riverqueue/river/pull/590), and the notes below include short summaries of the ways this impacts this client specifically. + +Users should upgrade backends to River v0.12.0 before upgrading this library in order to ensure a seamless transition of all in-flight jobs. Afterward, the latest River version may be used. + +### Breaking + +- **Breaking change:** The return type of `Client#insert_many` has been changed. Rather than returning just the number of rows inserted, it returns an array of all the `InsertResult` values for each inserted row. Unique conflicts which are skipped as duplicates are indicated in the same fashion as single inserts (the `unique_skipped_as_duplicated` attribute), and in such cases the conflicting row will be returned instead. +- **Breaking change:** Unique jobs no longer allow total customization of their states when using the `by_state` option. The pending, scheduled, available, and running states are required whenever customizing this list. + +### Added + +- The `UniqueOpts` class gains an `exclude_kind` option for cases where uniqueness needs to be guaranteed across multiple job types. +- Unique jobs utilizing `by_args` can now also opt to have a subset of the job's arguments considered for uniqueness. For example, you could choose to consider only the `customer_id` field while ignoring the other fields: + + ```ruby + UniqueOpts.new(by_args: ["customer_id"]) + ``` + + Any fields considered in uniqueness are also sorted alphabetically in order to guarantee a consistent result across implementations, even if the encoded JSON isn't sorted consistently. + +### Changed + +- Unique jobs have been improved to allow bulk insertion of unique jobs via `Client#insert_many`. + + This updated implementation is significantly faster due to the removal of advisory locks in favor of an index-backed uniqueness system, while allowing some flexibility in which job states are considered. However, not all states may be removed from consideration when using the `by_state` option; pending, scheduled, available, and running states are required whenever customizing this list. + ## [0.7.0] - 2024-08-30 ### Changed diff --git a/Gemfile.lock b/Gemfile.lock index 3ac4e45..9e9a2b5 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -143,6 +143,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/docs/README.md b/docs/README.md index c9ca412..dbc1e61 100644 --- a/docs/README.md +++ b/docs/README.md @@ -42,8 +42,8 @@ insert_res.job # inserted job row Job args should: -* Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. -* Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. +- Respond to `#kind` with a unique string that identifies them in the database, and which a Go worker will recognize. +- Response to `#to_json` with a JSON serialization that'll be parseable as an object in Go. They may also respond to `#insert_opts` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind. @@ -89,7 +89,7 @@ insert_res.unique_skipped_as_duplicated Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: ```ruby -client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) +client = River::Client.new(mock_driver) ``` Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other. diff --git a/driver/riverqueue-activerecord/Gemfile.lock b/driver/riverqueue-activerecord/Gemfile.lock index 6b24648..9256781 100644 --- a/driver/riverqueue-activerecord/Gemfile.lock +++ b/driver/riverqueue-activerecord/Gemfile.lock @@ -120,6 +120,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 96d5925..3d6d19a 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -31,36 +31,18 @@ def errors = {} end end - def advisory_lock(key) - ::ActiveRecord::Base.connection.execute("SELECT pg_advisory_xact_lock(#{key})") - nil - end - - def advisory_lock_try(key) - ::ActiveRecord::Base.connection.execute("SELECT pg_try_advisory_xact_lock(123)").first["pg_try_advisory_xact_lock"] - end - def job_get_by_id(id) data_set = RiverJob.where(id: id) data_set.first ? to_job_row_from_model(data_set.first) : nil end - def job_get_by_kind_and_unique_properties(get_params) - data_set = RiverJob.where(kind: get_params.kind) - data_set = data_set.where("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1]) if get_params.created_at - data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args - data_set = data_set.where(queue: get_params.queue) if get_params.queue - data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first ? to_job_row_from_model(data_set.first) : nil - end - def job_insert(insert_params) - to_job_row_from_model(RiverJob.create(insert_params_to_hash(insert_params))) + job_insert_many([insert_params]).first end - def job_insert_unique(insert_params, unique_key) - res = RiverJob.upsert( - insert_params_to_hash(insert_params).merge(unique_key: unique_key), + def job_insert_many(insert_params_many) + res = RiverJob.upsert_all( + insert_params_many.map { |param| insert_params_to_hash(param) }, on_duplicate: Arel.sql("kind = EXCLUDED.kind"), returning: Arel.sql("*, (xmax != 0) AS unique_skipped_as_duplicate"), @@ -69,15 +51,9 @@ def job_insert_unique(insert_params, unique_key) # ActiveRecord tries to look up a unique index instead of letting # Postgres handle that, and of course it doesn't support a `WHERE` # clause. The workaround is to target the index name instead of columns. - unique_by: "river_job_kind_unique_key_idx" + unique_by: "river_job_unique_idx" ) - - [to_job_row_from_raw(res), res.send(:hash_rows)[0]["unique_skipped_as_duplicate"]] - end - - def job_insert_many(insert_params_many) - RiverJob.insert_all(insert_params_many.map { |p| insert_params_to_hash(p) }) - insert_params_many.count + to_insert_results(res) end def job_list @@ -94,8 +70,6 @@ def transaction(&) end private def insert_params_to_hash(insert_params) - # the call to `#compact` is important so that we remove nils and table - # default values get picked up instead { args: insert_params.encoded_args, kind: insert_params.kind, @@ -104,8 +78,10 @@ def transaction(&) queue: insert_params.queue, state: insert_params.state, scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags - }.compact + tags: insert_params.tags || [], + unique_key: insert_params.unique_key, + unique_states: insert_params.unique_states + } end private def to_job_row_from_model(river_job) @@ -139,51 +115,64 @@ def transaction(&) scheduled_at: river_job.scheduled_at.getutc, state: river_job.state, tags: river_job.tags, - unique_key: river_job.unique_key + unique_key: river_job.unique_key, + unique_states: river_job.unique_states ) end + private def to_insert_results(res) + res.rows.map do |row| + to_job_row_from_raw(row, res.columns, res.column_types) + end + end + # This is really awful, but some of ActiveRecord's methods (e.g. `.create`) # return a model, and others (e.g. `.upsert`) return raw values, and # therefore this second version from unmarshaling a job row exists. I # searched long and hard for a way to have the former type of method return # raw or the latter type of method return a model, but was unable to find # anything. - private def to_job_row_from_raw(res) + private def to_job_row_from_raw(row, columns, column_types) river_job = {} - res.rows[0].each_with_index do |val, i| - river_job[res.columns[i]] = res.column_types[i].deserialize(val) + row.each_with_index do |val, i| + river_job[columns[i]] = column_types[i].deserialize(val) end - River::JobRow.new( - id: river_job["id"], - args: JSON.parse(river_job["args"]), - attempt: river_job["attempt"], - attempted_at: river_job["attempted_at"]&.getutc, - attempted_by: river_job["attempted_by"], - created_at: river_job["created_at"].getutc, - errors: river_job["errors"]&.map { |e| - deserialized_error = JSON.parse(e) + errors = river_job["errors"]&.map do |e| + deserialized_error = JSON.parse(e) - River::AttemptError.new( - at: Time.parse(deserialized_error["at"]), - attempt: deserialized_error["attempt"], - error: deserialized_error["error"], - trace: deserialized_error["trace"] - ) - }, - finalized_at: river_job["finalized_at"]&.getutc, - kind: river_job["kind"], - max_attempts: river_job["max_attempts"], - metadata: river_job["metadata"], - priority: river_job["priority"], - queue: river_job["queue"], - scheduled_at: river_job["scheduled_at"].getutc, - state: river_job["state"], - tags: river_job["tags"], - unique_key: river_job["unique_key"] - ) + River::AttemptError.new( + at: Time.parse(deserialized_error["at"]), + attempt: deserialized_error["attempt"], + error: deserialized_error["error"], + trace: deserialized_error["trace"] + ) + end + + [ + River::JobRow.new( + id: river_job["id"], + args: JSON.parse(river_job["args"]), + attempt: river_job["attempt"], + attempted_at: river_job["attempted_at"]&.getutc, + attempted_by: river_job["attempted_by"], + created_at: river_job["created_at"].getutc, + errors: errors, + finalized_at: river_job["finalized_at"]&.getutc, + kind: river_job["kind"], + max_attempts: river_job["max_attempts"], + metadata: river_job["metadata"], + priority: river_job["priority"], + queue: river_job["queue"], + scheduled_at: river_job["scheduled_at"].getutc, + state: river_job["state"], + tags: river_job["tags"], + unique_key: river_job["unique_key"], + unique_states: ::River::UniqueBitmask.to_states(river_job["unique_states"]&.to_i(2)) + ), + river_job["unique_skipped_as_duplicate"] + ] end end end diff --git a/driver/riverqueue-activerecord/spec/driver_spec.rb b/driver/riverqueue-activerecord/spec/driver_spec.rb index 11d4a38..94b74bd 100644 --- a/driver/riverqueue-activerecord/spec/driver_spec.rb +++ b/driver/riverqueue-activerecord/spec/driver_spec.rb @@ -122,14 +122,14 @@ describe "#to_job_row_from_raw" do it "converts a database record to `River::JobRow` with minimal properties" do - river_job = River::Driver::ActiveRecord::RiverJob.insert({ + res = River::Driver::ActiveRecord::RiverJob.insert({ id: 1, args: %({"job_num":1}), kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT - }, returning: Arel.sql("*")) + }, returning: Arel.sql("*, false AS unique_skipped_as_duplicate")) - job_row = driver.send(:to_job_row_from_raw, river_job) + job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types) expect(job_row).to be_an_instance_of(River::JobRow) expect(job_row).to have_attributes( @@ -148,11 +148,12 @@ state: River::JOB_STATE_AVAILABLE, tags: [] ) + expect(skipped_as_duplicate).to be(false) end it "converts a database record to `River::JobRow` with all properties" do now = Time.now - river_job = River::Driver::ActiveRecord::RiverJob.insert({ + res = River::Driver::ActiveRecord::RiverJob.insert({ id: 1, attempt: 1, attempted_at: now, @@ -168,9 +169,9 @@ state: River::JOB_STATE_COMPLETED, tags: ["tag1"], unique_key: Digest::SHA256.digest("unique_key_str") - }, returning: Arel.sql("*")) + }, returning: Arel.sql("*, true AS unique_skipped_as_duplicate")) - job_row = driver.send(:to_job_row_from_raw, river_job) + job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types) expect(job_row).to be_an_instance_of(River::JobRow) expect(job_row).to have_attributes( @@ -190,11 +191,12 @@ tags: ["tag1"], unique_key: Digest::SHA256.digest("unique_key_str") ) + expect(skipped_as_duplicate).to be(true) end it "with errors" do now = Time.now.utc - river_job = River::Driver::ActiveRecord::RiverJob.insert({ + res = River::Driver::ActiveRecord::RiverJob.insert({ args: %({"job_num":1}), errors: [JSON.dump( { @@ -207,9 +209,9 @@ kind: "simple", max_attempts: River::MAX_ATTEMPTS_DEFAULT, state: River::JOB_STATE_AVAILABLE - }, returning: Arel.sql("*")) + }, returning: Arel.sql("*, false AS unique_skipped_as_duplicate")) - job_row = driver.send(:to_job_row_from_raw, river_job) + job_row, skipped_as_duplicate = driver.send(:to_job_row_from_raw, res.rows[0], res.columns, res.column_types) expect(job_row.errors.count).to be(1) expect(job_row.errors[0]).to be_an_instance_of(River::AttemptError) @@ -219,6 +221,7 @@ error: "job failure", trace: "error trace" ) + expect(skipped_as_duplicate).to be(false) end end end diff --git a/driver/riverqueue-sequel/Gemfile.lock b/driver/riverqueue-sequel/Gemfile.lock index d6d0aee..df64f8f 100644 --- a/driver/riverqueue-sequel/Gemfile.lock +++ b/driver/riverqueue-sequel/Gemfile.lock @@ -78,6 +78,7 @@ GEM PLATFORMS arm64-darwin-22 + arm64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index a21714e..3b41706 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -13,51 +13,27 @@ def initialize(db) @db.extension(:pg_json) end - def advisory_lock(key) - @db.fetch("SELECT pg_advisory_xact_lock(?)", key).first - nil - end - - def advisory_lock_try(key) - @db.fetch("SELECT pg_try_advisory_xact_lock(?)", key).first[:pg_try_advisory_xact_lock] - end - def job_get_by_id(id) data_set = @db[:river_job].where(id: id) data_set.first ? to_job_row(data_set.first) : nil end - def job_get_by_kind_and_unique_properties(get_params) - data_set = @db[:river_job].where(kind: get_params.kind) - data_set = data_set.where(::Sequel.lit("tstzrange(?, ?, '[)') @> created_at", get_params.created_at[0], get_params.created_at[1])) if get_params.created_at - data_set = data_set.where(args: get_params.encoded_args) if get_params.encoded_args - data_set = data_set.where(queue: get_params.queue) if get_params.queue - data_set = data_set.where(state: get_params.state) if get_params.state - data_set.first ? to_job_row(data_set.first) : nil - end - def job_insert(insert_params) - to_job_row(@db[:river_job].returning.insert_select(insert_params_to_hash(insert_params))) + job_insert_many([insert_params]).first end - def job_insert_unique(insert_params, unique_key) - values = @db[:river_job] + def job_insert_many(insert_params_array) + @db[:river_job] .insert_conflict( - target: [:kind, :unique_key], - conflict_where: ::Sequel.lit("unique_key IS NOT NULL"), + target: [:unique_key], + conflict_where: ::Sequel.lit( + "unique_key IS NOT NULL AND unique_states IS NOT NULL AND river_job_state_in_bitmask(unique_states, state)" + ), update: {kind: ::Sequel[:excluded][:kind]} ) .returning(::Sequel.lit("*, (xmax != 0) AS unique_skipped_as_duplicate")) - .insert_select( - insert_params_to_hash(insert_params).merge(unique_key: ::Sequel.blob(unique_key)) - ) - - [to_job_row(values), values[:unique_skipped_as_duplicate]] - end - - def job_insert_many(insert_params_many) - @db[:river_job].multi_insert(insert_params_many.map { |p| insert_params_to_hash(p) }) - insert_params_many.count + .multi_insert(insert_params_array.map { |p| insert_params_to_hash(p) }) + .map { |row| to_insert_result(row) } end def job_list @@ -74,8 +50,6 @@ def transaction(&) end private def insert_params_to_hash(insert_params) - # the call to `#compact` is important so that we remove nils and table - # default values get picked up instead { args: insert_params.encoded_args, kind: insert_params.kind, @@ -84,8 +58,14 @@ def transaction(&) queue: insert_params.queue, state: insert_params.state, scheduled_at: insert_params.scheduled_at, - tags: insert_params.tags ? ::Sequel.pg_array(insert_params.tags) : nil - }.compact + tags: ::Sequel.pg_array(insert_params.tags || [], :text), + unique_key: insert_params.unique_key ? ::Sequel.blob(insert_params.unique_key) : nil, + unique_states: insert_params.unique_states + } + end + + private def to_insert_result(result) + [to_job_row(result), result[:unique_skipped_as_duplicate]] end private def to_job_row(river_job) @@ -113,7 +93,8 @@ def transaction(&) scheduled_at: river_job[:scheduled_at].getutc, state: river_job[:state], tags: river_job[:tags].to_a, - unique_key: river_job[:unique_key]&.to_s + unique_key: river_job[:unique_key]&.to_s, + unique_states: ::River::UniqueBitmask.to_states(river_job[:unique_states]&.to_i(2)) ) end end diff --git a/lib/client.rb b/lib/client.rb index ea280fd..f2cbf96 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -1,5 +1,4 @@ require "digest" -require "fnv" require "time" module River @@ -25,9 +24,8 @@ module River # River drivers are found in separate gems like `riverqueue-sequel` to help # minimize transient dependencies. class Client - def initialize(driver, advisory_lock_prefix: nil) + def initialize(driver) @driver = driver - @advisory_lock_prefix = check_advisory_lock_prefix_bounds(advisory_lock_prefix) @time_now_utc = -> { Time.now.utc } # for test time stubbing end @@ -73,11 +71,8 @@ def initialize(driver, advisory_lock_prefix: nil) # # Returns an instance of InsertResult. def insert(args, insert_opts: EMPTY_INSERT_OPTS) - insert_params, unique_opts = make_insert_params(args, insert_opts) - check_unique_job(insert_params, unique_opts) do - job = @driver.job_insert(insert_params) - InsertResult.new(job) - end + insert_params = make_insert_params(args, insert_opts) + insert_and_check_unique_job(insert_params) end # Inserts many new jobs as part of a single batch operation for improved @@ -122,30 +117,20 @@ def insert(args, insert_opts: EMPTY_INSERT_OPTS) # # See also JobArgsHash for an easy way to insert a job from a hash. # - # Unique job insertion isn't supported with bulk insertion because it'd run - # the risk of major lock contention. - # # Returns the number of jobs inserted. def insert_many(args) all_params = args.map do |arg| if arg.is_a?(InsertManyParams) - make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert + make_insert_params(arg.args, arg.insert_opts || EMPTY_INSERT_OPTS) else # jobArgs - make_insert_params(arg, EMPTY_INSERT_OPTS, is_insert_many: true).first # unique opts ignored on batch insert + make_insert_params(arg, EMPTY_INSERT_OPTS) end end @driver.job_insert_many(all_params) - end - - private def check_advisory_lock_prefix_bounds(advisory_lock_prefix) - return nil if advisory_lock_prefix.nil? - - # 2**32-1 is 0xffffffff (the largest number that's four bytes) - if advisory_lock_prefix < 0 || advisory_lock_prefix > 2**32 - 1 - raise ArgumentError, "advisory lock prefix must fit inside four bytes" + .map do |job, unique_skipped_as_duplicate| + InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) end - advisory_lock_prefix end # Default states that are used during a unique insert. Can be overridden by @@ -153,123 +138,101 @@ def insert_many(args) DEFAULT_UNIQUE_STATES = [ JOB_STATE_AVAILABLE, JOB_STATE_COMPLETED, + JOB_STATE_PENDING, JOB_STATE_RETRYABLE, JOB_STATE_RUNNING, JOB_STATE_SCHEDULED ].freeze private_constant :DEFAULT_UNIQUE_STATES + REQUIRED_UNIQUE_STATES = [ + JOB_STATE_AVAILABLE, + JOB_STATE_PENDING, + JOB_STATE_RUNNING, + JOB_STATE_SCHEDULED + ].freeze + private_constant :REQUIRED_UNIQUE_STATES + EMPTY_INSERT_OPTS = InsertOpts.new.freeze private_constant :EMPTY_INSERT_OPTS - private def check_unique_job(insert_params, unique_opts, &block) - return block.call if unique_opts.nil? - - any_unique_opts = false - get_params = Driver::JobGetByKindAndUniquePropertiesParam.new(kind: insert_params.kind) - unique_key = "" + private def insert_and_check_unique_job(insert_params) + job, unique_skipped_as_duplicate = @driver.job_insert(insert_params) + InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) + end - # It's extremely important here that this lock string format and algorithm - # match the one in the main River library _exactly_. Don't change them - # unless they're updated everywhere. - if unique_opts.by_args - any_unique_opts = true - get_params.encoded_args = insert_params.encoded_args - unique_key += "&args=#{insert_params.encoded_args}" - end + private def make_insert_params(args, insert_opts) + raise "args should respond to `#kind`" if !args.respond_to?(:kind) - if unique_opts.by_period - lower_period_bound = truncate_time(@time_now_utc.call, unique_opts.by_period).utc + # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. + args_json = args.to_json + raise "args should return non-nil from `#to_json`" if !args_json - any_unique_opts = true - get_params.created_at = [lower_period_bound, lower_period_bound + unique_opts.by_period] - unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + args_insert_opts = if args.respond_to?(:insert_opts) + args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace + args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS + else + EMPTY_INSERT_OPTS end - if unique_opts.by_queue - any_unique_opts = true - get_params.queue = insert_params.queue - unique_key += "&queue=#{insert_params.queue}" - end + scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at - if unique_opts.by_state - any_unique_opts = true - get_params.state = unique_opts.by_state - unique_key += "&state=#{unique_opts.by_state.join(",")}" - else - get_params.state = DEFAULT_UNIQUE_STATES - unique_key += "&state=#{DEFAULT_UNIQUE_STATES.join(",")}" + insert_params = Driver::JobInsertParams.new( + encoded_args: args_json, + kind: args.kind, + max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, + priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, + queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, + scheduled_at: scheduled_at&.utc || Time.now, + state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, + tags: validate_tags(insert_opts.tags || args_insert_opts.tags || []) + ) + + unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + if unique_opts + unique_key, unique_states = make_unique_key_and_bitmask(insert_params, unique_opts) + insert_params.unique_key = unique_key + insert_params.unique_states = unique_states end + insert_params + end - return block.call unless any_unique_opts + private def make_unique_key_and_bitmask(insert_params, unique_opts) + unique_key = "" - # fast path - if !unique_opts.by_state || unique_opts.by_state.sort == DEFAULT_UNIQUE_STATES - unique_key_hash = Digest::SHA256.digest(unique_key) - job, unique_skipped_as_duplicate = @driver.job_insert_unique(insert_params, unique_key_hash) - return InsertResult.new(job, unique_skipped_as_duplicated: unique_skipped_as_duplicate) + # It's extremely important here that this unique key format and algorithm + # match the one in the main River library _exactly_. Don't change them + # unless they're updated everywhere. + unless unique_opts.exclude_kind + unique_key += "&kind=#{insert_params.kind}" end - @driver.transaction do - lock_str = "unique_key" - lock_str += "kind=#{insert_params.kind}" - lock_str += unique_key - - lock_key = if @advisory_lock_prefix.nil? - FNV.fnv1_hash(lock_str, size: 64) + if unique_opts.by_args + parsed_args = JSON.parse(insert_params.encoded_args) + filtered_args = if unique_opts.by_args.is_a?(Array) + parsed_args.select { |k, _| unique_opts.by_args.include?(k) } else - # Steep should be able to tell that this is not nil, but it can't. - prefix = @advisory_lock_prefix #: Integer # rubocop:disable Layout/LeadingCommentSpace - prefix << 32 | FNV.fnv1_hash(lock_str, size: 32) + parsed_args end - # Packs a uint64 then unpacks to int64, which we need to do to keep the - # value within the bounds of Postgres' bigint. Overflow is okay because - # we can use the full bigint space (including negative numbers) for the - # advisory lock. - lock_key = uint64_to_int64(lock_key) - - @driver.advisory_lock(lock_key) - - existing_job = @driver.job_get_by_kind_and_unique_properties(get_params) - return InsertResult.new(existing_job, unique_skipped_as_duplicated: true) if existing_job - - block.call + encoded_args = JSON.generate(filtered_args.sort.to_h) + unique_key += "&args=#{encoded_args}" end - end - private def make_insert_params(args, insert_opts, is_insert_many: false) - raise "args should respond to `#kind`" if !args.respond_to?(:kind) + if unique_opts.by_period + lower_period_bound = truncate_time(@time_now_utc.call, unique_opts.by_period).utc - # ~all objects in Ruby respond to `#to_json`, so check non-nil instead. - args_json = args.to_json - raise "args should return non-nil from `#to_json`" if !args_json + unique_key += "&period=#{lower_period_bound.strftime("%FT%TZ")}" + end - args_insert_opts = if args.respond_to?(:insert_opts) - args_with_insert_opts = args #: _JobArgsWithInsertOpts # rubocop:disable Layout/LeadingCommentSpace - args_with_insert_opts.insert_opts || EMPTY_INSERT_OPTS - else - EMPTY_INSERT_OPTS + if unique_opts.by_queue + unique_key += "&queue=#{insert_params.queue}" end - scheduled_at = insert_opts.scheduled_at || args_insert_opts.scheduled_at - unique_opts = insert_opts.unique_opts || args_insert_opts.unique_opts + unique_key_hash = Digest::SHA256.digest(unique_key) + unique_states = validate_unique_states(unique_opts.by_state || DEFAULT_UNIQUE_STATES) - raise ArgumentError, "unique opts can't be used with `#insert_many`" if is_insert_many && unique_opts - - [ - Driver::JobInsertParams.new( - encoded_args: args_json, - kind: args.kind, - max_attempts: insert_opts.max_attempts || args_insert_opts.max_attempts || MAX_ATTEMPTS_DEFAULT, - priority: insert_opts.priority || args_insert_opts.priority || PRIORITY_DEFAULT, - queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, - scheduled_at: scheduled_at&.utc, # database defaults to now - state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: validate_tags(insert_opts.tags || args_insert_opts.tags) - ), - unique_opts - ] + [unique_key_hash, UniqueBitmask.from_states(unique_states)] end # Truncates the given time down to the interval. For example: @@ -290,11 +253,18 @@ def insert_many(args) private_constant :TAG_RE private def validate_tags(tags) - tags&.each do |tag| + tags.each do |tag| raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255 raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag) end end + + private def validate_unique_states(states) + REQUIRED_UNIQUE_STATES.each do |required_state| + raise ArgumentError, "by_state should include required state #{required_state}" unless states.include?(required_state) + end + states + end end # A single job to insert that's part of an #insert_many batch insert. Unlike @@ -322,7 +292,7 @@ class InsertResult # job matching unique property already being present. attr_reader :unique_skipped_as_duplicated - def initialize(job, unique_skipped_as_duplicated: false) + def initialize(job, unique_skipped_as_duplicated:) @job = job @unique_skipped_as_duplicated = unique_skipped_as_duplicated end diff --git a/lib/driver.rb b/lib/driver.rb index a3c11c6..92903f6 100644 --- a/lib/driver.rb +++ b/lib/driver.rb @@ -4,29 +4,6 @@ module River # considered to be for internal use only and subject to change. API stability # is not guaranteed. module Driver - # Parameters for looking up a job by kind and unique properties. - class JobGetByKindAndUniquePropertiesParam - attr_accessor :created_at - attr_accessor :encoded_args - attr_accessor :kind - attr_accessor :queue - attr_accessor :state - - def initialize( - kind:, - created_at: nil, - encoded_args: nil, - queue: nil, - state: nil - ) - self.kind = kind - self.created_at = created_at - self.encoded_args = encoded_args - self.queue = queue - self.state = state - end - end - # Insert parameters for a job. This is sent to underlying drivers and is meant # for internal use only. Its interface is subject to change. class JobInsertParams @@ -38,6 +15,8 @@ class JobInsertParams attr_accessor :scheduled_at attr_accessor :state attr_accessor :tags + attr_accessor :unique_key + attr_accessor :unique_states def initialize( encoded_args:, @@ -47,7 +26,9 @@ def initialize( queue:, scheduled_at:, state:, - tags: + tags:, + unique_key: nil, + unique_states: nil ) self.encoded_args = encoded_args self.kind = kind @@ -57,6 +38,8 @@ def initialize( self.scheduled_at = scheduled_at self.state = state self.tags = tags + self.unique_key = unique_key + self.unique_states = unique_states end end end diff --git a/lib/fnv.rb b/lib/fnv.rb deleted file mode 100644 index 28ffeef..0000000 --- a/lib/fnv.rb +++ /dev/null @@ -1,35 +0,0 @@ -module River - # FNV is the Fowler–Noll–Vo hash function, a simple hash that's very easy to - # implement, and hash the perfect characteristics for use with the 64 bits of - # available space in a PG advisory lock. - # - # I'm implemented it myself so that the River gem can stay dependency free - # (and because it's quite easy to do). - module FNV - def self.fnv1_hash(str, size:) - hash = OFFSET_BASIS.fetch(size) - mask = (2**size - 1).to_int # creates a mask of 1s of `size` bits long like 0xffffffff - prime = PRIME.fetch(size) - - str.each_byte do |byte| - hash *= prime - hash &= mask - hash ^= byte - end - - hash - end - - OFFSET_BASIS = { - 32 => 0x811c9dc5, - 64 => 0xcbf29ce484222325 - }.freeze - private_constant :OFFSET_BASIS - - PRIME = { - 32 => 0x01000193, - 64 => 0x00000100000001B3 - }.freeze - private_constant :PRIME - end -end diff --git a/lib/insert_opts.rb b/lib/insert_opts.rb index abd8dd3..baf24da 100644 --- a/lib/insert_opts.rb +++ b/lib/insert_opts.rb @@ -114,23 +114,33 @@ class UniqueOpts # Unlike other unique options, ByState gets a default when it's not set for # user convenience. The default is equivalent to: # - # by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_RUNNING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_SCHEDULED] + # by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_SCHEDULED] # # With this setting, any jobs of the same kind that have been completed or # discarded, but not yet cleaned out by the system, won't count towards the # uniqueness of a new insert. + # + # The pending, scheduled, available, and running states are required when + # customizing this list. attr_accessor :by_state + # Indicates that the job kind should not be considered for uniqueness. This + # is useful when you want to enforce uniqueness based on other properties + # across multiple worker types. + attr_accessor :exclude_kind + def initialize( by_args: nil, by_period: nil, by_queue: nil, - by_state: nil + by_state: nil, + exclude_kind: nil ) self.by_args = by_args self.by_period = by_period self.by_queue = by_queue self.by_state = by_state + self.exclude_kind = exclude_kind end end end diff --git a/lib/job.rb b/lib/job.rb index 494ded9..ca56780 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -3,6 +3,7 @@ module River JOB_STATE_CANCELLED = "cancelled" JOB_STATE_COMPLETED = "completed" JOB_STATE_DISCARDED = "discarded" + JOB_STATE_PENDING = "pending" JOB_STATE_RETRYABLE = "retryable" JOB_STATE_RUNNING = "running" JOB_STATE_SCHEDULED = "scheduled" @@ -113,6 +114,9 @@ class JobRow # configuration. attr_accessor :unique_key + # A list of states that the job must be in to be considered for uniqueness. + attr_accessor :unique_states + def initialize( id:, args:, @@ -132,7 +136,8 @@ def initialize( errors: nil, finalized_at: nil, tags: nil, - unique_key: nil + unique_key: nil, + unique_states: nil ) self.id = id self.args = args @@ -151,6 +156,7 @@ def initialize( self.state = state self.tags = tags self.unique_key = unique_key + self.unique_states = unique_states end end diff --git a/lib/riverqueue.rb b/lib/riverqueue.rb index 87f62b2..429e0d5 100644 --- a/lib/riverqueue.rb +++ b/lib/riverqueue.rb @@ -1,11 +1,11 @@ require "json" -require_relative "fnv" require_relative "insert_opts" require_relative "job" require_relative "client" require_relative "driver" +require_relative "unique_bitmask" module River end diff --git a/lib/unique_bitmask.rb b/lib/unique_bitmask.rb new file mode 100644 index 0000000..389885c --- /dev/null +++ b/lib/unique_bitmask.rb @@ -0,0 +1,41 @@ +module River + class UniqueBitmask + JOB_STATE_BIT_POSITIONS = { + ::River::JOB_STATE_AVAILABLE => 7, + ::River::JOB_STATE_CANCELLED => 6, + ::River::JOB_STATE_COMPLETED => 5, + ::River::JOB_STATE_DISCARDED => 4, + ::River::JOB_STATE_PENDING => 3, + ::River::JOB_STATE_RETRYABLE => 2, + ::River::JOB_STATE_RUNNING => 1, + ::River::JOB_STATE_SCHEDULED => 0 + }.freeze + private_constant :JOB_STATE_BIT_POSITIONS + + def self.from_states(states) + val = 0 + + states.each do |state| + bit_index = JOB_STATE_BIT_POSITIONS[state] + + bit_position = 7 - (bit_index % 8) + val |= 1 << bit_position + end + + format("%08b", val) + end + + def self.to_states(mask) + states = [] #: Array[jobStateAll] # rubocop:disable Layout/LeadingCommentSpace + + JOB_STATE_BIT_POSITIONS.each do |state, bit_index| + bit_position = 7 - (bit_index % 8) + if (mask & (1 << bit_position)) != 0 + states << state + end + end + + states.sort + end + end +end diff --git a/sig/client.rbs b/sig/client.rbs index aaa01b7..8fb37ae 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -4,27 +4,27 @@ module River QUEUE_DEFAULT: String class Client - @advisory_lock_prefix: Integer? @driver: _Driver @time_now_utc: ^() -> Time - def initialize: (_Driver driver, ?advisory_lock_prefix: Integer?) -> void + def initialize: (_Driver driver) -> void def insert: (jobArgs, ?insert_opts: InsertOpts) -> InsertResult - def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer - - private def check_advisory_lock_prefix_bounds: (Integer?) -> Integer? + def insert_many: (Array[jobArgs | InsertManyParams]) -> Array[InsertResult] DEFAULT_UNIQUE_STATES: Array[jobStateAll] EMPTY_INSERT_OPTS: InsertOpts + REQUIRED_UNIQUE_STATES: Array[jobStateAll] - private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult - private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] + private def insert_and_check_unique_job: (Driver::JobInsertParams) -> InsertResult + private def make_insert_params: (jobArgs, InsertOpts) -> Driver::JobInsertParams + private def make_unique_key_and_bitmask: (Driver::JobInsertParams, UniqueOpts) -> [String, String] private def truncate_time: (Time, Integer) -> Time private def uint64_to_int64: (Integer) -> Integer TAG_RE: Regexp - private def validate_tags: (Array[String]?) -> Array[String]? + private def validate_tags: (Array[String]) -> Array[String] + private def validate_unique_states: (Array[jobStateAll]) -> Array[jobStateAll] end class InsertManyParams @@ -45,6 +45,6 @@ module River attr_reader job: JobRow attr_reader unique_skipped_as_duplicated: bool - def initialize: (JobRow job, ?unique_skipped_as_duplicated: bool) -> void + def initialize: (JobRow job, unique_skipped_as_duplicated: bool) -> void end end diff --git a/sig/driver.rbs b/sig/driver.rbs index be4d4dc..ddf49db 100644 --- a/sig/driver.rbs +++ b/sig/driver.rbs @@ -2,9 +2,8 @@ module River interface _Driver def advisory_lock: (Integer) -> void def job_get_by_kind_and_unique_properties: (Driver::JobGetByKindAndUniquePropertiesParam) -> JobRow? - def job_insert: (Driver::JobInsertParams) -> JobRow - def job_insert_many: (Array[Driver::JobInsertParams]) -> Integer - def job_insert_unique: (Driver::JobInsertParams, String) -> [JobRow, bool] + def job_insert: (Driver::JobInsertParams) -> [JobRow, bool] + def job_insert_many: (Array[Driver::JobInsertParams]) -> Array[[JobRow, bool]] def transaction: [T] () { () -> T } -> T # this set of methods is used only in tests @@ -34,8 +33,10 @@ module River attr_accessor scheduled_at: Time? attr_accessor state: jobStateAll attr_accessor tags: Array[String]? + attr_accessor unique_key: String? + attr_accessor unique_states: String? - def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?) -> void + def initialize: (encoded_args: String, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time?, state: jobStateAll, tags: Array[String]?, ?unique_key: String?, ?unique_states: String?) -> void end end end diff --git a/sig/fnv.rbs b/sig/fnv.rbs deleted file mode 100644 index 3a016dc..0000000 --- a/sig/fnv.rbs +++ /dev/null @@ -1,9 +0,0 @@ -module River - module FNV - def self.fnv1_hash: (String, size: 32 | 64) -> Integer - - MASK: Hash[Integer, Integer] - OFFSET_BASIS: Hash[Integer, Integer] - PRIME: Hash[Integer, Integer] - end -end \ No newline at end of file diff --git a/sig/insert_opts.rbs b/sig/insert_opts.rbs index 69ad0ca..45b5fa6 100644 --- a/sig/insert_opts.rbs +++ b/sig/insert_opts.rbs @@ -11,11 +11,12 @@ module River end class UniqueOpts - attr_accessor by_args: bool? + attr_accessor by_args: bool? | Array[String]? attr_accessor by_period: Integer? attr_accessor by_queue: bool? attr_accessor by_state: Array[jobStateAll]? + attr_accessor exclude_kind: bool? - def initialize: (?by_args: bool?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?) -> void + def initialize: (?by_args: bool? | Array[String]?, ?by_period: Integer?, ?by_queue: bool?, ?by_state: Array[jobStateAll]?, ?exclude_kind: bool?) -> void end end diff --git a/sig/job.rbs b/sig/job.rbs index 3193f60..16a0288 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -3,11 +3,12 @@ module River JOB_STATE_CANCELLED: "cancelled" JOB_STATE_COMPLETED: "completed" JOB_STATE_DISCARDED: "discarded" + JOB_STATE_PENDING: "pending" JOB_STATE_RETRYABLE: "retryable" JOB_STATE_RUNNING: "running" JOB_STATE_SCHEDULED: "scheduled" - type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "retryable" | "running" | "scheduled" + type jobStateAll = "available" | "cancelled" | "completed" | "discarded" | "pending" | "retryable" | "running" | "scheduled" interface _JobArgs def is_a?: (Class) -> bool @@ -52,8 +53,9 @@ module River attr_accessor state: jobStateAll attr_accessor tags: Array[String]? attr_accessor unique_key: String? + attr_accessor unique_states: Array[jobStateAll]? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?, ?unique_key: String?, ?unique_states: Array[jobStateAll]?) -> void end class AttemptError diff --git a/sig/unique_bitmask.rbs b/sig/unique_bitmask.rbs new file mode 100644 index 0000000..af5f95b --- /dev/null +++ b/sig/unique_bitmask.rbs @@ -0,0 +1,9 @@ +module River + class UniqueBitmask + JOB_STATE_BIT_POSITIONS: Hash[jobStateAll, Integer] + + def self.from_states: (Array[jobStateAll]) -> String + + def self.to_states: (Integer) -> Array[jobStateAll] + end +end diff --git a/spec/client_spec.rb b/spec/client_spec.rb index f3c8e9b..c818a8b 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -20,6 +20,26 @@ class SimpleArgsWithInsertOpts < SimpleArgs attr_accessor :insert_opts end +class ComplexArgs + attr_accessor :customer_id + attr_accessor :order_id + attr_accessor :trace_id + attr_accessor :email + + def initialize(customer_id:, order_id:, trace_id:, email:) + self.customer_id = customer_id + self.order_id = order_id + self.trace_id = trace_id + self.email = email + end + + def kind = "complex" + + # intentionally not sorted alphabetically so we can ensure that the JSON + # used in the unique key is sorted. + def to_json = JSON.dump({order_id: order_id, customer_id: customer_id, trace_id: trace_id, email: email}) +end + # I originally had this top-level client test set up so that it was using a mock # driver, but it just turned out to be too horribly unsustainable. Adding # anything new required careful mock engineering, and even once done, we weren't @@ -119,22 +139,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs ) end - it "errors if advisory lock prefix is larger than four bytes" do - River::Client.new(driver, advisory_lock_prefix: 123) - - expect do - River::Client.new(driver, advisory_lock_prefix: -1) - end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") - - # 2^32-1 is 0xffffffff (1s for 32 bits) which fits - River::Client.new(driver, advisory_lock_prefix: 2**32 - 1) - - # 2^32 is 0x100000000, which does not - expect do - River::Client.new(driver, advisory_lock_prefix: 2**32) - end.to raise_error(ArgumentError, "advisory lock prefix must fit inside four bytes") - end - it "errors if args don't respond to #kind" do args_klass = Class.new do def to_json = {} @@ -186,16 +190,7 @@ def check_bigint_bounds(int) let(:now) { Time.now.utc } before { client.instance_variable_set(:@time_now_utc, -> { now }) } - let(:advisory_lock_keys) { [] } - - before do - # required so it's properly captured by the lambda below - keys = advisory_lock_keys - - driver.singleton_class.send(:define_method, :advisory_lock, ->(key) { keys.push(key) }) - end - - it "inserts a new unique job with minimal options on the fast path" do + it "inserts a new unique job with minimal options" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -207,23 +202,22 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - expect(advisory_lock_keys).to be_empty - - unique_key_str = "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + unique_key_str = "&kind=#{insert_res.job.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_PENDING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with minimal options on the slow path" do + it "inserts a new unique job with custom states" do job_args = SimpleArgsWithInsertOpts.new(job_num: 1) job_args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] ) ) @@ -231,97 +225,70 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) + lock_str = "&kind=#{job_args.kind}" \ + "&queue=#{River::QUEUE_DEFAULT}" - expect(insert_res.job.unique_key).to be_nil + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(lock_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) insert_res = client.insert(job_args) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options on the fast path" do - job_args = SimpleArgsWithInsertOpts.new(job_num: 1) - job_args.insert_opts = River::InsertOpts.new( + it "inserts a new unique job with all options" do + job_args = ComplexArgs.new(customer_id: 1, order_id: 2, trace_id: 3, email: "john@example.com") + insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_args: true, by_period: 15 * 60, by_queue: true, - by_state: River::Client.const_get(:DEFAULT_UNIQUE_STATES) + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_CANCELLED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED], + exclude_kind: true ) ) - insert_res = client.insert(job_args) + insert_res = client.insert(job_args, insert_opts: insert_opts) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false - expect(advisory_lock_keys).to be_empty - - unique_key_str = "&args=#{JSON.dump({job_num: 1})}" \ + sorted_json = {customer_id: 1, email: "john@example.com", order_id: 2, trace_id: 3} + unique_key_str = "&args=#{JSON.dump(sorted_json)}" \ "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{River::Client.const_get(:DEFAULT_UNIQUE_STATES).join(",")}" + "&queue=#{River::QUEUE_DEFAULT}" expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) + expect(insert_res.job.unique_states).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_CANCELLED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) - insert_res = client.insert(job_args) + insert_res = client.insert(job_args, insert_opts: insert_opts) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a new unique job with all options on the slow path" do - job_args = SimpleArgsWithInsertOpts.new(job_num: 1) - job_args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_args: true, - by_period: 15 * 60, - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE] - ) + it "inserts a new unique job with custom by_args" do + job_args = ComplexArgs.new(customer_id: 1, order_id: 2, trace_id: 3, email: "john@example.com") + insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new(by_args: ["customer_id", "order_id"]) ) - insert_res = client.insert(job_args) + insert_res = client.insert(job_args, insert_opts: insert_opts) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false + original_job_id = insert_res.job.id - lock_str = "unique_keykind=#{job_args.kind}" \ - "&args=#{JSON.dump({job_num: 1})}" \ - "&period=#{client.send(:truncate_time, now, 15 * 60).utc.strftime("%FT%TZ")}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{[River::JOB_STATE_AVAILABLE].join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, River::FNV.fnv1_hash(lock_str, size: 64)))]) - - expect(insert_res.job.unique_key).to be_nil + unique_key_str = "&kind=complex&args=#{JSON.dump({customer_id: 1, order_id: 2})}" + expect(insert_res.job.unique_key).to eq(Digest::SHA256.digest(unique_key_str)) - insert_res = client.insert(job_args) + insert_res = client.insert(job_args, insert_opts: insert_opts) expect(insert_res.job).to_not be_nil + expect(insert_res.job.id).to eq(original_job_id) expect(insert_res.unique_skipped_as_duplicated).to be true - end - it "inserts a new unique job with advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - job_args = SimpleArgsWithInsertOpts.new(job_num: 1) - job_args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path - ) - ) - - insert_res = client.insert(job_args) + # Change just the customer ID and the job should be unique again. + job_args.customer_id = 2 + insert_res = client.insert(job_args, insert_opts: insert_opts) expect(insert_res.job).to_not be_nil + expect(insert_res.job.id).to_not eq(original_job_id) expect(insert_res.unique_skipped_as_duplicated).to be false - - lock_str = "unique_keykind=#{job_args.kind}" \ - "&queue=#{River::QUEUE_DEFAULT}" \ - "&state=#{job_args.insert_opts.unique_opts.by_state.join(",")}" - expect(advisory_lock_keys).to eq([check_bigint_bounds(client.send(:uint64_to_int64, 123456 << 32 | River::FNV.fnv1_hash(lock_str, size: 32)))]) - - lock_key = advisory_lock_keys[0] - expect(lock_key >> 32).to eq(123456) end it "skips unique check if unique opts empty" do @@ -334,16 +301,35 @@ def check_bigint_bounds(int) expect(insert_res.job).to_not be_nil expect(insert_res.unique_skipped_as_duplicated).to be false end + + it "errors if any of the required unique states are removed from a custom by_states list" do + default_states = [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_RETRYABLE, River::JOB_STATE_SCHEDULED] + required_states = [River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] + required_states.each do |state| + job_args = SimpleArgsWithInsertOpts.new(job_num: 1) + job_args.insert_opts = River::InsertOpts.new( + unique_opts: River::UniqueOpts.new( + by_state: default_states - [state] + ) + ) + + expect do + client.insert(job_args) + end.to raise_error(ArgumentError, "by_state should include required state #{state}") + end + end end end describe "#insert_many" do it "inserts jobs from jobArgs with defaults" do - num_inserted = client.insert_many([ + results = client.insert_many([ SimpleArgs.new(job_num: 1), SimpleArgs.new(job_num: 2) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(2) + expect(results[0].job).to have_attributes(args: {"job_num" => 1}) + expect(results[1].job).to have_attributes(args: {"job_num" => 2}) jobs = driver.job_list expect(jobs.count).to be 2 @@ -376,11 +362,13 @@ def check_bigint_bounds(int) end it "inserts jobs from InsertManyParams with defaults" do - num_inserted = client.insert_many([ + results = client.insert_many([ River::InsertManyParams.new(SimpleArgs.new(job_num: 1)), River::InsertManyParams.new(SimpleArgs.new(job_num: 2)) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(2) + expect(results[0].job).to have_attributes(args: {"job_num" => 1}) + expect(results[1].job).to have_attributes(args: {"job_num" => 2}) jobs = driver.job_list expect(jobs.count).to be 2 @@ -413,6 +401,19 @@ def check_bigint_bounds(int) end it "inserts jobs with insert opts" do + # First, insert a job which will cause a duplicate conflict with the bulk + # insert so the bulk insert's row gets skipped. + dupe_job_args = SimpleArgsWithInsertOpts.new(job_num: 0) + dupe_job_args.insert_opts = River::InsertOpts.new( + queue: "job_to_duplicate", + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) + + insert_res = client.insert(dupe_job_args) + expect(insert_res.job).to_not be_nil + # We set job insert opts in this spec too so that we can verify that the # options passed at insertion time take precedence. args1 = SimpleArgsWithInsertOpts.new(job_num: 1) @@ -429,8 +430,16 @@ def check_bigint_bounds(int) queue: "job_custom_queue_2", tags: ["job_custom_2"] ) + args3 = SimpleArgsWithInsertOpts.new(job_num: 3) + args3.insert_opts = River::InsertOpts.new( + queue: "to_duplicate", # duplicate by queue, will be skipped + tags: ["job_custom_3"], + unique_opts: River::UniqueOpts.new( + by_queue: true + ) + ) - num_inserted = client.insert_many([ + results = client.insert_many([ River::InsertManyParams.new(args1, insert_opts: River::InsertOpts.new( max_attempts: 17, priority: 3, @@ -442,37 +451,43 @@ def check_bigint_bounds(int) priority: 4, queue: "my_queue_2", tags: ["custom_2"] + )), + River::InsertManyParams.new(args3, insert_opts: River::InsertOpts.new( + queue: "job_to_duplicate", # duplicate by queue, will be skipped + tags: ["custom_3"], + unique_opts: River::UniqueOpts.new( + by_queue: true + ) )) ]) - expect(num_inserted).to eq(2) + expect(results.length).to eq(3) # all rows returned, including skipped duplicates + expect(results[0].job).to have_attributes(tags: ["custom_1"]) + expect(results[1].job).to have_attributes(tags: ["custom_2"]) + expect(results[2].unique_skipped_as_duplicated).to be true + expect(results[2].job).to have_attributes( + id: insert_res.job.id, + tags: [] + ) jobs = driver.job_list - expect(jobs.count).to be 2 + expect(jobs.count).to be 3 - expect(jobs[0]).to have_attributes( + expect(jobs[0]).to have_attributes(queue: "job_to_duplicate") + + expect(jobs[1]).to have_attributes( max_attempts: 17, priority: 3, queue: "my_queue_1", tags: ["custom_1"] ) - expect(jobs[1]).to have_attributes( + expect(jobs[2]).to have_attributes( max_attempts: 18, priority: 4, queue: "my_queue_2", tags: ["custom_2"] ) end - - it "raises error with unique opts" do - expect do - client.insert_many([ - River::InsertManyParams.new(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( - unique_opts: River::UniqueOpts.new - )) - ]) - end.to raise_error(ArgumentError, "unique opts can't be used with `#insert_many`") - end end describe River::Client.const_get(:DEFAULT_UNIQUE_STATES) do diff --git a/spec/driver_shared_examples.rb b/spec/driver_shared_examples.rb index d1d659a..df0b54f 100644 --- a/spec/driver_shared_examples.rb +++ b/spec/driver_shared_examples.rb @@ -19,7 +19,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs shared_examples "driver shared examples" do describe "unique insertion" do - it "inserts a unique job once on the fast path" do + it "inserts a unique job once" do args = SimpleArgsWithInsertOpts.new(job_num: 1) args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( @@ -37,35 +37,14 @@ class SimpleArgsWithInsertOpts < SimpleArgs expect(insert_res.unique_skipped_as_duplicated).to be true end - it "inserts a unique job on the slow path" do + it "inserts a unique job with custom states" do client = River::Client.new(driver) args = SimpleArgsWithInsertOpts.new(job_num: 1) args.insert_opts = River::InsertOpts.new( unique_opts: River::UniqueOpts.new( by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path - ) - ) - - insert_res = client.insert(args) - expect(insert_res.job).to_not be_nil - expect(insert_res.unique_skipped_as_duplicated).to be false - original_job = insert_res.job - - insert_res = client.insert(args) - expect(insert_res.job.id).to eq(original_job.id) - expect(insert_res.unique_skipped_as_duplicated).to be true - end - - it "inserts a unique job on the slow path with an advisory lock prefix" do - client = River::Client.new(driver, advisory_lock_prefix: 123456) - - args = SimpleArgsWithInsertOpts.new(job_num: 1) - args.insert_opts = River::InsertOpts.new( - unique_opts: River::UniqueOpts.new( - by_queue: true, - by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_RUNNING] # non-default triggers slow path + by_state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] ) ) @@ -80,26 +59,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#advisory_lock" do - it "takes an advisory lock" do - driver.transaction do - driver.advisory_lock(123) - - Thread.new do - expect(driver.advisory_lock_try(123)).to be false - end.join - end - end - end - - describe "#advisory_lock_try" do - it "takes an advisory lock" do - driver.transaction do - expect(driver.advisory_lock_try(123)).to be true - end - end - end - describe "#job_get_by_id" do let(:job_args) { SimpleArgs.new(job_num: 1) } @@ -113,93 +72,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#job_get_by_kind_and_unique_properties" do - let(:job_args) { SimpleArgs.new(job_num: 1) } - - it "gets a job by kind" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: "does_not_exist" - )) - ).to be_nil - end - - it "gets a job by created at period" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at - 1, insert_res.job.created_at + 1] - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - created_at: [insert_res.job.created_at + 1, insert_res.job.created_at + 3] - )) - ).to be_nil - end - - it "gets a job by encoded args" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump(insert_res.job.args) - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - encoded_args: JSON.dump({"job_num" => 2}) - )) - ).to be_nil - end - - it "gets a job by queue" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: insert_res.job.queue - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - queue: "other_queue" - )) - ).to be_nil - end - - it "gets a job by state" do - insert_res = client.insert(job_args) - - job = driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_AVAILABLE, River::JOB_STATE_COMPLETED] - )) - expect(job.id).to eq(insert_res.job.id) - - expect( - driver.job_get_by_kind_and_unique_properties(River::Driver::JobGetByKindAndUniquePropertiesParam.new( - kind: job_args.kind, - state: [River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED] - )) - ).to be_nil - end - end - describe "#job_insert" do it "inserts a job" do insert_res = client.insert(SimpleArgs.new(job_num: 1)) @@ -215,6 +87,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs state: River::JOB_STATE_AVAILABLE, tags: [] ) + expect(insert_res.unique_skipped_as_duplicated).to be false # Make sure it made it to the database. Assert only minimally since we're # certain it's the same as what we checked above. @@ -235,6 +108,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs scheduled_at: be_within(2).of(target_time), state: River::JOB_STATE_SCHEDULED ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with job insert opts" do @@ -253,6 +127,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: "job_custom_queue", tags: ["job_custom"] ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with insert opts" do @@ -278,6 +153,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs queue: "my_queue", tags: ["custom"] ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts with job args hash" do @@ -288,6 +164,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs args: {"job_num" => 1}, kind: "hash_kind" ) + expect(insert_res.unique_skipped_as_duplicated).to be false end it "inserts in a transaction" do @@ -298,6 +175,7 @@ class SimpleArgsWithInsertOpts < SimpleArgs job = driver.job_get_by_id(insert_res.job.id) expect(job).to_not be_nil + expect(insert_res.unique_skipped_as_duplicated).to be false raise driver.rollback_exception end @@ -306,15 +184,69 @@ class SimpleArgsWithInsertOpts < SimpleArgs job = driver.job_get_by_id(insert_res.job.id) expect(job).to be_nil end + + it "inserts a unique job" do + insert_params = River::Driver::JobInsertParams.new( + encoded_args: JSON.dump({"job_num" => 1}), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: Time.now.getutc, + state: River::JOB_STATE_AVAILABLE, + unique_key: "unique_key", + unique_states: "00000001", + tags: nil + ) + + job_row, unique_skipped_as_duplicated = driver.job_insert(insert_params) + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [], + unique_key: "unique_key", + unique_states: [::River::JOB_STATE_AVAILABLE] + ) + expect(unique_skipped_as_duplicated).to be false + + # second insertion should be skipped + job_row, unique_skipped_as_duplicated = driver.job_insert(insert_params) + expect(job_row).to have_attributes( + attempt: 0, + args: {"job_num" => 1}, + created_at: be_within(2).of(Time.now.getutc), + kind: "simple", + max_attempts: River::MAX_ATTEMPTS_DEFAULT, + queue: River::QUEUE_DEFAULT, + priority: River::PRIORITY_DEFAULT, + scheduled_at: be_within(2).of(Time.now.getutc), + state: River::JOB_STATE_AVAILABLE, + tags: [], + unique_key: "unique_key", + unique_states: [::River::JOB_STATE_AVAILABLE] + ) + expect(unique_skipped_as_duplicated).to be true + end end describe "#job_insert_many" do it "inserts multiple jobs" do - num_inserted = client.insert_many([ + inserted = client.insert_many([ SimpleArgs.new(job_num: 1), SimpleArgs.new(job_num: 2) ]) - expect(num_inserted).to eq(2) + expect(inserted.length).to eq(2) + expect(inserted[0].job).to have_attributes(args: {"job_num" => 1}) + expect(inserted[0].unique_skipped_as_duplicated).to eq false + expect(inserted[1].job).to have_attributes(args: {"job_num" => 2}) + expect(inserted[1].unique_skipped_as_duplicated).to eq false jobs = driver.job_list expect(jobs.count).to be 2 @@ -350,11 +282,15 @@ class SimpleArgsWithInsertOpts < SimpleArgs jobs = nil driver.transaction do - num_inserted = client.insert_many([ + inserted = client.insert_many([ SimpleArgs.new(job_num: 1), SimpleArgs.new(job_num: 2) ]) - expect(num_inserted).to eq(2) + expect(inserted.length).to eq(2) + expect(inserted[0].unique_skipped_as_duplicated).to eq false + expect(inserted[0].job).to have_attributes(args: {"job_num" => 1}) + expect(inserted[1].unique_skipped_as_duplicated).to eq false + expect(inserted[1].job).to have_attributes(args: {"job_num" => 2}) jobs = driver.job_list expect(jobs.count).to be 2 @@ -368,52 +304,6 @@ class SimpleArgsWithInsertOpts < SimpleArgs end end - describe "#job_insert_unique" do - it "inserts a job" do - insert_params = River::Driver::JobInsertParams.new( - encoded_args: JSON.dump({"job_num" => 1}), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: Time.now.getutc, - state: River::JOB_STATE_AVAILABLE, - tags: nil - ) - - job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") - expect(job_row).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.getutc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.getutc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) - expect(unique_skipped_as_duplicated).to be false - - # second insertion should be skipped - job_row, unique_skipped_as_duplicated = driver.job_insert_unique(insert_params, "unique_key") - expect(job_row).to have_attributes( - attempt: 0, - args: {"job_num" => 1}, - created_at: be_within(2).of(Time.now.getutc), - kind: "simple", - max_attempts: River::MAX_ATTEMPTS_DEFAULT, - queue: River::QUEUE_DEFAULT, - priority: River::PRIORITY_DEFAULT, - scheduled_at: be_within(2).of(Time.now.getutc), - state: River::JOB_STATE_AVAILABLE, - tags: [] - ) - expect(unique_skipped_as_duplicated).to be true - end - end - describe "#job_list" do let(:job_args) { SimpleArgs.new(job_num: 1) } diff --git a/spec/fnv_spec.rb b/spec/fnv_spec.rb deleted file mode 100644 index df21872..0000000 --- a/spec/fnv_spec.rb +++ /dev/null @@ -1,641 +0,0 @@ -require "spec_helper" - -describe River::FNV do - describe "#fnv1_hash" do - it "hashes strings to 32 bits" do - TEST_STRS.each do |test_str| - expect(River::FNV.fnv1_hash(test_str, size: 32)).to eq(FNV1_32_HASHES[test_str]) - end - end - - it "hashes strings to 64 bits" do - TEST_STRS.each do |test_str| - expect(River::FNV.fnv1_hash(test_str, size: 64)).to eq(FNV1_64_HASHES[test_str]) - end - end - end -end - -# -# Test strings pulled from the Python FNV hash test suite: -# -# https://github.com/znerol/py-fnvhash/blob/master/fnvhash/test/vector.py -# - -TEST_STRS = [ - "", - "a", - "b", - "c", - "d", - "e", - "f", - "fo", - "foo", - "foob", - "fooba", - "foobar", - "" + "\x00", - "a" + "\x00", - "b" + "\x00", - "c" + "\x00", - "d" + "\x00", - "e" + "\x00", - "f" + "\x00", - "fo" + "\x00", - "foo" + "\x00", - "foob" + "\x00", - "fooba" + "\x00", - "foobar" + "\x00", - "ch", - "cho", - "chon", - "chong", - "chongo", - "chongo ", - "chongo w", - "chongo wa", - "chongo was", - "chongo was ", - "chongo was h", - "chongo was he", - "chongo was her", - "chongo was here", - "chongo was here!", - "chongo was here!\n", - "ch" + "\x00", - "cho" + "\x00", - "chon" + "\x00", - "chong" + "\x00", - "chongo" + "\x00", - "chongo " + "\x00", - "chongo w" + "\x00", - "chongo wa" + "\x00", - "chongo was" + "\x00", - "chongo was " + "\x00", - "chongo was h" + "\x00", - "chongo was he" + "\x00", - "chongo was her" + "\x00", - "chongo was here" + "\x00", - "chongo was here!" + "\x00", - "chongo was here!\n" + "\x00", - "cu", - "cur", - "curd", - "curds", - "curds ", - "curds a", - "curds an", - "curds and", - "curds and ", - "curds and w", - "curds and wh", - "curds and whe", - "curds and whey", - "curds and whey\n", - "cu" + "\x00", - "cur" + "\x00", - "curd" + "\x00", - "curds" + "\x00", - "curds " + "\x00", - "curds a" + "\x00", - "curds an" + "\x00", - "curds and" + "\x00", - "curds and " + "\x00", - "curds and w" + "\x00", - "curds and wh" + "\x00", - "curds and whe" + "\x00", - "curds and whey" + "\x00", - "curds and whey\n" + "\x00", - "hi", - "hi" + "\x00", - "hello", - "hello" + "\x00", - "\xff\x00\x00\x01", - "\x01\x00\x00\xff", - "\xff\x00\x00\x02", - "\x02\x00\x00\xff", - "\xff\x00\x00\x03", - "\x03\x00\x00\xff", - "\xff\x00\x00\x04", - "\x04\x00\x00\xff", - "\x40\x51\x4e\x44", - "\x44\x4e\x51\x40", - "\x40\x51\x4e\x4a", - "\x4a\x4e\x51\x40", - "\x40\x51\x4e\x54", - "\x54\x4e\x51\x40", - "127.0.0.1", - "127.0.0.1" + "\x00", - "127.0.0.2", - "127.0.0.2" + "\x00", - "127.0.0.3", - "127.0.0.3" + "\x00", - "64.81.78.68", - "64.81.78.68" + "\x00", - "64.81.78.74", - "64.81.78.74" + "\x00", - "64.81.78.84", - "64.81.78.84" + "\x00", - "feedface", - "feedface" + "\x00", - "feedfacedaffdeed", - "feedfacedaffdeed" + "\x00", - "feedfacedeadbeef", - "feedfacedeadbeef" + "\x00", - "line 1\nline 2\nline 3", - "chongo /\\../\\", - "chongo /\\../\\" + "\x00", - "chongo (Landon Curt Noll) /\\../\\", - "chongo (Landon Curt Noll) /\\../\\" + "\x00", - "http://antwrp.gsfc.nasa.gov/apod/astropix.html", - "http://en.wikipedia.org/wiki/Fowler_Noll_Vo_hash", - "http://epod.usra.edu/", - "http://exoplanet.eu/", - "http://hvo.wr.usgs.gov/cam3/", - "http://hvo.wr.usgs.gov/cams/HMcam/", - "http://hvo.wr.usgs.gov/kilauea/update/deformation.html", - "http://hvo.wr.usgs.gov/kilauea/update/images.html", - "http://hvo.wr.usgs.gov/kilauea/update/maps.html", - "http://hvo.wr.usgs.gov/volcanowatch/current_issue.html", - "http://neo.jpl.nasa.gov/risk/", - "http://norvig.com/21-days.html", - "http://primes.utm.edu/curios/home.php", - "http://slashdot.org/", - "http://tux.wr.usgs.gov/Maps/155.25-19.5.html", - "http://volcano.wr.usgs.gov/kilaueastatus.php", - "http://www.avo.alaska.edu/activity/Redoubt.php", - "http://www.dilbert.com/fast/", - "http://www.fourmilab.ch/gravitation/orbits/", - "http://www.fpoa.net/", - "http://www.ioccc.org/index.html", - "http://www.isthe.com/cgi-bin/number.cgi", - "http://www.isthe.com/chongo/bio.html", - "http://www.isthe.com/chongo/index.html", - "http://www.isthe.com/chongo/src/calc/lucas-calc", - "http://www.isthe.com/chongo/tech/astro/venus2004.html", - "http://www.isthe.com/chongo/tech/astro/vita.html", - "http://www.isthe.com/chongo/tech/comp/c/expert.html", - "http://www.isthe.com/chongo/tech/comp/calc/index.html", - "http://www.isthe.com/chongo/tech/comp/fnv/index.html", - "http://www.isthe.com/chongo/tech/math/number/howhigh.html", - "http://www.isthe.com/chongo/tech/math/number/number.html", - "http://www.isthe.com/chongo/tech/math/prime/mersenne.html", - "http://www.isthe.com/chongo/tech/math/prime/mersenne.html#largest", - "http://www.lavarnd.org/cgi-bin/corpspeak.cgi", - "http://www.lavarnd.org/cgi-bin/haiku.cgi", - "http://www.lavarnd.org/cgi-bin/rand-none.cgi", - "http://www.lavarnd.org/cgi-bin/randdist.cgi", - "http://www.lavarnd.org/index.html", - "http://www.lavarnd.org/what/nist-test.html", - "http://www.macosxhints.com/", - "http://www.mellis.com/", - "http://www.nature.nps.gov/air/webcams/parks/havoso2alert/havoalert.cfm", - "http://www.nature.nps.gov/air/webcams/parks/havoso2alert/timelines_24.cfm", - "http://www.paulnoll.com/", - "http://www.pepysdiary.com/", - "http://www.sciencenews.org/index/home/activity/view", - "http://www.skyandtelescope.com/", - "http://www.sput.nl/~rob/sirius.html", - "http://www.systemexperts.com/", - "http://www.tq-international.com/phpBB3/index.php", - "http://www.travelquesttours.com/index.htm", - "http://www.wunderground.com/global/stations/89606.html", - "21701" * 10, - "M21701" * 10, - "2^21701-1" * 10, - "\x54\xc5" * 10, - "\xc5\x54" * 10, - "23209" * 10, - "M23209" * 10, - "2^23209-1" * 10, - "\x5a\xa9" * 10, - "\xa9\x5a" * 10, - "391581216093" * 10, - "391581*2^216093-1" * 10, - "\x05\xf9\x9d\x03\x4c\x81" * 10, - "FEDCBA9876543210" * 10, - "\xfe\xdc\xba\x98\x76\x54\x32\x10" * 10, - "EFCDAB8967452301" * 10, - "\xef\xcd\xab\x89\x67\x45\x23\x01" * 10, - "0123456789ABCDEF" * 10, - "\x01\x23\x45\x67\x89\xab\xcd\xef" * 10, - "1032547698BADCFE" * 10, - "\x10\x32\x54\x76\x98\xba\xdc\xfe" * 10, - "\x00" * 500, - "\x07" * 500, - "~" * 500, - "\x7f" * 500 -] - -FNV1_32_HASHES = { - TEST_STRS[0] => 0x811c9dc5, - TEST_STRS[1] => 0x050c5d7e, - TEST_STRS[2] => 0x050c5d7d, - TEST_STRS[3] => 0x050c5d7c, - TEST_STRS[4] => 0x050c5d7b, - TEST_STRS[5] => 0x050c5d7a, - TEST_STRS[6] => 0x050c5d79, - TEST_STRS[7] => 0x6b772514, - TEST_STRS[8] => 0x408f5e13, - TEST_STRS[9] => 0xb4b1178b, - TEST_STRS[10] => 0xfdc80fb0, - TEST_STRS[11] => 0x31f0b262, - TEST_STRS[12] => 0x050c5d1f, - TEST_STRS[13] => 0x70772d5a, - TEST_STRS[14] => 0x6f772bc7, - TEST_STRS[15] => 0x6e772a34, - TEST_STRS[16] => 0x6d7728a1, - TEST_STRS[17] => 0x6c77270e, - TEST_STRS[18] => 0x6b77257b, - TEST_STRS[19] => 0x408f5e7c, - TEST_STRS[20] => 0xb4b117e9, - TEST_STRS[21] => 0xfdc80fd1, - TEST_STRS[22] => 0x31f0b210, - TEST_STRS[23] => 0xffe8d046, - TEST_STRS[24] => 0x6e772a5c, - TEST_STRS[25] => 0x4197aebb, - TEST_STRS[26] => 0xfcc8100f, - TEST_STRS[27] => 0xfdf147fa, - TEST_STRS[28] => 0xbcd44ee1, - TEST_STRS[29] => 0x23382c13, - TEST_STRS[30] => 0x846d619e, - TEST_STRS[31] => 0x1630abdb, - TEST_STRS[32] => 0xc99e89b2, - TEST_STRS[33] => 0x1692c316, - TEST_STRS[34] => 0x9f091bca, - TEST_STRS[35] => 0x2556be9b, - TEST_STRS[36] => 0x628e0e73, - TEST_STRS[37] => 0x98a0bf6c, - TEST_STRS[38] => 0xb10d5725, - TEST_STRS[39] => 0xdd002f35, - TEST_STRS[40] => 0x4197aed4, - TEST_STRS[41] => 0xfcc81061, - TEST_STRS[42] => 0xfdf1479d, - TEST_STRS[43] => 0xbcd44e8e, - TEST_STRS[44] => 0x23382c33, - TEST_STRS[45] => 0x846d61e9, - TEST_STRS[46] => 0x1630abba, - TEST_STRS[47] => 0xc99e89c1, - TEST_STRS[48] => 0x1692c336, - TEST_STRS[49] => 0x9f091ba2, - TEST_STRS[50] => 0x2556befe, - TEST_STRS[51] => 0x628e0e01, - TEST_STRS[52] => 0x98a0bf09, - TEST_STRS[53] => 0xb10d5704, - TEST_STRS[54] => 0xdd002f3f, - TEST_STRS[55] => 0x1c4a506f, - TEST_STRS[56] => 0x6e772a41, - TEST_STRS[57] => 0x26978421, - TEST_STRS[58] => 0xe184ff97, - TEST_STRS[59] => 0x9b5e5ac6, - TEST_STRS[60] => 0x5b88e592, - TEST_STRS[61] => 0xaa8164b7, - TEST_STRS[62] => 0x20b18c7b, - TEST_STRS[63] => 0xf28025c5, - TEST_STRS[64] => 0x84bb753f, - TEST_STRS[65] => 0x3219925a, - TEST_STRS[66] => 0x384163c6, - TEST_STRS[67] => 0x54f010d7, - TEST_STRS[68] => 0x8cea820c, - TEST_STRS[69] => 0xe12ab8ee, - TEST_STRS[70] => 0x26978453, - TEST_STRS[71] => 0xe184fff3, - TEST_STRS[72] => 0x9b5e5ab5, - TEST_STRS[73] => 0x5b88e5b2, - TEST_STRS[74] => 0xaa8164d6, - TEST_STRS[75] => 0x20b18c15, - TEST_STRS[76] => 0xf28025a1, - TEST_STRS[77] => 0x84bb751f, - TEST_STRS[78] => 0x3219922d, - TEST_STRS[79] => 0x384163ae, - TEST_STRS[80] => 0x54f010b2, - TEST_STRS[81] => 0x8cea8275, - TEST_STRS[82] => 0xe12ab8e4, - TEST_STRS[83] => 0x64411eaa, - TEST_STRS[84] => 0x6977223c, - TEST_STRS[85] => 0x428ae474, - TEST_STRS[86] => 0xb6fa7167, - TEST_STRS[87] => 0x73408525, - TEST_STRS[88] => 0xb78320a1, - TEST_STRS[89] => 0x0caf4135, - TEST_STRS[90] => 0xb78320a2, - TEST_STRS[91] => 0xcdc88e80, - TEST_STRS[92] => 0xb78320a3, - TEST_STRS[93] => 0x8ee1dbcb, - TEST_STRS[94] => 0xb78320a4, - TEST_STRS[95] => 0x4ffb2716, - TEST_STRS[96] => 0x860632aa, - TEST_STRS[97] => 0xcc2c5c64, - TEST_STRS[98] => 0x860632a4, - TEST_STRS[99] => 0x2a7ec4a6, - TEST_STRS[100] => 0x860632ba, - TEST_STRS[101] => 0xfefe8e14, - TEST_STRS[102] => 0x0a3cffd8, - TEST_STRS[103] => 0xf606c108, - TEST_STRS[104] => 0x0a3cffdb, - TEST_STRS[105] => 0xf906c5c1, - TEST_STRS[106] => 0x0a3cffda, - TEST_STRS[107] => 0xf806c42e, - TEST_STRS[108] => 0xc07167d7, - TEST_STRS[109] => 0xc9867775, - TEST_STRS[110] => 0xbf716668, - TEST_STRS[111] => 0xc78435b8, - TEST_STRS[112] => 0xc6717155, - TEST_STRS[113] => 0xb99568cf, - TEST_STRS[114] => 0x7662e0d6, - TEST_STRS[115] => 0x33a7f0e2, - TEST_STRS[116] => 0xc2732f95, - TEST_STRS[117] => 0xb053e78f, - TEST_STRS[118] => 0x3a19c02a, - TEST_STRS[119] => 0xa089821e, - TEST_STRS[120] => 0x31ae8f83, - TEST_STRS[121] => 0x995fa9c4, - TEST_STRS[122] => 0x35983f8c, - TEST_STRS[123] => 0x5036a251, - TEST_STRS[124] => 0x97018583, - TEST_STRS[125] => 0xb4448d60, - TEST_STRS[126] => 0x025dfe59, - TEST_STRS[127] => 0xc5eab3af, - TEST_STRS[128] => 0x7d21ba1e, - TEST_STRS[129] => 0x7704cddb, - TEST_STRS[130] => 0xd0071bfe, - TEST_STRS[131] => 0x0ff3774c, - TEST_STRS[132] => 0xb0fea0ea, - TEST_STRS[133] => 0x58177303, - TEST_STRS[134] => 0x4f599cda, - TEST_STRS[135] => 0x3e590a47, - TEST_STRS[136] => 0x965595f8, - TEST_STRS[137] => 0xc37f178d, - TEST_STRS[138] => 0x9711dd26, - TEST_STRS[139] => 0x23c99b7f, - TEST_STRS[140] => 0x6e568b17, - TEST_STRS[141] => 0x43f0245b, - TEST_STRS[142] => 0xbcb7a001, - TEST_STRS[143] => 0x12e6dffe, - TEST_STRS[144] => 0x0792f2d6, - TEST_STRS[145] => 0xb966936b, - TEST_STRS[146] => 0x46439ac5, - TEST_STRS[147] => 0x728d49af, - TEST_STRS[148] => 0xd33745c9, - TEST_STRS[149] => 0xbc382a57, - TEST_STRS[150] => 0x4bda1d31, - TEST_STRS[151] => 0xce35ccae, - TEST_STRS[152] => 0x3b6eed94, - TEST_STRS[153] => 0x445c9c58, - TEST_STRS[154] => 0x3db8bf9d, - TEST_STRS[155] => 0x2dee116d, - TEST_STRS[156] => 0xc18738da, - TEST_STRS[157] => 0x5b156176, - TEST_STRS[158] => 0x2aa7d593, - TEST_STRS[159] => 0xb2409658, - TEST_STRS[160] => 0xe1489528, - TEST_STRS[161] => 0xfe1ee07e, - TEST_STRS[162] => 0xe8842315, - TEST_STRS[163] => 0x3a6a63a2, - TEST_STRS[164] => 0x06d2c18c, - TEST_STRS[165] => 0xf8ef7225, - TEST_STRS[166] => 0x843d3300, - TEST_STRS[167] => 0xbb24f7ae, - TEST_STRS[168] => 0x878c0ec9, - TEST_STRS[169] => 0xb557810f, - TEST_STRS[170] => 0x57423246, - TEST_STRS[171] => 0x87f7505e, - TEST_STRS[172] => 0xbb809f20, - TEST_STRS[173] => 0x8932abb5, - TEST_STRS[174] => 0x0a9b3aa0, - TEST_STRS[175] => 0xb8682a24, - TEST_STRS[176] => 0xa7ac1c56, - TEST_STRS[177] => 0x11409252, - TEST_STRS[178] => 0xa987f517, - TEST_STRS[179] => 0xf309e7ed, - TEST_STRS[180] => 0xc9e8f417, - TEST_STRS[181] => 0x7f447bdd, - TEST_STRS[182] => 0xb929adc5, - TEST_STRS[183] => 0x57022879, - TEST_STRS[184] => 0xdcfd2c49, - TEST_STRS[185] => 0x6edafff5, - TEST_STRS[186] => 0xf04fb1f1, - TEST_STRS[187] => 0xfb7de8b9, - TEST_STRS[188] => 0xc5f1d7e9, - TEST_STRS[189] => 0x32c1f439, - TEST_STRS[190] => 0x7fd3eb7d, - TEST_STRS[191] => 0x81597da5, - TEST_STRS[192] => 0x05eb7a25, - TEST_STRS[193] => 0x9c0fa1b5, - TEST_STRS[194] => 0x53ccb1c5, - TEST_STRS[195] => 0xfabece15, - TEST_STRS[196] => 0x4ad745a5, - TEST_STRS[197] => 0xe5bdc495, - TEST_STRS[198] => 0x23b3c0a5, - TEST_STRS[199] => 0xfa823dd5, - TEST_STRS[200] => 0x0c6c58b9, - TEST_STRS[201] => 0xe2dbccd5, - TEST_STRS[202] => 0xdb7f50f9 -} - -FNV1_64_HASHES = { - TEST_STRS[0] => 0xcbf29ce484222325, - TEST_STRS[1] => 0xaf63bd4c8601b7be, - TEST_STRS[2] => 0xaf63bd4c8601b7bd, - TEST_STRS[3] => 0xaf63bd4c8601b7bc, - TEST_STRS[4] => 0xaf63bd4c8601b7bb, - TEST_STRS[5] => 0xaf63bd4c8601b7ba, - TEST_STRS[6] => 0xaf63bd4c8601b7b9, - TEST_STRS[7] => 0x08326207b4eb2f34, - TEST_STRS[8] => 0xd8cbc7186ba13533, - TEST_STRS[9] => 0x0378817ee2ed65cb, - TEST_STRS[10] => 0xd329d59b9963f790, - TEST_STRS[11] => 0x340d8765a4dda9c2, - TEST_STRS[12] => 0xaf63bd4c8601b7df, - TEST_STRS[13] => 0x08326707b4eb37da, - TEST_STRS[14] => 0x08326607b4eb3627, - TEST_STRS[15] => 0x08326507b4eb3474, - TEST_STRS[16] => 0x08326407b4eb32c1, - TEST_STRS[17] => 0x08326307b4eb310e, - TEST_STRS[18] => 0x08326207b4eb2f5b, - TEST_STRS[19] => 0xd8cbc7186ba1355c, - TEST_STRS[20] => 0x0378817ee2ed65a9, - TEST_STRS[21] => 0xd329d59b9963f7f1, - TEST_STRS[22] => 0x340d8765a4dda9b0, - TEST_STRS[23] => 0x50a6d3b724a774a6, - TEST_STRS[24] => 0x08326507b4eb341c, - TEST_STRS[25] => 0xd8d5c8186ba98bfb, - TEST_STRS[26] => 0x1ccefc7ef118dbef, - TEST_STRS[27] => 0x0c92fab3ad3db77a, - TEST_STRS[28] => 0x9b77794f5fdec421, - TEST_STRS[29] => 0x0ac742dfe7874433, - TEST_STRS[30] => 0xd7dad5766ad8e2de, - TEST_STRS[31] => 0xa1bb96378e897f5b, - TEST_STRS[32] => 0x5b3f9b6733a367d2, - TEST_STRS[33] => 0xb07ce25cbea969f6, - TEST_STRS[34] => 0x8d9e9997f9df0d6a, - TEST_STRS[35] => 0x838c673d9603cb7b, - TEST_STRS[36] => 0x8b5ee8a5e872c273, - TEST_STRS[37] => 0x4507c4e9fb00690c, - TEST_STRS[38] => 0x4c9ca59581b27f45, - TEST_STRS[39] => 0xe0aca20b624e4235, - TEST_STRS[40] => 0xd8d5c8186ba98b94, - TEST_STRS[41] => 0x1ccefc7ef118db81, - TEST_STRS[42] => 0x0c92fab3ad3db71d, - TEST_STRS[43] => 0x9b77794f5fdec44e, - TEST_STRS[44] => 0x0ac742dfe7874413, - TEST_STRS[45] => 0xd7dad5766ad8e2a9, - TEST_STRS[46] => 0xa1bb96378e897f3a, - TEST_STRS[47] => 0x5b3f9b6733a367a1, - TEST_STRS[48] => 0xb07ce25cbea969d6, - TEST_STRS[49] => 0x8d9e9997f9df0d02, - TEST_STRS[50] => 0x838c673d9603cb1e, - TEST_STRS[51] => 0x8b5ee8a5e872c201, - TEST_STRS[52] => 0x4507c4e9fb006969, - TEST_STRS[53] => 0x4c9ca59581b27f64, - TEST_STRS[54] => 0xe0aca20b624e423f, - TEST_STRS[55] => 0x13998e580afa800f, - TEST_STRS[56] => 0x08326507b4eb3401, - TEST_STRS[57] => 0xd8d5ad186ba95dc1, - TEST_STRS[58] => 0x1c72e17ef0ca4e97, - TEST_STRS[59] => 0x2183c1b327c38ae6, - TEST_STRS[60] => 0xb66d096c914504f2, - TEST_STRS[61] => 0x404bf57ad8476757, - TEST_STRS[62] => 0x887976bd815498bb, - TEST_STRS[63] => 0x3afd7f02c2bf85a5, - TEST_STRS[64] => 0xfc4476b0eb70177f, - TEST_STRS[65] => 0x186d2da00f77ecba, - TEST_STRS[66] => 0xf97140fa48c74066, - TEST_STRS[67] => 0xa2b1cf49aa926d37, - TEST_STRS[68] => 0x0690712cd6cf940c, - TEST_STRS[69] => 0xf7045b3102b8906e, - TEST_STRS[70] => 0xd8d5ad186ba95db3, - TEST_STRS[71] => 0x1c72e17ef0ca4ef3, - TEST_STRS[72] => 0x2183c1b327c38a95, - TEST_STRS[73] => 0xb66d096c914504d2, - TEST_STRS[74] => 0x404bf57ad8476736, - TEST_STRS[75] => 0x887976bd815498d5, - TEST_STRS[76] => 0x3afd7f02c2bf85c1, - TEST_STRS[77] => 0xfc4476b0eb70175f, - TEST_STRS[78] => 0x186d2da00f77eccd, - TEST_STRS[79] => 0xf97140fa48c7400e, - TEST_STRS[80] => 0xa2b1cf49aa926d52, - TEST_STRS[81] => 0x0690712cd6cf9475, - TEST_STRS[82] => 0xf7045b3102b89064, - TEST_STRS[83] => 0x74f762479f9d6aea, - TEST_STRS[84] => 0x08326007b4eb2b9c, - TEST_STRS[85] => 0xd8c4c9186b9b1a14, - TEST_STRS[86] => 0x7b495389bdbdd4c7, - TEST_STRS[87] => 0x3b6dba0d69908e25, - TEST_STRS[88] => 0xd6b2b17bf4b71261, - TEST_STRS[89] => 0x447bfb7f98e615b5, - TEST_STRS[90] => 0xd6b2b17bf4b71262, - TEST_STRS[91] => 0x3bd2807f93fe1660, - TEST_STRS[92] => 0xd6b2b17bf4b71263, - TEST_STRS[93] => 0x3329057f8f16170b, - TEST_STRS[94] => 0xd6b2b17bf4b71264, - TEST_STRS[95] => 0x2a7f8a7f8a2e19b6, - TEST_STRS[96] => 0x23d3767e64b2f98a, - TEST_STRS[97] => 0xff768d7e4f9d86a4, - TEST_STRS[98] => 0x23d3767e64b2f984, - TEST_STRS[99] => 0xccd1837e334e4aa6, - TEST_STRS[100] => 0x23d3767e64b2f99a, - TEST_STRS[101] => 0x7691fd7e028f6754, - TEST_STRS[102] => 0x34ad3b1041204318, - TEST_STRS[103] => 0xa29e749ea9d201c8, - TEST_STRS[104] => 0x34ad3b104120431b, - TEST_STRS[105] => 0xa29e779ea9d206e1, - TEST_STRS[106] => 0x34ad3b104120431a, - TEST_STRS[107] => 0xa29e769ea9d2052e, - TEST_STRS[108] => 0x02a17ebca4aa3497, - TEST_STRS[109] => 0x229ef18bcd375c95, - TEST_STRS[110] => 0x02a17dbca4aa32c8, - TEST_STRS[111] => 0x229b6f8bcd3449d8, - TEST_STRS[112] => 0x02a184bca4aa3ed5, - TEST_STRS[113] => 0x22b3618bcd48c3ef, - TEST_STRS[114] => 0x5c2c346706186f36, - TEST_STRS[115] => 0xb78c410f5b84f8c2, - TEST_STRS[116] => 0xed9478212b267395, - TEST_STRS[117] => 0xd9bbb55c5256662f, - TEST_STRS[118] => 0x8c54f0203249438a, - TEST_STRS[119] => 0xbd9790b5727dc37e, - TEST_STRS[120] => 0xa64e5f36c9e2b0e3, - TEST_STRS[121] => 0x8fd0680da3088a04, - TEST_STRS[122] => 0x67aad32c078284cc, - TEST_STRS[123] => 0xb37d55d81c57b331, - TEST_STRS[124] => 0x55ac0f3829057c43, - TEST_STRS[125] => 0xcb27f4b8e1b6cc20, - TEST_STRS[126] => 0x26caf88bcbef2d19, - TEST_STRS[127] => 0x8e6e063b97e61b8f, - TEST_STRS[128] => 0xb42750f7f3b7c37e, - TEST_STRS[129] => 0xf3c6ba64cf7ca99b, - TEST_STRS[130] => 0xebfb69b427ea80fe, - TEST_STRS[131] => 0x39b50c3ed970f46c, - TEST_STRS[132] => 0x5b9b177aa3eb3e8a, - TEST_STRS[133] => 0x6510063ecf4ec903, - TEST_STRS[134] => 0x2b3bbd2c00797c7a, - TEST_STRS[135] => 0xf1d6204ff5cb4aa7, - TEST_STRS[136] => 0x4836e27ccf099f38, - TEST_STRS[137] => 0x82efbb0dd073b44d, - TEST_STRS[138] => 0x4a80c282ffd7d4c6, - TEST_STRS[139] => 0x305d1a9c9ee43bdf, - TEST_STRS[140] => 0x15c366948ffc6997, - TEST_STRS[141] => 0x80153ae218916e7b, - TEST_STRS[142] => 0xfa23e2bdf9e2a9e1, - TEST_STRS[143] => 0xd47e8d8a2333c6de, - TEST_STRS[144] => 0x7e128095f688b056, - TEST_STRS[145] => 0x2f5356890efcedab, - TEST_STRS[146] => 0x95c2b383014f55c5, - TEST_STRS[147] => 0x4727a5339ce6070f, - TEST_STRS[148] => 0xb0555ecd575108e9, - TEST_STRS[149] => 0x48d785770bb4af37, - TEST_STRS[150] => 0x09d4701c12af02b1, - TEST_STRS[151] => 0x79f031e78f3cf62e, - TEST_STRS[152] => 0x52a1ee85db1b5a94, - TEST_STRS[153] => 0x6bd95b2eb37fa6b8, - TEST_STRS[154] => 0x74971b7077aef85d, - TEST_STRS[155] => 0xb4e4fae2ffcc1aad, - TEST_STRS[156] => 0x2bd48bd898b8f63a, - TEST_STRS[157] => 0xe9966ac1556257f6, - TEST_STRS[158] => 0x92a3d1cd078ba293, - TEST_STRS[159] => 0xf81175a482e20ab8, - TEST_STRS[160] => 0x5bbb3de722e73048, - TEST_STRS[161] => 0x6b4f363492b9f2be, - TEST_STRS[162] => 0xc2d559df73d59875, - TEST_STRS[163] => 0xf75f62284bc7a8c2, - TEST_STRS[164] => 0xda8dd8e116a9f1cc, - TEST_STRS[165] => 0xbdc1e6ab76057885, - TEST_STRS[166] => 0xfec6a4238a1224a0, - TEST_STRS[167] => 0xc03f40f3223e290e, - TEST_STRS[168] => 0x1ed21673466ffda9, - TEST_STRS[169] => 0xdf70f906bb0dd2af, - TEST_STRS[170] => 0xf3dcda369f2af666, - TEST_STRS[171] => 0x9ebb11573cdcebde, - TEST_STRS[172] => 0x81c72d9077fedca0, - TEST_STRS[173] => 0x0ec074a31be5fb15, - TEST_STRS[174] => 0x2a8b3280b6c48f20, - TEST_STRS[175] => 0xfd31777513309344, - TEST_STRS[176] => 0x194534a86ad006b6, - TEST_STRS[177] => 0x3be6fdf46e0cfe12, - TEST_STRS[178] => 0x017cc137a07eb057, - TEST_STRS[179] => 0x9428fc6e7d26b54d, - TEST_STRS[180] => 0x9aaa2e3603ef8ad7, - TEST_STRS[181] => 0x82c6d3f3a0ccdf7d, - TEST_STRS[182] => 0xc86eeea00cf09b65, - TEST_STRS[183] => 0x705f8189dbb58299, - TEST_STRS[184] => 0x415a7f554391ca69, - TEST_STRS[185] => 0xcfe3d49fa2bdc555, - TEST_STRS[186] => 0xf0f9c56039b25191, - TEST_STRS[187] => 0x7075cb6abd1d32d9, - TEST_STRS[188] => 0x43c94e2c8b277509, - TEST_STRS[189] => 0x3cbfd4e4ea670359, - TEST_STRS[190] => 0xc05887810f4d019d, - TEST_STRS[191] => 0x14468ff93ac22dc5, - TEST_STRS[192] => 0xebed699589d99c05, - TEST_STRS[193] => 0x6d99f6df321ca5d5, - TEST_STRS[194] => 0x0cd410d08c36d625, - TEST_STRS[195] => 0xef1b2a2c86831d35, - TEST_STRS[196] => 0x3b349c4d69ee5f05, - TEST_STRS[197] => 0x55248ce88f45f035, - TEST_STRS[198] => 0xaa69ca6a18a4c885, - TEST_STRS[199] => 0x1fe3fce62bd816b5, - TEST_STRS[200] => 0x0289a488a8df69d9, - TEST_STRS[201] => 0x15e96e1613df98b5, - TEST_STRS[202] => 0xe6be57375ad89b99 -} diff --git a/spec/unique_bitmask_spec.rb b/spec/unique_bitmask_spec.rb new file mode 100644 index 0000000..4357b5d --- /dev/null +++ b/spec/unique_bitmask_spec.rb @@ -0,0 +1,20 @@ +require "spec_helper" +require_relative "../driver/riverqueue-sequel/spec/spec_helper" + +RSpec.describe River::UniqueBitmask do + describe ".from_states" do + it "converts an array of states to a bitmask string" do + expect(described_class.from_states(River::Client.const_get(:DEFAULT_UNIQUE_STATES))).to eq("11110101") + expect(described_class.from_states([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED])).to eq("11010001") + expect(described_class.from_states([River::JOB_STATE_AVAILABLE])).to eq("00000001") + end + end + + describe ".to_states" do + it "converts a bitmask string to an array of states" do + expect(described_class.to_states(0b11110101)).to eq(River::Client.const_get(:DEFAULT_UNIQUE_STATES)) + expect(described_class.to_states(0b11010001)).to eq([River::JOB_STATE_AVAILABLE, River::JOB_STATE_PENDING, River::JOB_STATE_RUNNING, River::JOB_STATE_SCHEDULED]) + expect(described_class.to_states(0b00000001)).to eq([River::JOB_STATE_AVAILABLE]) + end + end +end