diff --git a/CHANGELOG.md b/CHANGELOG.md index f49c980..56c7005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tag format is now checked on insert. Tags should be no more than 255 characters and match the regex `/\A[\w][\w\-]+[\w]\z/`. [PR #22](https://github.com/riverqueue/riverqueue-ruby/pull/22). +- Returned jobs now have a `metadata` property. [PR #21](https://github.com/riverqueue/riverqueue-ruby/pull/22). + ## [0.4.0] - 2024-04-28 ### Changed diff --git a/docs/README.md b/docs/README.md index 61b354f..c9ca412 100644 --- a/docs/README.md +++ b/docs/README.md @@ -86,7 +86,7 @@ insert_res.unique_skipped_as_duplicated ### Custom advisory lock prefix -Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: +Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: ```ruby client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 8c329f5..3fce2ee 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -100,6 +100,7 @@ def transaction(&) finalized_at: river_job.finalized_at, kind: river_job.kind, max_attempts: river_job.max_attempts, + metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, scheduled_at: river_job.scheduled_at, diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index 5f615d1..69d99ca 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -86,6 +86,7 @@ def transaction(&) finalized_at: river_job.finalized_at, kind: river_job.kind, max_attempts: river_job.max_attempts, + metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, scheduled_at: river_job.scheduled_at, diff --git a/lib/client.rb b/lib/client.rb index 1e771ca..42dd5da 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -2,8 +2,13 @@ require "time" module River + # Default number of maximum attempts for a job. MAX_ATTEMPTS_DEFAULT = 25 + + # Default priority for a job. PRIORITY_DEFAULT = 1 + + # Default queue for a job. QUEUE_DEFAULT = "default" # Provides a client for River that inserts jobs. Unlike the Go version of the @@ -241,7 +246,7 @@ def insert_many(args) queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, scheduled_at: scheduled_at&.utc, # database defaults to now state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: insert_opts.tags || args_insert_opts.tags + tags: validate_tags(insert_opts.tags || args_insert_opts.tags) ), unique_opts ] @@ -260,6 +265,16 @@ def insert_many(args) private def uint64_to_int64(int) [int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace end + + TAG_RE = /\A[\w][\w\-]+[\w]\z/ + private_constant :TAG_RE + + private def validate_tags(tags) + tags&.each do |tag| + raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255 + raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag) + end + end end # A single job to insert that's part of an #insert_many batch insert. Unlike diff --git a/lib/job.rb b/lib/job.rb index 650fad6..bd3cf0f 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -55,15 +55,15 @@ class JobRow # The set of worker IDs that have worked this job. A worker ID differs # between different programs, but is shared by all executors within any # given one. (i.e. Different Go processes have different IDs, but IDs are - # shared within any given process.) A process generates a new ULID (an - # ordered UUID) worker ID when it starts up. + # shared within any given process.) A process generates a new ID based on + # host and current time when it starts up. attr_accessor :attempted_by # When the job record was created. attr_accessor :created_at # A set of errors that occurred when the job was worked, one for each - # attempt. Ordered from earliest error to the latest error. + # attempt. Ordered from earliest error to the latest error. attr_accessor :errors # The time at which the job was "finalized", meaning it was either completed @@ -79,6 +79,9 @@ class JobRow # for the last time and will no longer be worked. attr_accessor :max_attempts + # Arbitrary metadata associated with the job. + attr_accessor :metadata + # The priority of the job, with 1 being the highest priority and 4 being the # lowest. When fetching available jobs to work, the highest priority jobs # will always be fetched before any lower priority jobs are fetched. Note @@ -112,6 +115,7 @@ def initialize( created_at:, kind:, max_attempts:, + metadata:, priority:, queue:, scheduled_at:, @@ -134,6 +138,7 @@ def initialize( self.finalized_at = finalized_at self.kind = kind self.max_attempts = max_attempts + self.metadata = metadata self.priority = priority self.queue = queue self.scheduled_at = scheduled_at @@ -157,7 +162,7 @@ class AttemptError attr_accessor :error # Contains a stack trace from a job that panicked. The trace is produced by - # invoking `debug.Trace()`. + # invoking `debug.Trace()` in Go. attr_accessor :trace def initialize( diff --git a/sig/client.rbs b/sig/client.rbs index 40f55f0..99a7eae 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -16,9 +16,13 @@ module River def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult - private def uint64_to_int64: (Integer) -> Integer private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] private def truncate_time: (Time, Integer) -> Time + private def uint64_to_int64: (Integer) -> Integer + + TAG_RE: Regexp + + private def validate_tags: (Array[String]?) -> Array[String]? end class InsertManyParams diff --git a/sig/job.rbs b/sig/job.rbs index 126f398..fff8c3d 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -45,13 +45,14 @@ module River attr_accessor finalized_at: Time? attr_accessor kind: String attr_accessor max_attempts: Integer + attr_accessor metadata: Hash[String, untyped] attr_accessor priority: Integer attr_accessor queue: String attr_accessor scheduled_at: Time attr_accessor state: jobStateAll attr_accessor tags: Array[String]? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void end class AttemptError diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 23ba04a..7b4a7b3 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -51,6 +51,7 @@ def transaction(&) finalized_at: nil, kind: insert_params.kind, max_attempts: insert_params.max_attempts, + metadata: nil, priority: insert_params.priority, queue: insert_params.queue, scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB @@ -194,6 +195,22 @@ def to_json = nil end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") end + it "raises error if tags are too long" do + expect do + client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + tags: ["a" * 256] + )) + end.to raise_error(ArgumentError, "tags should be 255 characters or less") + end + + it "raises error if tags are misformatted" do + expect do + client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + tags: ["no,commas,allowed"] + )) + end.to raise_error(ArgumentError, 'tag should match regex /\A[\w][\w\-]+[\w]\z/') + end + def check_bigint_bounds(int) raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807 raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808