Skip to content

Commit

Permalink
Change QueueStats to hold the current status of all queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduardo Gurgel authored and edgurgel committed Sep 6, 2017
1 parent 240970a commit b1b97cf
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 48 deletions.
68 changes: 50 additions & 18 deletions lib/verk/queue_stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,22 @@ defmodule Verk.QueueStats do
* Amount of failed jobs
It will persist to redis from time to time
It also holds information about the current status of queus. They can be:
* running
* idle
* pausing
* paused
"""
use GenStage
require Logger
alias Verk.QueueStatsCounters

defmodule State do
@moduledoc false
defstruct queues: %{}
end

@persist_interval 10_000

@doc false
Expand All @@ -24,48 +35,69 @@ defmodule Verk.QueueStats do
"""
@spec all(binary) :: Map.t
def all(prefix \\ "") do
for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do
%{queue: to_string(queue), running_counter: running, finished_counter: finished, failed_counter: failed}
end
GenServer.call(__MODULE__, {:all, prefix})
end

@doc """
Requests to reset started counter for a `queue`
"""
@spec reset_started(binary) :: :ok
def reset_started(queue) do
GenStage.call(__MODULE__, {:reset_started, to_string(queue)})
defp status(queue, queues, running_counter) do
status = queues[queue] || Verk.Manager.status(queue)
if status == :running and running_counter == 0 do
:idle
else
status
end
end

@doc false
def init(_) do
QueueStatsCounters.init
Process.send_after(self(), :persist_stats, @persist_interval)
{:consumer, :ok, subscribe_to: [Verk.EventProducer]}
{:consumer, %State{}, subscribe_to: [Verk.EventProducer]}
end

def handle_call({:all, prefix}, _from, state) do
result = for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do
queue = to_string(queue)
%{queue: queue, status: status(queue, state.queues, running),
running_counter: running, finished_counter: finished, failed_counter: failed}
end
queues = for %{queue: queue, status: status} <- result, into: state.queues, do: {queue, status}
{:reply, result, [], %State{queues: queues}}
end

def handle_events(events, _from, state) do
for event <- events, do: handle_event(event)
{:noreply, [], state}
new_state = Enum.reduce(events, state, fn event, state ->
handle_event(event, state)
end)
{:noreply, [], new_state}
end

@doc false
defp handle_event(%Verk.Events.JobStarted{job: job}) do
defp handle_event(%Verk.Events.JobStarted{job: job}, state) do
QueueStatsCounters.register(:started, job.queue)
state
end

defp handle_event(%Verk.Events.JobFinished{job: job}) do
defp handle_event(%Verk.Events.JobFinished{job: job}, state) do
QueueStatsCounters.register(:finished, job.queue)
state
end

defp handle_event(%Verk.Events.JobFailed{job: job}) do
defp handle_event(%Verk.Events.JobFailed{job: job}, state) do
QueueStatsCounters.register(:failed, job.queue)
state
end

@doc false
def handle_call({:reset_started, queue}, _from, state) do
defp handle_event(%Verk.Events.QueueRunning{queue: queue}, state) do
QueueStatsCounters.reset_started(queue)
{:reply, :ok, [], state}
%{state | queues: Map.put(state.queues, to_string(queue), :running)}
end

defp handle_event(%Verk.Events.QueuePausing{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :pausing)}
end

defp handle_event(%Verk.Events.QueuePaused{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :paused)}
end

@doc false
Expand Down
106 changes: 76 additions & 30 deletions test/queue_stats_test.exs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
defmodule Verk.QueueStatsTest do
use ExUnit.Case
import Verk.QueueStats
import :meck
alias Verk.QueueStatsCounters
alias Verk.QueueStats.State

@table :queue_stats

setup do
on_exit fn -> unload() end
{ :ok, _ } = Confex.get_env(:verk, :redis_url)
|> Redix.start_link([name: Verk.Redis])
|> Redix.start_link([name: Verk.Redis])
Redix.pipeline!(Verk.Redis, [["DEL",
"stat:failed", "stat:processed",
"stat:failed:queue_1", "stat:processed:queue_1",
Expand All @@ -19,22 +23,51 @@ defmodule Verk.QueueStatsTest do
:ok
end

describe "all/1" do
describe "handle_call/3" do
test "list counters" do
init([]) # create table
QueueStatsCounters.init

handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)
handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)
handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_2" } }], :from, :state)
handle_events([%Verk.Events.JobFinished{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)
handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)

assert all() == [%{ queue: "queue_1", running_counter: 0, finished_counter: 1, failed_counter: 1 },
%{ queue: "queue_2", running_counter: 1, finished_counter: 0, failed_counter: 0 } ]
state = %State{ queues: %{ "queue_1" => :running, "queue_2" => :pausing }}
new_state = %State{ queues: %{ "queue_1" => :idle, "queue_2" => :pausing }}

assert handle_call({:all, ""}, :from, state) ==
{
:reply,
[%{ queue: "queue_1", running_counter: 0, finished_counter: 1, failed_counter: 1, status: :idle },
%{ queue: "queue_2", running_counter: 1, finished_counter: 0, failed_counter: 0, status: :pausing } ],
[],
new_state
}
end

test "list counters having no status of a queue" do
QueueStatsCounters.init

handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state)

state = %State{ queues: %{} }
new_state = %State{ queues: %{ "default" => :running } }

expect(Verk.Manager, :status, ["default"], :running)

assert handle_call({:all, ""}, :from, state) ==
{
:reply,
[%{ queue: "default", running_counter: 1, finished_counter: 0, failed_counter: 0, status: :running }],
[],
new_state
}
assert validate(Verk.Manager)
end

test "list counters searching for a prefix" do
init([]) # create table
QueueStatsCounters.init

handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state)
handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "default" } }], :from, :state)
Expand All @@ -43,41 +76,54 @@ defmodule Verk.QueueStatsTest do
handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "default-something" } }], :from, :state)
handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "priority" } }], :from, :state)

