Skip to content

Commit

Permalink
src sync
Browse files Browse the repository at this point in the history
  • Loading branch information
5HT committed Nov 22, 2018
1 parent bbf3cf2 commit e9d9f12
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 272 deletions.
26 changes: 0 additions & 26 deletions src/mqtt/n2o_auth.erl
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-module(n2o_auth).
-description('N2O MQTT Auth').
-include("emqttd.hrl").
%-behaviour(emqttd_auth_mod).
-compile(export_all).
-export([init/1, check/3, description/0]).

init([Listeners]) -> {ok, Listeners}.
Expand All @@ -24,27 +22,3 @@ check(#mqtt_client{client_id = ClientId,
end;
check(_Client, _Password, _Opts) -> ignore.

ttl() -> (application:get_env(n2o, auth_ttl, 60*15)).
gen_token([], _Data) ->
Now = now_msec(),
Expiration = Now+ttl()*1000,
{'Token', n2o_secret:pickle(term_to_binary(Expiration))};
gen_token(ClientSessionToken, Data) ->
Now = now_msec(),
case bin_to_term(n2o_secret:depickle(ClientSessionToken)) of
<<>> -> {error, invalid_token};
Expiration when Expiration > Now -> {'Token', ClientSessionToken};
_Expiration -> gen_token([], Data)
end.

bin_to_term(<<>>) -> <<>>;
bin_to_term(Bin) -> binary_to_term(Bin).

gen_sid(Time) ->
nitro_conv:hex(binary:part(crypto:hmac(application:get_env(n2o,hmac,sha256),
n2o_secret:secret(),term_to_binary(Time)),0,16)).

now_msec() -> now_msec(os:timestamp()).
now_msec({Mega,Sec,Micro}) -> (Mega*1000000 + Sec)*1000 + round(Micro/1000).
msec_now(A) -> A0 = A/1000, S = trunc(A0), Mega = S div 1000000,
Sec = S - Mega*1000000, Micro = round((A0 - S)*1000000), {Mega,Sec,Micro}.
4 changes: 2 additions & 2 deletions src/mqtt/n2o_ring.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
-module(n2o_ring).
-description('N2O Ring').
-include("n2o.hrl").
-compile(export_all).
-export([ring/0,init/1,send/1,lookup/1,add/1,delete/1]).
-record(state, { ring, nodes }).

send(Msg) ->
Expand Down Expand Up @@ -84,7 +84,7 @@ init(Peers) ->
]
),
Ring = array:from_list(assemble_ring([], lists:reverse(RawRing), [], length(Peers))),
n2o:info(?MODULE,"Created a ring with ~b points in it.\r~n", [array:sparse_size(Ring)]),
n2o:info(?MODULE,"RING: ~p~n", [array:sparse_size(Ring)]),
application:set_env(n2o,ring,Ring),
application:set_env(n2o,nodes,Peers),
{ok, #state{ring=Ring,nodes=Peers}}.
Expand Down
121 changes: 118 additions & 3 deletions src/mqtt/n2o_vnode.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
-module(n2o_vnode).
-description('N2O MQTT Backend').
-include("n2o.hrl").
-compile(export_all).
-include("emqttd.hrl").
-export([proc/2]).
-export([get_vnode/1,get_vnode/2,validate/1,send_reply/3,send_reply/4,send/3,send/4]).
-export([subscribe/3,subscribe/2,unsubscribe/3,unsubscribe/2,subscribe_cli/2,unsubscribe_cli/2]).
-export([load/1,unload/0]).
-export([on_client_connected/3, on_client_disconnected/3, on_client_subscribe/4,
on_client_unsubscribe/4, on_session_created/3, on_session_subscribed/4,
on_session_unsubscribed/4, on_session_terminated/4, on_message_publish/2,
on_message_delivered/4, on_message_acked/4 ]).

%% N2O-MQTT Topic Format

%% Client: 1. actions/:vsn/:module/:client
%% Server: 2. events/:vsn/:node/:module/:user/:client/:token

%% server and client sends

send_reply(ClientId, Topic, Message) -> send_reply(ClientId, 0, Topic, Message).
send_reply(ClientId, QoS, Topic, Message) ->
emqttd:publish(emqttd_message:make(ClientId, QoS, Topic, Message)).

send(C,T,M) -> send(C, T, M, [{qos,2}]).
send(C,T,M,Opts) -> emqttc:publish(C, T, M, Opts).

% N2O VNODE SERVER for MQTT

Expand All @@ -17,8 +39,6 @@ debug(Name,Topic,BERT,Address,Return) ->
ok;
_ -> skip end.

send(C,T,M) -> send(C, T, M, [{qos,2}]).
send(C,T,M,Opts) -> emqttc:publish(C, T, M, Opts).

sid() -> application:get_env(n2o,token_as_sid,false).
fix('') -> index;
Expand Down Expand Up @@ -78,3 +98,98 @@ proc({mqttc, C, connected}, State=#handler{name=Name,state=C,seq=S}) ->

proc(Unknown,#handler{seq=S}=Async) ->
{reply,{uknown,Unknown,S},Async#handler{seq=S+1}}.

% MQTT HELPERS

subscribe(X,Y) -> subscribe(X,Y,[{qos,2}]).
subscribe(X,Y,[{qos,Z}]) -> subscribe_cli(X,[{Y,Z}]).

unsubscribe(X,Y) -> unsubscribe(X,Y,[{qos,2}]).
unsubscribe(X,Y,[{qos,Z}]) -> unsubscribe_cli(X,[{Y,Z}]).

subscribe_cli(ClientId, TopicTable) ->
[ begin
kvs:put({mqtt_subscription, ClientId, Topic}),
kvs:put({mqtt_subproperty, {Topic, ClientId}, [{qos,Qos}]}),
emqttd_pubsub:add_subscriber(Topic,ClientId,[{qos,Qos}])
end || {Topic,Qos} <- TopicTable ].

unsubscribe_cli(ClientId, TopicTable)->
DelFun = fun() -> [ begin
mnesia:delete_object({mqtt_subscription, ClientId, Topic}),
kvs:delete(mqtt_subproperty, {Topic, ClientId}),
mnesia:delete_object({mqtt_subscriber, Topic, ClientId})
end || {Topic,_Qos} <- TopicTable ] end,
case mnesia:is_transaction() of
true -> DelFun();
false -> mnesia:transaction(DelFun)
end,
[(not ets:member(mqtt_subscriber, Topic)) andalso
emqttd_router:del_route(Topic) || {Topic,_Qos} <- TopicTable ].

% MQTT HOOKS

load(Env) ->
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed',fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).

unload() ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).

on_client_connected(_ConnAck, Client=#mqtt_client{client_id= <<"emqttc",_/bytes>>}, _) -> {ok, Client};
on_client_connected(_ConnAck, Client = #mqtt_client{}, _Env) -> {ok, Client}.
on_client_disconnected(_Reason, _Client = #mqtt_client{}, _Env) -> ok.
on_client_subscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}.
on_client_unsubscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}.
on_session_created(_ClientId, _Username, _Env) -> ok.
on_session_subscribed(<<"emqttd",_/binary>>,_,{<<"actions/",_, "/",_/binary>> =Topic,Opts},_) -> {ok,{Topic,Opts}};
on_session_subscribed(_ClientId, _Username, {Topic, Opts}, _Env) -> {ok, {Topic,Opts}}.
on_session_unsubscribed(_ClientId, _Username, {_Topic, _Opts}, _Env) -> ok.
on_session_terminated(_ClientId, _Username, _Reason, _Env) -> ok.
on_message_delivered(_ClientId, _Username, Message, _Env) -> {ok,Message}.
on_message_acked(_ClientId, _Username, Message, _Env) -> {ok,Message}.
on_message_publish(Message = #mqtt_message{topic = <<"actions/", _/binary>>, from=_From}, _Env) -> {ok, Message};
on_message_publish(#mqtt_message{topic = <<"events/", _TopicTail/binary>> = Topic, qos=Qos,
from={ClientId,_},payload = Payload}=Message, _Env) ->
{Module, ValidateFun} = application:get_env(?MODULE, validate, {?MODULE, validate}),
Res = case emqttd_topic:words(Topic) of
[E,V,'',M,U,_C,T] -> {Mod,F} = application:get_env(?MODULE, vnode, {?MODULE, get_vnode}),
NewTopic = emqttd_topic:join([E,V,Mod:F(ClientId,Payload),M,U,ClientId,T]),
emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip;
%% @NOTE redirect to vnode
[_E,_V,_N,_M,_U,ClientId,_T] ->
case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end;
[E,V,N,M,U,_C,T] -> NewTopic = emqttd_topic:join([E,V,N,M,U,ClientId,T]),
emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip;
%% @NOTE redirects to event topic with correct ClientId
_ -> case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end
end,
case Res of
{ok, _} -> case Module:ValidateFun(Payload) of ok -> Res; _ -> skip end;
_ -> Res
end;
on_message_publish(Message, _) -> {ok,Message}.

get_vnode(ClientId) -> get_vnode(ClientId, []).
get_vnode(ClientId, _) ->
[H|_] = binary_to_list(erlang:md5(ClientId)),
integer_to_binary(H rem (length(n2o:ring())) + 1).

validate(_Payload) -> ok.
Loading

0 comments on commit e9d9f12

Please sign in to comment.