Skip to content

Commit

Permalink
Merge pull request #76 from mbuhot/lukyanov-params-as-erlang-term
Browse files Browse the repository at this point in the history
Lukyanov params as erlang term
  • Loading branch information
jeanparpaillon authored Sep 11, 2020
2 parents c68300d + 6bf85a3 commit 6e26eea
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 32 deletions.
34 changes: 34 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
# 3.1.0

Version 3.1.0 adds support for job params as an Elixir/Erlang term.
You can insert any arbitrary Elixir/Erlang term into the queue:

```elixir
{"SendEmail", "[email protected]", "Welcome!"}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()
```

You should use the option `:params_type` when defining your queue module:

```elixir
defmodule MyJobQueue do
use EctoJob.JobQueue, table_name: "jobs", params_type: :binary
# ...
end
```

Possible values of the option are: `:map` (default) and `:binary` (for storing Elixir/Erlang terms).

You should use the same option when setting up the migration:

```elixir
@ecto_job_version 3

def up do
EctoJob.Migrations.Install.up()
EctoJob.Migrations.up("jobs", version: @ecto_job_version, params_type: :binary)
end
```


# 3.0.0

Version 3.0 adds support for prioritizing jobs within each job queue.
Expand Down
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ It is compatible with Postgresql and Mysql with a major difference:
Add `:ecto_job` to your `dependencies`

```elixir
{:ecto_job, "~> 3.0"}
{:ecto_job, "~> 3.1"}
```

## Installation
Expand Down Expand Up @@ -58,6 +58,17 @@ defmodule MyApp.Repo.Migrations.CreateJobQueue do
end
```

By default, a job holds a map of arbitrary data (which corresponds to a `jsonb` field in the table).
If you want to store an arbitrary Elixir/Erlang term in the job (`bytea` in the table),
you can set up the `params_type` option:

```
def up do
EctoJob.Migrations.Install.up()
EctoJob.Migrations.CreateJobTable.up("jobs", version: @ecto_job_version, params_type: :binary)
end
```

### Compatibility

`EctoJob` leverages specific PostgreSQL features, like notification mechanism
Expand Down Expand Up @@ -91,6 +102,8 @@ defmodule MyApp.Repo.Migrations.UpdateJobQueue do
end
```



Add a module for the queue, mix in `EctoJob.JobQueue`.
This will declare an `Ecto.Schema` to use with the table created in the migration, and a `start_link` function allowing the worker supervision tree to be started conveniently.

Expand All @@ -100,6 +113,14 @@ defmodule MyApp.JobQueue do
end
```

For jobs being Elixir/Erlang terms, you should add the `:params_type` option:

```elixir
defmodule MyApp.JobQueue do
use EctoJob.JobQueue, table_name: "jobs", params_type: :binary
end
```

Add `perform/2` function to the job queue module, this is where jobs from the queue will be dispatched.

```elixir
Expand Down Expand Up @@ -143,6 +164,22 @@ A job can be inserted into the Repo directly by constructing a job with the `new
|> MyApp.Repo.insert()
```

For inserting any arbitrary Elixir/Erlang term:

```elixir
{"SendEmail", "[email protected]", "Welcome!"}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()
```

or

```elixir
|> %MyStruct{}
|> MyApp.JobQueue.new()
|> MyApp.Repo.insert()
```

A job can be inserted with optional params:

