Skip to content

Commit

Permalink
Merge pull request #137 from edgurgel/verk-manager
Browse files Browse the repository at this point in the history
Verk manager
  • Loading branch information
edgurgel authored Sep 3, 2017
2 parents 1145e2a + aa9c3ed commit 12ec50d
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 53 deletions.
6 changes: 3 additions & 3 deletions lib/verk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Verk do
It has an API that provides information about the queues
"""
alias Verk.{Job, Time}
alias Verk.{Job, Time, Manager}

@schedule_key "schedule"

Expand All @@ -17,15 +17,15 @@ defmodule Verk do
"""
@spec add_queue(atom, pos_integer) :: Supervisor.on_start_child
def add_queue(queue, size \\ 25) when is_atom(queue) and size > 0 do
Verk.Supervisor.start_child(queue, size)
Manager.add(queue, size)
end

@doc """
Remove `queue` from the list of queues that are being processed
"""
@spec remove_queue(atom) :: :ok | {:error, :not_found}
def remove_queue(queue) when is_atom(queue) do
Verk.Supervisor.stop_child(queue)
Manager.remove(queue)
end

@doc """
Expand Down
51 changes: 51 additions & 0 deletions lib/verk/manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule Verk.Manager do
@moduledoc """
A process that manages the state of each started queue
"""

use GenServer
require Logger

@table :verk_manager
@ets_options [:ordered_set, :named_table, :public, read_concurrency: true]

@doc false
def start_link(queues), do: GenServer.start_link(__MODULE__, queues, name: __MODULE__)

@doc false
def init(queues) do
ets = :ets.new(@table, @ets_options)
for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size})
{:ok, ets}
end

@doc """
It returns the status of each queue currently
[{:default, 25}, {:low_priority, 10}]
"""
@spec status :: [{atom, pos_integer}]
def status, do: :ets.tab2list(@table)

@doc """
It adds the `queue` running with the amount of `size` of workers
It always returns the child spec
"""
@spec add(atom, pos_integer) :: Supervisor.on_start_child
def add(queue, size) do
unless :ets.insert_new(@table, {queue, size}) do
Logger.error "Queue #{queue} is already running"
end
Verk.Supervisor.start_child(queue, size)
end

@doc """
It removes the `queue`
It returns `:ok` if successful and `{:error, :not_found}` otherwise
"""
@spec remove(atom) :: :ok | {:error, :not_found}
def remove(queue) do
:ets.delete(@table, queue)
Verk.Supervisor.stop_child(queue)
end
end
17 changes: 17 additions & 0 deletions lib/verk/manager_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Verk.Manager.Supervisor do
@moduledoc false
use Supervisor

@doc false
def start_link, do: Supervisor.start_link(__MODULE__, [], name: __MODULE__)

@doc false
def init(_) do
queues = Confex.get_env(:verk, :queues, [])
children = for {queue, size} <- queues, do: Verk.Queue.Supervisor.child_spec(queue, size)

children = [worker(Verk.Manager, [queues], id: Verk.Manager) | children]

supervise(children, strategy: :rest_for_one)
end
end
13 changes: 11 additions & 2 deletions lib/verk/queue_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ defmodule Verk.Queue.Supervisor do

@doc false
def start_link(name, size) do
supervisor_name = String.to_atom("#{name}.supervisor")
Supervisor.start_link(__MODULE__, [name, size], name: supervisor_name)
Supervisor.start_link(__MODULE__, [name, size], name: name(name))
end

@doc false
Expand All @@ -30,4 +29,14 @@ defmodule Verk.Queue.Supervisor do
args = [[name: {:local, pool_name}, worker_module: Verk.Worker, size: pool_size, max_overflow: 0], []]
worker(:poolboy, args, restart: :permanent, shutdown: 5000, id: pool_name)
end

@doc false
def name(queue) do
String.to_atom("#{queue}.supervisor")
end

@doc false
def child_spec(queue, size) when is_atom(queue) do
supervisor(Verk.Queue.Supervisor, [queue, size], id: name(queue))
end
end
29 changes: 10 additions & 19 deletions lib/verk/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,38 @@ defmodule Verk.Supervisor do
"""
use Supervisor

