diff --git a/src/mqtt/n2o_auth.erl b/src/mqtt/n2o_auth.erl index 9756a260..c8d2a7fa 100644 --- a/src/mqtt/n2o_auth.erl +++ b/src/mqtt/n2o_auth.erl @@ -1,8 +1,6 @@ -module(n2o_auth). -description('N2O MQTT Auth'). -include("emqttd.hrl"). -%-behaviour(emqttd_auth_mod). --compile(export_all). -export([init/1, check/3, description/0]). init([Listeners]) -> {ok, Listeners}. @@ -24,27 +22,3 @@ check(#mqtt_client{client_id = ClientId, end; check(_Client, _Password, _Opts) -> ignore. -ttl() -> (application:get_env(n2o, auth_ttl, 60*15)). -gen_token([], _Data) -> - Now = now_msec(), - Expiration = Now+ttl()*1000, - {'Token', n2o_secret:pickle(term_to_binary(Expiration))}; -gen_token(ClientSessionToken, Data) -> - Now = now_msec(), - case bin_to_term(n2o_secret:depickle(ClientSessionToken)) of - <<>> -> {error, invalid_token}; - Expiration when Expiration > Now -> {'Token', ClientSessionToken}; - _Expiration -> gen_token([], Data) - end. - -bin_to_term(<<>>) -> <<>>; -bin_to_term(Bin) -> binary_to_term(Bin). - -gen_sid(Time) -> - nitro_conv:hex(binary:part(crypto:hmac(application:get_env(n2o,hmac,sha256), - n2o_secret:secret(),term_to_binary(Time)),0,16)). - -now_msec() -> now_msec(os:timestamp()). -now_msec({Mega,Sec,Micro}) -> (Mega*1000000 + Sec)*1000 + round(Micro/1000). -msec_now(A) -> A0 = A/1000, S = trunc(A0), Mega = S div 1000000, - Sec = S - Mega*1000000, Micro = round((A0 - S)*1000000), {Mega,Sec,Micro}. diff --git a/src/mqtt/n2o_ring.erl b/src/mqtt/n2o_ring.erl index d0162977..8155d7df 100644 --- a/src/mqtt/n2o_ring.erl +++ b/src/mqtt/n2o_ring.erl @@ -1,7 +1,7 @@ -module(n2o_ring). -description('N2O Ring'). -include("n2o.hrl"). --compile(export_all). +-export([ring/0,init/1,send/1,lookup/1,add/1,delete/1]). -record(state, { ring, nodes }). send(Msg) -> @@ -84,7 +84,7 @@ init(Peers) -> ] ), Ring = array:from_list(assemble_ring([], lists:reverse(RawRing), [], length(Peers))), - n2o:info(?MODULE,"Created a ring with ~b points in it.\r~n", [array:sparse_size(Ring)]), + n2o:info(?MODULE,"RING: ~p~n", [array:sparse_size(Ring)]), application:set_env(n2o,ring,Ring), application:set_env(n2o,nodes,Peers), {ok, #state{ring=Ring,nodes=Peers}}. diff --git a/src/mqtt/n2o_vnode.erl b/src/mqtt/n2o_vnode.erl index 61648b10..bb429134 100644 --- a/src/mqtt/n2o_vnode.erl +++ b/src/mqtt/n2o_vnode.erl @@ -1,7 +1,29 @@ -module(n2o_vnode). -description('N2O MQTT Backend'). -include("n2o.hrl"). --compile(export_all). +-include("emqttd.hrl"). +-export([proc/2]). +-export([get_vnode/1,get_vnode/2,validate/1,send_reply/3,send_reply/4,send/3,send/4]). +-export([subscribe/3,subscribe/2,unsubscribe/3,unsubscribe/2,subscribe_cli/2,unsubscribe_cli/2]). +-export([load/1,unload/0]). +-export([on_client_connected/3, on_client_disconnected/3, on_client_subscribe/4, + on_client_unsubscribe/4, on_session_created/3, on_session_subscribed/4, + on_session_unsubscribed/4, on_session_terminated/4, on_message_publish/2, + on_message_delivered/4, on_message_acked/4 ]). + +%% N2O-MQTT Topic Format + +%% Client: 1. actions/:vsn/:module/:client +%% Server: 2. events/:vsn/:node/:module/:user/:client/:token + +%% server and client sends + +send_reply(ClientId, Topic, Message) -> send_reply(ClientId, 0, Topic, Message). +send_reply(ClientId, QoS, Topic, Message) -> + emqttd:publish(emqttd_message:make(ClientId, QoS, Topic, Message)). + +send(C,T,M) -> send(C, T, M, [{qos,2}]). +send(C,T,M,Opts) -> emqttc:publish(C, T, M, Opts). % N2O VNODE SERVER for MQTT @@ -17,8 +39,6 @@ debug(Name,Topic,BERT,Address,Return) -> ok; _ -> skip end. -send(C,T,M) -> send(C, T, M, [{qos,2}]). -send(C,T,M,Opts) -> emqttc:publish(C, T, M, Opts). sid() -> application:get_env(n2o,token_as_sid,false). fix('') -> index; @@ -78,3 +98,98 @@ proc({mqttc, C, connected}, State=#handler{name=Name,state=C,seq=S}) -> proc(Unknown,#handler{seq=S}=Async) -> {reply,{uknown,Unknown,S},Async#handler{seq=S+1}}. + +% MQTT HELPERS + +subscribe(X,Y) -> subscribe(X,Y,[{qos,2}]). +subscribe(X,Y,[{qos,Z}]) -> subscribe_cli(X,[{Y,Z}]). + +unsubscribe(X,Y) -> unsubscribe(X,Y,[{qos,2}]). +unsubscribe(X,Y,[{qos,Z}]) -> unsubscribe_cli(X,[{Y,Z}]). + +subscribe_cli(ClientId, TopicTable) -> + [ begin + kvs:put({mqtt_subscription, ClientId, Topic}), + kvs:put({mqtt_subproperty, {Topic, ClientId}, [{qos,Qos}]}), + emqttd_pubsub:add_subscriber(Topic,ClientId,[{qos,Qos}]) + end || {Topic,Qos} <- TopicTable ]. + +unsubscribe_cli(ClientId, TopicTable)-> + DelFun = fun() -> [ begin + mnesia:delete_object({mqtt_subscription, ClientId, Topic}), + kvs:delete(mqtt_subproperty, {Topic, ClientId}), + mnesia:delete_object({mqtt_subscriber, Topic, ClientId}) + end || {Topic,_Qos} <- TopicTable ] end, + case mnesia:is_transaction() of + true -> DelFun(); + false -> mnesia:transaction(DelFun) + end, + [(not ets:member(mqtt_subscriber, Topic)) andalso + emqttd_router:del_route(Topic) || {Topic,_Qos} <- TopicTable ]. + +% MQTT HOOKS + +load(Env) -> + emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), + emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]), + emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]), + emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]), + emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]), + emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), + emqttd:hook('session.unsubscribed',fun ?MODULE:on_session_unsubscribed/4, [Env]), + emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]), + emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]), + emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]), + emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]). + +unload() -> + emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), + emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3), + emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4), + emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4), + emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4), + emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4), + emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2), + emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4), + emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4). + +on_client_connected(_ConnAck, Client=#mqtt_client{client_id= <<"emqttc",_/bytes>>}, _) -> {ok, Client}; +on_client_connected(_ConnAck, Client = #mqtt_client{}, _Env) -> {ok, Client}. +on_client_disconnected(_Reason, _Client = #mqtt_client{}, _Env) -> ok. +on_client_subscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}. +on_client_unsubscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}. +on_session_created(_ClientId, _Username, _Env) -> ok. +on_session_subscribed(<<"emqttd",_/binary>>,_,{<<"actions/",_, "/",_/binary>> =Topic,Opts},_) -> {ok,{Topic,Opts}}; +on_session_subscribed(_ClientId, _Username, {Topic, Opts}, _Env) -> {ok, {Topic,Opts}}. +on_session_unsubscribed(_ClientId, _Username, {_Topic, _Opts}, _Env) -> ok. +on_session_terminated(_ClientId, _Username, _Reason, _Env) -> ok. +on_message_delivered(_ClientId, _Username, Message, _Env) -> {ok,Message}. +on_message_acked(_ClientId, _Username, Message, _Env) -> {ok,Message}. +on_message_publish(Message = #mqtt_message{topic = <<"actions/", _/binary>>, from=_From}, _Env) -> {ok, Message}; +on_message_publish(#mqtt_message{topic = <<"events/", _TopicTail/binary>> = Topic, qos=Qos, + from={ClientId,_},payload = Payload}=Message, _Env) -> + {Module, ValidateFun} = application:get_env(?MODULE, validate, {?MODULE, validate}), + Res = case emqttd_topic:words(Topic) of + [E,V,'',M,U,_C,T] -> {Mod,F} = application:get_env(?MODULE, vnode, {?MODULE, get_vnode}), + NewTopic = emqttd_topic:join([E,V,Mod:F(ClientId,Payload),M,U,ClientId,T]), + emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip; + %% @NOTE redirect to vnode + [_E,_V,_N,_M,_U,ClientId,_T] -> + case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end; + [E,V,N,M,U,_C,T] -> NewTopic = emqttd_topic:join([E,V,N,M,U,ClientId,T]), + emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip; + %% @NOTE redirects to event topic with correct ClientId + _ -> case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end + end, + case Res of + {ok, _} -> case Module:ValidateFun(Payload) of ok -> Res; _ -> skip end; + _ -> Res + end; +on_message_publish(Message, _) -> {ok,Message}. + +get_vnode(ClientId) -> get_vnode(ClientId, []). +get_vnode(ClientId, _) -> + [H|_] = binary_to_list(erlang:md5(ClientId)), + integer_to_binary(H rem (length(n2o:ring())) + 1). + +validate(_Payload) -> ok. diff --git a/src/n2o.erl b/src/n2o.erl index 225c0de3..4d9fe349 100644 --- a/src/n2o.erl +++ b/src/n2o.erl @@ -3,41 +3,38 @@ -behaviour(supervisor). -behaviour(application). -include("n2o.hrl"). --include("emqttd.hrl"). --compile(export_all). --export([start/2, stop/1, init/1, proc/2]). +-export([start/2, stop/1, init/1, proc/2, version/0, ring/0, to_binary/1, bench/0]). -%% N2O-MQTT Topic Format -%% -%% Client: 1. actions/:vsn/:module/:client -%% Server: 2. events/:vsn/:node/:module/:user/:client/:token - -load(Env) -> - emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), - emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]), - emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]), - emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]), - emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]), - emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]), - emqttd:hook('session.unsubscribed',fun ?MODULE:on_session_unsubscribed/4, [Env]), - emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]), - emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]), - emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]), - emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]). - -stop(_) -> unload(), ok. -start(_,_) -> catch load([]), X = supervisor:start_link({local,n2o},n2o, []), +% SERVICES + +-export([send/2,reg/1,unreg/1,reg/2]). % mq +-export([pickle/1,depickle/1]). % pickle +-export([encode/1,decode/1]). % format +-export([info/3,warning/3,error/3]). % log +-export([session/1,session/2,user/1,user/0]). % session + +% BUILT-IN BACKENDS + +-export([cache/2,cache/3,cache/4,invalidate_cache/1]). % cache +-export([erroring/0,stack/2,erroring/2,stack_trace/2,error_page/2]). % error + +% START VNODE HASH RING + +stop(_) -> catch n2o_vnode:unload(), ok. +start(_,_) -> catch n2o_vnode:load([]), X = supervisor:start_link({local,n2o},n2o, []), n2o_async:start(#handler{module=?MODULE,class=caching,group=n2o,state=[],name="timer"}), [ n2o_async:start(#handler{module=n2o_vnode,class=ring,group=n2o,state=[],name=Pos}) || {{_,_},Pos} <- lists:zip(ring(),lists:seq(1,length(ring()))) ], X. -ring() -> n2o_ring:ring_list(). -ring_max() -> length(ring()). -rand_vnode() -> rand:uniform(ring_max()). -init([]) -> storage_init(), - n2o_ring:init([{node(),1,4}]), - { ok, { { one_for_one, 1000, 10 }, [] } }. +ring() -> array:to_list(n2o_ring:ring()). +rand_vnode() -> rand:uniform(length(ring())). +opt() -> [ set, named_table, { keypos, 1 }, public ]. +tables() -> application:get_env(n2o,tables,[ cookies, file, caching, ring, async ]). +storage_init() -> [ ets:new(X,opt()) || X <- tables() ]. +init([]) -> storage_init(), + n2o_ring:init([{node(),1,4}]), + { ok, { { one_for_one, 1000, 10 }, [] } }. % MQTT vs OTP benchmarks @@ -45,7 +42,7 @@ bench() -> [bench_mqtt(),bench_otp()]. run() -> 10000. bench_mqtt() -> N = run(), {T,_} = timer:tc(fun() -> [ begin Y = lists:concat([X rem 16]), - n2o:send_reply(<<"clientId">>,n2o:to_binary(["events/1/",Y]),term_to_binary(X)) + n2o_vnode:send_reply(<<"clientId">>,n2o:to_binary(["events/1/",Y]),term_to_binary(X)) end || X <- lists:seq(1,N) ], ok end), {mqtt,trunc(N*1000000/T),"msgs/s"}. @@ -55,56 +52,7 @@ bench_otp() -> N = run(), {T,_} = timer:tc(fun() -> term_to_binary(X)}) || X <- lists:seq(1,N) ], ok end), {otp,trunc(N*1000000/T),"msgs/s"}. -on_client_connected(_ConnAck, Client=#mqtt_client{client_id= <<"emqttc",_/bytes>>}, _) -> {ok, Client}; -on_client_connected(_ConnAck, Client = #mqtt_client{}, _Env) -> {ok, Client}. -on_client_disconnected(_Reason, _Client = #mqtt_client{}, _Env) -> ok. -on_client_subscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}. -on_client_unsubscribe(_ClientId, _Username, TopicTable, _Env) -> {ok, TopicTable}. -on_session_created(_ClientId, _Username, _Env) -> ok. -on_session_subscribed(<<"emqttd",_/binary>>,_,{<<"actions/",_, "/",_/binary>> =Topic,Opts},_) -> {ok,{Topic,Opts}}; -on_session_subscribed(_ClientId, _Username, {Topic, Opts}, _Env) -> {ok, {Topic,Opts}}. -on_session_unsubscribed(_ClientId, _Username, {_Topic, _Opts}, _Env) -> ok. -on_session_terminated(_ClientId, _Username, _Reason, _Env) -> ok. -on_message_delivered(_ClientId, _Username, Message, _Env) -> {ok,Message}. -on_message_acked(_ClientId, _Username, Message, _Env) -> {ok,Message}. -on_message_publish(Message = #mqtt_message{topic = <<"actions/", _/binary>>, from=_From}, _Env) -> {ok, Message}; -on_message_publish(#mqtt_message{topic = <<"events/", _TopicTail/binary>> = Topic, qos=Qos, - from={ClientId,_},payload = Payload}=Message, _Env) -> - {Module, ValidateFun} = application:get_env(?MODULE, validate, {?MODULE, validate}), - Res = case emqttd_topic:words(Topic) of - [E,V,'',M,U,_C,T] -> {Mod,F} = application:get_env(?MODULE, vnode, {?MODULE, get_vnode}), - NewTopic = emqttd_topic:join([E,V,Mod:F(ClientId,Payload),M,U,ClientId,T]), - emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip; - %% @NOTE redirect to vnode - [_E,_V,_N,_M,_U,ClientId,_T] -> - case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end; - [E,V,N,M,U,_C,T] -> NewTopic = emqttd_topic:join([E,V,N,M,U,ClientId,T]), - emqttd:publish(emqttd_message:make(ClientId, Qos, NewTopic, Payload)), skip; - %% @NOTE redirects to event topic with correct ClientId - _ -> case Module:ValidateFun(Payload) of ok -> {ok, Message}; _ -> skip end - end, - case Res of - {ok, _} -> case Module:ValidateFun(Payload) of ok -> Res; _ -> skip end; - _ -> Res - end; -on_message_publish(Message, _) -> {ok,Message}. - -unload() -> - emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3), - emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3), - emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4), - emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4), - emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4), - emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4), - emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2), - emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4), - emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4). - -send_reply(ClientId, Topic, Message) -> send_reply(ClientId, 0, Topic, Message). -send_reply(ClientId, QoS, Topic, Message) -> - emqttd:publish(emqttd_message:make(ClientId, QoS, Topic, Message)). - -% Pickling n2o:send/1 +% MQ mq() -> application:get_env(n2o,mq,n2o_gproc). send(X,Y) -> (mq()):send(X,Y). @@ -112,50 +60,38 @@ reg(X) -> (mq()):reg(X). unreg(X) -> (mq()):unreg(X). reg(X,Y) -> (mq()):reg(X,Y). -% Pickling n2o:pickle/1 +% PICKLE pickler() -> application:get_env(n2o,pickler,n2o_secret). pickle(Data) -> (pickler()):pickle(Data). depickle(SerializedData) -> (pickler()):depickle(SerializedData). -% Error handler n2o:error/2 n2o:stack/2 +% ERROR erroring() -> application:get_env(n2o,erroring,n2o). stack(Error, Reason) -> (erroring()):stack_trace(Error, Reason). erroring(Class, Error) -> (erroring()):error_page(Class, Error). -% Formatter +% SESSION + +session() -> application:get_env(n2o,session,n2o_session). +session(Key) -> #cx{session=SID}=get(context), (session()):get_value(SID, Key, []). +session(Key, Value) -> #cx{session=SID}=get(context), (session()):set_value(SID, Key, Value). +user() -> case session(user) of undefined -> []; E -> lists:concat([E]) end. +user(User) -> session(user,User). + +% FORMAT formatter() -> application:get_env(n2o,formatter,n2o_bert). encode(Term) -> (formatter()):encode(Term). decode(Term) -> (formatter()):decode(Term). -% Cache facilities n2o:cache/[1,2,3] +% CACHE -proc(init,#handler{}=Async) -> - n2o:info(?MODULE,"Proc Init: ~p\r~n",[init]), - Timer = timer_restart(ping()), - {ok,Async#handler{state=Timer}}; - -proc({timer,ping},#handler{state=Timer}=Async) -> - erlang:cancel_timer(Timer), - n2o:info(?MODULE,"n2o Timer: ~p\r~n",[ping]), - n2o:invalidate_cache(caching), - n2o_session:invalidate_sessions(), - {reply,ok,Async#handler{state=timer_restart(ping())}}. - -invalidate_cache(Table) -> ets:foldl(fun(X,_) -> n2o:cache(Table,element(1,X)) end, 0, Table). -timer_restart(Diff) -> {X,Y,Z} = Diff, erlang:send_after(1000*(Z+60*Y+60*60*X),self(),{timer,ping}). -ping() -> application:get_env(n2o,timer,{0,1,0}). -ttl() -> application:get_env(n2o,ttl,60*1). -till(Now,TTL) -> calendar:gregorian_seconds_to_datetime(calendar:datetime_to_gregorian_seconds(Now) + TTL), infinity. - -opt() -> [ set, named_table, { keypos, 1 }, public ]. -tables() -> application:get_env(n2o,tables,[ cookies, file, caching, ring, async ]). -storage_init() -> [ ets:new(X,opt()) || X <- tables() ]. -cache(Tab, Key, undefined) -> ets:delete(Tab,Key); -cache(Tab, Key, Value) -> ets:insert(Tab,{Key,till(calendar:local_time(), ttl()),Value}), Value. cache(Tab, Key, Value, Till) -> ets:insert(Tab,{Key,Till,Value}), Value. +cache(Tab, Key, undefined) -> ets:delete(Tab,Key); +cache(Tab, Key, Value) -> ets:insert(Tab,{Key,n2o_session:till(calendar:local_time(), + n2o_session:ttl()),Value}), Value. cache(Tab, Key) -> Res = ets:lookup(Tab,Key), Val = case Res of [] -> []; [Value] -> Value; Values -> Values end, @@ -165,40 +101,7 @@ cache(Tab, Key) -> true -> ets:delete(Tab,Key), []; false -> X end end. -% Context Variables and URL Query Strings from ?REQ and ?CTX n2o:q/1 n2o:qc/[1,2] - -q(Key) -> Val = get(Key), case Val of undefined -> qc(Key); A -> A end. -qc(Key) -> CX = get(context), qc(Key,CX). -qc(Key,Ctx) -> proplists:get_value(n2o:to_binary([Key]),Ctx#cx.params). - -atom(List) when is_list(List) -> list_to_atom(string:join([ lists:concat([L]) || L <- List],"_")); -atom(Scalar) -> list_to_atom(lists:concat([Scalar])). - -% These api are not really API - -temp_id() -> "auto" ++ integer_to_list(erlang:unique_integer() rem 1000000). -append(List, Key, Value) -> case Value of undefined -> List; _A -> [{Key, Value}|List] end. -render(X) -> wf_render:render(X). - -version() -> proplists:get_value(vsn,element(2,application:get_all_key(n2o))). - -keyset(Name,Pos,List,New) -> - case lists:keyfind(Name,Pos,List) of - false -> [New|List]; - _Element -> lists:keyreplace(Name,Pos,List,New) end. - -actions() -> get(actions). -actions(Ac) -> put(actions,Ac). - -clear_actions() -> put(actions,[]). -add_action(Action) -> - Actions = case get(actions) of undefined -> []; E -> E end, - put(actions,Actions++[Action]). - -fold(Fun,Handlers,Ctx) -> - lists:foldl(fun({_,Module},Ctx1) -> - {ok,_,NewCtx} = Module:Fun([],Ctx1), - NewCtx end,Ctx,Handlers). +% ERROR stack_trace(Error, Reason) -> Stacktrace = [case A of @@ -215,46 +118,26 @@ error_page(Class,Error) -> [ Module,Function,Arity,proplists:get_value(line, Location) ]) || { Module,Function,Arity,Location} <- erlang:get_stacktrace() ]. -session() -> application:get_env(n2o,session,n2o_session). -session(Key) -> #cx{session=SID}=get(context), (session()):get_value(SID, Key, []). -session(Key, Value) -> #cx{session=SID}=get(context), (session()):set_value(SID, Key, Value). -user() -> case session(user) of undefined -> []; E -> lists:concat([E]) end. -user(User) -> session(user,User). -subscribe(X,Y) -> subscribe(X,Y,[{qos,2}]). -subscribe(X,Y,[{qos,Z}]) -> subscribe_cli(X,[{Y,Z}]). - -unsubscribe(X,Y) -> unsubscribe(X,Y,[{qos,2}]). -unsubscribe(X,Y,[{qos,Z}]) -> unsubscribe_cli(X,[{Y,Z}]). - -subscribe_cli(ClientId, TopicTable) -> - [ begin - kvs:put({mqtt_subscription, ClientId, Topic}), - kvs:put({mqtt_subproperty, {Topic, ClientId}, [{qos,Qos}]}), - emqttd_pubsub:add_subscriber(Topic,ClientId,[{qos,Qos}]) - end || {Topic,Qos} <- TopicTable ]. - -unsubscribe_cli(ClientId, TopicTable)-> - DelFun = fun() -> [ begin - mnesia:delete_object({mqtt_subscription, ClientId, Topic}), - kvs:delete(mqtt_subproperty, {Topic, ClientId}), - mnesia:delete_object({mqtt_subscriber, Topic, ClientId}) - end || {Topic,_Qos} <- TopicTable ] end, - case mnesia:is_transaction() of - true -> DelFun(); - false -> mnesia:transaction(DelFun) - end, - [(not ets:member(mqtt_subscriber, Topic)) andalso - emqttd_router:del_route(Topic) || {Topic,_Qos} <- TopicTable ]. - -get_vnode(ClientId) -> get_vnode(ClientId, []). -get_vnode(ClientId, _) -> - [H|_] = binary_to_list(erlang:md5(ClientId)), - integer_to_binary(H rem ring_max() + 1). - -validate(_Payload) -> ok. - -% Tiny Logging Framework +% TIMER + +proc(init,#handler{}=Async) -> + n2o:info(?MODULE,"Proc Init: ~p\r~n",[init]), + Timer = timer_restart(ping()), + {ok,Async#handler{state=Timer}}; + +proc({timer,ping},#handler{state=Timer}=Async) -> + erlang:cancel_timer(Timer), + n2o:info(?MODULE,"n2o Timer: ~p\r~n",[ping]), + n2o:invalidate_cache(caching), + (n2o_session:storage()):invalidate_sessions(), + {reply,ok,Async#handler{state=timer_restart(ping())}}. + +invalidate_cache(Table) -> ets:foldl(fun(X,_) -> n2o:cache(Table,element(1,X)) end, 0, Table). +timer_restart(Diff) -> {X,Y,Z} = Diff, erlang:send_after(1000*(Z+60*Y+60*60*X),self(),{timer,ping}). +ping() -> application:get_env(n2o,timer,{0,1,0}). + +% LOG logger() -> application:get_env(?MODULE,logger,n2o_io). log_modules() -> application:get_env(?MODULE,log_modules,[]). @@ -288,3 +171,9 @@ to_binary(I) when is_integer(I) -> to_binary(integer_to_list(I)); to_binary(F) when is_float(F) -> float_to_binary(F,[{decimals,9},compact]); to_binary(L) when is_list(L) -> iolist_to_binary(L). +% + +version() -> proplists:get_value(vsn, + element(2,application:get_all_key(n2o))). + +% END diff --git a/src/n2o_async.erl b/src/n2o_async.erl index 85ac87ae..ad6b8e8d 100644 --- a/src/n2o_async.erl +++ b/src/n2o_async.erl @@ -1,10 +1,10 @@ -module(n2o_async). --description('N2O Async Processes'). +-description('N2O Process'). -include("n2o.hrl"). -behaviour(gen_server). -export([start_link/1]). -export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,code_change/3]). --compile(export_all). +-export([start/1,stop/2,send/2,send/3,pid/2,restart/2]). start(#handler{class=Class,name=Name,module=Module,group=Group} = Async) -> ChildSpec = {{Class,Name},{?MODULE,start_link,[Async]}, diff --git a/src/n2o_proto.erl b/src/n2o_proto.erl index 6edbae08..9479a2ad 100644 --- a/src/n2o_proto.erl +++ b/src/n2o_proto.erl @@ -1,10 +1,8 @@ -module(n2o_proto). -description('N2O Proto Loop'). -include("n2o.hrl"). --compile(export_all). --export([info/3, stream/3, push/5]). +-export([info/3, stream/3, push/5, init/4, terminate/2]). -formatter(O) -> case lists:keyfind(formatter,1,O) of {formatter,F} -> F; X -> X end. protocols() -> application:get_env(n2o,protocols,[ n2o_nitro ]). info(M,R,S) -> filter(M,R,S,protocols(),[]). filter(M,R,S,P,A) -> {Mod,Fun} = (application:get_env(n2o,filter,{?MODULE,push})), diff --git a/src/protocols/n2o_ftp.erl b/src/protocols/n2o_ftp.erl index 4028b1af..2683f338 100644 --- a/src/protocols/n2o_ftp.erl +++ b/src/protocols/n2o_ftp.erl @@ -2,10 +2,10 @@ -description('N2O File Protocol'). -include("n2o.hrl"). -include_lib("kernel/include/file.hrl"). --compile(export_all). +-export([info/3,proc/2,filename/1]). -define(ROOT, filename:join(mad_utils:cwd(),application:get_env(n2o,upload,code:priv_dir(n2o)))). --define(NEXT, 5*1024). % 256K chunks for best 25MB/s speed +-define(NEXT, 2*1024). % 256K chunks for best 25MB/s speed -define(STOP, 0). root() -> ?ROOT. @@ -17,26 +17,25 @@ filename(#ftp{sid=_Sid,filename=FileName}) -> FileName. %filename:join(lists:con % File Transfer Protocol info(#ftp{status = {event, _}}=FTP, Req, State) -> - n2o:info(?MODULE,"Event Message: ~p~n", [ FTP#ftp{data = <<>>} ]), Module = case State#cx.module of [] -> index; M -> M end, Reply = try Module:event(FTP) catch E:R -> - Error = n2o:stack_trace(E,R), + Error = n2o:stack(E,R), n2o:error(?MODULE,"Catch: ~p~n",[Error]), Error end, - {reply, {bert, {io,n2o_nitro:render_actions(n2o:actions()), Reply}}, Req, State}; + {reply, {bert, {io,n2o_nitro:render_actions(nitro:actions()), Reply}}, Req, State}; info(#ftp{id = Link, status = <<"init">>, block = Block, offset = Offset}=FTP, Req, State) -> Root=?ROOT, RelPath=(application:get_env(n2o,filename,n2o_ftp)):filename(FTP), - n2o:info(?MODULE,"RelPath: ~p~n",[ RelPath ]), FilePath = filename:join(Root, RelPath), ok = filelib:ensure_dir(FilePath), FileSize = case file:read_file_info(FilePath) of {ok, Fi} -> Fi#file_info.size; {error, _} -> 0 end, - n2o:info(?MODULE,"Info Init: ~p Offset: ~p Block: ~p~n",[ FilePath, FileSize, Block ]), + + n2o:info(?MODULE,"FTP INFO INIT: ~p~n",[ FTP#ftp{data = <<>>, sid = <<>>} ]), Block2 = case Block of 0 -> ?STOP; _ -> ?NEXT end, Offset2 = case FileSize >= Offset of true -> FileSize; false -> 0 end, @@ -48,48 +47,47 @@ info(#ftp{id = Link, status = <<"init">>, block = Block, offset = Offset}=FTP, R {reply, {bert, FTP2}, Req, State}; info(#ftp{id = Link, status = <<"send">>}=FTP, Req, State) -> - n2o:info(?MODULE,"Info Send: ~p~n",[ FTP#ftp{data = <<>>} ]), + n2o:info(?MODULE,"FTP INFO SEND: ~p~n",[ FTP#ftp{data = <<>>, sid = <<>>} ]), Reply = try - gen_server:call(n2o_async:pid(file, Link), FTP) + n2o_async:send(file, Link, FTP) catch E:R -> - n2o:error(?MODULE,"Info Error call the sync: ~p~n",[ {E,R} ]), - FTP#ftp{data = <<>>,block = ?STOP} + n2o:error(?MODULE,"FTP ERROR: ~p~n",[ {E,R} ]), + FTP#ftp{data = <<>>,sid = <<>>, block = ?STOP} end, - n2o:info(?MODULE,"Send reply ~p~n",[ Reply#ftp{ data = <<>> }]), {reply, {bert, Reply}, Req, State}; info(Message, Req, State) -> {unknown, Message, Req, State}. % n2o Handlers -proc(init, #handler{state = #ftp{sid = Sid, meta = ClientId} = FTP} = Async) -> - n2o:info(?MODULE,"Proc Init: ~p~n Sid: ~p ClientId: ~p~n",[ FTP#ftp{data = <<>>}, Sid, ClientId ]), - FTP2 = FTP#ftp{data = <<>>, status = {event, init}}, - n2o_ring:send({publish, <<"events/1//index/anon/",ClientId/binary,"/",Sid/binary>>, term_to_binary(FTP2)}), +proc(init, #handler{name = Link, state = #ftp{sid = Sid, meta = ClientId} = FTP} = Async) -> {ok, Async}; -proc(#ftp{id = Link, sid = Sid, data = Data, status = <<"send">>, block = Block, meta = ClientId} = FTP, - #handler{state = #ftp{size = TotalSize, offset = Offset, filename = RelPath}} = Async) when Offset + Block >= TotalSize -> - n2o:info(?MODULE,"Proc Stop ~p, last piece size: ~p: ClientId: ~p~n", [ FTP#ftp{data = <<>>}, byte_size(Data), ClientId ]), +proc(#ftp{sid = Token, data = Data, status = <<"send">>, block = Block, meta = ClientId} = FTP, + #handler{name = Link, state = #ftp{size = TotalSize, offset = Offset, filename = RelPath}} = Async) + when Offset + Block >= TotalSize -> + n2o:info(?MODULE,"FTP PROC FINALE: ~p~n", [ Link ]), case file:write_file(filename:join(?ROOT,RelPath), <>, [append,raw]) of {error, Reason} -> n2o:error(?MODULE,"WRITE ERROR: ~p~n", [ filename:join(?ROOT, RelPath) ]), {reply, {error, Reason}, Async}; ok -> - n2o:info(?MODULE,"WRITE FINAL: ~p~n", [ filename:join(?ROOT, RelPath) ]), - FTP2 = FTP#ftp{data = <<>>, offset = TotalSize, block = ?STOP}, + FTP2 = FTP#ftp{data = <<>>, sid = <<>>,offset = TotalSize, block = ?STOP}, FTP3 = FTP2#ftp{status = {event, stop}, filename = RelPath}, - n2o_ring:send({publish, <<"events/1//index/anon/",ClientId/binary,"/",Sid/binary>>, term_to_binary(FTP3)}), + spawn(fun() -> catch n2o_ring:send({publish, <<"events/1//index/anon/",ClientId/binary,"/",Token/binary>>, + term_to_binary(FTP3)}), + Sid = case n2o:depickle(Token) of {{S,_},_} -> S; X -> X end, + catch n2o:send(Sid,FTP3) end), spawn(fun() -> n2o_async:stop(file, Link) end), {stop, normal, FTP2, Async#handler{state = FTP2}} end; -proc(#ftp{data = Data, block = Block} = FTP, #handler{state = #ftp{offset = Offset, filename = RelPath}}=Async) -> - n2o:info(?MODULE,"WRITE: >>~p ~p ~p~n", [ Offset, Block, filename:join(?ROOT,RelPath) ]), +proc(#ftp{data = Data, block = Block} = FTP, + #handler{state = #ftp{offset = Offset, filename = RelPath}}=Async) -> FTP2 = FTP#ftp{status = <<"send">>, offset = Offset + Block }, - n2o:info(?MODULE,"Proc Process ~p~n",[ FTP2#ftp{data = <<>>} ]), case file:write_file(filename:join(?ROOT, RelPath), <>, [append,raw]) of {error, Reason} -> {reply, {error, Reason}, Async}; - ok -> {reply, FTP2#ftp{data = <<>>}, Async#handler{state = FTP2#ftp{filename = RelPath}}} end; + ok -> {reply, FTP2#ftp{data = <<>>}, + Async#handler{state = FTP2#ftp{data = <<>>, filename = RelPath}}} end; proc(_,Async) -> {reply, #ftpack{}, Async}. diff --git a/src/protocols/n2o_nitro.erl b/src/protocols/n2o_nitro.erl index 5f475830..9420d780 100644 --- a/src/protocols/n2o_nitro.erl +++ b/src/protocols/n2o_nitro.erl @@ -1,7 +1,7 @@ -module(n2o_nitro). --description('N2O Nitro Protocol'). +-description('N2O Nitrogen Web Framework Protocol'). -include("n2o.hrl"). --compile(export_all). +-export([info/3,render_actions/1]). % Nitrogen pickle handler @@ -18,12 +18,12 @@ info(#init{token=Auth}, Req, State = #cx{module = Module}) -> nitro:render(Elements), {ok,[]} catch Err:Rea -> - StackMain = n2o:stack_trace(Err,Rea), + StackMain = n2o:stack(Err,Rea), n2o:error(?MODULE,"Catch:~p~n",[StackMain]), {error,StackMain} end of {ok, _} -> Actions = try Module:event(init), render_actions(nitro:actions()) - catch Err1:Rea1 -> StackInit = n2o:stack_trace(Err1,Rea1), + catch Err1:Rea1 -> StackInit = n2o:stack(Err1,Rea1), n2o:error(?MODULE,"Catch:~p~n",[StackInit]), {stack,StackInit} end, {reply, {bert,{io,Actions,{'Token',Token}}},Req,New}; @@ -34,7 +34,7 @@ info(#client{data=Message}, Req, State) -> n2o:info(?MODULE,"Client Message: ~p",[Message]), Module = State#cx.module, Reply = try Module:event(#client{data=Message}) - catch Err:Rea -> Stack = n2o:stack_trace(Err,Rea), + catch Err:Rea -> Stack = n2o:stack(Err,Rea), n2o:error(?MODULE,"Catch:~p~n",[Stack]), {error,Stack} end, {reply,{bert,{io,render_actions(nitro:actions()),Reply}},Req,State}; @@ -42,7 +42,7 @@ info(#client{data=Message}, Req, State) -> info(#pickle{}=Event, Req, State) -> nitro:actions([]), Result = try html_events(Event,State) - catch E:R -> Stack = n2o:stack_trace(E,R), + catch E:R -> Stack = n2o:stack(E,R), n2o:error(?MODULE,"Catch: ~p:~p~n~p", Stack), {io,render_actions(nitro:actions()),Stack} end, {reply,{bert,Result}, Req,State}; @@ -55,7 +55,7 @@ info(#direct{data=Message}, Req, State) -> nitro:actions([]), Module = State#cx.module, Result = try Res = Module:event(Message), {direct,Res} - catch E:R -> Stack = n2o:stack_trace(E, R), + catch E:R -> Stack = n2o:stack(E, R), n2o:error(?MODULE,"Catch: ~p:~p~n~p", Stack), {stack,Stack} end, {reply,{bert,{io,render_actions(nitro:actions()),Result}}, Req,State}; diff --git a/src/services/n2o_secret.erl b/src/services/n2o_secret.erl index 8a646606..ebc59729 100644 --- a/src/services/n2o_secret.erl +++ b/src/services/n2o_secret.erl @@ -1,7 +1,7 @@ -module(n2o_secret). -description('N2O HMAC AES/CBC-128'). -include("n2o.hrl"). --export([pickle/1,depickle/1,secret/0,hex/1,unhex/1]). +-export([pickle/1,depickle/1,hex/1,unhex/1,sid/1]). pickle(Data) -> Message = term_to_binary(Data), @@ -24,3 +24,5 @@ hex(Bin) -> << << (digit(A1)),(digit(A2)) >> || <> <= Bin >>. unhex(Hex) -> << << (erlang:list_to_integer([H1,H2], 16)) >> || <> <= Hex >>. digit(X) when X >= 0 andalso X =< 9 -> X + 48; digit(X) when X >= 10 andalso X =< 15 -> X + 87. +sid(Seed) -> hex(binary:part(crypto:hmac(application:get_env(n2o,hmac,sha256), + secret(),term_to_binary(Seed)),0,10)). diff --git a/src/services/n2o_session.erl b/src/services/n2o_session.erl index dbafd7db..bcf232ef 100644 --- a/src/services/n2o_session.erl +++ b/src/services/n2o_session.erl @@ -1,7 +1,8 @@ -module(n2o_session). -include_lib("stdlib/include/ms_transform.hrl"). -description('N2O Session'). --compile(export_all). +-export([authenticate/2, get_value/3, set_value/3, storage/0, prolongate/0, from/1, ttl/0, till/2]). +-export([clear/1, delete/1, update/1, lookup/1, invalidate_sessions/0]). % PRELUDE @@ -12,12 +13,13 @@ cut(Bin) -> binary:part(Bin,0,20). expired(Till) -> Till < to(calendar:local_time()). expire() -> to(till(calendar:local_time(), ttl())). auth(Sid,Exp) -> {{Sid,'auth'},{Exp,[]}}. -token(Auth) -> ets:insert(cookies,Auth), {'Token',n2o:pickle(Auth)}. +storage() -> application:get_env(n2o,session_storage,n2o_session). +token(A) -> (storage()):update(A), {'Token',n2o:pickle(A)}. +token(A,P) -> (storage()):update(A), {'Token',P}. ttl() -> application:get_env(n2o,ttl,60*15). till(Now,TTL) -> from(to(Now)+TTL). prolongate() -> application:get_env(n2o,nitro_prolongate,false). -sid(Seed) -> nitro_conv:hex(binary:part(crypto:hmac(application:get_env(n2o,hmac,sha256), - n2o_secret:secret(),term_to_binary(Seed)),0,10)). +sid(Seed) -> n2o_secret:sid(Seed). % API @@ -25,31 +27,36 @@ authenticate([], Pickle) -> case n2o:depickle(Pickle) of <<>> -> token(auth(sid(os:timestamp()),expire())); {{Sid,'auth'},{Till,[]}} = Auth -> - case expired(Till) orelse prolongate() of - false -> ets:insert(cookies,Auth), - {'Token', Pickle}; - true -> delete_auth({Sid,auth}), - token(auth(Sid,expire())) end end. + case {expired(Till), prolongate()} of + {false,false} -> token(Auth,Pickle); + {false,true} -> token(auth(Sid,expire())); + {true,_} -> (storage()):delete({Sid,auth}), + token(auth(sid(os:timestamp()),expire())) + end + end. get_value(Session, Key, Default) -> - case lookup_ets({Session,Key}) of + case (storage()):lookup({Session,Key}) of [] -> Default; {{Session,Key},{Till,Value}} -> case expired(Till) of false -> Value; - true -> ets:delete(cookies,{Session,Key}), Default end end. + true -> (storage()):delete({Session,Key}), Default end end. set_value(Session, Key, Value) -> - ets:insert(cookies,{{Session,Key},{expire(),Value}}), Value. + (storage()):update({{Session,Key},{expire(),Value}}), Value. clear(Session) -> [ ets:delete(cookies,X) || X <- ets:select(cookies, ets:fun2ms(fun(A) when (element(1,element(1,A)) == Session) -> element(1,A) end)) ], ok. -lookup_ets(Key) -> - case ets:lookup(cookies,Key) of [Value] -> Value; Values -> Values end. +lookup({Session,Key}) -> + case ets:lookup(cookies,{Session,Key}) of [Value] -> Value; Values -> Values end. -delete_auth(Session) -> - case lookup_ets(Session) of [] -> ok; X -> ets:delete_object(cookies, X) end. +update(Token) -> + ets:insert(cookies,Token). + +delete({Session,Key}) -> + ets:delete(cookies,{Session,Key}). invalidate_sessions() -> ets:foldl(fun(X,A) -> {Sid,Key} = element(1,X), @@ -63,7 +70,7 @@ positive_test() -> {{SID,Key},{Till,[]}} = n2o:depickle(B), {'Token',C}=n2o_session:authenticate("",B), {{SID,Key},{Till,[]}} = n2o:depickle(C), - delete_auth({SID,'auth'}), + delete({SID,'auth'}), true=(C==B). negative_test() -> @@ -79,9 +86,9 @@ negative_test() -> {'Token', TokenC} = n2o_session:authenticate("", TokenB), {{SID2,_},{_,[]}} = n2o:depickle(TokenC), NewTokenIsValid = TokenB == TokenC, - delete_auth({SID0,auth}), - delete_auth({SID1,auth}), - delete_auth({SID2,auth}), + delete({SID0,auth}), + delete({SID1,auth}), + delete({SID2,auth}), TokenWasChanged == NewTokenIsValid. test_set_get_value() -> @@ -90,5 +97,5 @@ test_set_get_value() -> Key = base64:encode(crypto:strong_rand_bytes(8)), set_value(SID, Key, InputValue), ResultValue = get_value(SID, Key, []), - delete_auth({SID,Key}), + delete({SID,Key}), InputValue == ResultValue. diff --git a/src/ws/n2o_cowboy.erl b/src/ws/n2o_cowboy.erl index 3dd6ced2..a9c13225 100644 --- a/src/ws/n2o_cowboy.erl +++ b/src/ws/n2o_cowboy.erl @@ -1,7 +1,6 @@ -module(n2o_cowboy). -include("n2o.hrl"). -description('N2O Cowboy HTTP Backend'). -%-behaviour(cowboy_http_handler). -export([init/3, handle/2, terminate/3]). -compile(export_all). -record(state, {headers, body}). diff --git a/src/ws/n2o_cowboy2.erl b/src/ws/n2o_cowboy2.erl index dc83ff8b..d2b132f0 100644 --- a/src/ws/n2o_cowboy2.erl +++ b/src/ws/n2o_cowboy2.erl @@ -1,4 +1,5 @@ -module(n2o_cowboy2). +-description('N2O Cowboy2 WebSocket Backend'). -compile(export_all). init(Req,_Opts) -> {cowboy_websocket, Req, Req}.