Skip to content

Commit

Permalink
Support supervision tree
Browse files Browse the repository at this point in the history
  • Loading branch information
fredwu committed Oct 12, 2021
1 parent b108123 commit d63a719
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 19 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

## v3.3.0 [2021-10-12]

- [Added] The ability to start as part of a supervision tree

## v3.2.0 [2021-10-11]

- [Improved] Updated to the new `ConsumerSupervisor` syntax
Expand Down
44 changes: 27 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@ end

## Usage

A simple example:
### A simple example:

```elixir
{:ok, opq} = OPQ.init
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
```

Specify module, function and arguments:
### Specify module, function and arguments:

```elixir
{:ok, opq} = OPQ.init
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, IO, :inspect, ["hello"])
OPQ.enqueue(opq, IO, :inspect, ["world"])
```

Specify a custom name for the queue:
### Specify a custom name for the queue:

```elixir
OPQ.init(name: :items)
Expand All @@ -55,14 +55,24 @@ OPQ.enqueue(:items, fn -> IO.inspect("hello") end)
OPQ.enqueue(:items, fn -> IO.inspect("world") end)
```

Specify a custom worker to process items in the queue:
### Start as part of a supervision tree:

Note, when starting as part of a supervision tree, the `:name` option must be provided.

```elixir
children = [
{OPQ, name: :items}
]
```

### Specify a custom worker to process items in the queue:

```elixir
defmodule CustomWorker do
def start_link(item) do
Task.start_link fn ->
Task.start_link(fn ->
Agent.update(:bucket, &[item | &1])
end
end)
end
end

Expand All @@ -76,23 +86,23 @@ OPQ.enqueue(opq, "world")
Agent.get(:bucket, & &1) # => ["world", "hello"]
```

Rate limit:
### Rate limit:

```elixir
{:ok, opq} = OPQ.init(workers: 1, interval: 1000)

Task.async fn ->
Task.async(fn ->
OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
end
end)
```

If no interval is supplied, the ratelimiter will be bypassed.

Check the queue and number of available workers:
### Check the queue and number of available workers:

```elixir
{:ok, opq} = OPQ.init
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> Process.sleep(1000) end)

Expand All @@ -103,20 +113,20 @@ Process.sleep(1200)
{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 10}
```

Stop the queue:
### Stop the queue:

```elixir
{:ok, opq} = OPQ.init
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.stop(opq)
OPQ.enqueue(opq, fn -> IO.inspect("world") end) # => (EXIT) no process...
```

Pause and resume the queue:
### Pause and resume the queue:

```elixir
{:ok, opq} = OPQ.init
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end) # => "hello"
OPQ.pause(opq)
Expand Down
9 changes: 9 additions & 0 deletions lib/opq.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ defmodule OPQ do
alias OPQ.{Options, Feeder, RateLimiter, WorkerSupervisor}
alias OPQ.OptionsHandler, as: Opt

def child_spec(opts \\ []) do
%{
id: opts[:name],
start: {OPQ, :start_link, [opts]}
}
end

def start_link(opts \\ []), do: init(opts)

def init(opts \\ []) do
opts
|> Options.assign_defaults()
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule OPQ.Mixfile do
def project do
[
app: :opq,
version: "3.2.0",
version: "3.3.0",
elixir: "~> 1.5",
elixirc_paths: elixirc_paths(Mix.env()),
package: package(),
Expand Down
28 changes: 27 additions & 1 deletion test/lib/opq_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,33 @@ defmodule OPQTest do

doctest OPQ

test "enqueue items" do
test "enqueue items - child_spec/1" do
Supervisor.start_link([{OPQ, name: :opq}], strategy: :one_for_one)

OPQ.enqueue(:opq, :a)
OPQ.enqueue(:opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(:opq)

assert :queue.len(queue) == 0
end)
end

test "enqueue items - start_link/1" do
{:ok, opq} = OPQ.start_link()

OPQ.enqueue(opq, :a)
OPQ.enqueue(opq, :b)

wait(fn ->
{_status, queue, _demand} = OPQ.info(opq)

assert :queue.len(queue) == 0
end)
end

test "enqueue items - init/1" do
{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, :a)
Expand Down

0 comments on commit d63a719

Please sign in to comment.