Skip to content

Commit

Permalink
feat: Add job priority (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramondelemos authored Jul 14, 2019
1 parent 2b0fd50 commit 9d5dea8
Show file tree
Hide file tree
Showing 21 changed files with 507 additions and 4 deletions.
44 changes: 43 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ A transactional job queue built with Ecto, PostgreSQL and GenStage
Add `:ecto_job` to your `dependencies`

```elixir
{:ecto_job, "~> 2.1"}
{:ecto_job, "~> 3.0"}
```

## Installation
Expand All @@ -51,6 +51,28 @@ defmodule MyApp.Repo.Migrations.CreateJobQueue do
end
```

### Upgrading pre-existent Ecto Job to 3.x.x version

To upgrade your project to 3.x.x version of ecto job you must add a migration to update the pre-existent job queue tables:

```
mix ecto.gen.migration update_job_queue
```

```elixir
defmodule MyApp.Repo.Migrations.UpdateJobQueue do
use Ecto.Migration

def up do
EctoJob.Migrations.AddPriorityToJobTable.up("jobs")
end

def down do
EctoJob.Migrations.AddPriorityToJobTable.down("jobs")
end
end
```

Add a module for the queue, mix in `EctoJob.JobQueue`.
This will declare an `Ecto.Schema` to use with the table created in the migration, and a `start_link` function allowing the worker supervision tree to be started conveniently.

Expand Down Expand Up @@ -103,6 +125,26 @@ A job can be inserted into the Repo directly by constructing a job with the `new
|> MyApp.Repo.insert()
```

A job can be inserted with optional params:

- `:schedule` : runs the job at the given `%DateTime{}`. The default value is `DateTime.utc_now()`.
- `:max_attempts` : the maximum attempts for this job. The default value is `5`.
- `:priority` : the priority of this work, the default value is `0`. Increase this value to decrease priority.

```elixir
%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(max_attempts: 10)
|> MyApp.Repo.insert()

%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(priority: 1)
|> MyApp.Repo.insert()

%{"type" => "SendEmail", "address" => "[email protected]", "body" => "Welcome!"}
|> MyApp.JobQueue.new(priority: 2, max_attempts: 2)
|> MyApp.Repo.insert()
```

The primary benefit of `EctoJob` is the ability to enqueue and process jobs transactionally.
To achieve this, a job can be added to an `Ecto.Multi`, along with other application updates, using the `enqueue/3` function:

Expand Down
4 changes: 4 additions & 0 deletions examples/ecto_job_priority_demo/.formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
24 changes: 24 additions & 0 deletions examples/ecto_job_priority_demo/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
ecto_job_priority_demo-*.tar

8 changes: 8 additions & 0 deletions examples/ecto_job_priority_demo/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
start_db:
docker-compose -f docker-compose-test.yml up -d

migrate:
mix do ecto.create, ecto.migrate

run:
iex -S mix
38 changes: 38 additions & 0 deletions examples/ecto_job_priority_demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# EctoJobPriorityDemo

This demo application shows EctoJob in action with different priorities in the same queue.

## How it works

There are three GenServers adding the same quantity of Ecto Jobs with different priorities after every 5 seconds. The Job with higher priority takes more time to execute than the others to make clear that the priority is respected.

```elixir
high_priority = 100 - (0 * 50) = 100 |> Process.sleep()
regular_priority = 100 - (1 * 50) = 50 |> Process.sleep()
low_priority = 100 - (2 * 50) = 0 |> Process.sleep()
```

The default value of priority is `0`. To decrease the priority you must increase its value.

As you will see even the faster jobs are executed later if configured with low priority.

### Setup Postgresql

To start up the docker-compose postgresql service:
```bash
make start_db
```

### Setup Database

To run the project migration:
```bash
make migrate
```

### Running the application

To run the project:
```bash
make run
```
20 changes: 20 additions & 0 deletions examples/ecto_job_priority_demo/config/config.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# This file is responsible for configuring your application
# and its dependencies with the aid of the Mix.Config module.
use Mix.Config

config :logger, :level, :info

config :ecto_job_priority_demo, ecto_repos: [EctoJobPriorityDemo.Repo]

config :ecto_job_priority_demo, EctoJobPriorityDemo.Repo,
adapter: Ecto.Adapters.Postgres,
username: "postgres",
password: "password",
database: "ecto_job_test",
hostname: "localhost",
pool_size: 30

config :ecto_job,
repo: EctoJobPriorityDemo.Repo,
always_dispatch_jobs_on_poll: true,
log: false
11 changes: 11 additions & 0 deletions examples/ecto_job_priority_demo/docker-compose-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: '2'

services:
ecto-job-test:
image: postgres:9.6-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=ecto_job_test
109 changes: 109 additions & 0 deletions examples/ecto_job_priority_demo/lib/ecto_job_priority_demo.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
defmodule EctoJobPriorityDemo do
@moduledoc false
use GenServer

alias EctoJobPriorityDemo.JobMonitor
alias EctoJobPriorityDemo.JobQueue
alias EctoJobPriorityDemo.Repo

require Logger

@high_priority 0
@regular_priority 1
@low_priority 2

def start_link(jobs \\ %{}) do
GenServer.start_link(__MODULE__, jobs, name: __MODULE__)
end

def init(_) do
period = 5000
count = 500

{:ok, low_priority} =
JobMonitor.start_link(
%{count: count, priority: @low_priority, period: period},
:low_priority
)

{:ok, regular_priority} =
JobMonitor.start_link(
%{count: count, priority: @regular_priority, period: period},
:regular_priority
)

{:ok, high_priority} =
JobMonitor.start_link(
%{count: count, priority: @high_priority, period: period},
:high_priority
)

state = %{
high_priority: high_priority,
regular_priority: regular_priority,
low_priority: low_priority
}

Logger.info("Jobs monitor started")

send(self(), {:notify_jobs})
{:ok, state}
end

def resolve(priority) do
GenServer.cast(__MODULE__, {:resolve, priority})
end

# Server

def handle_info({:notify_jobs}, state) do
%{
high_priority: high_priority,
regular_priority: regular_priority,
low_priority: low_priority
} = state

high_priority_value =
high_priority
|> JobMonitor.count()

regular_priority_value =
regular_priority
|> JobMonitor.count()

low_priority_value =
low_priority
|> JobMonitor.count()

Logger.info(
"high_priority: #{inspect(high_priority_value)}, regular_priority: #{
inspect(regular_priority_value)
}, low_priority: #{inspect(low_priority_value)}"
)

Process.send_after(self(), {:notify_jobs}, 100)
{:noreply, state}
end

def handle_cast({:resolve, priority}, state) do
state
|> update_jobs(priority, -1)

{:noreply, state}
end

defp update_jobs(%{high_priority: high_priority}, @high_priority, value) do
high_priority
|> JobMonitor.update(value)
end

defp update_jobs(%{regular_priority: regular_priority}, @regular_priority, value) do
regular_priority
|> JobMonitor.update(value)
end

defp update_jobs(%{low_priority: low_priority}, @low_priority, value) do
low_priority
|> JobMonitor.update(value)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule EctoJobPriorityDemo.Application do
# See https://hexdocs.pm/elixir/Application.html
# for more information on OTP Applications
@moduledoc false

use Application

def start(_type, _args) do
# List all child processes to be supervised
children = [
{EctoJobPriorityDemo.Repo, []},
{EctoJobPriorityDemo.JobQueue, [repo: EctoJobPriorityDemo.Repo, max_demand: 100]},
{EctoJobPriorityDemo, %{}}
]

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: EctoJobPriorityDemo.Supervisor]
Supervisor.start_link(children, opts)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule EctoJobPriorityDemo.JobMonitor do
@moduledoc false
use GenServer

alias EctoJobPriorityDemo.JobQueue
alias EctoJobPriorityDemo.Repo

def start_link(jobs \\ %{count: 1, priority: 0, period: 1000}, server) do
GenServer.start_link(__MODULE__, jobs, name: server)
end

def init(%{count: count, priority: priority, period: period}) do
send(self(), {:produce_jobs, count, priority, period})
{:ok, 0}
end

def update(server, value) do
GenServer.cast(server, {:update, value})
end

def count(server) do
GenServer.call(server, :count)
end

# Server

def handle_cast({:update, value}, state) do
{:noreply, state + value}
end

def handle_call(:count, _from, state) do
{:reply, state, state}
end

def handle_info({:produce_jobs, count, priority, period}, state) do
jobs =
Enum.map(1..count, fn _ ->
%{
state: "AVAILABLE",
expires: nil,
schedule: DateTime.utc_now(),
attempt: 0,
max_attempts: 5,
params: %{priority: priority},
notify: nil,
priority: priority,
updated_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second),
inserted_at: NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
}
end)

JobQueue
|> Repo.insert_all(jobs)

Process.send_after(self(), {:produce_jobs, count, priority, period}, period)

{:noreply, state + count}
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule EctoJobPriorityDemo.JobQueue do
use EctoJob.JobQueue, table_name: "jobs"

def perform(multi, %{"priority" => priority}) do
multi
|> Ecto.Multi.run(:resolve, fn _repo, _changes ->
make_work_heavy(priority)
{:ok, EctoJobPriorityDemo.resolve(priority)}
end)
|> EctoJobPriorityDemo.Repo.transaction()
end

@doc """
This method make high priority jobs heavy
high_priority = 100 - (0 * 50) = 100
regular_priority = 100 - (1 * 50) = 50
low_priority = 100 - (2 * 50) = 0
"""
defp make_work_heavy(priority) do
(100 - priority * 50)
|> Process.sleep()
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
defmodule EctoJobPriorityDemo.Repo do
use Ecto.Repo, otp_app: :ecto_job_priority_demo, adapter: Ecto.Adapters.Postgres
end
Loading

0 comments on commit 9d5dea8

Please sign in to comment.