From fe8296852ff43cfef91e10a8c8971007c0f97ee4 Mon Sep 17 00:00:00 2001 From: Fred Wu Date: Sun, 2 Jul 2023 16:08:59 +1000 Subject: [PATCH] Add graceful exit in the default worker --- CHANGELOG.md | 2 ++ lib/opq/worker.ex | 5 ++++- test/lib/opq_test.exs | 25 +++++++++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a5adef2..90f6d46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- [Added] Graceful handling of exit signals in the default worker + ## v4.0.2 [2023-06-13] - [Improved] Updated all the dependencies diff --git a/lib/opq/worker.ex b/lib/opq/worker.ex index a6f6f7d..a4f93da 100644 --- a/lib/opq/worker.ex +++ b/lib/opq/worker.ex @@ -4,7 +4,10 @@ defmodule OPQ.Worker do """ def start_link(item) do - Task.start_link(fn -> process_item(item) end) + Task.start_link(fn -> + Process.flag(:trap_exit, true) + process_item(item) + end) end defp process_item({mod, fun, args}), do: apply(mod, fun, args) diff --git a/test/lib/opq_test.exs b/test/lib/opq_test.exs index d2b475d..f864c8e 100644 --- a/test/lib/opq_test.exs +++ b/test/lib/opq_test.exs @@ -3,6 +3,8 @@ defmodule OPQTest do doctest OPQ + @moduletag capture_log: true + test "enqueue items - child_spec/1" do Supervisor.start_link([{OPQ, name: :opq}], strategy: :one_for_one) @@ -182,6 +184,29 @@ defmodule OPQTest do end) end + test "graceful handle of exit signals" do + Process.flag(:trap_exit, true) + + Agent.start_link(fn -> [] end, name: ExitBucket) + + {:ok, opq} = OPQ.init(workers: 1) + + OPQ.enqueue(opq, fn -> + Process.sleep(10) + Agent.update(ExitBucket, &[:a | &1]) + end) + + Process.sleep(1) + + Process.exit(opq, :SIGTERM) + + refute Process.alive?(opq) + + wait(fn -> + assert Kernel.length(Agent.get(ExitBucket, & &1)) == 1 + end) + end + defp assert_empty_queue(queue_name) do assert queue_name |> OPQ.queue()