diff --git a/CHANGELOG.md b/CHANGELOG.md index e9607dc..88a38a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- [Added] `OPQ.Queue` wraps `:queue` and implements `Enumerable` + ## v3.3.0 [2021-10-12] - [Added] The ability to start as part of a supervision tree diff --git a/README.md b/README.md index d91ce33..c37b047 100644 --- a/README.md +++ b/README.md @@ -113,11 +113,30 @@ If no interval is supplied, the ratelimiter will be bypassed. OPQ.enqueue(opq, fn -> Process.sleep(1000) end) -{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 9} +{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 9} Process.sleep(1200) -{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 10} +{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 10} +``` + +If you just need to get the queue itself: + +```elixir +OPQ.queue(opq) # => #OPQ.Queue<[]> +``` + +### Queue + +OPQ implements `Enumerable`, so you can perform enumerable functions on the queue: + +```elixir +{:ok, opq} = OPQ.init() + +queue = OPQ.queue(opq) + +Enum.count(queue) # => 0 +Enum.empty?(queue) # => true ``` ### Stop the queue: diff --git a/lib/opq.ex b/lib/opq.ex index 0483a91..947530e 100644 --- a/lib/opq.ex +++ b/lib/opq.ex @@ -41,6 +41,7 @@ defmodule OPQ do def pause(feeder), do: GenStage.cast(feeder, :pause) def resume(feeder), do: GenStage.cast(feeder, :resume) def info(feeder), do: GenStage.call(feeder, :info, Opt.timeout(feeder)) + def queue(feeder), do: GenStage.call(feeder, :queue, Opt.timeout(feeder)) defp start_links(opts) do {:ok, feeder} = Feeder.start_link(opts[:name]) diff --git a/lib/opq/feeder.ex b/lib/opq/feeder.ex index a6d0c3e..bb78832 100644 --- a/lib/opq/feeder.ex +++ b/lib/opq/feeder.ex @@ -9,7 +9,7 @@ defmodule OPQ.Feeder do def start_link(name), do: GenStage.start_link(__MODULE__, :ok, name: name) def init(:ok) do - {:producer, {:normal, :queue.new(), 0}} + {:producer, {:normal, %OPQ.Queue{}, 0}} end def handle_cast(:stop, state) do @@ -24,16 +24,20 @@ defmodule OPQ.Feeder do dispatch_events(:normal, queue, demand, []) end - def handle_cast({:enqueue, event}, {status, queue, pending_demand}) do - queue = :queue.in(event, queue) + def handle_cast({:enqueue, event}, {status, %OPQ.Queue{data: data} = queue, pending_demand}) do + data = :queue.in(event, data) - dispatch_or_pause(status, queue, pending_demand) + dispatch_or_pause(status, %OPQ.Queue{data: data}, pending_demand) end def handle_call(:info, _from, state) do {:reply, state, [], state} end + def handle_call(:queue, _from, {_status, queue, _demand} = state) do + {:reply, queue, [], state} + end + defp dispatch_or_pause(:normal, queue, demand) do dispatch_events(:normal, queue, demand, []) end @@ -54,13 +58,13 @@ defmodule OPQ.Feeder do {:noreply, Enum.reverse(events), {status, queue, 0}} end - defp dispatch_events(status, queue, demand, events) do - case :queue.out(queue) do - {{:value, event}, queue} -> - dispatch_events(status, queue, demand - 1, [event | events]) + defp dispatch_events(status, %OPQ.Queue{data: data}, demand, events) do + case :queue.out(data) do + {{:value, event}, data} -> + dispatch_events(status, %OPQ.Queue{data: data}, demand - 1, [event | events]) - {:empty, queue} -> - {:noreply, Enum.reverse(events), {status, queue, demand}} + {:empty, data} -> + {:noreply, Enum.reverse(events), {status, %OPQ.Queue{data: data}, demand}} end end end diff --git a/lib/opq/queue.ex b/lib/opq/queue.ex new file mode 100644 index 0000000..af4a332 --- /dev/null +++ b/lib/opq/queue.ex @@ -0,0 +1,9 @@ +defmodule OPQ.Queue do + @moduledoc """ + A `:queue` wrapper so that protocols like `Enumerable` can be implemented. + """ + + @opaque t() :: %__MODULE__{data: :queue.queue()} + + defstruct data: :queue.new() +end diff --git a/lib/opq/queue/enumerable.ex b/lib/opq/queue/enumerable.ex new file mode 100644 index 0000000..8942501 --- /dev/null +++ b/lib/opq/queue/enumerable.ex @@ -0,0 +1,17 @@ +defimpl Enumerable, for: OPQ.Queue do + @moduledoc """ + Implementation based on https://github.com/princemaple/elixir-queue + """ + + def count(%OPQ.Queue{data: q}), do: {:ok, :queue.len(q)} + + def member?(%OPQ.Queue{data: q}, item) do + {:ok, :queue.member(item, q)} + end + + def reduce(%OPQ.Queue{data: q}, acc, fun) do + Enumerable.List.reduce(:queue.to_list(q), acc, fun) + end + + def slice(%OPQ.Queue{}), do: {:error, __MODULE__} +end diff --git a/lib/opq/queue/inspect.ex b/lib/opq/queue/inspect.ex new file mode 100644 index 0000000..990a04c --- /dev/null +++ b/lib/opq/queue/inspect.ex @@ -0,0 +1,11 @@ +defimpl Inspect, for: OPQ.Queue do + @moduledoc """ + Implementation based on https://github.com/princemaple/elixir-queue + """ + + import Inspect.Algebra + + def inspect(%OPQ.Queue{} = q, opts) do + concat(["#OPQ.Queue<", to_doc(Enum.to_list(q), opts), ">"]) + end +end diff --git a/test/lib/opq_test.exs b/test/lib/opq_test.exs index 9df5cfa..d2b475d 100644 --- a/test/lib/opq_test.exs +++ b/test/lib/opq_test.exs @@ -10,9 +10,7 @@ defmodule OPQTest do OPQ.enqueue(:opq, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(:opq) - - assert :queue.len(queue) == 0 + assert_empty_queue(:opq) end) end @@ -23,9 +21,7 @@ defmodule OPQTest do OPQ.enqueue(opq, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) - - assert :queue.len(queue) == 0 + assert_empty_queue(opq) end) end @@ -36,9 +32,7 @@ defmodule OPQTest do OPQ.enqueue(opq, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) - - assert :queue.len(queue) == 0 + assert_empty_queue(opq) end) end @@ -51,9 +45,8 @@ defmodule OPQTest do OPQ.enqueue(opq, fn -> Agent.update(Bucket, &[:b | &1]) end) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) + assert_empty_queue(opq) - assert :queue.len(queue) == 0 assert Kernel.length(Agent.get(Bucket, & &1)) == 2 end) end @@ -67,9 +60,8 @@ defmodule OPQTest do OPQ.enqueue(opq, Agent, :update, [MfaBucket, &[:b | &1]]) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) + assert_empty_queue(opq) - assert :queue.len(queue) == 0 assert Kernel.length(Agent.get(MfaBucket, & &1)) == 2 end) end @@ -81,9 +73,7 @@ defmodule OPQTest do OPQ.enqueue(:items, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(:items) - - assert :queue.len(queue) == 0 + assert_empty_queue(:items) end) end @@ -94,9 +84,7 @@ defmodule OPQTest do OPQ.enqueue(opq, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) - - assert :queue.len(queue) == 0 + assert_empty_queue(opq) end) end @@ -107,9 +95,7 @@ defmodule OPQTest do OPQ.enqueue(opq, :b) wait(fn -> - {_status, queue, _demand} = OPQ.info(opq) - - assert :queue.len(queue) == 0 + assert_empty_queue(opq) end) end @@ -195,4 +181,10 @@ defmodule OPQTest do assert Kernel.length(Agent.get(PauseBucket, & &1)) == 3 end) end + + defp assert_empty_queue(queue_name) do + assert queue_name + |> OPQ.queue() + |> Enum.empty?() + end end