From 58b45e14d5c2459ee082b19f8e462edab4c5aae6 Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 11:33:39 +0400 Subject: [PATCH 1/9] Add support for `params` as Erlang/Elixir term --- lib/ecto_job/job_queue.ex | 56 +++++++++++++------ lib/ecto_job/migrations.ex | 5 +- test/job_queue_test.exs | 28 ++++++++++ .../20200603110510_add_job_table_binary.exs | 23 ++++++++ test/support/params_binary_job_queue.ex | 8 +++ 5 files changed, 100 insertions(+), 20 deletions(-) create mode 100644 test/support/migrations/20200603110510_add_job_table_binary.exs create mode 100644 test/support/params_binary_job_queue.ex diff --git a/lib/ecto_job/job_queue.ex b/lib/ecto_job/job_queue.ex index d99b6ef..acbe052 100644 --- a/lib/ecto_job/job_queue.ex +++ b/lib/ecto_job/job_queue.ex @@ -53,7 +53,7 @@ defmodule EctoJob.JobQueue do schedule: DateTime.t() | nil, attempt: integer, max_attempts: integer | nil, - params: map(), + params: map() | any(), notify: String.t() | nil, priority: integer, inserted_at: DateTime.t() | nil, @@ -71,7 +71,7 @@ defmodule EctoJob.JobQueue do def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params) def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params) """ - @callback perform(multi :: Multi.t(), params :: map) :: + @callback perform(multi :: Multi.t(), params :: map() | any()) :: {:ok, any()} | {:error, any()} | {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}} @@ -80,11 +80,13 @@ defmodule EctoJob.JobQueue do table_name = Keyword.fetch!(opts, :table_name) schema_prefix = Keyword.get(opts, :schema_prefix) timestamps_opts = Keyword.get(opts, :timestamps_opts) + params_type = Keyword.get(opts, :params_type, :map) quote bind_quoted: [ table_name: table_name, schema_prefix: schema_prefix, - timestamps_opts: timestamps_opts + timestamps_opts: timestamps_opts, + params_type: params_type ] do use Ecto.Schema @behaviour EctoJob.JobQueue @@ -108,8 +110,8 @@ defmodule EctoJob.JobQueue do field(:attempt, :integer) # Maximum attempts before this job is FAILED field(:max_attempts, :integer) - # Job params, serialized as JSONB - field(:params, :map) + # Job params, serialized as JSONB or Elixir/Erlang term + field(:params, params_type) # Payload used to notify that job has completed field(:notify, :string) # Used to prioritize the job execution @@ -149,7 +151,7 @@ defmodule EctoJob.JobQueue do @doc """ Create a new `#{__MODULE__}` instance with the given job params. - Params will be serialized as JSON, so + Params will be serialized as JSON or Elixir/Erlang term, so Options: @@ -158,15 +160,15 @@ defmodule EctoJob.JobQueue do - `:priority` (integer): lower numbers run first; default is 0 - `:notify` (string): payload to use for Postgres notification upon job completion """ - @spec new(map, Keyword.t()) :: EctoJob.JobQueue.job() - def new(params = %{}, opts \\ []) do + @spec new(map() | any(), Keyword.t()) :: EctoJob.JobQueue.job() + def new(params, opts \\ []) do %__MODULE__{ state: if(opts[:schedule], do: "SCHEDULED", else: "AVAILABLE"), expires: nil, schedule: Keyword.get(opts, :schedule, DateTime.utc_now()), attempt: 0, max_attempts: opts[:max_attempts], - params: params, + params: serialize_params(params, unquote(params_type)), notify: opts[:notify], priority: Keyword.get(opts, :priority, 0) } @@ -210,6 +212,10 @@ defmodule EctoJob.JobQueue do end def requeue(_, _, _), do: {:error, :non_failed_job} + + @spec serialize_params(map() | any(), atom()) :: map() | binary() + defp serialize_params(params, :binary), do: :erlang.term_to_binary(params) + defp serialize_params(params, :map), do: params end end @@ -283,13 +289,16 @@ defmodule EctoJob.JobQueue do """ @spec reserve_available_jobs(repo, schema, integer, DateTime.t(), integer) :: {integer, [job]} def reserve_available_jobs(repo, schema, demand, now = %DateTime{}, timeout_ms) do - schema - |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) - |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) - |> Query.select([job], job) - |> repo.update_all( - set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] - ) + {count, jobs} = + schema + |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) + |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) + |> Query.select([job], job) + |> repo.update_all( + set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] + ) + + {count, deserialize_job_params(jobs)} end @doc """ @@ -355,7 +364,7 @@ defmodule EctoJob.JobQueue do case {count, results} do {0, _} -> {:error, :expired} - {1, [job]} -> {:ok, job} + {1, [job]} -> {:ok, deserialize_job_params(job)} end end @@ -393,7 +402,7 @@ defmodule EctoJob.JobQueue do {1, [job]} -> notify_failed(repo, job, updates) - {:ok, job} + {:ok, deserialize_job_params(job)} end end @@ -458,4 +467,15 @@ defmodule EctoJob.JobQueue do repo.query("SELECT pg_notify($1, $2)", [topic, payload]) :ok end + + defp deserialize_job_params(jobs) when is_list(jobs) do + for job <- jobs, do: deserialize_job_params(job) + end + + defp deserialize_job_params(job = %schema{}) do + case schema.__schema__(:type, :params) do + :map -> job + :binary -> %{job | params: :erlang.binary_to_term(job.params)} + end + end end diff --git a/lib/ecto_job/migrations.ex b/lib/ecto_job/migrations.ex index efadfc6..1fa70be 100644 --- a/lib/ecto_job/migrations.ex +++ b/lib/ecto_job/migrations.ex @@ -73,6 +73,7 @@ defmodule EctoJob.Migrations do prefix = Keyword.get(opts, :prefix) timestamp_opts = Keyword.get(opts, :timestamps, []) version = Keyword.get(opts, :version, 2) + params_type = Keyword.get(opts, :params_type, :map) _ = create table(name, opts) do @@ -87,7 +88,7 @@ defmodule EctoJob.Migrations do add(:attempt, :integer, null: false, default: 0) add(:max_attempts, :integer, null: false, default: 5) - add(:params, :map, null: false) + add(:params, params_type, null: false) add(:notify, :string) if version >= 3 do @@ -106,7 +107,7 @@ defmodule EctoJob.Migrations do create(index(name, [:schedule, :id])) 3 -> - create(index(name, [:priority, :schedule, :id])) + create(index(name, [:priority, :schedule, :id], prefix: prefix)) end execute(""" diff --git a/test/job_queue_test.exs b/test/job_queue_test.exs index 8e2c27d..e5b8cf7 100644 --- a/test/job_queue_test.exs +++ b/test/job_queue_test.exs @@ -32,6 +32,14 @@ defmodule EctoJob.JobQueueTest do :updated_at ] end + + test "params type is :map by default" do + assert EctoJob.Test.JobQueue.__schema__(:type, :params) == :map + end + + test "params type is :binary for `params_type: :binary`" do + assert EctoJob.Test.ParamsBinaryJobQueue.__schema__(:type, :params) == :binary + end end describe "JobQueue.new" do @@ -60,6 +68,11 @@ defmodule EctoJob.JobQueueTest do job = EctoJob.Test.JobQueue.new(%{}, priority: 1) assert job.priority == 1 end + + test "Converts params to binary for `params_type: :binary`" do + job = EctoJob.Test.ParamsBinaryJobQueue.new(%{a: 1}) + assert job.params == <<131, 116, 0, 0, 0, 1, 100, 0, 1, 97, 97, 1>> + end end describe "JobQueue.enqueue" do @@ -322,6 +335,21 @@ defmodule EctoJob.JobQueueTest do assert job.priority == low_priority end) end + + test "RESERVES available jobs with `params_type: :binary`" do + Repo.insert!(EctoJob.Test.ParamsBinaryJobQueue.new("binary")) + + {1, [job]} = + EctoJob.JobQueue.reserve_available_jobs( + Repo, + EctoJob.Test.ParamsBinaryJobQueue, + 1, + DateTime.utc_now(), + 5_000 + ) + + assert job.params == "binary" + end end describe "JobQueue.update_job_in_progress" do diff --git a/test/support/migrations/20200603110510_add_job_table_binary.exs b/test/support/migrations/20200603110510_add_job_table_binary.exs new file mode 100644 index 0000000..c1ef5e4 --- /dev/null +++ b/test/support/migrations/20200603110510_add_job_table_binary.exs @@ -0,0 +1,23 @@ +defmodule EctoJob.Test.Repo.Migrations.AddJobTableBinary do + use Ecto.Migration + alias EctoJob.Migrations.{CreateJobTable, Install} + + @ecto_job_version 3 + + def up do + execute("CREATE SCHEMA \"params_binary\";") + Install.up(prefix: "params_binary") + + CreateJobTable.up("jobs", + version: @ecto_job_version, + prefix: "params_binary", + params_type: :binary + ) + end + + def down do + CreateJobTable.down("jobs", prefix: "parasm_binary") + Intall.down(prefix: "params_binary") + execute("DROP SCHEMA \"params_binary\";") + end +end diff --git a/test/support/params_binary_job_queue.ex b/test/support/params_binary_job_queue.ex new file mode 100644 index 0000000..df66779 --- /dev/null +++ b/test/support/params_binary_job_queue.ex @@ -0,0 +1,8 @@ +defmodule EctoJob.Test.ParamsBinaryJobQueue do + @moduledoc false + use EctoJob.JobQueue, table_name: "jobs", schema_prefix: "params_binary", params_type: :binary + + def perform(multi, _params) do + EctoJob.Test.Repo.transaction(multi) + end +end From 18a64f844114e1b0d9b74ecffcda69fae9e92b4f Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:00:51 +0400 Subject: [PATCH 2/9] Update README.md --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 940145b..9177efe 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,26 @@ A job can be inserted into the Repo directly by constructing a job with the `new |> MyApp.Repo.insert() ``` +By default, the job is a map, which corresponds to the `params` field of the type `:map` in the queue schema +(`jsonb` if seen in the PostgreSQL table). It can also be any Elixir/Erlang term: + +```elixir +{"SendEmail", "joe@gmail.com", "Welcome!"} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + +or + +```elixir +|> %MyStruct{} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + +In this case the `params` field in the schema will by of the `:binary` type (`bytea` in the PostgreSQL table). + + A job can be inserted with optional params: - `:schedule` : runs the job at the given `%DateTime{}`. The default value is `DateTime.utc_now()`. From 8d305485794e4fafef598c66ae6b59aa466d54b2 Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:05:48 +0400 Subject: [PATCH 3/9] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9177efe..ff75ab2 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ A transactional job queue built with Ecto, PostgreSQL and GenStage Add `:ecto_job` to your `dependencies` ```elixir - {:ecto_job, "~> 3.0"} + {:ecto_job, "~> 3.1"} ``` ## Installation @@ -144,7 +144,7 @@ or |> MyApp.Repo.insert() ``` -In this case the `params` field in the schema will by of the `:binary` type (`bytea` in the PostgreSQL table). +In this case the `params` field in the schema will be of the `:binary` type (`bytea` in the PostgreSQL table). A job can be inserted with optional params: From f64017f882e8ac2fd148bf0ff1eb6f318bf6d905 Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:17:04 +0400 Subject: [PATCH 4/9] Update CHANGELOG.md --- CHANGELOG.md | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 372ab0f..c2126c5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,37 @@ +# 3.1.0 + +Version 3.1.0 adds support for job params as an Elixir/Erlang term. +You can insert any arbitrary Elixir/Erlang term into the queue: + +```elixir +{"SendEmail", "jonas@gmail.com", "Welcome!"} +|> MyApp.JobQueue.new() +|> MyApp.Repo.insert() +``` + +You should use the option `:params_type` when defining your queue module: + +```elixir +defmodule MyJobQueue do + use EctoJob.JobQueue, table_name: "jobs", params_type: :binary + # ... +end +``` + +Possible values of the option are: `:map` (default) and `:binary` (for storing Elixir/Erlang terms). + +You should use the same option when setting up the migration: + +```elixir +@ecto_job_version 3 + +def up do + EctoJob.Migrations.Install.up() + EctoJob.Migrations.up("jobs", version: @ecto_job_version, params_type: :binary) +end +``` + + # 3.0.0 Version 3.0 adds support for prioritizing jobs within each job queue. From 720af8464886ba9b4a57b3014d5b8e36ca31d74c Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:27:41 +0400 Subject: [PATCH 5/9] Update README.md --- README.md | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ff75ab2..4b9b572 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,17 @@ defmodule MyApp.Repo.Migrations.CreateJobQueue do end ``` +By default, a job holds a map of arbitrary data (which corresponds to a `jsonb` field in the table). +If you want to store an arbitrary Elixir/Erlang term in the job (`bytea` in the table), +you can set up the `params_type` option: + +``` + def up do + EctoJob.Migrations.Install.up() + EctoJob.Migrations.CreateJobTable.up("jobs", version: @ecto_job_version, params_type: :binary) + end +``` + ### Upgrading to version 3.0 To upgrade your project to 3.0 version of `ecto_job` you must add a migration to update the pre-existent job queue tables: @@ -84,6 +95,14 @@ defmodule MyApp.JobQueue do end ``` +For jobs being Elixir/Erlang terms, you should add the `:params_type` option: + +```elixir +defmodule MyApp.JobQueue do + use EctoJob.JobQueue, table_name: "jobs", params_type: :binary +end +``` + Add `perform/2` function to the job queue module, this is where jobs from the queue will be dispatched. ```elixir @@ -127,8 +146,7 @@ A job can be inserted into the Repo directly by constructing a job with the `new |> MyApp.Repo.insert() ``` -By default, the job is a map, which corresponds to the `params` field of the type `:map` in the queue schema -(`jsonb` if seen in the PostgreSQL table). It can also be any Elixir/Erlang term: +For inserting any arbitrary Elixir/Erlang term: ```elixir {"SendEmail", "joe@gmail.com", "Welcome!"} @@ -144,9 +162,6 @@ or |> MyApp.Repo.insert() ``` -In this case the `params` field in the schema will be of the `:binary` type (`bytea` in the PostgreSQL table). - - A job can be inserted with optional params: - `:schedule` : runs the job at the given `%DateTime{}`. The default value is `DateTime.utc_now()`. From 4087d50f5855624e6a7ecc7e3644b590f67d2708 Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:06:00 +0400 Subject: [PATCH 6/9] Bump version --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 1548971..2a2559d 100644 --- a/mix.exs +++ b/mix.exs @@ -1,7 +1,7 @@ defmodule EctoJob.Mixfile do use Mix.Project - @version "3.0.0" + @version "3.1.0" @url "https://github.com/mbuhot/ecto_job" def project do From 85fcdf0476615f49320ba533f3061ba44bdedc6a Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Wed, 3 Jun 2020 12:29:18 +0400 Subject: [PATCH 7/9] Update README.md --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 4b9b572..3b6f120 100644 --- a/README.md +++ b/README.md @@ -86,6 +86,8 @@ defmodule MyApp.Repo.Migrations.UpdateJobQueue do end ``` + + Add a module for the queue, mix in `EctoJob.JobQueue`. This will declare an `Ecto.Schema` to use with the table created in the migration, and a `start_link` function allowing the worker supervision tree to be started conveniently. From 98412615d2edcc2c6b9e75460a174b8cac6df693 Mon Sep 17 00:00:00 2001 From: Yuri Lukyanov Date: Thu, 4 Jun 2020 11:33:22 +0400 Subject: [PATCH 8/9] Fix typos --- test/support/migrations/20170817092638_add_job_table.exs | 2 +- test/support/migrations/20200603110510_add_job_table_binary.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/support/migrations/20170817092638_add_job_table.exs b/test/support/migrations/20170817092638_add_job_table.exs index f37c3a4..a5ae9db 100644 --- a/test/support/migrations/20170817092638_add_job_table.exs +++ b/test/support/migrations/20170817092638_add_job_table.exs @@ -11,6 +11,6 @@ defmodule EctoJob.Test.Repo.Migrations.AddJobTable do def down do CreateJobTable.down("jobs") - Intall.down() + Install.down() end end diff --git a/test/support/migrations/20200603110510_add_job_table_binary.exs b/test/support/migrations/20200603110510_add_job_table_binary.exs index c1ef5e4..e34e74d 100644 --- a/test/support/migrations/20200603110510_add_job_table_binary.exs +++ b/test/support/migrations/20200603110510_add_job_table_binary.exs @@ -16,7 +16,7 @@ defmodule EctoJob.Test.Repo.Migrations.AddJobTableBinary do end def down do - CreateJobTable.down("jobs", prefix: "parasm_binary") + CreateJobTable.down("jobs", prefix: "params_binary") Intall.down(prefix: "params_binary") execute("DROP SCHEMA \"params_binary\";") end From 6bf85a3b528288286a0637123c3604139163b02a Mon Sep 17 00:00:00 2001 From: Jean Parpaillon Date: Thu, 10 Sep 2020 15:00:04 +0200 Subject: [PATCH 9/9] jobs binary test: use same db but different table name Creating DB (or schema) differs from postgresql to mysql For testing, we just need separate table --- lib/ecto_job/job_queue.ex | 21 ++++++++++--------- test/supervisor_test.exs | 14 ++++++------- .../20200603110510_add_job_table_binary.exs | 10 ++------- test/support/params_binary_job_queue.ex | 2 +- 4 files changed, 21 insertions(+), 26 deletions(-) diff --git a/lib/ecto_job/job_queue.ex b/lib/ecto_job/job_queue.ex index 45f533b..1465e0f 100644 --- a/lib/ecto_job/job_queue.ex +++ b/lib/ecto_job/job_queue.ex @@ -527,16 +527,16 @@ defmodule EctoJob.JobQueue do now = %DateTime{}, timeout_ms ) do - {count, jobs} = - schema - |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) - |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) - |> Query.select([job], job) - |> repo.update_all( - set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] - ) + {count, jobs} = + schema + |> Query.with_cte("available_jobs", as: ^available_jobs(schema, demand)) + |> Query.join(:inner, [job], a in "available_jobs", on: job.id == a.id) + |> Query.select([job], job) + |> repo.update_all( + set: [state: "RESERVED", expires: reservation_expiry(now, timeout_ms), updated_at: now] + ) - {count, deserialize_job_params(jobs)} + {count, deserialize_job_params(jobs)} end defp do_reserve_available_jobs( @@ -580,8 +580,9 @@ defmodule EctoJob.JobQueue do end) {count, deserialize_job_params(jobs)} + end - defp deserialize_job_params(jobs) when is_list(jobs) do + defp deserialize_job_params(jobs) when is_list(jobs) do for job <- jobs, do: deserialize_job_params(job) end diff --git a/test/supervisor_test.exs b/test/supervisor_test.exs index f60903f..998585f 100644 --- a/test/supervisor_test.exs +++ b/test/supervisor_test.exs @@ -8,10 +8,10 @@ defmodule EctoJob.SupervisorTest do case EctoJob.Test.Repo.__adapter__() do Ecto.Adapters.Postgres -> assert [ - {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, - {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}, - {Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]} - ] = Supervisor.which_children(pid) + {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, + {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]}, + {Postgrex.Notifications, notifications_pid, :worker, [Postgrex.Notifications]} + ] = Supervisor.which_children(pid) assert Process.whereis(JobQueue.Supervisor) == pid assert Process.whereis(JobQueue.Notifier) == notifications_pid @@ -19,9 +19,9 @@ defmodule EctoJob.SupervisorTest do Ecto.Adapters.MyXQL -> assert [ - {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, - {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]} - ] = Supervisor.which_children(pid) + {EctoJob.WorkerSupervisor, _, :supervisor, [EctoJob.WorkerSupervisor]}, + {EctoJob.Producer, producer_pid, :worker, [EctoJob.Producer]} + ] = Supervisor.which_children(pid) assert Process.whereis(JobQueue.Supervisor) == pid assert Process.whereis(JobQueue.Producer) == producer_pid diff --git a/test/support/migrations/20200603110510_add_job_table_binary.exs b/test/support/migrations/20200603110510_add_job_table_binary.exs index e34e74d..e64954b 100644 --- a/test/support/migrations/20200603110510_add_job_table_binary.exs +++ b/test/support/migrations/20200603110510_add_job_table_binary.exs @@ -5,19 +5,13 @@ defmodule EctoJob.Test.Repo.Migrations.AddJobTableBinary do @ecto_job_version 3 def up do - execute("CREATE SCHEMA \"params_binary\";") - Install.up(prefix: "params_binary") - - CreateJobTable.up("jobs", + CreateJobTable.up("jobs_binary", version: @ecto_job_version, - prefix: "params_binary", params_type: :binary ) end def down do - CreateJobTable.down("jobs", prefix: "params_binary") - Intall.down(prefix: "params_binary") - execute("DROP SCHEMA \"params_binary\";") + CreateJobTable.down("jobs_binary") end end diff --git a/test/support/params_binary_job_queue.ex b/test/support/params_binary_job_queue.ex index df66779..4187b43 100644 --- a/test/support/params_binary_job_queue.ex +++ b/test/support/params_binary_job_queue.ex @@ -1,6 +1,6 @@ defmodule EctoJob.Test.ParamsBinaryJobQueue do @moduledoc false - use EctoJob.JobQueue, table_name: "jobs", schema_prefix: "params_binary", params_type: :binary + use EctoJob.JobQueue, table_name: "jobs_binary", params_type: :binary def perform(multi, _params) do EctoJob.Test.Repo.transaction(multi)