diff --git a/src/ecpool.appup.src b/src/ecpool.appup.src index 5a2a1f25..c871f73d 100644 --- a/src/ecpool.appup.src +++ b/src/ecpool.appup.src @@ -1,6 +1,9 @@ %% -*-: erlang -*- -{"0.5.9", +{"0.5.10", [ + {"0.5.9", [ + {load_module, ecpool_worker, brutal_purge, soft_purge, []} + ]}, {<<"0\\.5\\.[3-8]">>, [ %% NOTE: MUST start ecpool_monitor before any load_module instructions {add_module, ecpool_monitor}, @@ -33,6 +36,9 @@ ]} ], [ + {"0.5.9", [ + {load_module, ecpool_worker, brutal_purge, soft_purge, []} + ]}, {<<"0\\.5\\.[3-8]">>, [ {load_module, ecpool_pool, brutal_purge, soft_purge, []}, {load_module, ecpool_sup, brutal_purge, soft_purge, []}, diff --git a/src/ecpool_worker.erl b/src/ecpool_worker.erl index 24e7c0c8..d05fbbee 100644 --- a/src/ecpool_worker.erl +++ b/src/ecpool_worker.erl @@ -30,6 +30,8 @@ , set_disconnect_callback/2 , add_reconnect_callback/2 , remove_reconnect_callback/2 + , remove_reconnect_callback_by_signature/2 + , get_reconnect_callbacks/1 , add_disconnect_callback/2 ]). @@ -110,6 +112,14 @@ add_reconnect_callback(Pid, OnReconnect) -> remove_reconnect_callback(Pid, OnReconnect) -> gen_server:cast(Pid, {remove_reconn_callbk, OnReconnect}). +-spec(remove_reconnect_callback_by_signature(pid(), term()) -> ok). +remove_reconnect_callback_by_signature(Pid, CallbackSignature) -> + gen_server:cast(Pid, {remove_reconnect_callback_by_signature, CallbackSignature}). + +-spec(get_reconnect_callbacks(pid()) -> [{module(), atom(), list()}]). +get_reconnect_callbacks(Pid) -> + gen_server:call(Pid, get_reconnect_callbacks, infinity). + -spec(add_disconnect_callback(pid(), ecpool:conn_callback()) -> ok). add_disconnect_callback(Pid, OnDisconnect) -> gen_server:cast(Pid, {add_disconn_callbk, OnDisconnect}). @@ -151,6 +161,9 @@ handle_call(client, _From, State = #state{client = Client}) -> handle_call({exec, Action}, _From, State = #state{client = Client}) -> {reply, safe_exec(Action, Client), State}; +handle_call(get_reconnect_callbacks, _From, #state{on_reconnect = OnReconnect} = State) -> + {reply, OnReconnect, State}; + handle_call(Req, _From, State) -> logger:error("[PoolWorker] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -169,9 +182,20 @@ handle_cast({set_reconn_callbk, OnReconnect}, State) -> handle_cast({set_disconn_callbk, OnDisconnect}, State) -> {noreply, State#state{on_disconnect = ensure_callback(OnDisconnect)}}; -handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) -> +handle_cast({add_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect0}) -> + OldOnReconnect = + case reconnect_callback_signature(OnReconnect) of + {ok, Signature} -> + drop_reconnect_callbacks_by_signature(OldOnReconnect0, Signature); + error -> + OldOnReconnect0 + end, {noreply, State#state{on_reconnect = add_conn_callback(OnReconnect, OldOnReconnect)}}; +handle_cast({remove_reconnect_callback_by_signature, CallbackSignature}, State = #state{on_reconnect = OnReconnect0}) -> + OldOnReconnect = drop_reconnect_callbacks_by_signature(OnReconnect0, CallbackSignature), + {noreply, State#state{on_reconnect = OldOnReconnect}}; + handle_cast({remove_reconn_callbk, OnReconnect}, State = #state{on_reconnect = OldOnReconnect}) -> {noreply, State#state{on_reconnect = remove_conn_callback(OnReconnect, OldOnReconnect)}}; @@ -254,8 +278,9 @@ reconnect(Secs, State = #state{client = Client, on_disconnect = Disconnect, supe handle_reconnect(undefined, _) -> ok; handle_reconnect(Client, OnReconnectList) when is_list(OnReconnectList) -> + %% reverse apply the callbacks because the newer ones are cons-ed to the head of the list lists:foreach(fun(OnReconnectCallback) -> safe_exec(OnReconnectCallback, Client) end, - OnReconnectList). + lists:reverse(OnReconnectList)). handle_disconnect(undefined, _) -> ok; @@ -279,6 +304,27 @@ connect_internal(State) -> _C:Reason:ST -> {error, {Reason, ST}} end. +-spec reconnect_callback_signature({module(), atom(), list()}) -> {ok, term()} | error. +reconnect_callback_signature({M, _, A}) -> + try + {ok, M:get_reconnect_callback_signature(A)} + catch + _:_ -> + error + end. + +is_reconnect_calback_signature_match(CB, Sig) -> + case reconnect_callback_signature(CB) of + {ok, Sig2} -> + Sig2 =:= Sig; + _ -> + false + end. + +drop_reconnect_callbacks_by_signature(Callbacks, Signature) -> + Pred = fun(CB) -> not is_reconnect_calback_signature_match(CB, Signature) end, + lists:filter(Pred, Callbacks). + safe_exec({_M, _F, _A} = Action, MainArg) -> try exec(Action, MainArg) catch E:R:ST ->