diff --git a/CHANGELOG.md b/CHANGELOG.md index 372ab0f..c2126c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,37 @@ +# 3.1.0 + +Version 3.1.0 adds support for job params as an Elixir/Erlang term. +You can insert any arbitrary Elixir/Erlang term into the queue: + +```elixir +{"SendEmail", "jonas@gmail.com", "Welcome!"} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + +You should use the option `:params_type` when defining your queue module: + +```elixir +defmodule MyJobQueue do + use EctoJob.JobQueue, table_name: "jobs", params_type: :binary + # ... +end +``` + +Possible values of the option are: `:map` (default) and `:binary` (for storing Elixir/Erlang terms). + +You should use the same option when setting up the migration: + +```elixir +@ecto_job_version 3 + +def up do + EctoJob.Migrations.Install.up() + EctoJob.Migrations.up("jobs", version: @ecto_job_version, params_type: :binary) +end +``` + + # 3.0.0 Version 3.0 adds support for prioritizing jobs within each job queue. diff --git a/README.md b/README.md index 42b44c1..cbb1930 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ It is compatible with Postgresql and Mysql with a major difference: Add `:ecto_job` to your `dependencies` ```elixir - {:ecto_job, "~> 3.0"} + {:ecto_job, "~> 3.1"} ``` ## Installation @@ -58,6 +58,17 @@ defmodule MyApp.Repo.Migrations.CreateJobQueue do end ``` +By default, a job holds a map of arbitrary data (which corresponds to a `jsonb` field in the table). +If you want to store an arbitrary Elixir/Erlang term in the job (`bytea` in the table), +you can set up the `params_type` option: + +``` + def up do + EctoJob.Migrations.Install.up() + EctoJob.Migrations.CreateJobTable.up("jobs", version: @ecto_job_version, params_type: :binary) + end +``` + ### Compatibility `EctoJob` leverages specific PostgreSQL features, like notification mechanism @@ -91,6 +102,8 @@ defmodule MyApp.Repo.Migrations.UpdateJobQueue do end ``` + + Add a module for the queue, mix in `EctoJob.JobQueue`. This will declare an `Ecto.Schema` to use with the table created in the migration, and a `start_link` function allowing the worker supervision tree to be started conveniently. @@ -100,6 +113,14 @@ defmodule MyApp.JobQueue do end ``` +For jobs being Elixir/Erlang terms, you should add the `:params_type` option: + +```elixir +defmodule MyApp.JobQueue do + use EctoJob.JobQueue, table_name: "jobs", params_type: :binary +end +``` + Add `perform/2` function to the job queue module, this is where jobs from the queue will be dispatched. ```elixir @@ -143,6 +164,22 @@ A job can be inserted into the Repo directly by constructing a job with the `new |> MyApp.Repo.insert() ``` +For inserting any arbitrary Elixir/Erlang term: + +```elixir +{"SendEmail", "joe@gmail.com", "Welcome!"} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + +or + +```elixir +|> %MyStruct{} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + A job can be inserted with optional params: - `:schedule` : runs the job at the given `%DateTime{}`. The default value is `DateTime.utc_now()`. diff --git a/lib/ecto_job/job_queue.ex b/lib/ecto_job/job_queue.ex index 3bc11c8..1465e0f 100644 --- a/lib/ecto_job/job_queue.ex +++ b/lib/ecto_job/job_queue.ex @@ -53,7 +53,7 @@ defmodule EctoJob.JobQueue do schedule: DateTime.t() | nil, attempt: integer, max_attempts: integer | nil, - params: map(), + params: map() | any(), notify: String.t() | nil, priority: integer, inserted_at: DateTime.t() | nil, @@ -71,7 +71,7 @@ defmodule EctoJob.JobQueue do def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params) def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params) """ - @callback perform(multi :: Multi.t(), params :: map) :: + @callback perform(multi :: Multi.t(), params :: map() | any()) :: {:ok, any()} | {:error, any()} | {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}} @@ -80,11 +80,13 @@ defmodule EctoJob.JobQueue do table_name = Keyword.fetch!(opts, :table_name) schema_prefix = Keyword.get(opts, :schema_prefix) timestamps_opts = Keyword.get(opts, :timestamps_opts) + params_type = Keyword.get(opts, :params_type, :map) quote bind_quoted: [ table_name: table_name, schema_prefix: schema_prefix, - timestamps_opts: timestamps_opts + timestamps_opts: timestamps_opts, + params_type: params_type ] do use Ecto.Schema @behaviour EctoJob.JobQueue @@ -108,8 +110,8 @@ defmodule EctoJob.JobQueue do field(:attempt, :integer) # Maximum attempts before this job is FAILED field(:max_attempts, :integer) - # Job params, serialized as JSONB - field(:params, :map) + # Job params, serialized as JSONB or Elixir/Erlang term + field(:params, params_type) # Payload used to notify that job has completed field(:notify, :string) # Used to prioritize the job execution @@ -149,7 +151,7 @@ defmodule EctoJob.JobQueue do @doc """ Create a new `#{__MODULE__}` instance with the given job params. - Params will be serialized as JSON, so + Params will be serialized as JSON or Elixir/Erlang term, so Options: @@ -158,15 +160,15 @@ defmodule EctoJob.JobQueue do - `:priority` (integer): lower numbers run first; default is 0 - `:notify` (string): payload to use for Postgres notification upon job completion """ - @spec new(map, Keyword.t()) :: EctoJob.JobQueue.job() - def new(params = %{}, opts \\ []) do + @spec new(map() | any(), Keyword.t()) :: EctoJob.JobQueue.job() + def new(params, opts \\ []) do %__MODULE__{ state: if(opts[:schedule], do: "SCHEDULED", else: "AVAILABLE"), expires: nil, schedule: Keyword.get(opts, :schedule, DateTime.utc_now()), attempt: 0, max_attempts: opts[:max_attempts], - params: params, + params: serialize_params(params, unquote(params_type)), notify: opts[:notify], priority: Keyword.get(opts, :priority, 0) } @@ -210,6 +212,10 @@ defmodule EctoJob.JobQueue do end def requeue(_, _, _), do: {:error, :non_failed_job} + + @spec serialize_params(map() | any(), atom()) :: map() | binary() + defp serialize_params(params, :binary), do: :erlang.term_to_binary(params) + defp serialize_params(params, :map), do: params end end @@ -331,7 +337,7 @@ defmodule EctoJob.JobQueue do def update_job_in_progress(repo, job, now, timeout_ms) do case do_update_job_in_progress(repo.__adapter__(), repo, job, now, timeout_ms) do {0, _} -> {:error, :expired} - {1, [job]} -> {:ok, job} + {1, [job]} -> {:ok, deserialize_job_params(job)} end end @@ -359,7 +365,7 @@ defmodule EctoJob.JobQueue do {1, [job]} -> notify_failed(repo, job, updates) - {:ok, job} + {:ok, deserialize_job_params(job)} end end @@ -521,13 +527,16 @@ defmodule EctoJob.JobQueue do now = %DateTime{}, timeout_ms ) do - schema - |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) - |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) - |> Query.select([job], job) - |> repo.update_all( - set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] - ) + {count, jobs} = + schema + |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) + |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) + |> Query.select([job], job) + |> repo.update_all( + set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] + ) + + {count, deserialize_job_params(jobs)} end defp do_reserve_available_jobs( @@ -538,7 +547,7 @@ defmodule EctoJob.JobQueue do now = %DateTime{}, timeout_ms ) do - {:ok, {count, results}} = + {:ok, {count, jobs}} = repo.transaction(fn -> ids = schema @@ -559,7 +568,7 @@ defmodule EctoJob.JobQueue do ] ) - results = + jobs = Query.from( j in schema, where: j.id in ^ids, @@ -567,9 +576,20 @@ defmodule EctoJob.JobQueue do ) |> repo.all() - {count, results} + {count, jobs} end) - {count, results} + {count, deserialize_job_params(jobs)} + end + + defp deserialize_job_params(jobs) when is_list(jobs) do + for job <- jobs, do: deserialize_job_params(job) + end + + defp deserialize_job_params(job = %schema{}) do + case schema.__schema__(:type, :params) do + :map -> job + :binary -> %{job | params: :erlang.binary_to_term(job.params)} + end end end diff --git a/lib/ecto_job/migrations.ex b/lib/ecto_job/migrations.ex index 400799f..bff9869 100644 --- a/lib/ecto_job/migrations.ex +++ b/lib/ecto_job/migrations.ex @@ -89,6 +89,7 @@ defmodule EctoJob.Migrations do timestamp_opts = Keyword.get(opts, :timestamps, []) version = Keyword.get(opts, :version, 2) + params_type = Keyword.get(opts, :params_type, :map) _ = create table(name, opts) do @@ -100,7 +101,7 @@ defmodule EctoJob.Migrations do add(:attempt, :integer, null: false, default: 0) add(:max_attempts, :integer, null: false, default: 5) - add(:params, :map, null: false) + add(:params, params_type, null: false) add(:notify, :string) if version >= 3 do diff --git a/mix.exs b/mix.exs index d8d9c32..7144696 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule EctoJob.Mixfile do use Mix.Project - @version "3.0.0" + @version "3.1.0" @url "https://github.com/mbuhot/ecto_job" def project do diff --git a/test/job_queue_test.exs b/test/job_queue_test.exs index 8e2c27d..e5b8cf7 100644 --- a/test/job_queue_test.exs +++ b/test/job_queue_test.exs @@ -32,6 +32,14 @@ defmodule EctoJob.JobQueueTest do :updated_at ] end + + test "params type is :map by default" do + assert EctoJob.Test.JobQueue.__schema__(:type, :params) == :map + end + + test "params type is :binary for `params_type: :binary`" do + assert EctoJob.Test.ParamsBinaryJobQueue.__schema__(:type, :params) == :binary + end end describe "JobQueue.new" do @@ -60,6 +68,11 @@ defmodule EctoJob.JobQueueTest do job = EctoJob.Test.JobQueue.new(%{}, priority: 1) assert job.priority == 1 end + + test "Converts params to binary for `params_type: :binary`" do + job = EctoJob.Test.ParamsBinaryJobQueue.new(%{a: 1}) + assert job.params == <<131, 116, 0, 0, 0, 1, 100, 0, 1, 97, 97, 1>> + end end describe "JobQueue.enqueue" do @@ -322,6 +335,21 @@ defmodule EctoJob.JobQueueTest do assert job.priority == low_priority end) end + + test "RESERVES available jobs with `params_type: :binary`" do + Repo.insert!(EctoJob.Test.ParamsBinaryJobQueue.new("binary")) + + {1, [job]} = + EctoJob.JobQueue.reserve_available_jobs( + Repo, + EctoJob.Test.ParamsBinaryJobQueue, + 1, + DateTime.utc_now(), + 5_000 + ) + + assert job.params == "binary" + end end describe "JobQueue.update_job_in_progress" do diff --git a/test/supervisor_test.exs b/test/supervisor_test.exs index f60903f..998585f 100644 --- a/test/supervisor_test.exs +++ b/test/supervisor_test.exs @@ -8,10 +8,10 @@ defmodule EctoJob.SupervisorTest do case EctoJob.Test.Repo.__adapter__() do Ecto.Adapters.Postgres -> assert [ - {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, - {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}, - {Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]} - ] = Supervisor.which_children(pid) + {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, + {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}, + {Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]} + ] = Supervisor.which_children(pid) assert Process.whereis(JobQueue.Supervisor) == pid assert Process.whereis(JobQueue.Notifier) == notifications_pid @@ -19,9 +19,9 @@ defmodule EctoJob.SupervisorTest do Ecto.Adapters.MyXQL -> assert [ - {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, - {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]} - ] = Supervisor.which_children(pid) + {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, + {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]} + ] = Supervisor.which_children(pid) assert Process.whereis(JobQueue.Supervisor) == pid assert Process.whereis(JobQueue.Producer) == producer_pid diff --git a/test/support/migrations/20200603110510_add_job_table_binary.exs b/test/support/migrations/20200603110510_add_job_table_binary.exs new file mode 100644 index 0000000..e64954b --- /dev/null +++ b/test/support/migrations/20200603110510_add_job_table_binary.exs @@ -0,0 +1,17 @@ +defmodule EctoJob.Test.Repo.Migrations.AddJobTableBinary do + use Ecto.Migration + alias EctoJob.Migrations.{CreateJobTable, Install} + + @ecto_job_version 3 + + def up do + CreateJobTable.up("jobs_binary", + version: @ecto_job_version, + params_type: :binary + ) + end + + def down do + CreateJobTable.down("jobs_binary") + end +end diff --git a/test/support/params_binary_job_queue.ex b/test/support/params_binary_job_queue.ex new file mode 100644 index 0000000..4187b43 --- /dev/null +++ b/test/support/params_binary_job_queue.ex @@ -0,0 +1,8 @@ +defmodule EctoJob.Test.ParamsBinaryJobQueue do + @moduledoc false + use EctoJob.JobQueue, table_name: "jobs_binary", params_type: :binary + + def perform(multi, _params) do + EctoJob.Test.Repo.transaction(multi) + end +end