From 7c05e09f02f59b77c4124b877dc3914a4d44f322 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sat, 2 Sep 2017 18:56:20 +1200 Subject: [PATCH 1/5] Bump version 1.1.0 --- mix.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mix.exs b/mix.exs index 4b9f798..84c8130 100644 --- a/mix.exs +++ b/mix.exs @@ -7,7 +7,7 @@ defmodule Verk.Mixfile do def project do [app: :verk, - version: "1.0.2", + version: "1.1.0", elixir: "~> 1.4", build_embedded: Mix.env == :prod, start_permanent: Mix.env == :prod, From ee87d5d6a9859ed9755bbd63b6bb6bb00dc28e34 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sat, 2 Sep 2017 18:56:31 +1200 Subject: [PATCH 2/5] Add Verk.Manager to control the state of queues --- lib/verk/manager.ex | 51 ++++++++++++++++++++++++++++++++++++ test/manager_test.exs | 61 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+) create mode 100644 lib/verk/manager.ex create mode 100644 test/manager_test.exs diff --git a/lib/verk/manager.ex b/lib/verk/manager.ex new file mode 100644 index 0000000..3f57dcd --- /dev/null +++ b/lib/verk/manager.ex @@ -0,0 +1,51 @@ +defmodule Verk.Manager do + @moduledoc """ + A process that manages the state of each started queue + """ + + use GenServer + require Logger + + @table :verk_manager + @ets_options [:ordered_set, :named_table, :public, read_concurrency: true] + + @doc false + def start_link(queues), do: GenServer.start_link(__MODULE__, queues, name: __MODULE__) + + @doc false + def init(queues) do + ets = :ets.new(@table, @ets_options) + for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size}) + {:ok, ets} + end + + @doc """ + It returns the status of each queue currently + + [{:default, 25}, {:low_priority, 10}] + """ + @spec status :: [{atom, pos_integer}] + def status, do: :ets.tab2list(@table) + + @doc """ + It adds the `queue` running with the amount of `size` of workers + It always returns the child spec + """ + @spec add(atom, pos_integer) :: Supervisor.on_start_child + def add(queue, size) do + unless :ets.insert_new(@table, {queue, size}) do + Logger.error "Queue #{queue} is already running" + end + Verk.Supervisor.start_child(queue, size) + end + + @doc """ + It removes the `queue` + It returns `:ok` if successful and `{:error, :not_found}` otherwise + """ + @spec remove(atom) :: :ok | {:error, :not_found} + def remove(queue) do + :ets.delete(@table, queue) + Verk.Supervisor.stop_child(queue) + end +end diff --git a/test/manager_test.exs b/test/manager_test.exs new file mode 100644 index 0000000..7fafe1a --- /dev/null +++ b/test/manager_test.exs @@ -0,0 +1,61 @@ +defmodule Verk.ManagerTest do + use ExUnit.Case + import :meck + import Verk.Manager + + setup do + new Verk.Supervisor + on_exit fn -> unload() end + :ok + end + + describe "init/1" do + test "creates an ETS table with queues" do + queues = [default: 25, low_priority: 10] + init(queues) + assert :ets.tab2list(:verk_manager) == queues + end + end + + describe "status/0" do + test "returns running queues" do + queues = [default: 25, low_priority: 10] + init(queues) + assert status() == queues + end + end + + describe "add/2" do + test "adds queue to supervisor if not already there" do + init([]) + + expect(Verk.Supervisor, :start_child, [:default, 25], {:ok, :child}) + + assert add(:default, 25) == {:ok, :child} + assert :ets.tab2list(:verk_manager) == [default: 25] + assert validate Verk.Supervisor + end + end + + describe "remove/1" do + test "removes queue from supervisor if queue is running" do + queues = [default: 25] + init(queues) + + expect(Verk.Supervisor, :stop_child, [:default], :ok) + + assert remove(:default) == :ok + assert validate Verk.Supervisor + end + + test "does nothing if queue is not running" do + queues = [default: 25] + init(queues) + + expect(Verk.Supervisor, :stop_child, [:default], {:error, :not_found}) + + assert remove(:default) == {:error, :not_found} + assert validate Verk.Supervisor + end + end +end From bc6f08a23e0ac1245a4465e2a44a0d2ae7a485ed Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sat, 2 Sep 2017 18:57:10 +1200 Subject: [PATCH 3/5] Change Verk.add_queue/remove_queue to use Verk.Manager --- lib/verk.ex | 6 +++--- test/verk_test.exs | 37 ++++++++++++------------------------- 2 files changed, 15 insertions(+), 28 deletions(-) diff --git a/lib/verk.ex b/lib/verk.ex index 0bebed6..cd94591 100644 --- a/lib/verk.ex +++ b/lib/verk.ex @@ -8,7 +8,7 @@ defmodule Verk do It has an API that provides information about the queues """ - alias Verk.{Job, Time} + alias Verk.{Job, Time, Manager} @schedule_key "schedule" @@ -17,7 +17,7 @@ defmodule Verk do """ @spec add_queue(atom, pos_integer) :: Supervisor.on_start_child def add_queue(queue, size \\ 25) when is_atom(queue) and size > 0 do - Verk.Supervisor.start_child(queue, size) + Manager.add(queue, size) end @doc """ @@ -25,7 +25,7 @@ defmodule Verk do """ @spec remove_queue(atom) :: :ok | {:error, :not_found} def remove_queue(queue) when is_atom(queue) do - Verk.Supervisor.stop_child(queue) + Manager.remove(queue) end @doc """ diff --git a/test/verk_test.exs b/test/verk_test.exs index 39a5cb8..1059f00 100644 --- a/test/verk_test.exs +++ b/test/verk_test.exs @@ -2,7 +2,7 @@ defmodule VerkTest do use ExUnit.Case import :meck import Verk - alias Verk.Time + alias Verk.{Time, Manager} setup do on_exit fn -> unload() end @@ -10,49 +10,36 @@ defmodule VerkTest do end describe "add_queue/2" do - test "add a new queue" do + test "returns the child spec" do queue = :test_queue - child = { :"test_queue.supervisor", { Verk.Queue.Supervisor, :start_link, [:test_queue, 30] }, :permanent, :infinity, :supervisor, [ Verk.Queue.Supervisor ] } - expect(Supervisor, :start_child, [Verk.Supervisor, child], :ok) + expect(Manager, :add, [:test_queue, 30], {:ok, :child}) - assert add_queue(queue, 30) == :ok + assert add_queue(queue, 30) == {:ok, :child} - assert validate Supervisor + assert validate Manager end end describe "remove_queue/1" do - test "a queue successfully" do + test "returns true if successfully removed" do queue = :test_queue - expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) - expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) + expect(Manager, :remove, [:test_queue], :ok) assert remove_queue(queue) == :ok - assert validate Supervisor + assert validate Manager end - test "a queue unsuccessfully terminating child" do + test "returns false if failed to remove" do queue = :test_queue - expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found }) + expect(Manager, :remove, [:test_queue], {:error, :not_found}) - assert remove_queue(queue) == { :error, :not_found } + assert remove_queue(queue) == {:error, :not_found} - assert validate Supervisor - end - - test "a queue unsuccessfully deleting child" do - queue = :test_queue - - expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) - expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found }) - - assert remove_queue(queue) == { :error, :not_found } - - assert validate Supervisor + assert validate Manager end end From 0c6801a618d0053947cd5e34e42f40b80f3568b5 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sat, 2 Sep 2017 19:23:51 +1200 Subject: [PATCH 4/5] Add Verk.QueueSupervisor.name & child_spec --- lib/verk/queue_supervisor.ex | 13 +++++++++++-- test/queue_supervisor_test.exs | 29 +++++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) create mode 100644 test/queue_supervisor_test.exs diff --git a/lib/verk/queue_supervisor.ex b/lib/verk/queue_supervisor.ex index e7dced8..7f8e3b8 100644 --- a/lib/verk/queue_supervisor.ex +++ b/lib/verk/queue_supervisor.ex @@ -10,8 +10,7 @@ defmodule Verk.Queue.Supervisor do @doc false def start_link(name, size) do - supervisor_name = String.to_atom("#{name}.supervisor") - Supervisor.start_link(__MODULE__, [name, size], name: supervisor_name) + Supervisor.start_link(__MODULE__, [name, size], name: name(name)) end @doc false @@ -30,4 +29,14 @@ defmodule Verk.Queue.Supervisor do args = [[name: {:local, pool_name}, worker_module: Verk.Worker, size: pool_size, max_overflow: 0], []] worker(:poolboy, args, restart: :permanent, shutdown: 5000, id: pool_name) end + + @doc false + def name(queue) do + String.to_atom("#{queue}.supervisor") + end + + @doc false + def child_spec(queue, size) when is_atom(queue) do + supervisor(Verk.Queue.Supervisor, [queue, size], id: name(queue)) + end end diff --git a/test/queue_supervisor_test.exs b/test/queue_supervisor_test.exs new file mode 100644 index 0000000..70e04be --- /dev/null +++ b/test/queue_supervisor_test.exs @@ -0,0 +1,29 @@ +defmodule Verk.QueueSupervisorTest do + use ExUnit.Case + import Verk.Queue.Supervisor + + describe "init/1" do + test "defines tree" do + {:ok, {_, children}} = init([:default, 25]) + [queue_manager, pool, workers_manager] = children + + assert {:"default.queue_manager", _, _, _, :worker, [Verk.QueueManager]} = queue_manager + assert {:"default.pool", _, _, _, :worker, [:poolboy]} = pool + assert {:"default.workers_manager", _, _, _, :worker, [Verk.WorkersManager]} = workers_manager + end + end + + describe "name/1" do + test "returns supervisor name" do + assert name("default") == :"default.supervisor" + end + end + + describe "child_spec/2" do + test "returns supervisor spec" do + assert {:"default.supervisor", + {Verk.Queue.Supervisor, :start_link, [:default, 25]}, _, + _, :supervisor, [Verk.Queue.Supervisor]} = child_spec(:default, 25) + end + end +end From aa9c3edeccf662f05ac1b87b87697b556cf092b5 Mon Sep 17 00:00:00 2001 From: Eduardo Gurgel Date: Sat, 2 Sep 2017 19:24:14 +1200 Subject: [PATCH 5/5] Change Verk.Supervisor to start new ManagerSupervisor The ManagerSupervisor will guarantee that the Manager exists before the queues --- lib/verk/manager_supervisor.ex | 17 +++++++++ lib/verk/supervisor.ex | 29 ++++++--------- test/manager_supervisor_test.exs | 14 ++++++++ test/supervisor_test.exs | 61 ++++++++++++++++++++++++++++++-- 4 files changed, 99 insertions(+), 22 deletions(-) create mode 100644 lib/verk/manager_supervisor.ex create mode 100644 test/manager_supervisor_test.exs diff --git a/lib/verk/manager_supervisor.ex b/lib/verk/manager_supervisor.ex new file mode 100644 index 0000000..9a34b26 --- /dev/null +++ b/lib/verk/manager_supervisor.ex @@ -0,0 +1,17 @@ +defmodule Verk.Manager.Supervisor do + @moduledoc false + use Supervisor + + @doc false + def start_link, do: Supervisor.start_link(__MODULE__, [], name: __MODULE__) + + @doc false + def init(_) do + queues = Confex.get_env(:verk, :queues, []) + children = for {queue, size} <- queues, do: Verk.Queue.Supervisor.child_spec(queue, size) + + children = [worker(Verk.Manager, [queues], id: Verk.Manager) | children] + + supervise(children, strategy: :rest_for_one) + end +end diff --git a/lib/verk/supervisor.ex b/lib/verk/supervisor.ex index 5ea20c1..cbaf5ae 100644 --- a/lib/verk/supervisor.ex +++ b/lib/verk/supervisor.ex @@ -9,47 +9,38 @@ defmodule Verk.Supervisor do """ use Supervisor - @doc false + @doc """ + It starts the main supervisor + """ def start_link do Supervisor.start_link(__MODULE__, [], name: __MODULE__) end @doc false def init(_) do - queues = Confex.get_env(:verk, :queues, []) - children = for {queue, size} <- queues, do: queue_child(queue, size) - redis_url = Confex.get_env(:verk, :redis_url) + redis = worker(Redix, [redis_url, [name: Verk.Redis]], id: Verk.Redis) + event_producer = worker(Verk.EventProducer, [], id: Verk.EventProducer) + queue_stats = worker(Verk.QueueStats, [], id: Verk.QueueStats) schedule_manager = worker(Verk.ScheduleManager, [], id: Verk.ScheduleManager) - event_producer = worker(Verk.EventProducer, []) - - queue_stats = worker(Verk.QueueStats, []) - redis = worker(Redix, [redis_url, [name: Verk.Redis]], id: Verk.Redis) + manager_sup = supervisor(Verk.Manager.Supervisor, [], id: Verk.Manager.Supervisor) - children = [redis, event_producer, queue_stats, schedule_manager] ++ children + children = [redis, event_producer, queue_stats, schedule_manager, manager_sup] supervise(children, strategy: :one_for_one) end @doc false def start_child(queue, size \\ 25) when is_atom(queue) and size > 0 do - Supervisor.start_child(__MODULE__, queue_child(queue, size)) + Supervisor.start_child(__MODULE__, Verk.Queue.Supervisor.child_spec(queue, size)) end @doc false def stop_child(queue) when is_atom(queue) do - name = supervisor_name(queue) + name = Verk.Queue.Supervisor.name(queue) case Supervisor.terminate_child(__MODULE__, name) do :ok -> Supervisor.delete_child(__MODULE__, name) error = {:error, :not_found} -> error end end - - defp queue_child(queue, size) when is_atom(queue) do - supervisor(Verk.Queue.Supervisor, [queue, size], id: supervisor_name(queue)) - end - - defp supervisor_name(queue) do - String.to_atom("#{queue}.supervisor") - end end diff --git a/test/manager_supervisor_test.exs b/test/manager_supervisor_test.exs new file mode 100644 index 0000000..bc52c7c --- /dev/null +++ b/test/manager_supervisor_test.exs @@ -0,0 +1,14 @@ +defmodule Verk.ManagerSupervisorTest do + use ExUnit.Case + import Verk.Manager.Supervisor + + describe "init/1" do + test "defines tree" do + {:ok, {_, children}} = init([]) + [manager, default] = children + + assert {Verk.Manager, _, _, _, :worker, [Verk.Manager]} = manager + assert {:"default.supervisor", _, _, _, :supervisor, [Verk.Queue.Supervisor]} = default + end + end +end diff --git a/test/supervisor_test.exs b/test/supervisor_test.exs index d7feca0..fa56147 100644 --- a/test/supervisor_test.exs +++ b/test/supervisor_test.exs @@ -1,16 +1,71 @@ defmodule Verk.SupervisorTest do use ExUnit.Case + import :meck + import Verk.Supervisor + + setup do + new Supervisor + on_exit fn -> unload() end + :ok + end describe "init/1" do test "defines tree" do - {:ok, {_, children}} = Verk.Supervisor.init([]) - [redix, producer, stats, schedule_manager, default] = children + {:ok, {_, children}} = init([]) + [redix, producer, stats, schedule_manager, manager_sup] = children assert {Verk.Redis, _, _, _, :worker, [Redix]} = redix assert {Verk.EventProducer, _, _, _, :worker, [Verk.EventProducer]} = producer assert {Verk.QueueStats, _, _, _, :worker, [Verk.QueueStats]} = stats assert {Verk.ScheduleManager, _, _, _, :worker, [Verk.ScheduleManager]} = schedule_manager - assert {:"default.supervisor", _, _, _, :supervisor, [Verk.Queue.Supervisor]} = default + assert {Verk.Manager.Supervisor, _, _, _, :supervisor, [Verk.Manager.Supervisor]} = manager_sup + end + end + + describe "start_child/2" do + test "add a new queue" do + queue = :test_queue + + child = { :"test_queue.supervisor", { Verk.Queue.Supervisor, :start_link, [:test_queue, 30] }, :permanent, :infinity, :supervisor, [ Verk.Queue.Supervisor ] } + expect(Supervisor, :start_child, [Verk.Supervisor, child], :ok) + + assert start_child(queue, 30) == :ok + + assert validate Supervisor end end + + describe "stop_child/1" do + test "a queue successfully" do + queue = :test_queue + + expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) + expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) + + assert stop_child(queue) == :ok + + assert validate Supervisor + end + + test "a queue unsuccessfully terminating child" do + queue = :test_queue + + expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found }) + + assert stop_child(queue) == { :error, :not_found } + + assert validate Supervisor + end + + test "a queue unsuccessfully deleting child" do + queue = :test_queue + + expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok) + expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found }) + + assert stop_child(queue) == { :error, :not_found } + + assert validate Supervisor + end +end end