-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
32 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,8 +48,8 @@ defmodule MyApp.Repo.Migrations.CreateJobQueue do | |
end | ||
``` | ||
|
||
Add a module for the queue. This will declare an `Ecto.Schema` to use with the table created in the migration and a | ||
start_link function allowing the supervisor to be started conveniently. | ||
Add a module for the queue, mix in `EctoJob.JobQueue`. | ||
This will declare an `Ecto.Schema` to use with the table created in the migration, and a `start_link` function allowing the worker supervision tree to be started conveniently. | ||
|
||
```elixir | ||
defmodule MyApp.JobQueue do | ||
|
@@ -69,7 +69,7 @@ defmodule MyApp.JobQueue do | |
end | ||
``` | ||
|
||
Add your new JobQueue module as a supervisor to the application supervision tree: | ||
Add your new `JobQueue` module as a supervisor to the application supervision tree to run the worker supervisor: | ||
|
||
```elixir | ||
def start(_type, _args) do | ||
|
@@ -83,12 +83,37 @@ def start(_type, _args) do | |
opts = [strategy: :one_for_one, name: MyApp.Supervisor] | ||
Supervisor.start_link(children, opts) | ||
end | ||
|
||
``` | ||
|
||
If you want to run the workers on a separate node to the enqueuers, just leave your `JobQueue` module out of the supervision tree. | ||
|
||
## Usage | ||
|
||
All jobs sent to a queue are eventually dispatched to the queue modules `perform/2` function. | ||
### Enqueueing jobs | ||
|
||
Jobs are Ecto schemas, with each queue backed by a different table. | ||
A job can be inserted into the Repo directly by constructing a job with the `new/2` function: | ||
|
||
```elixir | ||
%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"} | ||
|> MyApp.JobQueue.new() | ||
|> MyApp.Repo.insert() | ||
``` | ||
|
||
The primary benefit of `EctoJob` is the ability to enqueue and process jobs transactionally. | ||
To achieve this, a job can be added to an `Ecto.Multi`, along with other application updates, using the `enqueue/3` function: | ||
|
||
```elixir | ||
Ecto.Multi.new() | ||
|> Ecto.Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "[email protected]"})) | ||
|> MyApp.JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}) | ||
|> MyApp.Repo.transaction() | ||
``` | ||
|
||
### Handling Jobs | ||
|
||
All jobs sent to a queue are eventually dispatched to the `perform/2` function defined in the queue module. | ||
The first argument supplied is an `Ecto.Multi` which has been initialized with a `delete` operation, marking the job as complete. | ||
The `Ecto.Multi` struct must be passed to the `Ecto.Repo.transaction` function to complete the job, along with any other application updates. | ||
|
||
```elixir | ||
|
@@ -104,36 +129,19 @@ defmodule MyApp.JobQueue do | |
end | ||
``` | ||
|
||
When a queue handles multiple job types, it is useful to pattern match on the job and delegate to a separate module: | ||
When a queue handles multiple job types, it is useful to pattern match on the job and delegate to separate modules: | ||
|
||
```elixir | ||
defmodule MyApp.JobQueue do | ||
use EctoJob.JobQueue, table_name: "jobs" | ||
|
||
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SendEmail"), do: MyApp.SendEmail.perform(multi, job) | ||
def perform(multi = %Ecto.Multi{}, job = %{"type" => "CustomerReport"), do: MyApp.CustomerReport.perform(multi, job) | ||
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SyncWithCRM"), do: MyApp.CRMSync.run(multi, job) | ||
def perform(multi = %Ecto.Multi{}, job = %{"type" => "SyncWithCRM"), do: MyApp.CRMSync.perform(multi, job) | ||
... | ||
end | ||
``` | ||
|
||
Enqueue jobs: | ||
|
||
Directly: | ||
```elixir | ||
%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"} | ||
|> MyApp.JobQueue.new() | ||
|> MyApp.Repo.insert() | ||
``` | ||
|
||
As part of a Multi: | ||
```elixir | ||
Ecto.Multi.new() | ||
|> Ecto.Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "[email protected]"})) | ||
|> MyApp.JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}) | ||
|> MyApp.Repo.transaction() | ||
``` | ||
|
||
|
||
## How it works | ||
|
||
|