diff --git a/lib/poxa.ex b/lib/poxa.ex index 8d272ed..9f8b6c2 100644 --- a/lib/poxa.ex +++ b/lib/poxa.ex @@ -9,7 +9,7 @@ defmodule Poxa do def start(_type, _args) do dispatch = :cowboy_router.compile([ {:_, [ { '/ping', Poxa.PingHandler, [] }, - { '/console', Poxa.Console.WSHandler, [] }, + { '/apps/:app_id/console', Poxa.Console.WSHandler, [] }, { '/', :cowboy_static, {:priv_file, :poxa, 'index.html'} }, { '/static/[...]', :cowboy_static, {:priv_dir, :poxa, 'static'} }, { '/apps/:app_id/events', Poxa.EventHandler, [] }, diff --git a/lib/poxa/app.ex b/lib/poxa/app.ex new file mode 100644 index 0000000..4513233 --- /dev/null +++ b/lib/poxa/app.ex @@ -0,0 +1,89 @@ +defmodule Poxa.App do + use GenServer + + @doc """ + Given the app_key it returns the related app_id + """ + def id(key) do + case :ets.match(Poxa.App, {:'$1', key, :'_'}) do + [[id]] -> {:ok, id} + _ -> {:error, {:key_not_found, key}} + end + end + + @doc """ + Given the app_id it returns the related app_key + """ + @spec key(binary) :: {:ok, binary} | {:error, term} + def key(id) do + case :ets.lookup(Poxa.App, id) do + [{^id, key, _}] -> {:ok, key} + _ -> {:error, {:id_not_found, id}} + end + end + + @doc """ + Given the app_key it returns the related app_secret + """ + def secret(key) do + case :ets.match(Poxa.App, {:'_', key, :'$1'}) do + [[secret]] -> {:ok, secret} + _ -> {:error, {:key_not_found, key}} + end + end + + @doc """ + Given the app_id it returns the related app_key and secret + """ + def key_secret(id) do + case :ets.lookup(Poxa.App, id) do + [{^id, key, secret}] -> {:ok, {key, secret}} + _ -> {:error, {:id_not_found, id}} + end + end + + @doc """ + Reload table content using Application environment + """ + @spec reload :: :ok + def reload do + GenServer.call(Poxa.App, :reload) + end + + @doc false + def start_link(args \\ []) do + GenServer.start_link(__MODULE__, args, [name: Poxa.App]) + end + + @doc """ + Create ETS table to fetch app_id, app_key and secret + """ + def init(_) do + ets_options = [:ordered_set, :named_table, :protected, { :read_concurrency, true }] + :ets.new(Poxa.App, ets_options) + populate_table + { :ok, :ok } + end + + def handle_call(:reload, _from, _) do + populate_table + {:reply, :ok, :ok} + end + + defp populate_table do + apps = Application.get_env(:poxa, :apps, []) + app = fetch_standalone_app + for {app_id, app_key, secret} <- [app | apps] do + :ets.insert(Poxa.App, {app_id, app_key, secret}) + end + end + + defp fetch_standalone_app do + app_id = Application.get_env(:poxa, :app_id) + app_key = Application.get_env(:poxa, :app_key) + app_secret = Application.get_env(:poxa, :app_secret) + if app_id && app_key && app_secret do + {app_id, app_key, app_secret} + end + end +end diff --git a/lib/poxa/auth_signature.ex b/lib/poxa/auth_signature.ex index 74c98cf..e100e82 100644 --- a/lib/poxa/auth_signature.ex +++ b/lib/poxa/auth_signature.ex @@ -4,25 +4,21 @@ defmodule Poxa.AuthSignature do http://pusher.com/docs/auth_signatures """ - require Logger - alias Poxa.CryptoHelper - @doc """ Validate auth signature as described at: http://pusher.com/docs/auth_signatures . Returns true or false """ - @spec valid?(binary, binary) :: boolean - def valid?(to_sign, auth) do - case String.split(auth, ":", parts: 2) do - [app_key, remote_signed_data] -> - signed_data = sign_data(to_sign) - Poxa.Authentication.check_key(app_key) and signed_data == remote_signed_data - _ -> false + @spec valid?(binary, binary, binary) :: boolean + def valid?(app_id, to_sign, auth) do + case Poxa.App.key_secret(app_id) do + {:ok, {app_key, app_secret}} -> + case String.split(auth, ":", parts: 2) do + [^app_key, remote_signed_data] -> + signed_data = Poxa.CryptoHelper.hmac256_to_string(app_secret, to_sign) + signed_data == remote_signed_data + _ -> false + end + {:error, _} -> false end end - - defp sign_data(data) do - {:ok, app_secret} = :application.get_env(:poxa, :app_secret) - CryptoHelper.hmac256_to_string(app_secret, data) - end end diff --git a/lib/poxa/authentication.ex b/lib/poxa/authentication.ex index f2858f4..4561233 100644 --- a/lib/poxa/authentication.ex +++ b/lib/poxa/authentication.ex @@ -7,23 +7,15 @@ defmodule Poxa.Authentication do Returns true if authentication process is validated correctly, false otherwise More info at: http://pusher.com/docs/rest_api#authentication """ - @spec check(binary, binary, binary, [{binary, binary}]) :: boolean - def check(method, path, body, qs_vals) do + @spec check(binary, binary, binary, binary, [{binary, binary}]) :: boolean + def check(app_id, method, path, body, qs_vals) do qs_vals = Enum.into(qs_vals, %{}) body_md5 = qs_vals["body_md5"] - {:ok, app_key} = Application.fetch_env(:poxa, :app_key) - {:ok, secret} = Application.fetch_env(:poxa, :app_secret) - - valid_body?(body, body_md5) and Signaturex.validate(app_key, secret, method, path, qs_vals, 600) - end - - @doc """ - Returns true if the `app_key` is the same as `auth_key`, false otherwise - """ - @spec check_key(binary) :: boolean - def check_key(auth_key) do - {:ok, app_key} = Application.fetch_env(:poxa, :app_key) - auth_key == app_key + case Poxa.App.key_secret(app_id) do + {:ok, {app_key, secret}} -> + valid_body?(body, body_md5) and Signaturex.validate(app_key, secret, method, path, qs_vals, 600) + {:error, _} -> false + end end defp valid_body?("", nil), do: true diff --git a/lib/poxa/authorization_helper.ex b/lib/poxa/authorization_helper.ex index d135fc3..0b98d6b 100644 --- a/lib/poxa/authorization_helper.ex +++ b/lib/poxa/authorization_helper.ex @@ -1,13 +1,19 @@ defmodule Poxa.AuthorizationHelper do - alias Poxa.Authentication + @moduledoc """ + Helper to keep the cowboy rest is_authorized dry for Pusher authentication + """ + @doc """ + A helper to share authentication process between handlers. It uses the cowboy `req` to retrieve the necessary data. + """ @spec is_authorized(:cowboy_req.req, any) :: {true, :cowboy_req.req, any} | {{false, binary}, :cowboy_req.req, nil} def is_authorized(req, state) do {:ok, body, req} = :cowboy_req.body(req) {method, req} = :cowboy_req.method(req) {qs_vals, req} = :cowboy_req.qs_vals(req) {path, req} = :cowboy_req.path(req) - if Authentication.check(method, path, body, qs_vals) do + {app_id, req} = :cowboy_req.binding(:app_id, req) + if Poxa.Authentication.check(app_id, method, path, body, qs_vals) do {true, req, state} else {{false, "authentication failed"}, req, nil} diff --git a/lib/poxa/channel.ex b/lib/poxa/channel.ex index 46cd807..89083be 100644 --- a/lib/poxa/channel.ex +++ b/lib/poxa/channel.ex @@ -86,36 +86,36 @@ defmodule Poxa.Channel do @doc """ Returns true if the channel has at least 1 subscription """ - @spec occupied?(binary) :: boolean - def occupied?(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :_} + @spec occupied?(binary, binary) :: boolean + def occupied?(channel, app_id) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, :_} :gproc.select_count([{match, [], [true]}]) != 0 end @doc """ Returns the list of channels the `pid` is subscribed """ - @spec all(pid | :_) :: [binary] - def all(pid \\ :_) do - match = {{:p, :l, {:pusher, :'$1'}}, pid, :_} + @spec all(binary, pid | :_) :: [binary] + def all(app_id, pid \\ :_) do + match = {{:p, :l, {:pusher, app_id, :'$1'}}, pid, :_} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq end @doc """ Returns true if `pid` is subscribed to `channel` and false otherwise """ - @spec subscribed?(binary, pid) :: boolean - def subscribed?(channel, pid) do - match = {{:p, :l, {:pusher, channel}}, pid, :_} + @spec subscribed?(binary, binary, pid) :: boolean + def subscribed?(channel, app_id, pid) do + match = {{:p, :l, {:pusher, app_id, channel}}, pid, :_} :gproc.select_count([{match, [], [true]}]) != 0 end @doc """ Returns how many connections are opened on the `channel` """ - @spec subscription_count(binary) :: non_neg_integer - def subscription_count(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :_} + @spec subscription_count(binary, binary) :: non_neg_integer + def subscription_count(channel, app_id) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, :_} :gproc.select_count([{match, [], [true]}]) end end diff --git a/lib/poxa/channels_handler.ex b/lib/poxa/channels_handler.ex index 2212a2d..8f8519c 100644 --- a/lib/poxa/channels_handler.ex +++ b/lib/poxa/channels_handler.ex @@ -32,11 +32,12 @@ defmodule Poxa.ChannelsHandler do attributes = String.split(info, ",") |> Enum.reject(&(&1 == "")) {channel, req} = :cowboy_req.binding(:channel_name, req, nil) + {app_id, req} = :cowboy_req.binding(:app_id, req) if channel do - {malformed_request_one_channel?(attributes, channel), req, {:one, channel, attributes}} + {malformed_request_one_channel?(attributes, channel), req, {:one, app_id, channel, attributes}} else {filter, req} = :cowboy_req.qs_val("filter_by_prefix", req, nil) - {malformed_request_all_channels?(attributes, filter), req, {:all, filter, attributes}} + {malformed_request_all_channels?(attributes, filter), req, {:all, app_id, filter, attributes}} end end @@ -71,42 +72,42 @@ defmodule Poxa.ChannelsHandler do end @doc false - def get_json(req, {:one, channel, attributes}) do - show(channel, attributes, req, nil) + def get_json(req, {:one, app_id, channel, attributes}) do + show(app_id, channel, attributes, req, nil) end - def get_json(req, {:all, filter, attributes}) do - index(filter, attributes, req, nil) + def get_json(req, {:all, app_id, filter, attributes}) do + index(app_id, filter, attributes, req, nil) end - defp show(channel, attributes, req, state) do - occupied = Channel.occupied?(channel) - attribute_list = mount_attribute_list(attributes, channel) + defp show(app_id, channel, attributes, req, state) do + occupied = Channel.occupied?(channel, app_id) + attribute_list = mount_attribute_list(attributes, app_id, channel) {JSX.encode!([occupied: occupied] ++ attribute_list), req, state} end - defp mount_attribute_list(attributes, channel) do + defp mount_attribute_list(attributes, app_id, channel) do attribute_list = if Enum.member?(attributes, "subscription_count") do - [subscription_count: Channel.subscription_count(channel)] + [subscription_count: Channel.subscription_count(app_id, channel)] else [] end attribute_list ++ if Enum.member?(attributes, "user_count") do - [user_count: PresenceChannel.user_count(channel)] + [user_count: PresenceChannel.user_count(app_id, channel)] else [] end end - defp index(filter, attributes, req, state) do - channels = channels(filter, attributes) + defp index(app_id, filter, attributes, req, state) do + channels = channels(app_id, filter, attributes) {JSX.encode!(channels: channels), req, state} end - defp channels(filter, attributes) do - for channel <- Channel.all, filter_channel(channel, filter) do - {channel, mount_attribute_list(attributes, channel)} + defp channels(app_id, filter, attributes) do + for channel <- Channel.all(app_id), filter_channel(channel, filter) do + {channel, mount_attribute_list(attributes, app_id, channel)} end end diff --git a/lib/poxa/console/console.ex b/lib/poxa/console/console.ex index 91ad14c..f73068d 100644 --- a/lib/poxa/console/console.ex +++ b/lib/poxa/console/console.ex @@ -3,39 +3,41 @@ defmodule Poxa.Console do use GenEvent @doc false - def init(pid), do: {:ok, pid} + def init([pid, app_id]), do: {:ok, {pid, app_id}} @doc false - def handle_event(%{event: :connected, socket_id: socket_id, origin: origin}, pid) do + def handle_event(%{event: :connected, app_id: app_id, socket_id: socket_id, origin: origin}, {pid, app_id}) do send_message("Connection", socket_id, "Origin: #{origin}", pid) - {:ok, pid} + {:ok, {pid, app_id}} end - def handle_event(%{event: :disconnected, socket_id: socket_id, channels: channels, duration: duration}, pid) do + def handle_event(%{event: :disconnected, app_id: app_id, socket_id: socket_id, channels: channels, duration: duration}, {pid, app_id}) do send_message("Disconnection", socket_id, "Channels: #{inspect channels}, Lifetime: #{duration}s", pid) - {:ok, pid} + {:ok, {pid, app_id}} end - def handle_event(%{event: :subscribed, socket_id: socket_id, channel: channel}, pid) do + def handle_event(%{event: :subscribed, app_id: app_id, socket_id: socket_id, channel: channel}, {pid, app_id}) do send_message("Subscribed", socket_id, "Channel: #{channel}", pid) - {:ok, pid} + {:ok, {pid, app_id}} end - def handle_event(%{event: :unsubscribed, socket_id: socket_id, channel: channel}, pid) do + def handle_event(%{event: :unsubscribed, app_id: app_id, socket_id: socket_id, channel: channel}, {pid, app_id}) do send_message("Unsubscribed", socket_id, "Channel: #{channel}", pid) - {:ok, pid} + {:ok, {pid, app_id}} end - def handle_event(%{event: :api_message, channels: channels, name: name}, pid) do + def handle_event(%{event: :api_message, app_id: app_id, channels: channels, name: name}, {pid, app_id}) do send_message("API Message", "", "Channels: #{inspect channels}, Event: #{name}", pid) - {:ok, pid} + {:ok, {pid, app_id}} end - def handle_event(%{event: :client_event_message, socket_id: socket_id, channels: channels, name: name}, pid) do + def handle_event(%{event: :client_event_message, app_id: app_id, socket_id: socket_id, channels: channels, name: name}, {pid, app_id}) do send_message("Client Event Message", socket_id, "Channels: #{inspect channels}, Event: #{name}", pid) - {:ok, pid} + {:ok, {pid, app_id}} end + def handle_event(_data, state), do: {:ok, state} + defp send_message(type, socket_id, details, pid) do msg = message(type, socket_id, details) |> encode! send(pid, msg) diff --git a/lib/poxa/console/ws_handler.ex b/lib/poxa/console/ws_handler.ex index fbab025..b82b7d0 100644 --- a/lib/poxa/console/ws_handler.ex +++ b/lib/poxa/console/ws_handler.ex @@ -11,12 +11,13 @@ defmodule Poxa.Console.WSHandler do {method, req} = :cowboy_req.method(req) {path, req} = :cowboy_req.path(req) {qs_vals, req} = :cowboy_req.qs_vals(req) + {app_id, req} = :cowboy_req.binding(:app_id, req) - if Authentication.check(method, path, "", qs_vals) do - :ok = Poxa.Event.add_handler({Poxa.Console, self}, self) + if Authentication.check(app_id, method, path, "", qs_vals) do + :ok = Poxa.Event.add_handler({Poxa.Console, self}, [self, app_id]) {:ok, req, nil} else - Logger.error "Failed to authenticate on Console Websocket" + Logger.error "Failed to authenticate on Console Websocket for app with id: #{app_id}" {:shutdown, req} end end diff --git a/lib/poxa/event.ex b/lib/poxa/event.ex index 87f247c..373e7b0 100644 --- a/lib/poxa/event.ex +++ b/lib/poxa/event.ex @@ -1,5 +1,8 @@ defmodule Poxa.Event do - def notify(event, data), do: :gen_event.sync_notify(Poxa.Event, Map.put(data, :event, event)) + def notify(event, app_id, data) do + poxa_event = data |> Map.put(:event, event) |> Map.put(:app_id, app_id) + :gen_event.sync_notify(Poxa.Event, poxa_event) + end def add_handler(handler, pid) do GenEvent.add_mon_handler(Poxa.Event, handler, pid) diff --git a/lib/poxa/event_handler.ex b/lib/poxa/event_handler.ex index db4f83a..7777f40 100644 --- a/lib/poxa/event_handler.ex +++ b/lib/poxa/event_handler.ex @@ -26,7 +26,8 @@ defmodule Poxa.EventHandler do {:ok, body, req} = :cowboy_req.body(req) case JSX.decode(body) do {:ok, data} -> - case PusherEvent.build(data) do + {app_id, req} = :cowboy_req.binding(:app_id, req) + case PusherEvent.build(app_id, data) do {:ok, event} -> {false, req, %{body: body, event: event}} _ -> req = :cowboy_req.set_resp_body(@invalid_event_json, req) @@ -47,7 +48,8 @@ defmodule Poxa.EventHandler do {qs_vals, req} = :cowboy_req.qs_vals(req) {method, req} = :cowboy_req.method(req) {path, req} = :cowboy_req.path(req) - authorized = Authentication.check(method, path, body, qs_vals) + {app_id, req} = :cowboy_req.binding(:app_id, req) + authorized = Authentication.check(app_id, method, path, body, qs_vals) unless authorized do req = :cowboy_req.set_resp_body(@authentication_error_json, req) end @@ -80,7 +82,7 @@ defmodule Poxa.EventHandler do @doc false def post(req, %{event: event}) do PusherEvent.publish(event) - Event.notify(:api_message, %{channels: event.channels, name: event.name}) + Event.notify(:api_message, event.app_id, %{channels: event.channels, name: event.name}) req = :cowboy_req.set_resp_body("{}", req) {true, req, nil} end diff --git a/lib/poxa/presence_channel.ex b/lib/poxa/presence_channel.ex index aaef41f..a99310d 100644 --- a/lib/poxa/presence_channel.ex +++ b/lib/poxa/presence_channel.ex @@ -8,9 +8,9 @@ defmodule Poxa.PresenceChannel do More info at: http://pusher.com/docs/rest_api#method-get-users """ - @spec users(binary) :: [binary | integer] - def users(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :'$1'} + @spec users(binary, binary) :: [binary | integer] + def users(channel, app_id) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, :'$1'} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq(fn {user_id, _} -> user_id end) |> Enum.map(fn {user_id, _} -> user_id end) @@ -19,9 +19,9 @@ defmodule Poxa.PresenceChannel do @doc """ Returns the number of unique users on a presence channel """ - @spec user_count(binary) :: non_neg_integer - def user_count(channel) do - match = {{:p, :l, {:pusher, channel}}, :_, :'$1'} + @spec user_count(binary, binary) :: non_neg_integer + def user_count(channel, app_id) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, :'$1'} :gproc.select([{match, [], [:'$1']}]) |> Enum.uniq(fn {user_id, _} -> user_id end) |> Enum.count diff --git a/lib/poxa/presence_subscription.ex b/lib/poxa/presence_subscription.ex index 1d9ef2b..2099708 100644 --- a/lib/poxa/presence_subscription.ex +++ b/lib/poxa/presence_subscription.ex @@ -15,31 +15,31 @@ defmodule Poxa.PresenceSubscription do @type t :: %__MODULE__{channel: binary, channel_data: [{user_id, user_info}]} alias Poxa.PusherEvent - import Poxa.Channel, only: [presence?: 1, subscribed?: 2] + import Poxa.Channel, only: [presence?: 1, subscribed?: 3] require Logger @doc """ Returns a PresenceSubscription struct """ - @spec subscribe!(binary, :jsx.json_term) :: __MODULE__.t - def subscribe!(channel, channel_data) do + @spec subscribe!(binary, binary, :jsx.json_term) :: __MODULE__.t + def subscribe!(app_id, channel, channel_data) do decoded_channel_data = JSX.decode!(channel_data) - if subscribed?(channel, self) do + if subscribed?(app_id, channel, self) do Logger.info "Already subscribed #{inspect self} on channel #{channel}" else Logger.info "Registering #{inspect self} to channel #{channel}" {user_id, user_info} = extract_userid_and_userinfo(decoded_channel_data) - unless user_id_already_on_presence_channel(user_id, channel) do + unless user_id_already_on_presence_channel(app_id, user_id, channel) do message = PusherEvent.presence_member_added(channel, user_id, user_info) - :gproc.send({:p, :l, {:pusher, channel}}, {self, message}) + :gproc.send({:p, :l, {:pusher, app_id, channel}}, {self, message}) end - :gproc.reg({:p, :l, {:pusher, channel}}, {user_id, user_info}) + :gproc.reg({:p, :l, {:pusher, app_id, channel}}, {user_id, user_info}) end - %__MODULE__{channel: channel, channel_data: channel_data(channel)} + %__MODULE__{channel: channel, channel_data: channel_data(app_id, channel)} end - defp channel_data(channel) do - for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :l, {:pusher, channel}}) do + defp channel_data(app_id, channel) do + for {_pid, {user_id, user_info}} <- :gproc.lookup_values({:p, :l, {:pusher, app_id, channel}}) do {user_id, user_info} end |> Enum.uniq(fn {user_id, _} -> user_id end) end @@ -53,8 +53,8 @@ defmodule Poxa.PresenceSubscription do defp sanitize_user_id(user_id) when is_binary(user_id), do: user_id defp sanitize_user_id(user_id), do: JSX.encode!(user_id) - defp user_id_already_on_presence_channel(user_id, channel) do - match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}} + defp user_id_already_on_presence_channel(app_id, user_id, channel) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, {user_id, :_}} :gproc.select_count([{match, [], [true]}]) != 0 end @@ -62,12 +62,12 @@ defmodule Poxa.PresenceSubscription do Unsubscribe from a presence channel, possibly triggering presence_member_removed It respects multiple connections from the same user id """ - @spec unsubscribe!(binary) :: {:ok, binary} - def unsubscribe!(channel) do - case :gproc.get_value({:p, :l, {:pusher, channel}}) do + @spec unsubscribe!(binary, binary) :: {:ok, binary} + def unsubscribe!(app_id, channel) do + case :gproc.get_value({:p, :l, {:pusher, app_id, channel}}) do {user_id, _} -> - if only_one_connection_on_user_id?(channel, user_id) do - presence_member_removed(channel, user_id) + if only_one_connection_on_user_id?(app_id, channel, user_id) do + presence_member_removed(app_id, channel, user_id) end _ -> nil end @@ -80,24 +80,24 @@ defmodule Poxa.PresenceSubscription do If the connection is the only one, fire up the member_removed event """ - @spec check_and_remove :: :ok - def check_and_remove do - match = {{:p, :l, {:pusher, :'$1'}}, self, {:'$2', :_}} + @spec check_and_remove(binary) :: :ok + def check_and_remove(app_id) do + match = {{:p, :l, {:pusher, app_id, :'$1'}}, self, {:'$2', :_}} channel_user_id = :gproc.select([{match, [], [[:'$1',:'$2']]}]) for [channel, user_id] <- channel_user_id, - presence?(channel), only_one_connection_on_user_id?(channel, user_id) do - presence_member_removed(channel, user_id) + presence?(channel), only_one_connection_on_user_id?(app_id, channel, user_id) do + presence_member_removed(app_id, channel, user_id) end :ok end - defp presence_member_removed(channel, user_id) do + defp presence_member_removed(app_id, channel, user_id) do message = PusherEvent.presence_member_removed(channel, user_id) - :gproc.send({:p, :l, {:pusher, channel}}, {self, message}) + :gproc.send({:p, :l, {:pusher, app_id, channel}}, {self, message}) end - defp only_one_connection_on_user_id?(channel, user_id) do - match = {{:p, :l, {:pusher, channel}}, :_, {user_id, :_}} + defp only_one_connection_on_user_id?(app_id, channel, user_id) do + match = {{:p, :l, {:pusher, app_id, channel}}, :_, {user_id, :_}} :gproc.select_count([{match, [], [true]}]) == 1 end end diff --git a/lib/poxa/pusher_event.ex b/lib/poxa/pusher_event.ex index bed95c9..279e4a1 100644 --- a/lib/poxa/pusher_event.ex +++ b/lib/poxa/pusher_event.ex @@ -140,42 +140,42 @@ defmodule Poxa.PusherEvent do data: data} |> encode! end - defstruct [:channels, :name, :data, :socket_id] - @type t :: %Poxa.PusherEvent{channels: list, name: binary, + defstruct [:channels, :name, :app_id, :data, :socket_id] + @type t :: %Poxa.PusherEvent{channels: list, name: binary, app_id: binary, data: binary | map, socket_id: nil | binary} @doc """ Builds the struct based on the decoded JSON from /events endpoint """ - @spec build(map) :: {:ok, Poxa.PusherEvent.t} | {:error, atom} - def build(%{"name" => name, "channels" => channels, "data" => data} = event) do - build_event(channels, data, name, event["socket_id"]) + @spec build(binary, map) :: {:ok, Poxa.PusherEvent.t} | {:error, atom} + def build(app_id, %{"name" => name, "channels" => channels, "data" => data} = event) do + build_event(app_id, channels, data, name, event["socket_id"]) end - def build(%{"name" => name, "channel" => channel, "data" => data} = event) do - build_event([channel], data, name, event["socket_id"]) + def build(app_id, %{"name" => name, "channel" => channel, "data" => data} = event) do + build_event(app_id, [channel], data, name, event["socket_id"]) end - def build(_), do: {:error, :invalid_pusher_event} + def build(_, _), do: {:error, :invalid_pusher_event} @doc """ Build client events """ - @spec build_client_event(map, binary) :: {:ok, Poxa.PusherEvent.t} | {:error, atom} - def build_client_event(%{"event" => name, "channel" => channel, "data" => data}, socket_id) do - build_event([channel], data, name, socket_id) + @spec build_client_event(binary, map, binary) :: {:ok, Poxa.PusherEvent.t} | {:error, atom} + def build_client_event(app_id, %{"event" => name, "channel" => channel, "data" => data}, socket_id) do + build_event(app_id, [channel], data, name, socket_id) end - def build_client_event(%{"name" => name, "channel" => channel, "data" => data}, socket_id) do - build_event([channel], data, name, socket_id) + def build_client_event(app_id, %{"name" => name, "channel" => channel, "data" => data}, socket_id) do + build_event(app_id, [channel], data, name, socket_id) end - defp build_event(channels, data, name, socket_id) do - event = %Poxa.PusherEvent{channels: channels, data: data, name: name, socket_id: socket_id} + defp build_event(app_id, channels, data, name, socket_id) do + event = %Poxa.PusherEvent{app_id: app_id, channels: channels, data: data, name: name, socket_id: socket_id} if valid?(event), do: {:ok, event}, else: {:error, :invalid_event} end - defp valid?(%Poxa.PusherEvent{channels: channels, socket_id: socket_id}) do + defp valid?(%Poxa.PusherEvent{channels: channels, socket_id: socket_id, app_id: app_id}) do Enum.all?(channels, &Poxa.Channel.valid?(&1)) and - (!socket_id || Poxa.SocketId.valid?(socket_id)) + (!socket_id || Poxa.SocketId.valid?(socket_id)) and app_id end defp valid?(_), do: false @@ -192,7 +192,7 @@ defmodule Poxa.PusherEvent do defp publish_event_to_channel(event, channel) do message = build_message(event, channel) |> encode! - :gproc.send({:p, :l, {:pusher, channel}}, {self, message, event.socket_id}) + :gproc.send({:p, :l, {:pusher, event.app_id, channel}}, {self, message, event.socket_id}) end defp build_message(event, channel) do diff --git a/lib/poxa/subscription.ex b/lib/poxa/subscription.ex index e51f8a6..5325a8f 100644 --- a/lib/poxa/subscription.ex +++ b/lib/poxa/subscription.ex @@ -14,40 +14,40 @@ defmodule Poxa.Subscription do Returns {:ok, channel} to public and private channels and a PresenceSubscription to a presence channel """ - @spec subscribe!(:jsx.json_term, binary) :: {:ok, binary} + @spec subscribe!(binary, :jsx.json_term, binary) :: {:ok, binary} | PresenceSubscription.t | {:error, binary} - def subscribe!(data, socket_id) do + def subscribe!(app_id, data, socket_id) do channel = data["channel"] cond do Channel.private?(channel) -> - subscribe_private_channel(socket_id, channel, data["auth"], data["channel_data"]) + subscribe_private_channel(app_id, socket_id, channel, data["auth"], data["channel_data"]) Channel.presence?(channel) -> - subscribe_presence_channel(socket_id, channel, data["auth"], data["channel_data"]) + subscribe_presence_channel(app_id, socket_id, channel, data["auth"], data["channel_data"]) is_binary(channel) -> - subscribe_channel(channel) + subscribe_channel(app_id, channel) true -> Logger.info "Missing channel" {:error, PusherEvent.pusher_error("Missing parameter: data.channel")} end end - defp subscribe_presence_channel(socket_id, channel, auth, channel_data) do + defp subscribe_presence_channel(app_id, socket_id, channel, auth, channel_data) do to_sign = socket_id <> ":" <> channel <> ":" <> (channel_data || "") - if AuthSignature.valid?(to_sign, auth) do - PresenceSubscription.subscribe!(channel, channel_data) + if AuthSignature.valid?(app_id, to_sign, auth) do + PresenceSubscription.subscribe!(app_id, channel, channel_data) else {:error, signature_error(to_sign, auth)} end end - defp subscribe_private_channel(socket_id, channel, auth, channel_data) do + defp subscribe_private_channel(app_id, socket_id, channel, auth, channel_data) do to_sign = case channel_data do nil -> socket_id <> ":" <> channel channel_data -> socket_id <> ":" <> channel <> ":" <> channel_data end - if AuthSignature.valid?(to_sign, auth) do - subscribe_channel(channel) + if AuthSignature.valid?(app_id, to_sign, auth) do + subscribe_channel(app_id, channel) else {:error, signature_error(to_sign, auth)} end @@ -58,13 +58,13 @@ defmodule Poxa.Subscription do PusherEvent.pusher_error(msg) end - defp subscribe_channel(channel) do + defp subscribe_channel(app_id, channel) do Logger.info "Subscribing to channel #{channel}" - if Channel.subscribed?(channel, self) do + if Channel.subscribed?(channel, app_id, self) do Logger.info "Already subscribed #{inspect self} on channel #{channel}" else Logger.info "Registering #{inspect self} to channel #{channel}" - :gproc.reg({:p, :l, {:pusher, channel}}) + :gproc.reg({:p, :l, {:pusher, app_id, channel}}) end {:ok, channel} end @@ -72,14 +72,14 @@ defmodule Poxa.Subscription do @doc """ Unsubscribe from a channel always returning :ok """ - @spec unsubscribe!(:jsx.json_term) :: {:ok, binary} - def unsubscribe!(data) do + @spec unsubscribe!(binary, :jsx.json_term) :: {:ok, binary} + def unsubscribe!(app_id, data) do channel = data["channel"] - if Channel.subscribed?(channel, self) do + if Channel.subscribed?(channel, app_id, self) do if Channel.presence?(channel) do - PresenceSubscription.unsubscribe!(channel); + PresenceSubscription.unsubscribe!(app_id, channel); end - :gproc.unreg({:p, :l, {:pusher, channel}}); + :gproc.unreg({:p, :l, {:pusher, app_id, channel}}); else Logger.debug "Not subscribed to" end diff --git a/lib/poxa/supervisor.ex b/lib/poxa/supervisor.ex index a4c4522..cbc2944 100644 --- a/lib/poxa/supervisor.ex +++ b/lib/poxa/supervisor.ex @@ -10,7 +10,8 @@ defmodule Poxa.Supervisor do The supervisor will spawn a GenEvent named Poxa.Event """ def init([]) do - children = [worker(GenEvent, [[name: Poxa.Event]])] + children = [worker(Poxa.App, []), + worker(GenEvent, [[name: Poxa.Event]])] supervise children, strategy: :one_for_one end end diff --git a/lib/poxa/users_handler.ex b/lib/poxa/users_handler.ex index 12d6cc0..35bc09b 100644 --- a/lib/poxa/users_handler.ex +++ b/lib/poxa/users_handler.ex @@ -49,7 +49,8 @@ defmodule Poxa.UsersHandler do """ def get_json(req, channel) do - response = PresenceChannel.users(channel) |> Enum.map(fn(id) -> [id: id] end) + {app_id, req} = :cowboy_req.binding(:app_id, req) + response = PresenceChannel.users(channel, app_id) |> Enum.map(&([id: &1])) {JSX.encode!(users: response), req, nil} end end diff --git a/lib/poxa/websocket_handler.ex b/lib/poxa/websocket_handler.ex index f81bfde..4077499 100644 --- a/lib/poxa/websocket_handler.ex +++ b/lib/poxa/websocket_handler.ex @@ -8,6 +8,7 @@ defmodule Poxa.WebsocketHandler do More info on Pusher protocol at: http://pusher.com/docs/pusher_protocol """ require Logger + alias Poxa.App alias Poxa.PusherEvent alias Poxa.Event alias Poxa.PresenceSubscription @@ -20,7 +21,7 @@ defmodule Poxa.WebsocketHandler do @min_protocol 5 defmodule State do - defstruct [:socket_id, :time] + defstruct [:socket_id, :time, :app_id] end @doc false @@ -33,18 +34,18 @@ defmodule Poxa.WebsocketHandler do def websocket_init(_transport_name, req, _opts) do {app_key, req} = :cowboy_req.binding(:app_key, req) {protocol, req} = :cowboy_req.qs_val("protocol", req, to_string(@max_protocol)) - case :application.get_env(:poxa, :app_key) do - {:ok, ^app_key} -> + case App.id(app_key) do + {:ok, app_id} -> if supported_protocol?(protocol) do send self, :start - {:ok, req, nil} + {:ok, req, %State{app_id: app_id}} else Logger.error "Protocol #{protocol} not supported" send self, :start_error {:ok, req, {4007, "Unsupported protocol version"}} end - {:ok, expected_app_key} -> - Logger.error "Invalid app_key, expected #{expected_app_key}, found #{app_key}" + {:error, _reason} -> + Logger.error "App not found with key #{app_key}" send self, :start_error {:ok, req, {4001, "Application does not exist"}} end @@ -80,20 +81,20 @@ defmodule Poxa.WebsocketHandler do defp handle_pusher_event("pusher:subscribe", decoded_json, req, %State{socket_id: socket_id} = state) do data = decoded_json["data"] - reply = case Subscription.subscribe!(data, socket_id) do + reply = case Subscription.subscribe!(state.app_id, data, socket_id) do {:ok, channel} -> - Event.notify(:subscribed, %{socket_id: socket_id, channel: channel}) + Event.notify(:subscribed, state.app_id, %{socket_id: socket_id, channel: channel}) PusherEvent.subscription_succeeded(channel) subscription = %PresenceSubscription{channel: channel} -> - Event.notify(:subscribed, %{socket_id: socket_id, channel: channel}) + Event.notify(:subscribed, state.app_id, %{socket_id: socket_id, channel: channel}) PusherEvent.subscription_succeeded(subscription) {:error, error} -> error end {:reply, {:text, reply}, req, state} end defp handle_pusher_event("pusher:unsubscribe", decoded_json, req, %State{socket_id: socket_id} = state) do - {:ok, channel} = Subscription.unsubscribe! decoded_json["data"] - Event.notify(:unsubscribed, %{socket_id: socket_id, channel: channel}) + {:ok, channel} = Subscription.unsubscribe!(state.app_id, decoded_json["data"]) + Event.notify(:unsubscribed, state.app_id, %{socket_id: socket_id, channel: channel}) {:ok, req, state} end defp handle_pusher_event("pusher:ping", _decoded_json, req, state) do @@ -102,11 +103,11 @@ defmodule Poxa.WebsocketHandler do end # Client Events defp handle_pusher_event("client-" <> _event_name, decoded_json, req, %State{socket_id: socket_id} = state) do - {:ok, event} = PusherEvent.build_client_event(decoded_json, socket_id) + {:ok, event} = PusherEvent.build_client_event(state.app_id, decoded_json, socket_id) channel = List.first(event.channels) - if Channel.private_or_presence?(channel) and Channel.subscribed?(channel, self) do + if Channel.private_or_presence?(channel) and Channel.subscribed?(channel, state.app_id, self) do PusherEvent.publish(event) - Event.notify(:client_event_message, %{socket_id: socket_id, channels: event.channels, name: event.name}) + Event.notify(:client_event_message, state.app_id, %{socket_id: socket_id, channels: event.channels, name: event.name}) end {:ok, req, state} end @@ -122,15 +123,15 @@ defmodule Poxa.WebsocketHandler do * start_error - Improper start returning useful error code after unsuccessful websocket_init * msg broadcast - Any message generate by events that gets to the connection """ - def websocket_info(:start, req, _state) do + def websocket_info(:start, req, state) do # Unique identifier for the connection socket_id = SocketId.generate! {origin, req} = :cowboy_req.host_url(req) - Event.notify(:connected, %{socket_id: socket_id, origin: origin}) + Event.notify(:connected, state.app_id, %{socket_id: socket_id, origin: origin}) reply = PusherEvent.connection_established(socket_id) - {:reply, {:text, reply}, req, %State{socket_id: socket_id, time: Time.stamp}} + {:reply, {:text, reply}, req, %{state | socket_id: socket_id, time: Time.stamp}} end def websocket_info(:start_error, req, {code, message}) do @@ -156,11 +157,11 @@ defmodule Poxa.WebsocketHandler do member removal if necessary and explicitly unregister tags on gproc """ def websocket_terminate(_reason, _req, nil), do: :ok - def websocket_terminate(_reason, _req, %State{socket_id: socket_id, time: time}) do - duration = Time.stamp - time - channels = Channel.all(self) - Event.notify(:disconnected, %{socket_id: socket_id, channels: channels, duration: duration}) - PresenceSubscription.check_and_remove + def websocket_terminate(_reason, _req, state) do + duration = Time.stamp - state.time + channels = Channel.all(state.app_id, self) + Event.notify(:disconnected, state.app_id, %{socket_id: state.socket_id, channels: channels, duration: duration}) + PresenceSubscription.check_and_remove(state.app_id) :gproc.goodbye :ok end diff --git a/priv/index.html b/priv/index.html index ed5a166..86c7dfa 100644 --- a/priv/index.html +++ b/priv/index.html @@ -15,6 +15,10 @@