Skip to content

Commit

Permalink
OPQ.Queue wraps :queue and implements Enumerable
Browse files Browse the repository at this point in the history
  • Loading branch information
fredwu committed Oct 14, 2021
1 parent 2086059 commit b408883
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 34 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- [Added] `OPQ.Queue` wraps `:queue` and implements `Enumerable`

## v3.3.0 [2021-10-12]

- [Added] The ability to start as part of a supervision tree
Expand Down
23 changes: 21 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,30 @@ If no interval is supplied, the ratelimiter will be bypassed.

OPQ.enqueue(opq, fn -> Process.sleep(1000) end)

{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 9}
{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 9}

Process.sleep(1200)

{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 10}
{status, queue, available_workers} = OPQ.info(opq) # => {:normal, #OPQ.Queue<[]>, 10}
```

If you just need to get the queue itself:

```elixir
OPQ.queue(opq) # => #OPQ.Queue<[]>
```

### Queue

OPQ implements `Enumerable`, so you can perform enumerable functions on the queue:

```elixir
{:ok, opq} = OPQ.init()

queue = OPQ.queue(opq)

Enum.count(queue) # => 0
Enum.empty?(queue) # => true
```

### Stop the queue:
Expand Down
1 change: 1 addition & 0 deletions lib/opq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ defmodule OPQ do
def pause(feeder), do: GenStage.cast(feeder, :pause)
def resume(feeder), do: GenStage.cast(feeder, :resume)
def info(feeder), do: GenStage.call(feeder, :info, Opt.timeout(feeder))
def queue(feeder), do: GenStage.call(feeder, :queue, Opt.timeout(feeder))

defp start_links(opts) do
{:ok, feeder} = Feeder.start_link(opts[:name])
Expand Down
24 changes: 14 additions & 10 deletions lib/opq/feeder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule OPQ.Feeder do
def start_link(name), do: GenStage.start_link(__MODULE__, :ok, name: name)

def init(:ok) do
{:producer, {:normal, :queue.new(), 0}}
{:producer, {:normal, %OPQ.Queue{}, 0}}
end

def handle_cast(:stop, state) do
Expand All @@ -24,16 +24,20 @@ defmodule OPQ.Feeder do
dispatch_events(:normal, queue, demand, [])
end

def handle_cast({:enqueue, event}, {status, queue, pending_demand}) do
queue = :queue.in(event, queue)
def handle_cast({:enqueue, event}, {status, %OPQ.Queue{data: data} = queue, pending_demand}) do
data = :queue.in(event, data)

dispatch_or_pause(status, queue, pending_demand)
dispatch_or_pause(status, %OPQ.Queue{data: data}, pending_demand)
end

def handle_call(:info, _from, state) do
{:reply, state, [], state}
end

def handle_call(:queue, _from, {_status, queue, _demand} = state) do
{:reply, queue, [], state}
end

defp dispatch_or_pause(:normal, queue, demand) do
dispatch_events(:normal, queue, demand, [])
end
Expand All @@ -54,13 +58,13 @@ defmodule OPQ.Feeder do
{:noreply, Enum.reverse(events), {status, queue, 0}}
end

defp dispatch_events(status, queue, demand, events) do
case :queue.out(queue) do
{{:value, event}, queue} ->
dispatch_events(status, queue, demand - 1, [event | events])
defp dispatch_events(status, %OPQ.Queue{data: data}, demand, events) do
case :queue.out(data) do
{{:value, event}, data} ->
dispatch_events(status, %OPQ.Queue{data: data}, demand - 1, [event | events])

{:empty, queue} ->
{:noreply, Enum.reverse(events), {status, queue, demand}}
{:empty, data} ->
{:noreply, Enum.reverse(events), {status, %OPQ.Queue{data: data}, demand}}
end
end
end
9 changes: 9 additions & 0 deletions lib/opq/queue.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defmodule OPQ.Queue do
@moduledoc """
A `:queue` wrapper so that protocols like `Enumerable` can be implemented.
"""

@opaque t() :: %__MODULE__{data: :queue.queue()}

defstruct data: :queue.new()
end
17 changes: 17 additions & 0 deletions lib/opq/queue/enumerable.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defimpl Enumerable, for: OPQ.Queue do
@moduledoc """
Implementation based on https://github.com/princemaple/elixir-queue
"""

def count(%OPQ.Queue{data: q}), do: {:ok, :queue.len(q)}

def member?(%OPQ.Queue{data: q}, item) do
{:ok, :queue.member(item, q)}
end

def reduce(%OPQ.Queue{data: q}, acc, fun) do
Enumerable.List.reduce(:queue.to_list(q), acc, fun)
end

def slice(%OPQ.Queue{}), do: {:error, __MODULE__}
end
11 changes: 11 additions & 0 deletions lib/opq/queue/inspect.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defimpl Inspect, for: OPQ.Queue do
@moduledoc """
Implementation based on https://github.com/princemaple/elixir-queue
"""

import Inspect.Algebra

def inspect(%OPQ.Queue{} = q, opts) do
concat(["#OPQ.Queue<", to_doc(Enum.to_list(q), opts), ">"])
end
end
36 changes: 14 additions & 22 deletions test/lib/opq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ defmodule OPQTest do
OPQ.enqueue(:opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(:opq)

assert :queue.len(queue) == 0
assert_empty_queue(:opq)
end)
end

Expand All @@ -23,9 +21,7 @@ defmodule OPQTest do
OPQ.enqueue(opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)

assert :queue.len(queue) == 0
assert_empty_queue(opq)
end)
end

Expand All @@ -36,9 +32,7 @@ defmodule OPQTest do
OPQ.enqueue(opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)

assert :queue.len(queue) == 0
assert_empty_queue(opq)
end)
end

Expand All @@ -51,9 +45,8 @@ defmodule OPQTest do
OPQ.enqueue(opq, fn -> Agent.update(Bucket, &[:b | &1]) end)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)
assert_empty_queue(opq)

assert :queue.len(queue) == 0
assert Kernel.length(Agent.get(Bucket, & &1)) == 2
end)
end
Expand All @@ -67,9 +60,8 @@ defmodule OPQTest do
OPQ.enqueue(opq, Agent, :update, [MfaBucket, &[:b | &1]])

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)
assert_empty_queue(opq)

assert :queue.len(queue) == 0
assert Kernel.length(Agent.get(MfaBucket, & &1)) == 2
end)
end
Expand All @@ -81,9 +73,7 @@ defmodule OPQTest do
OPQ.enqueue(:items, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(:items)

assert :queue.len(queue) == 0
assert_empty_queue(:items)
end)
end

Expand All @@ -94,9 +84,7 @@ defmodule OPQTest do
OPQ.enqueue(opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)

assert :queue.len(queue) == 0
assert_empty_queue(opq)
end)
end

Expand All @@ -107,9 +95,7 @@ defmodule OPQTest do
OPQ.enqueue(opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)

assert :queue.len(queue) == 0
assert_empty_queue(opq)
end)
end

Expand Down Expand Up @@ -195,4 +181,10 @@ defmodule OPQTest do
assert Kernel.length(Agent.get(PauseBucket, & &1)) == 3
end)
end

defp assert_empty_queue(queue_name) do
assert queue_name
|> OPQ.queue()
|> Enum.empty?()
end
end

0 comments on commit b408883

Please sign in to comment.