diff --git a/lib/eredis_sub.ex b/lib/eredis_sub.ex index 463ae99..9620063 100644 --- a/lib/eredis_sub.ex +++ b/lib/eredis_sub.ex @@ -25,7 +25,7 @@ defmodule EredisSub do Subscribe to a channel: metadata_example = %{subscribed_at: DateTime.utc_now()} - EredisSub.Server.subscribe("my_channel", {MyModule, :handle, metadata_example}) + EredisSub.Server.subscribe("my_channel", MyModule, metadata_example) ### Add the following to your supervision tree: @@ -42,11 +42,13 @@ defmodule EredisSub do @doc """ Publish a message to a channel. + If successfull, returns the number of subscribers that received the message. + It should never error, unless there is a connection problem. ## Examples iex> EredisSub.publish("my_channel", "Hello, world!") - :ok + {:ok, 0} """ def publish(channel, message) do Server.publish(channel, message) @@ -66,11 +68,11 @@ defmodule EredisSub do ...> end ...> end ...> - ...> EredisSub.subscribe(channel, {FooBar, :handle, metadata}) + ...> EredisSub.subscribe(channel, FooBar, metadata) :ok """ - def subscribe(channel, handler) do - Server.subscribe(channel, handler) + def subscribe(channel, handler_module, metadata) do + Server.subscribe(channel, handler_module, metadata) end @doc """ diff --git a/lib/eredis_sub/handler.ex b/lib/eredis_sub/handler.ex index ce2b3e0..f9c153e 100644 --- a/lib/eredis_sub/handler.ex +++ b/lib/eredis_sub/handler.ex @@ -8,5 +8,5 @@ defmodule EredisSub.Handler do Receives a binary message and a metadata that can be customized upon subscription. Return value is ignored. """ - @callback handle(binary, term) :: term + @callback handle(message :: binary, metadata :: term) :: term end diff --git a/lib/eredis_sub/server.ex b/lib/eredis_sub/server.ex index eda0e8b..1cbbef2 100644 --- a/lib/eredis_sub/server.ex +++ b/lib/eredis_sub/server.ex @@ -1,7 +1,11 @@ defmodule EredisSub.Server do @moduledoc """ - Holds internal state for `EredisSub`. - Do not use this module directly. Instead, use `EredisSub`. + Holds internal state for `EredisSub`, which are: a connection to publish, + a connection to subscribe and a map of subscriptions. + + Also, it converts Elixir strings to Erlang charlists used by `eredis`. + + **Do not use this module directly. Instead, use `EredisSub`.** """ use GenServer @@ -17,8 +21,8 @@ defmodule EredisSub.Server do GenServer.call(name, {:publish, channel, message}) end - def subscribe(channel, {mod, fun, metadata}, name \\ __MODULE__) do - GenServer.call(name, {:subscribe, channel, {mod, fun, metadata}}) + def subscribe(channel, handler_module, metadata \\ %{}, name \\ __MODULE__) do + GenServer.call(name, {:subscribe, channel, handler_module, metadata}) end def unsubscribe_all(channel, name \\ __MODULE__) do @@ -44,19 +48,23 @@ defmodule EredisSub.Server do end def handle_call({:publish, channel, message}, _from, state) do - args = for s <- ["PUBLISH", channel, message], do: String.to_charlist(s) - {:ok, _count_subs} = :eredis.q(state.pub_conn, args) - {:reply, :ok, state} + command = Enum.map(["PUBLISH", channel, message], &String.to_charlist/1) + + response = + with {:ok, number_of_subscribers} <- :eredis.q(state.pub_conn, command) do + {:ok, String.to_integer(number_of_subscribers)} + end + + {:reply, response, state} end - # Simplify this for brevity, keep the same functionality - def handle_call({:subscribe, channel, mfa}, _from, state) do + def handle_call({:subscribe, channel, handler_module, metadata}, _from, state) do response = :eredis_sub.subscribe(state.sub_conn, [String.to_charlist(channel)]) subscriptions = if response == :ok do - Map.update(state.subscriptions, channel, [mfa], fn subscriptions -> - [mfa | subscriptions] + Map.update(state.subscriptions, channel, [{handler_module, metadata}], fn subscriptions -> + [{handler_module, metadata} | subscriptions] end) else state.subscriptions @@ -79,10 +87,11 @@ defmodule EredisSub.Server do end def handle_info({:message, channel, msg, _client_pid}, state) do + channel = to_string(channel) subscriptions = Map.get(state.subscriptions, channel, []) - Enum.each(subscriptions, fn {mod, fun, metadata} -> - apply_no_link(channel, {mod, fun, [to_string(msg), metadata]}) + Enum.each(subscriptions, fn {handler_module, metadata} -> + apply_no_link(channel, handler_module, to_string(msg), metadata) end) :eredis_sub.ack_message(state.sub_conn) @@ -104,14 +113,16 @@ defmodule EredisSub.Server do {:noreply, state} end - defp apply_no_link(channel, {mod, fun, args}) do + defp apply_no_link(channel, handler_module, message, metadata) do Task.start(fn -> try do - apply(mod, fun, args) + handler_module.handle(message, metadata) rescue e -> - msg = "Error from channel #{channel} on #{inspect(mod)}:#{inspect(fun)}: #{inspect(e)}." - Logger.error(msg) + error = + "Error from channel #{channel} handler #{inspect(handler_module)}: #{inspect(e)}." + + Logger.error(error) end end) end diff --git a/test/eredis_sub_test.exs b/test/eredis_sub_test.exs index ed5e889..0b504ab 100644 --- a/test/eredis_sub_test.exs +++ b/test/eredis_sub_test.exs @@ -23,13 +23,13 @@ defmodule EredisSubTest do end end - test "subscribes and publishes" do + test "subscribes, publishes and receives a message" do channel = "my super secret channel" metadata = %{test_pid: self()} - EredisSub.subscribe(channel, {PingPong, :handle, metadata}) - EredisSub.publish(channel, "ping") + EredisSub.subscribe(channel, PingPong, metadata) + assert EredisSub.publish(channel, "ping") == {:ok, 1} assert_receive :pong end end