@doc false
@doc """
It starts the main supervisor
"""
def start_link do
Supervisor.start_link(__MODULE__, [], name: __MODULE__)
end

@doc false
def init(_) do
queues = Confex.get_env(:verk, :queues, [])
children = for {queue, size} <- queues, do: queue_child(queue, size)

redis_url = Confex.get_env(:verk, :redis_url)

redis = worker(Redix, [redis_url, [name: Verk.Redis]], id: Verk.Redis)
event_producer = worker(Verk.EventProducer, [], id: Verk.EventProducer)
queue_stats = worker(Verk.QueueStats, [], id: Verk.QueueStats)
schedule_manager = worker(Verk.ScheduleManager, [], id: Verk.ScheduleManager)
event_producer = worker(Verk.EventProducer, [])

queue_stats = worker(Verk.QueueStats, [])
redis = worker(Redix, [redis_url, [name: Verk.Redis]], id: Verk.Redis)
manager_sup = supervisor(Verk.Manager.Supervisor, [], id: Verk.Manager.Supervisor)

children = [redis, event_producer, queue_stats, schedule_manager] ++ children
children = [redis, event_producer, queue_stats, schedule_manager, manager_sup]
supervise(children, strategy: :one_for_one)
end

@doc false
def start_child(queue, size \\ 25) when is_atom(queue) and size > 0 do
Supervisor.start_child(__MODULE__, queue_child(queue, size))
Supervisor.start_child(__MODULE__, Verk.Queue.Supervisor.child_spec(queue, size))
end

@doc false
def stop_child(queue) when is_atom(queue) do
name = supervisor_name(queue)
name = Verk.Queue.Supervisor.name(queue)
case Supervisor.terminate_child(__MODULE__, name) do
:ok -> Supervisor.delete_child(__MODULE__, name)
error = {:error, :not_found} -> error
end
end

defp queue_child(queue, size) when is_atom(queue) do
supervisor(Verk.Queue.Supervisor, [queue, size], id: supervisor_name(queue))
end

defp supervisor_name(queue) do
String.to_atom("#{queue}.supervisor")
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule Verk.Mixfile do

