Skip to content

Commit

Permalink
Add graceful exit in the default worker
Browse files Browse the repository at this point in the history
  • Loading branch information
fredwu committed Jul 2, 2023
1 parent d3c25c8 commit fe82968
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- [Added] Graceful handling of exit signals in the default worker

## v4.0.2 [2023-06-13]

- [Improved] Updated all the dependencies
Expand Down
5 changes: 4 additions & 1 deletion lib/opq/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ defmodule OPQ.Worker do
"""

def start_link(item) do
Task.start_link(fn -> process_item(item) end)
Task.start_link(fn ->
Process.flag(:trap_exit, true)
process_item(item)
end)
end

defp process_item({mod, fun, args}), do: apply(mod, fun, args)
Expand Down
25 changes: 25 additions & 0 deletions test/lib/opq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule OPQTest do

doctest OPQ

@moduletag capture_log: true

test "enqueue items - child_spec/1" do
Supervisor.start_link([{OPQ, name: :opq}], strategy: :one_for_one)

Expand Down Expand Up @@ -182,6 +184,29 @@ defmodule OPQTest do
end)
end

test "graceful handle of exit signals" do
Process.flag(:trap_exit, true)

Agent.start_link(fn -> [] end, name: ExitBucket)

{:ok, opq} = OPQ.init(workers: 1)

OPQ.enqueue(opq, fn ->
Process.sleep(10)
Agent.update(ExitBucket, &[:a | &1])
end)

Process.sleep(1)

Process.exit(opq, :SIGTERM)

refute Process.alive?(opq)

wait(fn ->
assert Kernel.length(Agent.get(ExitBucket, & &1)) == 1
end)
end

defp assert_empty_queue(queue_name) do
assert queue_name
|> OPQ.queue()
Expand Down

0 comments on commit fe82968

Please sign in to comment.