- `:schedule` : runs the job at the given `%DateTime{}`. The default value is `DateTime.utc_now()`.
Expand Down
64 changes: 42 additions & 22 deletions lib/ecto_job/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ defmodule EctoJob.JobQueue do
schedule: DateTime.t() | nil,
attempt: integer,
max_attempts: integer | nil,
params: map(),
params: map() | any(),
notify: String.t() | nil,
priority: integer,
inserted_at: DateTime.t() | nil,
Expand All @@ -71,7 +71,7 @@ defmodule EctoJob.JobQueue do
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)
"""
@callback perform(multi :: Multi.t(), params :: map) ::
@callback perform(multi :: Multi.t(), params :: map() | any()) ::
{:ok, any()}
| {:error, any()}
| {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}}
Expand All @@ -80,11 +80,13 @@ defmodule EctoJob.JobQueue do
table_name = Keyword.fetch!(opts, :table_name)
schema_prefix = Keyword.get(opts, :schema_prefix)
timestamps_opts = Keyword.get(opts, :timestamps_opts)
params_type = Keyword.get(opts, :params_type, :map)

quote bind_quoted: [
table_name: table_name,
schema_prefix: schema_prefix,
timestamps_opts: timestamps_opts
timestamps_opts: timestamps_opts,
params_type: params_type
] do
use Ecto.Schema
@behaviour EctoJob.JobQueue
Expand All @@ -108,8 +110,8 @@ defmodule EctoJob.JobQueue do
field(:attempt, :integer)
# Maximum attempts before this job is FAILED
field(:max_attempts, :integer)
# Job params, serialized as JSONB
field(:params, :map)
# Job params, serialized as JSONB or Elixir/Erlang term
field(:params, params_type)
# Payload used to notify that job has completed
field(:notify, :string)
# Used to prioritize the job execution
Expand Down Expand Up @@ -149,7 +151,7 @@ defmodule EctoJob.JobQueue do
@doc """
Create a new `#{__MODULE__}` instance with the given job params.
Params will be serialized as JSON, so
Params will be serialized as JSON or Elixir/Erlang term, so
Options:
Expand All @@ -158,15 +160,15 @@ defmodule EctoJob.JobQueue do
- `:priority` (integer): lower numbers run first; default is 0
- `:notify` (string): payload to use for Postgres notification upon job completion
"""
@spec new(map, Keyword.t()) :: EctoJob.JobQueue.job()
def new(params = %{}, opts \\ []) do
@spec new(map() | any(), Keyword.t()) :: EctoJob.JobQueue.job()
def new(params, opts \\ []) do
%__MODULE__{
state: if(opts[:schedule], do: "SCHEDULED", else: "AVAILABLE"),
expires: nil,
schedule: Keyword.get(opts, :schedule, DateTime.utc_now()),
attempt: 0,
max_attempts: opts[:max_attempts],
params: params,
params: serialize_params(params, unquote(params_type)),
notify: opts[:notify],
priority: Keyword.get(opts, :priority, 0)
}
Expand Down Expand Up @@ -210,6 +212,10 @@ defmodule EctoJob.JobQueue do
end

def requeue(_, _, _), do: {:error, :non_failed_job}

@spec serialize_params(map() | any(), atom()) :: map() | binary()
defp serialize_params(params, :binary), do: :erlang.term_to_binary(params)
defp serialize_params(params, :map), do: params
end
end

Expand Down Expand Up @@ -331,7 +337,7 @@ defmodule EctoJob.JobQueue do
def update_job_in_progress(repo, job, now, timeout_ms) do
case do_update_job_in_progress(repo.__adapter__(), repo, job, now, timeout_ms) do
{0, _} -> {:error, :expired}
{1, [job]} -> {:ok, job}
{1, [job]} -> {:ok, deserialize_job_params(job)}
end
end

Expand Down Expand Up @@ -359,7 +365,7 @@ defmodule EctoJob.JobQueue do

{1, [job]} ->
notify_failed(repo, job, updates)
{:ok, job}
{:ok, deserialize_job_params(job)}
end
end

Expand Down Expand Up @@ -521,13 +527,16 @@ defmodule EctoJob.JobQueue do
now = %DateTime{},
timeout_ms
) do
schema
|> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand))
|> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id)
|> Query.select([job], job)
|> repo.update_all(
set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
)
{count, jobs} =
schema
|> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand))
|> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id)
|> Query.select([job], job)
|> repo.update_all(
set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now]
)

{count, deserialize_job_params(jobs)}
end

defp do_reserve_available_jobs(
Expand All @@ -538,7 +547,7 @@ defmodule EctoJob.JobQueue do
now = %DateTime{},
timeout_ms
) do
{:ok, {count, results}} =
{:ok, {count, jobs}} =
repo.transaction(fn ->
ids =
schema
Expand All @@ -559,17 +568,28 @@ defmodule EctoJob.JobQueue do
]
)

results =
jobs =
Query.from(
j in schema,
where: j.id in ^ids,
select: j
)
|> repo.all()

{count, results}
{count, jobs}
end)

{count, results}
{count, deserialize_job_params(jobs)}
end

defp deserialize_job_params(jobs) when is_list(jobs) do
for job <- jobs, do: deserialize_job_params(job)
end

defp deserialize_job_params(job = %schema{}) do
case schema.__schema__(:type, :params) do
:map -> job
:binary -> %{job | params: :erlang.binary_to_term(job.params)}
end
end
end
3 changes: 2 additions & 1 deletion lib/ecto_job/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ defmodule EctoJob.Migrations do

timestamp_opts = Keyword.get(opts, :timestamps, [])
version = Keyword.get(opts, :version, 2)
params_type = Keyword.get(opts, :params_type, :map)

_ =
create table(name, opts) do
Expand All @@ -100,7 +101,7 @@ defmodule EctoJob.Migrations do

add(:attempt, :integer, null: false, default: 0)
add(:max_attempts, :integer, null: false, default: 5)
add(:params, :map, null: false)
add(:params, params_type, null: false)
add(:notify, :string)

if version >= 3 do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule EctoJob.Mixfile do
use Mix.Project

@version "3.0.0"
@version "3.1.0"
@url "https://github.com/mbuhot/ecto_job"

def project do
Expand Down
28 changes: 28 additions & 0 deletions test/job_queue_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ defmodule EctoJob.JobQueueTest do
:updated_at
]
end

test "params type is :map by default" do
assert EctoJob.Test.JobQueue.__schema__(:type, :params) == :map
end

test "params type is :binary for `params_type: :binary`" do
assert EctoJob.Test.ParamsBinaryJobQueue.__schema__(:type, :params) == :binary
end
end

describe "JobQueue.new" do
Expand Down Expand Up @@ -60,6 +68,11 @@ defmodule EctoJob.JobQueueTest do
job = EctoJob.Test.JobQueue.new(%{}, priority: 1)
assert job.priority == 1
end

test "Converts params to binary for `params_type: :binary`" do
job = EctoJob.Test.ParamsBinaryJobQueue.new(%{a: 1})
assert job.params == <<131, 116, 0, 0, 0, 1, 100, 0, 1, 97, 97, 1>>
end
end

describe "JobQueue.enqueue" do
Expand Down Expand Up @@ -322,6 +335,21 @@ defmodule EctoJob.JobQueueTest do
assert job.priority == low_priority
end)
end

test "RESERVES available jobs with `params_type: :binary`" do
Repo.insert!(EctoJob.Test.ParamsBinaryJobQueue.new("binary"))

{1, [job]} =
EctoJob.JobQueue.reserve_available_jobs(
Repo,
EctoJob.Test.ParamsBinaryJobQueue,
1,
DateTime.utc_now(),
5_000
)

assert job.params == "binary"
end
end

describe "JobQueue.update_job_in_progress" do
Expand Down
14 changes: 7 additions & 7 deletions test/supervisor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ defmodule EctoJob.SupervisorTest do
case EctoJob.Test.Repo.__adapter__() do
Ecto.Adapters.Postgres ->
assert [
{EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]},
{EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]},
{Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]}
] = Supervisor.which_children(pid)
{EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]},
{EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]},
{Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]}
] = Supervisor.which_children(pid)

assert Process.whereis(JobQueue.Supervisor) == pid
assert Process.whereis(JobQueue.Notifier) == notifications_pid
assert Process.whereis(JobQueue.Producer) == producer_pid

Ecto.Adapters.MyXQL ->
assert [
{EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]},
{EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}
] = Supervisor.which_children(pid)
{EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]},
{EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}
] = Supervisor.which_children(pid)

assert Process.whereis(JobQueue.Supervisor) == pid
assert Process.whereis(JobQueue.Producer) == producer_pid
Expand Down
17 changes: 17 additions & 0 deletions test/support/migrations/20200603110510_add_job_table_binary.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule EctoJob.Test.Repo.Migrations.AddJobTableBinary do
use Ecto.Migration
alias EctoJob.Migrations.{CreateJobTable, Install}

@ecto_job_version 3

def up do
CreateJobTable.up("jobs_binary",
version: @ecto_job_version,
params_type: :binary
)
end

def down do
CreateJobTable.down("jobs_binary")
end
end
Loading

0 comments on commit 6e26eea

Please sign in to comment.