Skip to content

Commit

Permalink
Merge pull request #138 from edgurgel/pause-2
Browse files Browse the repository at this point in the history
Pause: Take 2
edgurgel authored Sep 9, 2017
2 parents 12ec50d + 7d082a1 commit 0b4b80c
Showing 8 changed files with 414 additions and 80 deletions.
3 changes: 3 additions & 0 deletions lib/verk.ex
Original file line number Diff line number Diff line change
@@ -28,6 +28,9 @@ defmodule Verk do
Manager.remove(queue)
end

defdelegate pause_queue(queue), to: Verk.Manager, as: :pause
defdelegate resume_queue(queue), to: Verk.Manager, as: :resume

@doc """
Enqueues a Job to the specified queue returning the respective job id
22 changes: 22 additions & 0 deletions lib/verk/events.ex
Original file line number Diff line number Diff line change
@@ -27,4 +27,26 @@ defmodule Verk.Events do
stacktrace: [:erlang.stack_item()], exception: Exception.t}
defstruct [:job, :started_at, :failed_at, :stacktrace, :exception]
end

defmodule QueueRunning do
@moduledoc """
When a queue is running
"""
@type t :: %__MODULE__{queue: atom}
defstruct [:queue]
end
defmodule QueuePausing do
@moduledoc """
When a queue is pausing
"""
@type t :: %__MODULE__{queue: atom}
defstruct [:queue]
end
defmodule QueuePaused do
@moduledoc """
When a queue paused
"""
@type t :: %__MODULE__{queue: atom}
defstruct [:queue]
end
end
40 changes: 36 additions & 4 deletions lib/verk/manager.ex
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ defmodule Verk.Manager do

use GenServer
require Logger
alias Verk.WorkersManager

@table :verk_manager
@ets_options [:ordered_set, :named_table, :public, read_concurrency: true]
@@ -15,25 +16,56 @@ defmodule Verk.Manager do
@doc false
def init(queues) do
ets = :ets.new(@table, @ets_options)
for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size})
for {queue, size} <- queues, do: :ets.insert_new(@table, {queue, size, :running})
{:ok, ets}
end

@doc """
It returns the status of each queue currently
[{:default, 25}, {:low_priority, 10}]
[{:default, 25, :paused}, {:low_priority, 10, :running}]
"""
@spec status :: [{atom, pos_integer}]
@spec status :: [{atom, pos_integer, atom}]
def status, do: :ets.tab2list(@table)

@doc """
It returns the status of each queue currently
[{:default, 25, :paused}, {:low_priority, 10, :running}]
"""
@spec status(atom) :: :running | :paused
def status(queue) do
[{^queue, _, queue_status}] = :ets.lookup(@table, queue)
queue_status
end

@spec pause(atom) :: boolean
def pause(queue) do
if :ets.update_element(@table, queue, {3, :paused}) do
WorkersManager.pause(queue)
true
else
false
end
end

@spec resume(atom) :: boolean
def resume(queue) do
if :ets.update_element(@table, queue, {3, :running}) do
WorkersManager.resume(queue)
true
else
false
end
end

@doc """
It adds the `queue` running with the amount of `size` of workers
It always returns the child spec
"""
@spec add(atom, pos_integer) :: Supervisor.on_start_child
def add(queue, size) do
unless :ets.insert_new(@table, {queue, size}) do
unless :ets.insert_new(@table, {queue, size, :running}) do
Logger.error "Queue #{queue} is already running"
end
Verk.Supervisor.start_child(queue, size)
68 changes: 50 additions & 18 deletions lib/verk/queue_stats.ex
Original file line number Diff line number Diff line change
@@ -7,11 +7,22 @@ defmodule Verk.QueueStats do
* Amount of failed jobs
It will persist to redis from time to time
It also holds information about the current status of queus. They can be:
* running
* idle
* pausing
* paused
"""
use GenStage
require Logger
alias Verk.QueueStatsCounters

defmodule State do
@moduledoc false
defstruct queues: %{}
end

@persist_interval 10_000

