Skip to content

Commit

Permalink
Add Verk.Manager.{status/1, pause/1, resume/1}
Browse files Browse the repository at this point in the history
Now the manager holds the status of each queue (paused or running)
  • Loading branch information
edgurgel committed Sep 6, 2017
1 parent 682e667 commit 240970a
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 15 deletions.
40 changes: 36 additions & 4 deletions lib/verk/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Verk.Manager do

use GenServer
require Logger
alias Verk.WorkersManager

@table :verk_manager
@ets_options [:ordered_set, :named_table, :public, read_concurrency: true]
Expand All @@ -15,25 +16,56 @@ defmodule Verk.Manager do
@doc false
def init(queues) do
ets = :ets.new(@table, @ets_options)
for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size})
for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size, :running})
{:ok, ets}
end

@doc """
It returns the status of each queue currently
[{:default, 25}, {:low_priority, 10}]
[{:default, 25, :paused}, {:low_priority, 10, :running}]
"""
@spec status :: [{atom, pos_integer}]
@spec status :: [{atom, pos_integer, atom}]
def status, do: :ets.tab2list(@table)

@doc """
It returns the status of each queue currently
[{:default, 25, :paused}, {:low_priority, 10, :running}]
"""
@spec status(atom) :: :running | :paused
def status(queue) do
[{^queue, _, queue_status}] = :ets.lookup(@table, queue)
queue_status
end

@spec pause(atom) :: boolean
def pause(queue) do
if :ets.update_element(@table, queue, {3, :paused}) do
WorkersManager.pause(queue)
true
else
false
end
end

@spec resume(atom) :: boolean
def resume(queue) do
if :ets.update_element(@table, queue, {3, :running}) do
WorkersManager.resume(queue)
true
else
false
end
end

@doc """
It adds the `queue` running with the amount of `size` of workers
It always returns the child spec
"""
@spec add(atom, pos_integer) :: Supervisor.on_start_child
def add(queue, size) do
unless :ets.insert_new(@table, {queue, size}) do
unless :ets.insert_new(@table, {queue, size, :running}) do
Logger.error "Queue #{queue} is already running"
end
Verk.Supervisor.start_child(queue, size)
Expand Down
86 changes: 75 additions & 11 deletions test/manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,117 @@ defmodule Verk.ManagerTest do
import Verk.Manager

setup do
new Verk.Supervisor
on_exit fn -> unload() end
:ok
end

defp init_table(queues) do
:ets.new(:verk_manager, [:ordered_set, :named_table, :public, read_concurrency: true])
for {queue, size, status} <- queues, do: :ets.insert_new(:verk_manager, {queue, size, status})
end

describe "init/1" do
test "creates an ETS table with queues" do
queues = [default: 25, low_priority: 10]
init(queues)
assert :ets.tab2list(:verk_manager) == queues
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}]
end
end

describe "status/0" do
test "returns running queues" do
queues = [default: 25, low_priority: 10]
init(queues)
assert status() == queues
queues = [{:default, 25, :running}, {:low_priority, 10, :running}]
init_table(queues)
assert status() == [{:default, 25, :running}, {:low_priority, 10, :running}]
end
end

describe "status/1" do
test "returns status of a queue" do
queues = [{:default, 25, :running}, {:low_priority, 10, :paused}]
init_table(queues)
assert status(:default) == :running
assert status(:low_priority) == :paused
end
end

describe "pause/1" do
test "pauses queue if queue exists" do
queues = [{:default, 25, :running}, {:low_priority, 10, :running}]
init_table(queues)

queue = :default

expect(Verk.WorkersManager, :pause, [queue], :ok)

assert pause(queue) == true
assert :ets.tab2list(:verk_manager) == [{:default, 25, :paused}, {:low_priority, 10, :running}]
assert validate Verk.WorkersManager
end

test "does nothing if queue does not exist" do
queues = [{:default, 25, :running}, {:low_priority, 10, :running}]
init_table(queues)

queue = :no_queue

assert pause(queue) == false
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}]
end
end

describe "resume/1" do
test "resume queue if queue exists" do
queues = [{:default, 25, :paused}, {:low_priority, 10, :running}]
init_table(queues)

queue = :default

expect(Verk.WorkersManager, :resume, [queue], :ok)

assert resume(queue) == true
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}, {:low_priority, 10, :running}]
assert validate Verk.WorkersManager
end

test "does nothing if queue does not exist" do
queues = [{:default, 25, :paused}, {:low_priority, 10, :running}]
init_table(queues)

queue = :no_queue

assert pause(queue) == false
assert :ets.tab2list(:verk_manager) == [{:default, 25, :paused}, {:low_priority, 10, :running}]
end
end

describe "add/2" do
test "adds queue to supervisor if not already there" do
init([])
init_table([])

expect(Verk.Supervisor, :start_child, [:default, 25], {:ok, :child})

assert add(:default, 25) == {:ok, :child}
assert :ets.tab2list(:verk_manager) == [default: 25]
assert :ets.tab2list(:verk_manager) == [{:default, 25, :running}]
assert validate Verk.Supervisor
end
end

describe "remove/1" do
test "removes queue from supervisor if queue is running" do
queues = [default: 25]
init(queues)
queues = [{:default, 25, :paused}, {:low_priority, 10, :running}]
init_table(queues)

expect(Verk.Supervisor, :stop_child, [:default], :ok)

assert remove(:default) == :ok
assert :ets.tab2list(:verk_manager) == [{:low_priority, 10, :running}]
assert validate Verk.Supervisor
end

test "does nothing if queue is not running" do
queues = [default: 25]
init(queues)
queues = [{:default, 25, :paused}]
init_table(queues)

expect(Verk.Supervisor, :stop_child, [:default], {:error, :not_found})

Expand Down

0 comments on commit 240970a

Please sign in to comment.