Skip to content

Commit

Permalink
Merge pull request #462 from akira/re-enqueue-dead
Browse files Browse the repository at this point in the history
Add api to re-enqueue dead job
  • Loading branch information
ananthakumaran authored Dec 12, 2021
2 parents aa77ce3 + 742631a commit 617a589
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 3 deletions.
15 changes: 15 additions & 0 deletions lib/exq/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,21 @@ defmodule Exq.Api do
GenServer.call(pid, :clear_failed)
end

@doc """
Re Enqueue jobs from dead queue.
Expected args:
* `pid` - Exq.Api process
* `raw_job` - raw json encoded job value
Returns:
* `{:ok, num_enqueued}`
"""
def dequeue_failed_jobs(pid, raw_jobs) do
GenServer.call(pid, {:dequeue_failed_jobs, raw_jobs})
end

@doc """
Number of jobs that have failed and exceeded their retry count.
Expand Down
5 changes: 5 additions & 0 deletions lib/exq/api/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ defmodule Exq.Api.Server do
{:reply, :ok, state}
end

def handle_call({:dequeue_failed_jobs, raw_jobs}, _from, state) do
result = JobQueue.dequeue_failed_jobs(state.redis, state.namespace, raw_jobs)
{:reply, result, state}
end

def handle_call(:clear_processes, _from, state) do
JobStat.clear_processes(state.redis, state.namespace)
{:reply, :ok, state}
Expand Down
8 changes: 6 additions & 2 deletions lib/exq/redis/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,16 @@ defmodule Exq.Redis.JobQueue do
Connection.zrem!(redis, scheduled_queue_key(namespace), raw_jobs)
end

def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
end

def remove_failed_jobs(redis, namespace, raw_jobs) do
Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs)
end

def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
def dequeue_failed_jobs(redis, namespace, raw_jobs) do
dequeue_scheduled_jobs(redis, namespace, failed_queue_key(namespace), raw_jobs)
end

def list_queues(redis, namespace) do
Expand Down
12 changes: 11 additions & 1 deletion test/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ defmodule ApiTest do
{:ok, [raw_job]} = Exq.Api.retries(Exq.Api, raw: true)
assert {:ok, 1} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)

assert {:ok, 0} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "test", raw: true)
end

test "remove job in scheduled queue" do
Expand All @@ -289,6 +289,7 @@ defmodule ApiTest do
{:ok, 1} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
{:ok, 0} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "custom", raw: true)
end

test "remove job in failed queue" do
Expand All @@ -304,6 +305,15 @@ defmodule ApiTest do
{:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234")
end

test "enqueue jobs in failed queue" do
JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234", queue: "test"}, "this is an error")
{:ok, [raw_job]} = Exq.Api.failed(Exq.Api, raw: true)
{:ok, 1} = Exq.Api.dequeue_failed_jobs(Exq.Api, [raw_job])
assert {:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234")
{:ok, 0} = Exq.Api.dequeue_failed_jobs(Exq.Api, [raw_job])
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "test", raw: true)
end

test "clear job queue" do
{:ok, jid} = Exq.enqueue(Exq, 'custom', Bogus, [])
Exq.Api.remove_queue(Exq.Api, 'custom')
Expand Down

0 comments on commit 617a589

Please sign in to comment.