Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Feature multi apps #43

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/poxa.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Poxa do
def start(_type, _args) do
dispatch = :cowboy_router.compile([
{:_, [ { '/ping', Poxa.PingHandler, [] },
{ '/console', Poxa.Console.WSHandler, [] },
{ '/apps/:app_id/console', Poxa.Console.WSHandler, [] },
{ '/', :cowboy_static, {:priv_file, :poxa, 'index.html'} },
{ '/static/[...]', :cowboy_static, {:priv_dir, :poxa, 'static'} },
{ '/apps/:app_id/events', Poxa.EventHandler, [] },
Expand Down
89 changes: 89 additions & 0 deletions lib/poxa/app.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule Poxa.App do
use GenServer

@doc """
Given the app_key it returns the related app_id
"""
def id(key) do
case :ets.match(Poxa.App, {:'$1', key, :'_'}) do
[[id]] -> {:ok, id}
_ -> {:error, {:key_not_found, key}}
end
end

@doc """
Given the app_id it returns the related app_key
"""
@spec key(binary) :: {:ok, binary} | {:error, term}
def key(id) do
case :ets.lookup(Poxa.App, id) do
[{^id, key, _}] -> {:ok, key}
_ -> {:error, {:id_not_found, id}}
end
end

@doc """
Given the app_key it returns the related app_secret
"""
def secret(key) do
case :ets.match(Poxa.App, {:'_', key, :'$1'}) do
[[secret]] -> {:ok, secret}
_ -> {:error, {:key_not_found, key}}
end
end

@doc """
Given the app_id it returns the related app_key and secret
"""
def key_secret(id) do
case :ets.lookup(Poxa.App, id) do
[{^id, key, secret}] -> {:ok, {key, secret}}
_ -> {:error, {:id_not_found, id}}
end
end

@doc """
Reload table content using Application environment
"""
@spec reload :: :ok
def reload do
GenServer.call(Poxa.App, :reload)
end

@doc false
def start_link(args \\ []) do
GenServer.start_link(__MODULE__, args, [name: Poxa.App])
end

@doc """
Create ETS table to fetch app_id, app_key and secret
"""
def init(_) do
ets_options = [:ordered_set, :named_table, :protected, { :read_concurrency, true }]
:ets.new(Poxa.App, ets_options)
populate_table
{ :ok, :ok }
end

def handle_call(:reload, _from, _) do
populate_table
{:reply, :ok, :ok}
end

defp populate_table do
apps = Application.get_env(:poxa, :apps, [])
app = fetch_standalone_app
for {app_id, app_key, secret} <- [app | apps] do
:ets.insert(Poxa.App, {app_id, app_key, secret})
end
end

defp fetch_standalone_app do
app_id = Application.get_env(:poxa, :app_id)
app_key = Application.get_env(:poxa, :app_key)
app_secret = Application.get_env(:poxa, :app_secret)
if app_id && app_key && app_secret do
{app_id, app_key, app_secret}
end
end
end
26 changes: 11 additions & 15 deletions lib/poxa/auth_signature.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,21 @@ defmodule Poxa.AuthSignature do
http://pusher.com/docs/auth_signatures
"""

require Logger
alias Poxa.CryptoHelper

@doc """
Validate auth signature as described at: http://pusher.com/docs/auth_signatures .
Returns true or false
"""
@spec valid?(binary, binary) :: boolean
def valid?(to_sign, auth) do
case String.split(auth, ":", parts: 2) do
[app_key, remote_signed_data] ->
signed_data = sign_data(to_sign)
Poxa.Authentication.check_key(app_key) and signed_data == remote_signed_data
_ -> false
@spec valid?(binary, binary, binary) :: boolean
def valid?(app_id, to_sign, auth) do
case Poxa.App.key_secret(app_id) do
{:ok, {app_key, app_secret}} ->
case String.split(auth, ":", parts: 2) do
[^app_key, remote_signed_data] ->
signed_data = Poxa.CryptoHelper.hmac256_to_string(app_secret, to_sign)
signed_data == remote_signed_data
_ -> false
end
{:error, _} -> false
end
end

defp sign_data(data) do
{:ok, app_secret} = :application.get_env(:poxa, :app_secret)
CryptoHelper.hmac256_to_string(app_secret, data)
end
end
22 changes: 7 additions & 15 deletions lib/poxa/authentication.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,15 @@ defmodule Poxa.Authentication do
Returns true if authentication process is validated correctly, false otherwise
More info at: http://pusher.com/docs/rest_api#authentication
"""
@spec check(binary, binary, binary, [{binary, binary}]) :: boolean
def check(method, path, body, qs_vals) do
@spec check(binary, binary, binary, binary, [{binary, binary}]) :: boolean
def check(app_id, method, path, body, qs_vals) do
qs_vals = Enum.into(qs_vals, %{})
body_md5 = qs_vals["body_md5"]
{:ok, app_key} = Application.fetch_env(:poxa, :app_key)
{:ok, secret} = Application.fetch_env(:poxa, :app_secret)

valid_body?(body, body_md5) and Signaturex.validate(app_key, secret, method, path, qs_vals, 600)
end

@doc """
Returns true if the `app_key` is the same as `auth_key`, false otherwise
"""
@spec check_key(binary) :: boolean
def check_key(auth_key) do
{:ok, app_key} = Application.fetch_env(:poxa, :app_key)
auth_key == app_key
case Poxa.App.key_secret(app_id) do
{:ok, {app_key, secret}} ->
valid_body?(body, body_md5) and Signaturex.validate(app_key, secret, method, path, qs_vals, 600)
{:error, _} -> false
end
end

defp valid_body?("", nil), do: true
Expand Down
10 changes: 8 additions & 2 deletions lib/poxa/authorization_helper.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
defmodule Poxa.AuthorizationHelper do
alias Poxa.Authentication
@moduledoc """
Helper to keep the cowboy rest is_authorized dry for Pusher authentication
"""

@doc """
A helper to share authentication process between handlers. It uses the cowboy `req` to retrieve the necessary data.
"""
@spec is_authorized(:cowboy_req.req, any) :: {true, :cowboy_req.req, any} | {{false, binary}, :cowboy_req.req, nil}
def is_authorized(req, state) do
{:ok, body, req} = :cowboy_req.body(req)
{method, req} = :cowboy_req.method(req)
{qs_vals, req} = :cowboy_req.qs_vals(req)
{path, req} = :cowboy_req.path(req)
if Authentication.check(method, path, body, qs_vals) do
{app_id, req} = :cowboy_req.binding(:app_id, req)
if Poxa.Authentication.check(app_id, method, path, body, qs_vals) do
{true, req, state}
else
{{false, "authentication failed"}, req, nil}
Expand Down
24 changes: 12 additions & 12 deletions lib/poxa/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,36 @@ defmodule Poxa.Channel do
@doc """
Returns true if the channel has at least 1 subscription
"""
@spec occupied?(binary) :: boolean
def occupied?(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :_}
@spec occupied?(binary, binary) :: boolean
def occupied?(channel, app_id) do
match = {{:p, :l, {:pusher, app_id, channel}}, :_, :_}
:gproc.select_count([{match, [], [true]}]) != 0
end

@doc """
Returns the list of channels the `pid` is subscribed
"""
@spec all(pid | :_) :: [binary]
def all(pid \\ :_) do
match = {{:p, :l, {:pusher, :'$1'}}, pid, :_}
@spec all(binary, pid | :_) :: [binary]
def all(app_id, pid \\ :_) do
match = {{:p, :l, {:pusher, app_id, :'$1'}}, pid, :_}
:gproc.select([{match, [], [:'$1']}]) |> Enum.uniq
end

@doc """
Returns true if `pid` is subscribed to `channel` and false otherwise
"""
@spec subscribed?(binary, pid) :: boolean
def subscribed?(channel, pid) do
match = {{:p, :l, {:pusher, channel}}, pid, :_}
@spec subscribed?(binary, binary, pid) :: boolean
def subscribed?(channel, app_id, pid) do
match = {{:p, :l, {:pusher, app_id, channel}}, pid, :_}
:gproc.select_count([{match, [], [true]}]) != 0
end

@doc """
Returns how many connections are opened on the `channel`
"""
@spec subscription_count(binary) :: non_neg_integer
def subscription_count(channel) do
match = {{:p, :l, {:pusher, channel}}, :_, :_}
@spec subscription_count(binary, binary) :: non_neg_integer
def subscription_count(channel, app_id) do
match = {{:p, :l, {:pusher, app_id, channel}}, :_, :_}
:gproc.select_count([{match, [], [true]}])
end
end
35 changes: 18 additions & 17 deletions lib/poxa/channels_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ defmodule Poxa.ChannelsHandler do
attributes = String.split(info, ",")
|> Enum.reject(&(&1 == ""))
{channel, req} = :cowboy_req.binding(:channel_name, req, nil)
{app_id, req} = :cowboy_req.binding(:app_id, req)
if channel do
{malformed_request_one_channel?(attributes, channel), req, {:one, channel, attributes}}
{malformed_request_one_channel?(attributes, channel), req, {:one, app_id, channel, attributes}}
else
{filter, req} = :cowboy_req.qs_val("filter_by_prefix", req, nil)
{malformed_request_all_channels?(attributes, filter), req, {:all, filter, attributes}}
{malformed_request_all_channels?(attributes, filter), req, {:all, app_id, filter, attributes}}
end
end

Expand Down Expand Up @@ -71,42 +72,42 @@ defmodule Poxa.ChannelsHandler do
end

@doc false
def get_json(req, {:one, channel, attributes}) do
show(channel, attributes, req, nil)
def get_json(req, {:one, app_id, channel, attributes}) do
show(app_id, channel, attributes, req, nil)
end
def get_json(req, {:all, filter, attributes}) do
index(filter, attributes, req, nil)
def get_json(req, {:all, app_id, filter, attributes}) do
index(app_id, filter, attributes, req, nil)
end

defp show(channel, attributes, req, state) do
occupied = Channel.occupied?(channel)
attribute_list = mount_attribute_list(attributes, channel)
defp show(app_id, channel, attributes, req, state) do
occupied = Channel.occupied?(channel, app_id)
attribute_list = mount_attribute_list(attributes, app_id, channel)
{JSX.encode!([occupied: occupied] ++ attribute_list), req, state}
end

defp mount_attribute_list(attributes, channel) do
defp mount_attribute_list(attributes, app_id, channel) do
attribute_list =
if Enum.member?(attributes, "subscription_count") do
[subscription_count: Channel.subscription_count(channel)]
[subscription_count: Channel.subscription_count(app_id, channel)]
else
[]
end
attribute_list ++
if Enum.member?(attributes, "user_count") do
[user_count: PresenceChannel.user_count(channel)]
[user_count: PresenceChannel.user_count(app_id, channel)]
else
[]
end
end

defp index(filter, attributes, req, state) do
channels = channels(filter, attributes)
defp index(app_id, filter, attributes, req, state) do
channels = channels(app_id, filter, attributes)
{JSX.encode!(channels: channels), req, state}
end

defp channels(filter, attributes) do
for channel <- Channel.all, filter_channel(channel, filter) do
{channel, mount_attribute_list(attributes, channel)}
defp channels(app_id, filter, attributes) do
for channel <- Channel.all(app_id), filter_channel(channel, filter) do
{channel, mount_attribute_list(attributes, app_id, channel)}
end
end

Expand Down
28 changes: 15 additions & 13 deletions lib/poxa/console/console.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,41 @@ defmodule Poxa.Console do
use GenEvent

@doc false
def init(pid), do: {:ok, pid}
def init([pid, app_id]), do: {:ok, {pid, app_id}}

@doc false
def handle_event(%{event: :connected, socket_id: socket_id, origin: origin}, pid) do
def handle_event(%{event: :connected, app_id: app_id, socket_id: socket_id, origin: origin}, {pid, app_id}) do
send_message("Connection", socket_id, "Origin: #{origin}", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(%{event: :disconnected, socket_id: socket_id, channels: channels, duration: duration}, pid) do
def handle_event(%{event: :disconnected, app_id: app_id, socket_id: socket_id, channels: channels, duration: duration}, {pid, app_id}) do
send_message("Disconnection", socket_id, "Channels: #{inspect channels}, Lifetime: #{duration}s", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(%{event: :subscribed, socket_id: socket_id, channel: channel}, pid) do
def handle_event(%{event: :subscribed, app_id: app_id, socket_id: socket_id, channel: channel}, {pid, app_id}) do
send_message("Subscribed", socket_id, "Channel: #{channel}", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(%{event: :unsubscribed, socket_id: socket_id, channel: channel}, pid) do
def handle_event(%{event: :unsubscribed, app_id: app_id, socket_id: socket_id, channel: channel}, {pid, app_id}) do
send_message("Unsubscribed", socket_id, "Channel: #{channel}", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(%{event: :api_message, channels: channels, name: name}, pid) do
def handle_event(%{event: :api_message, app_id: app_id, channels: channels, name: name}, {pid, app_id}) do
send_message("API Message", "", "Channels: #{inspect channels}, Event: #{name}", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(%{event: :client_event_message, socket_id: socket_id, channels: channels, name: name}, pid) do
def handle_event(%{event: :client_event_message, app_id: app_id, socket_id: socket_id, channels: channels, name: name}, {pid, app_id}) do
send_message("Client Event Message", socket_id, "Channels: #{inspect channels}, Event: #{name}", pid)
{:ok, pid}
{:ok, {pid, app_id}}
end

def handle_event(_data, state), do: {:ok, state}

defp send_message(type, socket_id, details, pid) do
msg = message(type, socket_id, details) |> encode!
send(pid, msg)
Expand Down
7 changes: 4 additions & 3 deletions lib/poxa/console/ws_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ defmodule Poxa.Console.WSHandler do
{method, req} = :cowboy_req.method(req)
{path, req} = :cowboy_req.path(req)
{qs_vals, req} = :cowboy_req.qs_vals(req)
{app_id, req} = :cowboy_req.binding(:app_id, req)

if Authentication.check(method, path, "", qs_vals) do
:ok = Poxa.Event.add_handler({Poxa.Console, self}, self)
if Authentication.check(app_id, method, path, "", qs_vals) do
:ok = Poxa.Event.add_handler({Poxa.Console, self}, [self, app_id])
{:ok, req, nil}
else
Logger.error "Failed to authenticate on Console Websocket"
Logger.error "Failed to authenticate on Console Websocket for app with id: #{app_id}"
{:shutdown, req}
end
end
Expand Down
5 changes: 4 additions & 1 deletion lib/poxa/event.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
defmodule Poxa.Event do
def notify(event, data), do: :gen_event.sync_notify(Poxa.Event, Map.put(data, :event, event))
def notify(event, app_id, data) do
poxa_event = data |> Map.put(:event, event) |> Map.put(:app_id, app_id)
:gen_event.sync_notify(Poxa.Event, poxa_event)
end

def add_handler(handler, pid) do
GenEvent.add_mon_handler(Poxa.Event, handler, pid)
Expand Down
Loading