@doc false
@@ -24,48 +35,69 @@ defmodule Verk.QueueStats do
"""
@spec all(binary) :: Map.t
def all(prefix \\ "") do
for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do
%{queue: to_string(queue), running_counter: running, finished_counter: finished, failed_counter: failed}
end
GenServer.call(__MODULE__, {:all, prefix})
end

@doc """
Requests to reset started counter for a `queue`
"""
@spec reset_started(binary) :: :ok
def reset_started(queue) do
GenStage.call(__MODULE__, {:reset_started, to_string(queue)})
defp status(queue, queues, running_counter) do
status = queues[queue] || Verk.Manager.status(queue)
if status == :running and running_counter == 0 do
:idle
else
status
end
end

@doc false
def init(_) do
QueueStatsCounters.init
Process.send_after(self(), :persist_stats, @persist_interval)
{:consumer, :ok, subscribe_to: [Verk.EventProducer]}
{:consumer, %State{}, subscribe_to: [Verk.EventProducer]}
end

def handle_call({:all, prefix}, _from, state) do
result = for {queue, running, finished, failed} <- QueueStatsCounters.all(prefix), is_list(queue) do
queue = to_string(queue)
%{queue: queue, status: status(queue, state.queues, running),
running_counter: running, finished_counter: finished, failed_counter: failed}
end
queues = for %{queue: queue, status: status} <- result, into: state.queues, do: {queue, status}
{:reply, result, [], %State{queues: queues}}
end

def handle_events(events, _from, state) do
for event <- events, do: handle_event(event)
{:noreply, [], state}
new_state = Enum.reduce(events, state, fn event, state ->
handle_event(event, state)
end)
{:noreply, [], new_state}
end

@doc false
defp handle_event(%Verk.Events.JobStarted{job: job}) do
defp handle_event(%Verk.Events.JobStarted{job: job}, state) do
QueueStatsCounters.register(:started, job.queue)
state
end

defp handle_event(%Verk.Events.JobFinished{job: job}) do
defp handle_event(%Verk.Events.JobFinished{job: job}, state) do
QueueStatsCounters.register(:finished, job.queue)
state
end

defp handle_event(%Verk.Events.JobFailed{job: job}) do
defp handle_event(%Verk.Events.JobFailed{job: job}, state) do
QueueStatsCounters.register(:failed, job.queue)
state
end

@doc false
def handle_call({:reset_started, queue}, _from, state) do
defp handle_event(%Verk.Events.QueueRunning{queue: queue}, state) do
QueueStatsCounters.reset_started(queue)
{:reply, :ok, [], state}
%{state | queues: Map.put(state.queues, to_string(queue), :running)}
end

defp handle_event(%Verk.Events.QueuePausing{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :pausing)}
end

defp handle_event(%Verk.Events.QueuePaused{queue: queue}, state) do
%{state | queues: Map.put(state.queues, to_string(queue), :paused)}
end

@doc false
64 changes: 53 additions & 11 deletions lib/verk/workers_manager.ex
Original file line number Diff line number Diff line change
@@ -13,7 +13,8 @@ defmodule Verk.WorkersManager do

defmodule State do
@moduledoc false
defstruct [:queue_name, :pool_name, :queue_manager_name, :pool_size, :monitors, :timeout]
defstruct queue_name: nil, pool_name: nil, queue_manager_name: nil,
pool_size: nil, monitors: nil, timeout: nil, status: :running
end

@doc """
@@ -67,27 +68,55 @@ defmodule Verk.WorkersManager do
end
end

@doc """
Pauses a `queue`
"""
@spec pause(binary | atom) :: :ok | :already_paused
def pause(queue), do: GenServer.call(name(queue), :pause)

@doc """
Resumes a `queue`.
"""
@spec resume(binary | atom) :: :ok | :already_running
def resume(queue), do: GenServer.call(name(queue), :resume)

@doc """
Create a table to monitor workers saving data about the assigned queue/pool
"""
def init([workers_manager_name, queue_name, queue_manager_name, pool_name, size]) do
monitors = :ets.new(workers_manager_name, [:named_table, read_concurrency: true])
timeout = Confex.get_env(:verk, :workers_manager_timeout, @default_timeout)
state = %State{queue_name: queue_name,
queue_manager_name: queue_manager_name,
pool_name: pool_name,
pool_size: size,
monitors: monitors,
timeout: timeout}

Logger.info "Workers Manager started for queue #{queue_name}"
timeout = Confex.get_env(:verk, :workers_manager_timeout, @default_timeout)
status = Verk.Manager.status(queue_name)
state = %State{queue_name: queue_name,
queue_manager_name: queue_manager_name,
pool_name: pool_name,
pool_size: size,
monitors: monitors,
timeout: timeout,
status: status}
Logger.info "Workers Manager started for queue #{queue_name} (#{status})"

send self(), :enqueue_inprogress
Verk.QueueStats.reset_started(queue_name)

if status == :running, do: notify!(%Events.QueueRunning{queue: queue_name})

{:ok, state}
end

@doc false
def handle_call(:pause, _from, state = %State{status: :running}) do
notify!(%Events.QueuePausing{queue: state.queue_name})
{:reply, :ok, %{state | status: :pausing}, 0}
end
def handle_call(:pause, _from, state = %State{status: :paused}), do: {:reply, :already_paused, state}
def handle_call(:pause, _from, state), do: {:reply, :ok, state}

def handle_call(:resume, _from, state = %State{status: :running}), do: {:reply, :already_running, state}
def handle_call(:resume, _from, state) do
notify!(%Events.QueueRunning{queue: state.queue_name})
{:reply, :ok, %{state | status: :running}, 0}
end

@doc false
def handle_info(:enqueue_inprogress, state) do
case QueueManager.enqueue_inprogress(state.queue_manager_name) do
@@ -99,6 +128,19 @@ defmodule Verk.WorkersManager do
end
end

def handle_info(:timeout, state = %State{status: :paused}), do: {:noreply, state}

def handle_info(:timeout, state = %State{status: :pausing}) do
n_running_jobs = :ets.info(state.monitors, :size)

if n_running_jobs == 0 do
notify!(%Events.QueuePaused{queue: state.queue_name})
{:noreply, %{state | status: :paused}}
else
{:noreply, state}
end
end

def handle_info(:timeout, state) do
workers = free_workers(state.pool_name)
if workers != 0 do
Loading

0 comments on commit 0b4b80c

Please sign in to comment.