From 682e6677112bac7c94d5d34e189f89d72820ce22 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Thu, 29 Jun 2017 14:53:29 +1200 Subject: [PATCH 1/4] Add WorkersManager#pause/resume It sends events for each new status of the queue --- lib/verk/events.ex | 22 +++++++ lib/verk/workers_manager.ex | 64 +++++++++++++++++---- test/workers_manager_test.exs | 105 ++++++++++++++++++++++++++++++++-- 3 files changed, 174 insertions(+), 17 deletions(-) diff --git a/lib/verk/events.ex b/lib/verk/events.ex index c54e05a..cf12f7d 100644 --- a/lib/verk/events.ex +++ b/lib/verk/events.ex @@ -27,4 +27,26 @@ defmodule Verk.Events do stacktrace: [:erlang.stack_item()], exception: Exception.t} defstruct [:job, :started_at, :failed_at, :stacktrace, :exception] end + + defmodule QueueRunning do + @moduledoc """ + When a queue is running + """ + @type t :: %__MODULE__{queue: atom} + defstruct [:queue] + end + defmodule QueuePausing do + @moduledoc """ + When a queue is pausing + """ + @type t :: %__MODULE__{queue: atom} + defstruct [:queue] + end + defmodule QueuePaused do + @moduledoc """ + When a queue paused + """ + @type t :: %__MODULE__{queue: atom} + defstruct [:queue] + end end diff --git a/lib/verk/workers_manager.ex b/lib/verk/workers_manager.ex index 8b3c353..c4fbbd1 100644 --- a/lib/verk/workers_manager.ex +++ b/lib/verk/workers_manager.ex @@ -13,7 +13,8 @@ defmodule Verk.WorkersManager do defmodule State do @moduledoc false - defstruct [:queue_name, :pool_name, :queue_manager_name, :pool_size, :monitors, :timeout] + defstruct queue_name: nil, pool_name: nil, queue_manager_name: nil, + pool_size: nil, monitors: nil, timeout: nil, status: :running end @doc """ @@ -67,27 +68,55 @@ defmodule Verk.WorkersManager do end end + @doc """ + Pauses a `queue` + """ + @spec pause(binary | atom) :: :ok | :already_paused + def pause(queue), do: GenServer.call(name(queue), :pause) + + @doc """ + Resumes a `queue`. + """ + @spec resume(binary | atom) :: :ok | :already_running + def resume(queue), do: GenServer.call(name(queue), :resume) + @doc """ Create a table to monitor workers saving data about the assigned queue/pool """ def init([workers_manager_name, queue_name, queue_manager_name, pool_name, size]) do monitors = :ets.new(workers_manager_name, [:named_table, read_concurrency: true]) - timeout = Confex.get_env(:verk, :workers_manager_timeout, @default_timeout) - state = %State{queue_name: queue_name, - queue_manager_name: queue_manager_name, - pool_name: pool_name, - pool_size: size, - monitors: monitors, - timeout: timeout} - - Logger.info "Workers Manager started for queue #{queue_name}" + timeout = Confex.get_env(:verk, :workers_manager_timeout, @default_timeout) + status = Verk.Manager.status(queue_name) + state = %State{queue_name: queue_name, + queue_manager_name: queue_manager_name, + pool_name: pool_name, + pool_size: size, + monitors: monitors, + timeout: timeout, + status: status} + Logger.info "Workers Manager started for queue #{queue_name} (#{status})" send self(), :enqueue_inprogress - Verk.QueueStats.reset_started(queue_name) + + if status == :running, do: notify!(%Events.QueueRunning{queue: queue_name}) {:ok, state} end + @doc false + def handle_call(:pause, _from, state = %State{status: :running}) do + notify!(%Events.QueuePausing{queue: state.queue_name}) + {:reply, :ok, %{state | status: :pausing}, 0} + end + def handle_call(:pause, _from, state = %State{status: :paused}), do: {:reply, :already_paused, state} + def handle_call(:pause, _from, state), do: {:reply, :ok, state} + + def handle_call(:resume, _from, state = %State{status: :running}), do: {:reply, :already_running, state} + def handle_call(:resume, _from, state) do + notify!(%Events.QueueRunning{queue: state.queue_name}) + {:reply, :ok, %{state | status: :running}, 0} + end + @doc false def handle_info(:enqueue_inprogress, state) do case QueueManager.enqueue_inprogress(state.queue_manager_name) do @@ -99,6 +128,19 @@ defmodule Verk.WorkersManager do end end + def handle_info(:timeout, state = %State{status: :paused}), do: {:noreply, state} + + def handle_info(:timeout, state = %State{status: :pausing}) do + n_running_jobs = :ets.info(state.monitors, :size) + + if n_running_jobs == 0 do + notify!(%Events.QueuePaused{queue: state.queue_name}) + {:noreply, %{state | status: :paused}} + else + {:noreply, state} + end + end + def handle_info(:timeout, state) do workers = free_workers(state.pool_name) if workers != 0 do diff --git a/test/workers_manager_test.exs b/test/workers_manager_test.exs index d6cb271..8edc3df 100644 --- a/test/workers_manager_test.exs +++ b/test/workers_manager_test.exs @@ -88,7 +88,7 @@ defmodule Verk.WorkersManagerTest do end describe "init/1" do - test "inits" do + test "inits and notifies if 'running'" do name = :workers_manager queue_name = "queue_name" queue_manager_name = "queue_manager_name" @@ -97,14 +97,34 @@ defmodule Verk.WorkersManagerTest do timeout = Confex.get_env(:verk, :workers_manager_timeout) state = %State{ queue_name: queue_name, queue_manager_name: queue_manager_name, pool_name: pool_name, pool_size: pool_size, - monitors: :workers_manager, timeout: timeout } - expect(Verk.QueueStats, :reset_started, [queue_name], :ok) + monitors: :workers_manager, timeout: timeout, status: :running } - assert init([name, queue_name, queue_manager_name, pool_name, pool_size]) - == { :ok, state } + expect(Verk.Manager, :status, [queue_name], :running) + + assert init([name, queue_name, queue_manager_name, pool_name, pool_size]) == { :ok, state } assert_received :enqueue_inprogress - assert validate Verk.QueueStats + assert_receive %Verk.Events.QueueRunning{ queue: ^queue_name } + assert validate Verk.Manager + end + + test "inits and does not notify ir paused" do + name = :workers_manager + queue_name = "queue_name" + queue_manager_name = "queue_manager_name" + pool_name = "pool_name" + pool_size = "size" + timeout = Confex.get_env(:verk, :workers_manager_timeout) + state = %State{ queue_name: queue_name, queue_manager_name: queue_manager_name, + pool_name: pool_name, pool_size: pool_size, + monitors: :workers_manager, timeout: timeout, status: :paused } + + expect(Verk.Manager, :status, [queue_name], :paused) + + assert init([name, queue_name, queue_manager_name, pool_name, pool_size]) == { :ok, state } + + assert_received :enqueue_inprogress + assert validate Verk.Manager end end @@ -134,7 +154,80 @@ defmodule Verk.WorkersManagerTest do end end + describe "handle_call/3 pause" do + test "with running status" do + queue_name = "queue_name" + state = %State{status: :running, queue_name: queue_name} + + assert handle_call(:pause, :from, state) == { :reply, :ok, %{ state | status: :pausing }, 0 } + assert_receive %Verk.Events.QueuePausing{ queue: ^queue_name } + end + + test "with pausing status" do + state = %State{status: :pausing} + + assert handle_call(:pause, :from, state) == { :reply, :ok, state } + refute_receive %Verk.Events.QueuePausing{} + end + + test "with paused status" do + state = %State{status: :paused} + + assert handle_call(:pause, :from, state) == { :reply, :already_paused, state } + refute_receive %Verk.Events.QueuePausing{} + end + end + + describe "handle_call/3 resume" do + test "with running status" do + state = %State{status: :running} + + assert handle_call(:resume, :from, state) == { :reply, :already_running, state } + refute_receive %Verk.Events.QueueRunning{} + end + + test "with pausing status" do + queue_name = "queue_name" + state = %State{status: :pausing, queue_name: queue_name} + + assert handle_call(:resume, :from, state) == { :reply, :ok, %{state | status: :running}, 0} + assert_receive %Verk.Events.QueueRunning{ queue: ^queue_name } + end + + test "with paused status" do + queue_name = "queue_name" + state = %State{status: :paused, queue_name: queue_name} + + assert handle_call(:resume, :from, state) == { :reply, :ok, %{state | status: :running}, 0} + assert_receive %Verk.Events.QueueRunning{ queue: ^queue_name } + end + end + describe "handle_info/2 timeout" do + test "timeout with paused status" do + state = %State{ status: :paused } + + assert handle_info(:timeout, state) == { :noreply, state } + end + + test "timeout with pausing status and jobs running", %{ monitors: monitors } do + row = { self(), "job_id", "job", make_ref(), "start_time" } + :ets.insert(monitors, row) + + state = %State{ status: :pausing, monitors: monitors } + + assert handle_info(:timeout, state) == { :noreply, state } + end + + test "timeout with pausing status and no jobs running", %{ monitors: monitors } do + queue_name = "queue_name" + state = %State{ status: :pausing, monitors: monitors, queue_name: queue_name } + + assert handle_info(:timeout, state) == { :noreply, %{ state | status: :paused } } + + assert_receive %Verk.Events.QueuePaused{ queue: ^queue_name } + end + test "timeout with no free workers", %{ monitors: monitors } do pool_name = "pool_name" new Verk.QueueManager From 240970a5aa3dad2c18366bd4655b371257abf00b Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sun, 3 Sep 2017 16:32:11 +1200 Subject: [PATCH 2/4] Add Verk.Manager.{status/1, pause/1, resume/1} Now the manager holds the status of each queue (paused or running) --- lib/verk/manager.ex | 40 ++++++++++++++++++-- test/manager_test.exs | 86 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 111 insertions(+), 15 deletions(-) diff --git a/lib/verk/manager.ex b/lib/verk/manager.ex index 3f57dcd..e3fd4f2 100644 --- a/lib/verk/manager.ex +++ b/lib/verk/manager.ex @@ -5,6 +5,7 @@ defmodule Verk.Manager do use GenServer require Logger + alias Verk.WorkersManager @table :verk_manager @ets_options [:ordered_set, :named_table, :public, read_concurrency: true] @@ -15,25 +16,56 @@ defmodule Verk.Manager do @doc false def init(queues) do ets = :ets.new(@table, @ets_options) - for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size}) + for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size, :running}) {:ok, ets} end @doc """ It returns the status of each queue currently - [{:default, 25}, {:low_priority, 10}] + [{:default, 25, :paused}, {:low_priority, 10, :running}] """ - @spec status :: [{atom, pos_integer}] + @spec status :: [{atom, pos_integer, atom}] def status, do: :ets.tab2list(@table) + @doc """ + It returns the status of each queue currently + + [{:default, 25, :paused}, {:low_priority, 10, :running}] + """ + @spec status(atom) :: :running | :paused + def status(queue) do + [{^queue, _, queue_status}] = :ets.lookup(@table, queue) + queue_status + end + + @spec pause(atom) :: boolean + def pause(queue) do + if :ets.update_element(@table, queue, {3, :paused}) do + WorkersManager.pause(queue) + true + else + false + end + end + + @spec resume(atom) :: boolean + def resume(queue) do + if :ets.update_element(@table, queue, {3, :running}) do + WorkersManager.resume(queue) + true + else + false + end + end + @doc """ It adds the `queue` running with the amount of `size` of workers It always returns the child spec """ @spec add(atom, pos_integer) :: Supervisor.on_start_child def add(queue, size) do - unless :ets.insert_new(@table, {queue, size}) do + unless :ets.insert_new(@table, {queue, size, :running}) do Logger.error "Queue #{queue} is already running" end Verk.Supervisor.start_child(queue, size) diff --git a/test/manager_test.exs b/test/manager_test.exs index 7fafe1a..51fc89b 100644 --- a/test/manager_test.exs +++ b/test/manager_test.exs @@ -4,53 +4,117 @@ defmodule Verk.ManagerTest do import Verk.Manager setup do - new Verk.Supervisor on_exit fn -> unload() end :ok end + defp init_table(queues) do + :ets.new(:verk_manager, [:ordered_set, :named_table, :public, read_concurrency: true]) + for {queue, size, status} <- queues, do: :ets.insert_new(:verk_manager, {queue, size, status}) + end + describe "init/1" do test "creates an ETS table with queues" do queues = [default: 25, low_priority: 10] init(queues) - assert :ets.tab2list(:verk_manager) == queues + assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}] end end describe "status/0" do test "returns running queues" do - queues = [default: 25, low_priority: 10] - init(queues) - assert status() == queues + queues = [{:default, 25, :running}, {:low_priority, 10, :running}] + init_table(queues) + assert status() == [{:default, 25, :running}, {:low_priority, 10, :running}] + end + end + + describe "status/1" do + test "returns status of a queue" do + queues = [{:default, 25, :running}, {:low_priority, 10, :paused}] + init_table(queues) + assert status(:default) == :running + assert status(:low_priority) == :paused + end + end + + describe "pause/1" do + test "pauses queue if queue exists" do + queues = [{:default, 25, :running}, {:low_priority, 10, :running}] + init_table(queues) + + queue = :default + + expect(Verk.WorkersManager, :pause, [queue], :ok) + + assert pause(queue) == true + assert :ets.tab2list(:verk_manager) == [{:default, 25, :paused}, {:low_priority, 10, :running}] + assert validate Verk.WorkersManager + end + + test "does nothing if queue does not exist" do + queues = [{:default, 25, :running}, {:low_priority, 10, :running}] + init_table(queues) + + queue = :no_queue + + assert pause(queue) == false + assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}] + end + end + + describe "resume/1" do + test "resume queue if queue exists" do + queues = [{:default, 25, :paused}, {:low_priority, 10, :running}] + init_table(queues) + + queue = :default + + expect(Verk.WorkersManager, :resume, [queue], :ok) + + assert resume(queue) == true + assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}] + assert validate Verk.WorkersManager + end + + test "does nothing if queue does not exist" do + queues = [{:default, 25, :paused}, {:low_priority, 10, :running}] + init_table(queues) + + queue = :no_queue + + assert pause(queue) == false + assert :ets.tab2list(:verk_manager) == [{:default, 25, :paused}, {:low_priority, 10, :running}] end end describe "add/2" do test "adds queue to supervisor if not already there" do - init([]) + init_table([]) expect(Verk.Supervisor, :start_child, [:default, 25], {:ok, :child}) assert add(:default, 25) == {:ok, :child} - assert :ets.tab2list(:verk_manager) == [default: 25] + assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}] assert validate Verk.Supervisor end end describe "remove/1" do test "removes queue from supervisor if queue is running" do - queues = [default: 25] - init(queues) + queues = [{:default, 25, :paused}, {:low_priority, 10, :running}] + init_table(queues) expect(Verk.Supervisor, :stop_child, [:default], :ok) assert remove(:default) == :ok + assert :ets.tab2list(:verk_manager) == [{:low_priority, 10, :running}] assert validate Verk.Supervisor end test "does nothing if queue is not running" do - queues = [default: 25] - init(queues) + queues = [{:default, 25, :paused}] + init_table(queues) expect(Verk.Supervisor, :stop_child, [:default], {:error, :not_found}) From b1b97cf3cd3b96d6b7b45a0cf7ce6de0d5301f0e Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Thu, 29 Jun 2017 14:55:49 +1200 Subject: [PATCH 3/4] Change QueueStats to hold the current status of all queues --- lib/verk/queue_stats.ex | 68 +++++++++++++++++------- test/queue_stats_test.exs | 106 +++++++++++++++++++++++++++----------- 2 files changed, 126 insertions(+), 48 deletions(-) diff --git a/lib/verk/queue_stats.ex b/lib/verk/queue_stats.ex index 9eef425..149c8a7 100644 --- a/lib/verk/queue_stats.ex +++ b/lib/verk/queue_stats.ex @@ -7,11 +7,22 @@ defmodule Verk.QueueStats do * Amount of failed jobs It will persist to redis from time to time + + It also holds information about the current status of queus. They can be: + * running + * idle + * pausing + * paused """ use GenStage require Logger alias Verk.QueueStatsCounters + defmodule State do + @moduledoc false + defstruct queues: %{} + end + @persist_interval 10_000 @doc false @@ -24,48 +35,69 @@ defmodule Verk.QueueStats do """ @spec all(binary) :: Map.t def all(prefix \\ "") do - for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do - %{queue: to_string(queue), running_counter: running, finished_counter: finished, failed_counter: failed} - end + GenServer.call(__MODULE__, {:all, prefix}) end - @doc """ - Requests to reset started counter for a `queue` - """ - @spec reset_started(binary) :: :ok - def reset_started(queue) do - GenStage.call(__MODULE__, {:reset_started, to_string(queue)}) + defp status(queue, queues, running_counter) do + status = queues[queue] || Verk.Manager.status(queue) + if status == :running and running_counter == 0 do + :idle + else + status + end end @doc false def init(_) do QueueStatsCounters.init Process.send_after(self(), :persist_stats, @persist_interval) - {:consumer, :ok, subscribe_to: [Verk.EventProducer]} + {:consumer, %State{}, subscribe_to: [Verk.EventProducer]} + end + + def handle_call({:all, prefix}, _from, state) do + result = for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do + queue = to_string(queue) + %{queue: queue, status: status(queue, state.queues, running), + running_counter: running, finished_counter: finished, failed_counter: failed} + end + queues = for %{queue: queue, status: status} <- result, into: state.queues, do: {queue, status} + {:reply, result, [], %State{queues: queues}} end def handle_events(events, _from, state) do - for event <- events, do: handle_event(event) - {:noreply, [], state} + new_state = Enum.reduce(events, state, fn event, state -> + handle_event(event, state) + end) + {:noreply, [], new_state} end @doc false - defp handle_event(%Verk.Events.JobStarted{job: job}) do + defp handle_event(%Verk.Events.JobStarted{job: job}, state) do QueueStatsCounters.register(:started, job.queue) + state end - defp handle_event(%Verk.Events.JobFinished{job: job}) do + defp handle_event(%Verk.Events.JobFinished{job: job}, state) do QueueStatsCounters.register(:finished, job.queue) + state end - defp handle_event(%Verk.Events.JobFailed{job: job}) do + defp handle_event(%Verk.Events.JobFailed{job: job}, state) do QueueStatsCounters.register(:failed, job.queue) + state end - @doc false - def handle_call({:reset_started, queue}, _from, state) do + defp handle_event(%Verk.Events.QueueRunning{queue: queue}, state) do QueueStatsCounters.reset_started(queue) - {:reply, :ok, [], state} + %{state | queues: Map.put(state.queues, to_string(queue), :running)} + end + + defp handle_event(%Verk.Events.QueuePausing{queue: queue}, state) do + %{state | queues: Map.put(state.queues, to_string(queue), :pausing)} + end + + defp handle_event(%Verk.Events.QueuePaused{queue: queue}, state) do + %{state | queues: Map.put(state.queues, to_string(queue), :paused)} end @doc false diff --git a/test/queue_stats_test.exs b/test/queue_stats_test.exs index 93c5b3c..4f4353c 100644 --- a/test/queue_stats_test.exs +++ b/test/queue_stats_test.exs @@ -1,12 +1,16 @@ defmodule Verk.QueueStatsTest do use ExUnit.Case import Verk.QueueStats + import :meck + alias Verk.QueueStatsCounters + alias Verk.QueueStats.State @table :queue_stats setup do + on_exit fn -> unload() end { :ok, _ } = Confex.get_env(:verk, :redis_url) - |> Redix.start_link([name: Verk.Redis]) + |> Redix.start_link([name: Verk.Redis]) Redix.pipeline!(Verk.Redis, [["DEL", "stat:failed", "stat:processed", "stat:failed:queue_1", "stat:processed:queue_1", @@ -19,9 +23,9 @@ defmodule Verk.QueueStatsTest do :ok end - describe "all/1" do + describe "handle_call/3" do test "list counters" do - init([]) # create table + QueueStatsCounters.init handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) @@ -29,12 +33,41 @@ defmodule Verk.QueueStatsTest do handle_events([%Verk.Events.JobFinished{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) - assert all() == [%{ queue: "queue_1", running_counter: 0, finished_counter: 1, failed_counter: 1 }, - %{ queue: "queue_2", running_counter: 1, finished_counter: 0, failed_counter: 0 } ] + state = %State{ queues: %{ "queue_1" => :running, "queue_2" => :pausing }} + new_state = %State{ queues: %{ "queue_1" => :idle, "queue_2" => :pausing }} + + assert handle_call({:all, ""}, :from, state) == + { + :reply, + [%{ queue: "queue_1", running_counter: 0, finished_counter: 1, failed_counter: 1, status: :idle }, + %{ queue: "queue_2", running_counter: 1, finished_counter: 0, failed_counter: 0, status: :pausing } ], + [], + new_state + } + end + + test "list counters having no status of a queue" do + QueueStatsCounters.init + + handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state) + + state = %State{ queues: %{} } + new_state = %State{ queues: %{ "default" => :running } } + + expect(Verk.Manager, :status, ["default"], :running) + + assert handle_call({:all, ""}, :from, state) == + { + :reply, + [%{ queue: "default", running_counter: 1, finished_counter: 0, failed_counter: 0, status: :running }], + [], + new_state + } + assert validate(Verk.Manager) end test "list counters searching for a prefix" do - init([]) # create table + QueueStatsCounters.init handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state) handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state) @@ -43,25 +76,16 @@ defmodule Verk.QueueStatsTest do handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "default-something" } }], :from, :state) handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "priority" } }], :from, :state) - assert all("def") == [%{ queue: "default", running_counter: 1, finished_counter: 1, failed_counter: 0 }, - %{ queue: "default-something", running_counter: 0, finished_counter: 0, failed_counter: 1 } ] - end - end - - describe "handle_call/3" do - test "reset_started with no element" do - init([]) # create table - - assert handle_call({ :reset_started, "queue" }, :from, :state) == { :reply, :ok, [], :state } - assert :ets.tab2list(@table) == [{ 'queue', 0, 0, 0, 0, 0 }] - end - - test "reset_started with existing element" do - init([]) # create table - :ets.insert_new(@table, { 'queue', 1, 2, 3, 4, 5 }) + state = %State{ queues: %{ "default" => :running, "default-something" => :pausing }} - assert handle_call({ :reset_started, "queue" }, :from, :state) == { :reply, :ok, [], :state } - assert :ets.tab2list(@table) == [{ 'queue', 0, 2, 3, 4, 5 }] + assert handle_call({:all, "def"}, :from, state) == + { + :reply, + [%{ queue: "default", running_counter: 1, finished_counter: 1, failed_counter: 0, status: :running }, + %{ queue: "default-something", running_counter: 0, finished_counter: 0, failed_counter: 1, status: :pausing } ], + [], + state + } end end @@ -69,15 +93,37 @@ defmodule Verk.QueueStatsTest do test "creates an ETS table" do assert :ets.info(@table) == :undefined - assert init([]) == { :consumer, :ok, subscribe_to: [Verk.EventProducer] } + assert init([]) == { :consumer, %State{}, subscribe_to: [Verk.EventProducer] } assert :ets.info(@table) != :undefined end end describe "handle_event/2" do + test "with queue running and existing element" do + QueueStatsCounters.init + :ets.insert_new(@table, { 'default', 1, 2, 3, 4, 5 }) + + event = %Verk.Events.QueueRunning{ queue: :default} + state = %State{queues: %{}} + + assert handle_events([event], :from, state) == { :noreply, [], %State{queues: %{ "default" => :running}}} + + assert :ets.tab2list(@table) == [{ 'default', 0, 2, 3, 4, 5 }] + end + + test "with queue running" do + QueueStatsCounters.init + event = %Verk.Events.QueueRunning{ queue: :default} + state = %State{queues: %{}} + + assert handle_events([event], :from, state) == { :noreply, [], %State{queues: %{ "default" => :running}}} + + assert :ets.tab2list(@table) == [{ 'default', 0, 0, 0, 0, 0 }] + end + test "with started event" do - init([]) # create table + QueueStatsCounters.init event = %Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue" } } assert handle_events([event], :from, :state) == { :noreply, [], :state } @@ -86,7 +132,7 @@ defmodule Verk.QueueStatsTest do end test "with finished event" do - init([]) # create table + QueueStatsCounters.init event = %Verk.Events.JobFinished{ job: %Verk.Job{ queue: "queue" } } assert handle_events([event], :from, :state) == { :noreply, [], :state } @@ -95,7 +141,7 @@ defmodule Verk.QueueStatsTest do end test "with failed event" do - init([]) # create table + QueueStatsCounters.init event = %Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue" } } assert handle_events([event], :from, :state) == { :noreply, [], :state } @@ -106,7 +152,7 @@ defmodule Verk.QueueStatsTest do describe "handle_info/2" do test "persist processed and failed counts" do - init([]) + QueueStatsCounters.init handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state) @@ -149,7 +195,7 @@ defmodule Verk.QueueStatsTest do end test 'test with unexpected message' do - init([]) + QueueStatsCounters.init assert handle_info(:pretty_sweet, :state) == {:noreply, [], :state} end end From 7d082a1124d805468801af280fdfaf726d4d9c3b Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Thu, 29 Jun 2017 15:02:58 +1200 Subject: [PATCH 4/4] Expose Verk.pause_queue & Verk.resume_queue --- lib/verk.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/verk.ex b/lib/verk.ex index cd94591..dce77aa 100644 --- a/lib/verk.ex +++ b/lib/verk.ex @@ -28,6 +28,9 @@ defmodule Verk do Manager.remove(queue) end + defdelegate pause_queue(queue), to: Verk.Manager, as: :pause + defdelegate resume_queue(queue), to: Verk.Manager, as: :resume + @doc """ Enqueues a Job to the specified queue returning the respective job id