From bbbbfc0a5bd03b0982eaede3a7868a1a42bee411 Mon Sep 17 00:00:00 2001 From: Brandur Leach Date: Sat, 6 Jul 2024 12:40:24 -0700 Subject: [PATCH] Check tag format + return metadata + additional doc strings (#22) Mainly, bring in tag format checks which verify they're shorter than 255 characters and don't contain any special characters (especially commas), like Go and Python do already. Start returning metadata in jobs, although notably, it's not possible to insert with it yet. A few additional docstrings brought over from my project to document River Python. --- CHANGELOG.md | 5 +++++ docs/README.md | 2 +- driver/riverqueue-activerecord/lib/driver.rb | 1 + driver/riverqueue-sequel/lib/driver.rb | 1 + lib/client.rb | 17 ++++++++++++++++- lib/job.rb | 13 +++++++++---- sig/client.rbs | 6 +++++- sig/job.rbs | 3 ++- spec/client_spec.rb | 17 +++++++++++++++++ 9 files changed, 57 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f49c980..56c7005 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Tag format is now checked on insert. Tags should be no more than 255 characters and match the regex `/\A[\w][\w\-]+[\w]\z/`. [PR #22](https://github.com/riverqueue/riverqueue-ruby/pull/22). +- Returned jobs now have a `metadata` property. [PR #21](https://github.com/riverqueue/riverqueue-ruby/pull/22). + ## [0.4.0] - 2024-04-28 ### Changed diff --git a/docs/README.md b/docs/README.md index 61b354f..c9ca412 100644 --- a/docs/README.md +++ b/docs/README.md @@ -86,7 +86,7 @@ insert_res.unique_skipped_as_duplicated ### Custom advisory lock prefix -Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: +Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks: ```ruby client = River::Client.new(mock_driver, advisory_lock_prefix: 123456) diff --git a/driver/riverqueue-activerecord/lib/driver.rb b/driver/riverqueue-activerecord/lib/driver.rb index 8c329f5..3fce2ee 100644 --- a/driver/riverqueue-activerecord/lib/driver.rb +++ b/driver/riverqueue-activerecord/lib/driver.rb @@ -100,6 +100,7 @@ def transaction(&) finalized_at: river_job.finalized_at, kind: river_job.kind, max_attempts: river_job.max_attempts, + metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, scheduled_at: river_job.scheduled_at, diff --git a/driver/riverqueue-sequel/lib/driver.rb b/driver/riverqueue-sequel/lib/driver.rb index 5f615d1..69d99ca 100644 --- a/driver/riverqueue-sequel/lib/driver.rb +++ b/driver/riverqueue-sequel/lib/driver.rb @@ -86,6 +86,7 @@ def transaction(&) finalized_at: river_job.finalized_at, kind: river_job.kind, max_attempts: river_job.max_attempts, + metadata: river_job.metadata, priority: river_job.priority, queue: river_job.queue, scheduled_at: river_job.scheduled_at, diff --git a/lib/client.rb b/lib/client.rb index 1e771ca..42dd5da 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -2,8 +2,13 @@ require "time" module River + # Default number of maximum attempts for a job. MAX_ATTEMPTS_DEFAULT = 25 + + # Default priority for a job. PRIORITY_DEFAULT = 1 + + # Default queue for a job. QUEUE_DEFAULT = "default" # Provides a client for River that inserts jobs. Unlike the Go version of the @@ -241,7 +246,7 @@ def insert_many(args) queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT, scheduled_at: scheduled_at&.utc, # database defaults to now state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE, - tags: insert_opts.tags || args_insert_opts.tags + tags: validate_tags(insert_opts.tags || args_insert_opts.tags) ), unique_opts ] @@ -260,6 +265,16 @@ def insert_many(args) private def uint64_to_int64(int) [int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace end + + TAG_RE = /\A[\w][\w\-]+[\w]\z/ + private_constant :TAG_RE + + private def validate_tags(tags) + tags&.each do |tag| + raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255 + raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag) + end + end end # A single job to insert that's part of an #insert_many batch insert. Unlike diff --git a/lib/job.rb b/lib/job.rb index 650fad6..bd3cf0f 100644 --- a/lib/job.rb +++ b/lib/job.rb @@ -55,15 +55,15 @@ class JobRow # The set of worker IDs that have worked this job. A worker ID differs # between different programs, but is shared by all executors within any # given one. (i.e. Different Go processes have different IDs, but IDs are - # shared within any given process.) A process generates a new ULID (an - # ordered UUID) worker ID when it starts up. + # shared within any given process.) A process generates a new ID based on + # host and current time when it starts up. attr_accessor :attempted_by # When the job record was created. attr_accessor :created_at # A set of errors that occurred when the job was worked, one for each - # attempt. Ordered from earliest error to the latest error. + # attempt. Ordered from earliest error to the latest error. attr_accessor :errors # The time at which the job was "finalized", meaning it was either completed @@ -79,6 +79,9 @@ class JobRow # for the last time and will no longer be worked. attr_accessor :max_attempts + # Arbitrary metadata associated with the job. + attr_accessor :metadata + # The priority of the job, with 1 being the highest priority and 4 being the # lowest. When fetching available jobs to work, the highest priority jobs # will always be fetched before any lower priority jobs are fetched. Note @@ -112,6 +115,7 @@ def initialize( created_at:, kind:, max_attempts:, + metadata:, priority:, queue:, scheduled_at:, @@ -134,6 +138,7 @@ def initialize( self.finalized_at = finalized_at self.kind = kind self.max_attempts = max_attempts + self.metadata = metadata self.priority = priority self.queue = queue self.scheduled_at = scheduled_at @@ -157,7 +162,7 @@ class AttemptError attr_accessor :error # Contains a stack trace from a job that panicked. The trace is produced by - # invoking `debug.Trace()`. + # invoking `debug.Trace()` in Go. attr_accessor :trace def initialize( diff --git a/sig/client.rbs b/sig/client.rbs index 40f55f0..99a7eae 100644 --- a/sig/client.rbs +++ b/sig/client.rbs @@ -16,9 +16,13 @@ module River def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult - private def uint64_to_int64: (Integer) -> Integer private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?] private def truncate_time: (Time, Integer) -> Time + private def uint64_to_int64: (Integer) -> Integer + + TAG_RE: Regexp + + private def validate_tags: (Array[String]?) -> Array[String]? end class InsertManyParams diff --git a/sig/job.rbs b/sig/job.rbs index 126f398..fff8c3d 100644 --- a/sig/job.rbs +++ b/sig/job.rbs @@ -45,13 +45,14 @@ module River attr_accessor finalized_at: Time? attr_accessor kind: String attr_accessor max_attempts: Integer + attr_accessor metadata: Hash[String, untyped] attr_accessor priority: Integer attr_accessor queue: String attr_accessor scheduled_at: Time attr_accessor state: jobStateAll attr_accessor tags: Array[String]? - def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void + def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void end class AttemptError diff --git a/spec/client_spec.rb b/spec/client_spec.rb index 23ba04a..7b4a7b3 100644 --- a/spec/client_spec.rb +++ b/spec/client_spec.rb @@ -51,6 +51,7 @@ def transaction(&) finalized_at: nil, kind: insert_params.kind, max_attempts: insert_params.max_attempts, + metadata: nil, priority: insert_params.priority, queue: insert_params.queue, scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB @@ -194,6 +195,22 @@ def to_json = nil end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`") end + it "raises error if tags are too long" do + expect do + client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + tags: ["a" * 256] + )) + end.to raise_error(ArgumentError, "tags should be 255 characters or less") + end + + it "raises error if tags are misformatted" do + expect do + client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new( + tags: ["no,commas,allowed"] + )) + end.to raise_error(ArgumentError, 'tag should match regex /\A[\w][\w\-]+[\w]\z/') + end + def check_bigint_bounds(int) raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807 raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808