assert all("def") == [%{ queue: "default", running_counter: 1, finished_counter: 1, failed_counter: 0 },
%{ queue: "default-something", running_counter: 0, finished_counter: 0, failed_counter: 1 } ]
end
end

describe "handle_call/3" do
test "reset_started with no element" do
init([]) # create table

assert handle_call({ :reset_started, "queue" }, :from, :state) == { :reply, :ok, [], :state }
assert :ets.tab2list(@table) == [{ 'queue', 0, 0, 0, 0, 0 }]
end

test "reset_started with existing element" do
init([]) # create table
:ets.insert_new(@table, { 'queue', 1, 2, 3, 4, 5 })
state = %State{ queues: %{ "default" => :running, "default-something" => :pausing }}

assert handle_call({ :reset_started, "queue" }, :from, :state) == { :reply, :ok, [], :state }
assert :ets.tab2list(@table) == [{ 'queue', 0, 2, 3, 4, 5 }]
assert handle_call({:all, "def"}, :from, state) ==
{
:reply,
[%{ queue: "default", running_counter: 1, finished_counter: 1, failed_counter: 0, status: :running },
%{ queue: "default-something", running_counter: 0, finished_counter: 0, failed_counter: 1, status: :pausing } ],
[],
state
}
end
end

describe "init/1" do
test "creates an ETS table" do
assert :ets.info(@table) == :undefined

assert init([]) == { :consumer, :ok, subscribe_to: [Verk.EventProducer] }
assert init([]) == { :consumer, %State{}, subscribe_to: [Verk.EventProducer] }

assert :ets.info(@table) != :undefined
end
end

describe "handle_event/2" do
test "with queue running and existing element" do
QueueStatsCounters.init
:ets.insert_new(@table, { 'default', 1, 2, 3, 4, 5 })

event = %Verk.Events.QueueRunning{ queue: :default}
state = %State{queues: %{}}

assert handle_events([event], :from, state) == { :noreply, [], %State{queues: %{ "default" => :running}}}

assert :ets.tab2list(@table) == [{ 'default', 0, 2, 3, 4, 5 }]
end

test "with queue running" do
QueueStatsCounters.init
event = %Verk.Events.QueueRunning{ queue: :default}
state = %State{queues: %{}}

assert handle_events([event], :from, state) == { :noreply, [], %State{queues: %{ "default" => :running}}}

assert :ets.tab2list(@table) == [{ 'default', 0, 0, 0, 0, 0 }]
end

test "with started event" do
init([]) # create table
QueueStatsCounters.init
event = %Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue" } }

assert handle_events([event], :from, :state) == { :noreply, [], :state }
Expand All @@ -86,7 +132,7 @@ defmodule Verk.QueueStatsTest do
end

test "with finished event" do
init([]) # create table
QueueStatsCounters.init
event = %Verk.Events.JobFinished{ job: %Verk.Job{ queue: "queue" } }

assert handle_events([event], :from, :state) == { :noreply, [], :state }
Expand All @@ -95,7 +141,7 @@ defmodule Verk.QueueStatsTest do
end

test "with failed event" do
init([]) # create table
QueueStatsCounters.init
event = %Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue" } }

assert handle_events([event], :from, :state) == { :noreply, [], :state }
Expand All @@ -106,7 +152,7 @@ defmodule Verk.QueueStatsTest do

describe "handle_info/2" do
test "persist processed and failed counts" do
init([])
QueueStatsCounters.init

handle_events([%Verk.Events.JobStarted{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)
handle_events([%Verk.Events.JobFailed{ job: %Verk.Job{ queue: "queue_1" } }], :from, :state)
Expand Down Expand Up @@ -149,7 +195,7 @@ defmodule Verk.QueueStatsTest do
end

test 'test with unexpected message' do
init([])
QueueStatsCounters.init
assert handle_info(:pretty_sweet, :state) == {:noreply, [], :state}
end
end
Expand Down

0 comments on commit b1b97cf

Please sign in to comment.