A transactional job queue built with Ecto, PostgreSQL and GenStage
- Transactional job processing
- Retries
- Scheduled jobs
- Multiple queues
- Low latency concurrent processing
- Avoid frequent database polling
- Library of functions, not a full OTP application
Add :ecto_job
to your dependencies
{:ecto_job, "~> 0.2"}
Add a migration to install the notification function and create the a job queue table:
mix ecto.gen.migration create_job_queue
defmodule MyApp.Repo.Migrations.CreateJobQueue do
use Ecto.Migration
def up do
EctoJob.Migrations.Install.up()
EctoJob.Migrations.CreateJobTable.up("jobs")
end
def down do
EctoJob.Migrations.CreateJobTable.down("jobs")
EctoJob.Migrations.Install.down()
end
end
Add a module for the queue, mix in EctoJob.JobQueue
.
This will declare an Ecto.Schema
to use with the table created in the migration, and a start_link
function allowing the worker supervision tree to be started conveniently.
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"
end
Add perform/2
function to the job queue module, this is where jobs from the queue will be dispatched.
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"
def perform(multi = %Ecto.Multi{}, job = %{}) do
... job logic here ...
end
end
Add your new JobQueue
module as a supervisor to the application supervision tree to run the worker supervisor:
def start(_type, _args) do
import Supervisor.Spec
children = [
supervisor(MyApp.Repo, []),
supervisor(MyApp.JobQueue, [[repo: MyApp.Repo, max_demand: 100]])
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
If you want to run the workers on a separate node to the enqueuers, just leave your JobQueue
module out of the supervision tree.
Jobs are Ecto schemas, with each queue backed by a different table.
A job can be inserted into the Repo directly by constructing a job with the new/2
function:
%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()
The primary benefit of EctoJob
is the ability to enqueue and process jobs transactionally.
To achieve this, a job can be added to an Ecto.Multi
, along with other application updates, using the enqueue/3
function:
Ecto.Multi.new()
|> Ecto.Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "[email protected]"}))
|> MyApp.JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"})
|> MyApp.Repo.transaction()
All jobs sent to a queue are eventually dispatched to the perform/2
function defined in the queue module.
The first argument supplied is an Ecto.Multi
which has been initialized with a delete
operation, marking the job as complete.
The Ecto.Multi
struct must be passed to the Ecto.Repo.transaction
function to complete the job, along with any other application updates.
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SendEmail", "recipient" => recipient, "body" => body}) do
multi
|> Ecto.Multi.run(:send, fn _ -> EmailService.send(recipient, body) end)
|> Ecto.Multi.insert(:stats, %EmailSendStats{recipient: recipient})
|> MyApp.Repo.transaction()
end
end
When a queue handles multiple job types, it is useful to pattern match on the job and delegate to separate modules:
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SendEmail"), do: MyApp.SendEmail.perform(multi, job)
def perform(multi = %Ecto.Multi{}, job = %{"type" => "CustomerReport"), do: MyApp.CustomerReport.perform(multi, job)
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SyncWithCRM"), do: MyApp.CRMSync.perform(multi, job)
...
end
You can customize how often the table is polled for scheduled jobs. The default is 60_000
ms.
config :ecto_job, :poll_interval, 15_000
Control both the time period the job is reserved by a waiting worker before it's started and the base retry interval for failed/timed out jobs. Keep in mind, for jobs that are expected to retry quickly, any configured base_expiry_seconds
will only retry a job as quickly as the poll_interval
. An exception to this may be the first retry, which may not be bounded similarly if the initial job trigger dispatches the first attempt mid-interval.
config :ecto_job, :base_expiry_seconds, 60
You can control whether logs are on or off and the log level. The default is true
and :info
.
config :ecto_job, log: true,
log_level: :debug
See EctoJob.Config
for configuration details.
Each job queue is represented as a PostgreSQL table and Ecto schema.
Jobs are added to the queue by inserting into the table, using Ecto.Repo.transaction
to transactionally enqueue jobs with other application updates.
A GenStage
producer responds to demand for jobs by efficiently pulling jobs from the queue in batches.
When there is insufficient jobs in the queue, the demand for jobs is buffered.
As jobs are inserted into the queue, pg_notify
notifies the producer that new work is available,
allowing the producer to dispatch jobs immediately if there is pending demand.
A GenStage
ConsumerSupervisor
subscribes to the producer, and spawns a new Task
for each job.
The callback for each job receives an Ecto.Multi
structure, pre-populated with a delete
command to remove the job from the queue.
Application code then add additional commands to the Ecto.Multi
and submit it to the
Repo
with a call to transaction
, ensuring that application updates are performed atomically with the job removal.
Scheduled jobs and Failed jobs are reactivated by polling the database once per minute.
Jobs scheduled to run at a future time start in the "SCHEDULED" state. Scheduled jobs transition to "AVAILABLE" after the scheduled time has passed.
Jobs that are intended to run immediately start in an "AVAILABLE" state.
The producer will update a batch of jobs setting the state to "RESERVED", with an expiry of 5 minutes.
Once a consumer is given a job, it increments the attempt counter and updates the state to "IN_PROGRESS", with an expiry of 5 minutes. If the job is being retried, the expiry will be 5 minutes * the attempt counter.
If successful, the consumer will delete the job from the queue. If unsuccessful, the job remains in the "IN_PROGRESS" state until it expires.
Jobs in the "RESERVED" or "IN_PROGRESS" state past the expiry time will be returned to the "AVAILABLE" state.
Expired jobs in the "IN_PROGRESS" state with attempts >= MAX_ATTEMPTS move to a "FAILED" state. Failed jobs are kept in the database so that application developers can handle the failure.