def project do
[app: :verk,
version: "1.0.2",
version: "1.1.0",
elixir: "~> 1.4",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
Expand Down
14 changes: 14 additions & 0 deletions test/manager_supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule Verk.ManagerSupervisorTest do
use ExUnit.Case
import Verk.Manager.Supervisor

describe "init/1" do
test "defines tree" do
{:ok, {_, children}} = init([])
[manager, default] = children

assert {Verk.Manager, _, _, _, :worker, [Verk.Manager]} = manager
assert {:"default.supervisor", _, _, _, :supervisor, [Verk.Queue.Supervisor]} = default
end
end
end
61 changes: 61 additions & 0 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
defmodule Verk.ManagerTest do
use ExUnit.Case
import :meck
import Verk.Manager

setup do
new Verk.Supervisor
on_exit fn -> unload() end
:ok
end

describe "init/1" do
test "creates an ETS table with queues" do
queues = [default: 25, low_priority: 10]
init(queues)
assert :ets.tab2list(:verk_manager) == queues
end
end

describe "status/0" do
test "returns running queues" do
queues = [default: 25, low_priority: 10]
init(queues)
assert status() == queues
end
end

describe "add/2" do
test "adds queue to supervisor if not already there" do
init([])

expect(Verk.Supervisor, :start_child, [:default, 25], {:ok, :child})

assert add(:default, 25) == {:ok, :child}
assert :ets.tab2list(:verk_manager) == [default: 25]
assert validate Verk.Supervisor
end
end

describe "remove/1" do
test "removes queue from supervisor if queue is running" do
queues = [default: 25]
init(queues)

expect(Verk.Supervisor, :stop_child, [:default], :ok)

assert remove(:default) == :ok
assert validate Verk.Supervisor
end

test "does nothing if queue is not running" do
queues = [default: 25]
init(queues)

expect(Verk.Supervisor, :stop_child, [:default], {:error, :not_found})

assert remove(:default) == {:error, :not_found}
assert validate Verk.Supervisor
end
end
end
29 changes: 29 additions & 0 deletions test/queue_supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule Verk.QueueSupervisorTest do
use ExUnit.Case
import Verk.Queue.Supervisor

describe "init/1" do
test "defines tree" do
{:ok, {_, children}} = init([:default, 25])
[queue_manager, pool, workers_manager] = children

assert {:"default.queue_manager", _, _, _, :worker, [Verk.QueueManager]} = queue_manager
assert {:"default.pool", _, _, _, :worker, [:poolboy]} = pool
assert {:"default.workers_manager", _, _, _, :worker, [Verk.WorkersManager]} = workers_manager
end
end

describe "name/1" do
test "returns supervisor name" do
assert name("default") == :"default.supervisor"
end
end

describe "child_spec/2" do
test "returns supervisor spec" do
assert {:"default.supervisor",
{Verk.Queue.Supervisor, :start_link, [:default, 25]}, _,
_, :supervisor, [Verk.Queue.Supervisor]} = child_spec(:default, 25)
end
end
end
61 changes: 58 additions & 3 deletions test/supervisor_test.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,71 @@
defmodule Verk.SupervisorTest do
use ExUnit.Case
import :meck
import Verk.Supervisor

setup do
new Supervisor
on_exit fn -> unload() end
:ok
end

describe "init/1" do
test "defines tree" do
{:ok, {_, children}} = Verk.Supervisor.init([])
[redix, producer, stats, schedule_manager, default] = children
{:ok, {_, children}} = init([])
[redix, producer, stats, schedule_manager, manager_sup] = children

assert {Verk.Redis, _, _, _, :worker, [Redix]} = redix
assert {Verk.EventProducer, _, _, _, :worker, [Verk.EventProducer]} = producer
assert {Verk.QueueStats, _, _, _, :worker, [Verk.QueueStats]} = stats
assert {Verk.ScheduleManager, _, _, _, :worker, [Verk.ScheduleManager]} = schedule_manager
assert {:"default.supervisor", _, _, _, :supervisor, [Verk.Queue.Supervisor]} = default
assert {Verk.Manager.Supervisor, _, _, _, :supervisor, [Verk.Manager.Supervisor]} = manager_sup
end
end

describe "start_child/2" do
test "add a new queue" do
queue = :test_queue

child = { :"test_queue.supervisor", { Verk.Queue.Supervisor, :start_link, [:test_queue, 30] }, :permanent, :infinity, :supervisor, [ Verk.Queue.Supervisor ] }
expect(Supervisor, :start_child, [Verk.Supervisor, child], :ok)

assert start_child(queue, 30) == :ok

assert validate Supervisor
end
end

describe "stop_child/1" do
test "a queue successfully" do
queue = :test_queue

expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok)
expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok)

assert stop_child(queue) == :ok

assert validate Supervisor
end

test "a queue unsuccessfully terminating child" do
queue = :test_queue

expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found })

assert stop_child(queue) == { :error, :not_found }

assert validate Supervisor
end

test "a queue unsuccessfully deleting child" do
queue = :test_queue

expect(Supervisor, :terminate_child, [Verk.Supervisor, :"test_queue.supervisor"], :ok)
expect(Supervisor, :delete_child, [Verk.Supervisor, :"test_queue.supervisor"], { :error, :not_found })

assert stop_child(queue) == { :error, :not_found }

assert validate Supervisor
end
end
end
Loading

0 comments on commit 12ec50d

Please sign in to comment.