Releases: oban-bg/oban
v2.9.0
Optionally Use Meta for Unique Jobs
It's now possible to use the meta
field for unique jobs. Unique jobs have always supported worker
, queue
, and args
fields. That was flexible, but forced applications to put ad-hoc unique values in args
when they should really be in meta
.
The meta
field supports keys
, just like args
. That makes it possible to use highly efficient fingerprint style uniqueness (and possibly drop the index on args
, if desired).
Here's an example of using a single "fingerprint" key in meta
for uniqueness:
defmodule MyApp.FingerprintWorker do
use Oban.Worker, unique: [fields: [:worker, :meta], keys: [:fingerprint]]
@impl Worker
def new(args, opts) do
fingerprint = :erlang.phash2(args)
super(args, Keyword.put(opts, :meta, %{fingerprint: fingerprint}))
end
end
For backward compatiblity meta
isn't included in unique fields
by default.
Expanded Start and Scale Options
After extensive refactoring to queue option management and validation, now it's possible to start and scale queues with all supported options. Previously start/stop functions only supported the limit
option for dynamic scaling, reducing runtime flexibility considerably.
Now it's possible to start a queue in the paused state:
Oban.start_queue(queue: :dynamic, paused: true)
Even better, for apps that use an alternative engine like the SmartEngine from Oban Pro, it's possible to start a dynamic queue with options like global concurrency or rate limiting:
Oban.start_queue(queue: :dynamic, local_limit: 10, global_limit: 50)
All options are also passed through scale_queue
, locally or globally, even allowing you to reconfigure a feature like rate limiting at runtime:
Oban.scale_queue(queue: :dynamic, rate_limit: [allowed: 50, period: 60])
Added
-
[Oban] Add
Oban.cancel_all_jobs/1,2
to cancel multiple jobs at once, within an atomic transaction. The function accepts aJob
query for complete control over which jobs are cancelled. -
[Oban] Add
Oban.retry_all_jobs/1,2
to retry multiple jobs at once, within an atomic transaction. Likecancel_all_jobs
, it accepts a query for fine-grained control. -
[Oban] Add
with_limit
option todrain_queue/2
, which controls the number of jobs that are fetched and executed concurrently. When paired withwith_recursion
this can drastically speed up interdependent job draining, i.e. workflows. -
[Oban.Telemetry] Add telemetry span events for all engine and notifier actions. Now all database operations are covered by spans.
-
[Oban.Migrations] Add
create_schema
option to prevent automatic schema creation in migrations.
Changed
-
[Oban] Consistently include a
:snoozed
count indrain_queue/2
output. Previously the count was only included when there was at least one snoozed job. -
[Oban.Testing] Default to
attempt: 1
forperform_job/3
, as a worker'sperform/1
would never be called withattempt: 0
.
Fixed
-
[Oban.Queue.Supervisor] Change supervisor strategy to
:one_for_all
.Queue supervisors used a
:rest_for_one
strategy, which allowed the task supervisor to keep running when a producer crashed. That allowed duplicate long-lived jobs to run simultaneously, which is a bug in itself, but could also causeattempt > max_attempts
violations. -
[Oban.Plugins.Cron] Start step ranges from the minimum value, rather than for the entire set. Now the range
8-23/4
correctly includes[8, 12, 16, 20]
. -
[Oban.Plugins.Cron] Correcly parse step ranges with a single value, e.g.
0 1/2 * * *
-
[Oban.Telemetry] Comply with
:telemetry.span/3
by exposing errors asreason
in metadata
v2.8.0
Time Unit Scheduling
It's now possible to specify a unit for :schedule_in
, rather than always assuming seconds. This makes it possible to schedule a job using clearer minutes, hours, days, or weeks, e.g. schedule_in: {1, :minute}
or schedule_in: {3, :days}
.
Changed
-
[Oban.Testing] Accept non-map args to
perform_job/3
for compatibility with
overriddenWorker.new/2
callbacks. -
[Oban.Queue.Producer] Include some jitter when scheduling queue refreshes to prevent queues from refreshing simultaneously. In extreme cases, refresh contention could cause producers to crash.
Fixed
-
[Oban.Queue.Executor] Restore logged warnings for unexpected job results by retaining the
safe
flag during normal execution. -
[Oban.Plugins.Gossip] Catch and discard unexpected messages rather than crashing the plugin.
-
[Oban.Testing] Eliminate dialyzer warnings by including
repo
option in theOban.Testing.perform_job/3
spec.
v2.7.2
Fixed
-
[Oban.Plugins.Pruner] Consider
cancelled_at
ordiscarded_at
timestamps when querying prunable jobs. The previous query required anattempted_at
value, even forcancelled
ordiscarded
jobs. If a job was cancelled before it was attempted then it wouldn't ever be pruned. -
[Oban.Plugins.Gossip] Correct exit handling during safe checks. Occasionally, producer checks time out and the previous
catch
block didn't handle exits properly.
v2.7.1
v2.7.0
Pluggable Notifier
The PubSub functionality Oban relies on for starting, stopping, scaling, and pausing queues is now pluggable. The previous notifier was hard-coded to use Postgres for PubSub via LISTEN/NOTIFY
. Unfortunately, LISTEN/NOTIFY
doesn't work for PG Bouncer in "Transaction Mode." While relatively few users run in "Transaction Mode," it is increasingly common and deserved a work-around.
Oban.Notifier
defines a minimal behaviour and Oban ships with a default implementation, Oban.PostgresNotifier
, based on the old LISTEN/NOTIFY
module. You can specify a different notifier in your Oban configuration:
config :my_app, Oban,
notifier: MyApp.CustomNotifier,
repo: MyApp.Repo,
...
An alternate pg/pg2
based notifier is available in Oban Pro.
Replacing Opts on Unique Conflict
The replace_args
option for updating args on unique conflict is replaced with a highly flexible replace
option. Using replace
, you can update any job field when there is a conflict. Replace works for any of the following fields: args
, max_attempts
, meta
, priority
, queue
, scheduled_at
, tags
, worker
.
For example, to change the priority
to 0 and increase max_attempts
to 5 when there is a conflict:
BusinessWorker.new(
args,
max_attempts: 5,
priority: 0,
replace: [:max_attempts, :priority]
)
Another example is bumping the scheduled time to 1 second in the future on conflict. Either scheduled_at
or schedule_in
values will work, but the replace option is always scheduled_at
.
UrgentWorker.new(args, schedule_in: 1, replace: [:scheduled_at])
Added
-
[Oban.Job] A new
conflict?
field indicates whether a unique job conflicted with an existing job on insert. -
[Oban.Telemetry] Always populate the
unsaved_error
field forerror
ordiscard
events. Previously, the field was empty fordiscard
events. -
[Oban.Queue.Engine] Define a
cancel_job/2
callback in the engine and move cancellation from the query module to theBasicEngine
.
Changed
-
[Oban.Testing] Thanks to a slight refactoring, the
perform_job/3
helper now emits the same telemetry events that you'd have in production. The refactoring also exposed and fixed an inconsistency around invalid snooze handlers. -
[Oban.Testing] Expose
Testing.all_enqueued/0
, an alternate clause that doesn't require any options. This new clause makes it possible to check for any enqueued jobs without filters. -
[Oban.Plugins.Stager] Limit the number of jobs staged at one time to prevent timeout errors while staging a massive number of jobs.
Fixed
- [Oban.Repo] Respect ongoing transactions when using
get_dynamic_repo
. Prior to this fix, calls that useOban.Repo
, such asOban.insert/2
, could use a different repo than the one used in the current transaction.
v2.6.1
v2.6.0
Pluggable Queue Engines
Queue producers now use an "engine" callback module to manage demand and work
with the database. The engine provides a clean way to expand and enhance the
functionality of queue producers without modifying the solid foundation of queue
supervision trees. Engines make enhanced functionality such as global
concurrency, local rate limiting and global rate limiting possible!
The BasicEngine
is the default, and it retains the exact functionality of
previous Oban versions. To specify the engine explicitly, or swap in an
alternate engine, set it in your configuration:
config :my_app, Oban,
engine: Oban.Queue.BasicEngine,
...
See the v2.6 upgrade guide for instructions on swapping in an alternate
engine.
Recursive Queue Draining
The ever-handy Oban.drain_queue/1,2
function gained a new with_recursion
option, which makes it possible to test jobs that insert more jobs when they
execute. When with_recursion
is enabled the queue will keep executing until no
jobs are available. It even composes with with_scheduled
!
In practice, this is especially useful for testing dependent workflows.
Gossip Plugin for Queue Monitoring
The new gossip plugin uses PubSub to efficiently exchange queue state
information between all interested nodes. This allows Oban instances to
broadcast state information regardless of which engine they are using, and
without storing anything in the database.
See the v2.6 upgrade guide for details on switching to the gossip
plugin.
Added
-
[Oban.Job] Inject the current conf into the jobs struct as virtual field,
making the complete conf available withinperform/1
. -
[Oban.Notifier] Add
unlisten/2
, used to stop a process from receiving
notifications for one or more channels.
Changed
-
[Oban.Telemetry] Stop emitting circuit events for queue producers. As
producers no longer poll, the circuit breaker masked real errors more than it
guarded against sporatic issues. -
[Oban.Telemetry] Delay
[:oban, :supervisor, :init]
event until the complete
supervision tree finishes initialization. -
[Oban.Migration] Stop creating an
oban_beats
table entirely.
Fixed
- [Oban.Plugins.Cron] Prevent schedule overflow right before midnight
v2.5.0
Top of the Minute Cron
Rather than scheduling cron evaluation 60 seconds after the server starts, evaluation is now scheduled at the top of the next minute. This yields several improvements:
- More predictable timing for cron jobs now that they are inserted at the top of the minute. Note that jobs may execute later depending on how busy queues are.
- Minimize contention between servers inserting jobs, thanks to the transaction lock acquired by each plugin.
- Prevent duplicate inserts for jobs that omit the
completed
state (when server start time is staggered the transaction lock has no effect)
Repeater Plugin for Transaction Pooled Databases
Environments that can't make use of PG notifications, i.e. because they use PgBouncer with transaction pooling, won't process available jobs reliably. The new Repeater
plugin provides a workaround that simulates polling functionality for producers.
Simply include the Repeater
in your plugin list:
config :my_app, Oban,
plugins: [
Oban.Plugins.Pruner,
Oban.Plugins.Stager,
Oban.Plugins.Repeater
],
...
Include Perform Result in Job Telemetry Metadata
The metadata for [:oban, :job, :stop]
telemetry events now include the job's perform/1
return value. That makes it possible to extract job output from other processes:
def handle_event([:oban, :job, :stop], _timing, meta, _opts) do
IO.inspect(meta.result, label: "return from #{meta.job.worker}")
:ok
end
Added
-
[Oban] Support a changeset function in addition to a changeset in
insert_all/4
. Inserting jobs within a multi is even more powerful. -
[Oban.Queue.Executor] Stash a timed job's pid to enable inner process messaging, notably via telemetry events.
Changed
-
[Oban] Upgrade minimum Elixir version from 1.8 to 1.9.
-
[Oban.Plugins.Cron] Individually validate crontab workers and options, providing more descriptive errors at the collection and entry level.
-
[Oban] Simplify the job cancelling flow for consistency. Due to race conditions it was always possible for a job to complete before it was cancelled, now that flow is much simpler.
-
[Oban.Queue.Producer] Replace dispatch cooldown with a simpler debounce mechanism.
-
[Oban.Plugins.Stager] Limit the number of notify events to one per queue, rather than one per available job.
Fixed
-
[Oban.Queue.Producer] Handle unclean exits without an error and stack, which prevents "zombie" jobs. This would occur after multiple hot upgrades when the worker module changed, as the VM would forcibly terminate any processes running the old modules.
-
[Oban] Specify that the
changeset_wrapper
type allows keys other than:changesets
, fixing a dialyzer type mismatch.
Removed
- [Oban] Remove the undocumented
version/0
function in favor of using the standardApplication.spec/2