Skip to content

Commit

Permalink
use :poolboy.status to get free workers
Browse files Browse the repository at this point in the history
  • Loading branch information
mitchellhenke committed May 13, 2016
1 parent 140c26d commit dc42370
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 5 deletions.
10 changes: 7 additions & 3 deletions lib/verk/workers_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ defmodule Verk.WorkersManager do
end

def handle_info(:timeout, state) do
workers = free_workers(state.monitors, state.pool_size)
workers = free_workers(state.pool_name)
if workers != 0 do
case QueueManager.dequeue(state.queue_manager_name, workers) do
jobs when is_list(jobs) ->
Expand Down Expand Up @@ -193,6 +193,8 @@ defmodule Verk.WorkersManager do
Log.start(job, worker)
Verk.Worker.perform_async(worker, self, job)
notify!(%Events.JobStarted{job: job, started_at: Timex.DateTime.now})
:full ->
Verk.enqueue(job)
end
end

Expand All @@ -217,8 +219,10 @@ defmodule Verk.WorkersManager do
:ok = GenEvent.ack_notify(Verk.EventManager, event)
end

defp free_workers(monitors, size) do
size - :ets.info(monitors, :size)
defp free_workers(pool_name) do
{_, free, _, _} = :poolboy.status(pool_name)

free
end

defp random_timeout(timeout) do
Expand Down
9 changes: 7 additions & 2 deletions test/workers_manager_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,14 @@ defmodule Verk.WorkersManagerTest do
end

test "handle info timeout with no free workers", %{ monitors: monitors } do
pool_name = "pool_name"
new Verk.QueueManager
state = %State{ monitors: monitors, pool_name: "pool_name", pool_size: 1 }
state = %State{ monitors: monitors, pool_name: pool_name, pool_size: 1 }

row = { self, "job_id", "job", make_ref, "start_time" }
:ets.insert(monitors, row)

expect(:poolboy, :status, ["pool_name"], {nil, 0, nil, nil})
assert handle_info(:timeout, state) == { :noreply, state }

assert validate Verk.QueueManager
Expand All @@ -128,9 +130,11 @@ defmodule Verk.WorkersManagerTest do
:rand.seed(:exs64, {1,2,3})
queue_manager_name = :queue_manager_name
timeout = 1000
state = %State{ monitors: monitors, pool_name: "pool_name",
pool_name = "pool_name"
state = %State{ monitors: monitors, pool_name: pool_name,
pool_size: 1, queue_manager_name: queue_manager_name, timeout: timeout }

expect(:poolboy, :status, ["pool_name"], {nil, 1, nil, nil})
expect(Verk.QueueManager, :dequeue, [queue_manager_name, 1], [])

assert handle_info(:timeout, state) == { :noreply, state, 1350 }
Expand All @@ -153,6 +157,7 @@ defmodule Verk.WorkersManagerTest do

expect(Verk.QueueManager, :dequeue, [queue_manager_name, 1], [:encoded_job])
expect(Verk.Job, :decode!, [:encoded_job], job)
expect(:poolboy, :status, [pool_name], {nil, 1, nil, nil})
expect(:poolboy, :checkout, [pool_name, false], worker)
expect(Verk.Worker, :perform_async, [worker, worker, job], :ok)

Expand Down

0 comments on commit dc42370

Please sign in to comment.