Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check tag format + return metadata + additional doc strings #22

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Tag format is now checked on insert. Tags should be no more than 255 characters and match the regex `/\A[\w][\w\-]+[\w]\z/`. [PR #22](https://github.com/riverqueue/riverqueue-ruby/pull/22).
- Returned jobs now have a `metadata` property. [PR #21](https://github.com/riverqueue/riverqueue-ruby/pull/22).

## [0.4.0] - 2024-04-28

### Changed
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ insert_res.unique_skipped_as_duplicated

### Custom advisory lock prefix

Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:

```ruby
client = River::Client.new(mock_driver, advisory_lock_prefix: 123456)
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-activerecord/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def transaction(&)
finalized_at: river_job.finalized_at,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
Expand Down
1 change: 1 addition & 0 deletions driver/riverqueue-sequel/lib/driver.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def transaction(&)
finalized_at: river_job.finalized_at,
kind: river_job.kind,
max_attempts: river_job.max_attempts,
metadata: river_job.metadata,
priority: river_job.priority,
queue: river_job.queue,
scheduled_at: river_job.scheduled_at,
Expand Down
17 changes: 16 additions & 1 deletion lib/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@
require "time"

module River
# Default number of maximum attempts for a job.
MAX_ATTEMPTS_DEFAULT = 25

# Default priority for a job.
PRIORITY_DEFAULT = 1

# Default queue for a job.
QUEUE_DEFAULT = "default"

# Provides a client for River that inserts jobs. Unlike the Go version of the
Expand Down Expand Up @@ -241,7 +246,7 @@ def insert_many(args)
queue: insert_opts.queue || args_insert_opts.queue || QUEUE_DEFAULT,
scheduled_at: scheduled_at&.utc, # database defaults to now
state: scheduled_at ? JOB_STATE_SCHEDULED : JOB_STATE_AVAILABLE,
tags: insert_opts.tags || args_insert_opts.tags
tags: validate_tags(insert_opts.tags || args_insert_opts.tags)
),
unique_opts
]
Expand All @@ -260,6 +265,16 @@ def insert_many(args)
private def uint64_to_int64(int)
[int].pack("Q").unpack1("q") #: Integer # rubocop:disable Layout/LeadingCommentSpace
end

TAG_RE = /\A[\w][\w\-]+[\w]\z/
private_constant :TAG_RE

private def validate_tags(tags)
tags&.each do |tag|
raise ArgumentError, "tags should be 255 characters or less" if tag.length > 255
raise ArgumentError, "tag should match regex #{TAG_RE.inspect}" unless TAG_RE.match(tag)
end
end
end

# A single job to insert that's part of an #insert_many batch insert. Unlike
Expand Down
13 changes: 9 additions & 4 deletions lib/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ class JobRow
# The set of worker IDs that have worked this job. A worker ID differs
# between different programs, but is shared by all executors within any
# given one. (i.e. Different Go processes have different IDs, but IDs are
# shared within any given process.) A process generates a new ULID (an
# ordered UUID) worker ID when it starts up.
# shared within any given process.) A process generates a new ID based on
# host and current time when it starts up.
attr_accessor :attempted_by

# When the job record was created.
attr_accessor :created_at

# A set of errors that occurred when the job was worked, one for each
# attempt. Ordered from earliest error to the latest error.
# attempt. Ordered from earliest error to the latest error.
attr_accessor :errors

# The time at which the job was "finalized", meaning it was either completed
Expand All @@ -79,6 +79,9 @@ class JobRow
# for the last time and will no longer be worked.
attr_accessor :max_attempts

# Arbitrary metadata associated with the job.
attr_accessor :metadata

# The priority of the job, with 1 being the highest priority and 4 being the
# lowest. When fetching available jobs to work, the highest priority jobs
# will always be fetched before any lower priority jobs are fetched. Note
Expand Down Expand Up @@ -112,6 +115,7 @@ def initialize(
created_at:,
kind:,
max_attempts:,
metadata:,
priority:,
queue:,
scheduled_at:,
Expand All @@ -134,6 +138,7 @@ def initialize(
self.finalized_at = finalized_at
self.kind = kind
self.max_attempts = max_attempts
self.metadata = metadata
self.priority = priority
self.queue = queue
self.scheduled_at = scheduled_at
Expand All @@ -157,7 +162,7 @@ class AttemptError
attr_accessor :error

# Contains a stack trace from a job that panicked. The trace is produced by
# invoking `debug.Trace()`.
# invoking `debug.Trace()` in Go.
attr_accessor :trace

def initialize(
Expand Down
6 changes: 5 additions & 1 deletion sig/client.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@ module River
def insert_many: (Array[jobArgs | InsertManyParams]) -> Integer

private def check_unique_job: (Driver::JobInsertParams, UniqueOpts?) { () -> InsertResult } -> InsertResult
private def uint64_to_int64: (Integer) -> Integer
private def make_insert_params: (jobArgs, InsertOpts, ?is_insert_many: bool) -> [Driver::JobInsertParams, UniqueOpts?]
private def truncate_time: (Time, Integer) -> Time
private def uint64_to_int64: (Integer) -> Integer

TAG_RE: Regexp

private def validate_tags: (Array[String]?) -> Array[String]?
end

class InsertManyParams
Expand Down
3 changes: 2 additions & 1 deletion sig/job.rbs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ module River
attr_accessor finalized_at: Time?
attr_accessor kind: String
attr_accessor max_attempts: Integer
attr_accessor metadata: Hash[String, untyped]
attr_accessor priority: Integer
attr_accessor queue: String
attr_accessor scheduled_at: Time
attr_accessor state: jobStateAll
attr_accessor tags: Array[String]?

def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
def initialize: (id: Integer, args: Hash[String, untyped], attempt: Integer, ?attempted_at: Time?, ?attempted_by: String?, created_at: Time, ?errors: Array[AttemptError]?, ?finalized_at: Time?, kind: String, max_attempts: Integer, metadata: Hash[String, untyped], priority: Integer, queue: String, scheduled_at: Time, state: jobStateAll, ?tags: Array[String]?) -> void
end

class AttemptError
Expand Down
17 changes: 17 additions & 0 deletions spec/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def transaction(&)
finalized_at: nil,
kind: insert_params.kind,
max_attempts: insert_params.max_attempts,
metadata: nil,
priority: insert_params.priority,
queue: insert_params.queue,
scheduled_at: insert_params.scheduled_at || Time.now, # normally defaults from DB
Expand Down Expand Up @@ -194,6 +195,22 @@ def to_json = nil
end.to raise_error(RuntimeError, "args should return non-nil from `#to_json`")
end

it "raises error if tags are too long" do
expect do
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
tags: ["a" * 256]
))
end.to raise_error(ArgumentError, "tags should be 255 characters or less")
end

it "raises error if tags are misformatted" do
expect do
client.insert(SimpleArgs.new(job_num: 1), insert_opts: River::InsertOpts.new(
tags: ["no,commas,allowed"]
))
end.to raise_error(ArgumentError, 'tag should match regex /\A[\w][\w\-]+[\w]\z/')
end

def check_bigint_bounds(int)
raise "lock key shouldn't be larger than Postgres bigint max (9223372036854775807); was: #{int}" if int > 9223372036854775807
raise "lock key shouldn't be smaller than Postgres bigint min (-9223372036854775808); was: #{int}" if int < -9223372036854775808
Expand Down
Loading