From 85cc6e359268b166a814622131ae526611aff375 Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Tue, 14 Sep 2010 17:49:18 +0100 Subject: [PATCH 1/5] amqp_client is now a proper Erlang/OTP application --- ebin/amqp_client.app.in | 1 + src/amqp_client.erl | 50 +++++++++++++++++++++++++++++++ src/amqp_connection.erl | 4 ++- src/amqp_connection_sup.erl | 1 - src/amqp_network_connection.erl | 4 --- src/amqp_sup.erl | 52 +++++++++++++++++++++++++++++++++ 6 files changed, 106 insertions(+), 6 deletions(-) create mode 100644 src/amqp_client.erl create mode 100644 src/amqp_sup.erl diff --git a/ebin/amqp_client.app.in b/ebin/amqp_client.app.in index 551392c5..fc31a625 100644 --- a/ebin/amqp_client.app.in +++ b/ebin/amqp_client.app.in @@ -13,4 +13,5 @@ ]}, {registered, []}, {env, []}, + {mod, {amqp_client, []}}, {applications, [kernel, stdlib]}]}. diff --git a/src/amqp_client.erl b/src/amqp_client.erl new file mode 100644 index 00000000..aa1339e7 --- /dev/null +++ b/src/amqp_client.erl @@ -0,0 +1,50 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is the RabbitMQ Erlang Client. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial +%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C) +%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ____________________. + +%% @private +-module(amqp_client). + +-behaviour(application). + +-export([start/0]). +-export([start/2, stop/1]). + +%%--------------------------------------------------------------------------- +%% Interface +%%--------------------------------------------------------------------------- + +start() -> + application:start(amqp_client). + +%%--------------------------------------------------------------------------- +%% application callbacks +%%--------------------------------------------------------------------------- + +start(_StartType, _StartArgs) -> + {ok, AmqpSup} = amqp_sup:start_link(), + register(amqp_sup, AmqpSup), + {ok, AmqpSup}. + +stop(_State) -> + ok. diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl index 25535825..b7640733 100644 --- a/src/amqp_connection.erl +++ b/src/amqp_connection.erl @@ -99,7 +99,9 @@ start(Type) -> %% a RabbitMQ server, assuming that the server is running in the same process %% space. start(Type, AmqpParams) -> - {ok, _Sup, Connection} = amqp_connection_sup:start_link(Type, AmqpParams), + amqp_client:start(), + {ok, _Sup, Connection} = + amqp_sup:start_connection_sup(amqp_sup, Type, AmqpParams), Module = case Type of direct -> amqp_direct_connection; network -> amqp_network_connection end, diff --git a/src/amqp_connection_sup.erl b/src/amqp_connection_sup.erl index ef26c508..be63f251 100644 --- a/src/amqp_connection_sup.erl +++ b/src/amqp_connection_sup.erl @@ -38,7 +38,6 @@ start_link(Type, AmqpParams) -> {ok, Sup} = supervisor2:start_link(?MODULE, []), - unlink(Sup), {ok, ChSupSup} = supervisor2:start_child(Sup, {channel_sup_sup, {amqp_channel_sup_sup, start_link, [Type]}, diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index dcad19b8..ed07d26e 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -393,10 +393,6 @@ start_ok(#state{params = #amqp_params{username = Username, response = rabbit_binary_generator:generate_table(LoginTable)}. client_properties(UserProperties) -> - %% TODO This eagerly starts the amqp_client application in order to - %% to get the version from the app descriptor, which may be - %% overkill - maybe there is a more suitable point to boot the app - rabbit_misc:start_applications([amqp_client]), {ok, Vsn} = application:get_key(amqp_client, vsn), Default = [{<<"product">>, longstr, <<"RabbitMQ">>}, {<<"version">>, longstr, list_to_binary(Vsn)}, diff --git a/src/amqp_sup.erl b/src/amqp_sup.erl new file mode 100644 index 00000000..672c15d0 --- /dev/null +++ b/src/amqp_sup.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is the RabbitMQ Erlang Client. +%% +%% The Initial Developers of the Original Code are LShift Ltd., +%% Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd., Cohesive Financial +%% Technologies LLC., and Rabbit Technologies Ltd. are Copyright (C) +%% 2007 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit +%% Technologies Ltd.; +%% +%% All Rights Reserved. +%% +%% Contributor(s): ____________________. + +%% @private +-module(amqp_sup). + +-include("amqp_client.hrl"). + +-behaviour(supervisor2). + +-export([start_link/0, start_connection_sup/3]). +-export([init/1]). + +%%--------------------------------------------------------------------------- +%% Interface +%%--------------------------------------------------------------------------- + +start_link() -> + supervisor2:start_link(?MODULE, []). + +start_connection_sup(Sup, Type, AmqpParams) -> + supervisor2:start_child(Sup, [Type, AmqpParams]). + +%%--------------------------------------------------------------------------- +%% supervisor2 callbacks +%%--------------------------------------------------------------------------- + +init([]) -> + {ok, {{simple_one_for_one_terminate, 0, 1}, + [{connection_sup, {amqp_connection_sup, start_link, []}, + temporary, infinity, supervisor, [amqp_connection_sup]}]}}. From 5db719381b2e970f0175c45d394e5b37c0de9dd6 Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Tue, 14 Sep 2010 18:29:53 +0100 Subject: [PATCH 2/5] registering amqp_sup properly; closing connections nicely before stopping amqp_client --- src/amqp_client.erl | 14 ++++++++++---- src/amqp_connection.erl | 2 +- src/amqp_sup.erl | 8 ++++---- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/amqp_client.erl b/src/amqp_client.erl index aa1339e7..d36555e1 100644 --- a/src/amqp_client.erl +++ b/src/amqp_client.erl @@ -28,7 +28,7 @@ -behaviour(application). -export([start/0]). --export([start/2, stop/1]). +-export([start/2, prep_stop/1, stop/1]). %%--------------------------------------------------------------------------- %% Interface @@ -42,9 +42,15 @@ start() -> %%--------------------------------------------------------------------------- start(_StartType, _StartArgs) -> - {ok, AmqpSup} = amqp_sup:start_link(), - register(amqp_sup, AmqpSup), - {ok, AmqpSup}. + amqp_sup:start_link(). + +prep_stop(State) -> + ConnectionSups = supervisor2:find_child(amqp_sup, undefined), + lists:foreach(fun(CSup) -> + [Connection] = supervisor2:find_child(CSup, connection), + amqp_connection:close(Connection) + end, ConnectionSups), + State. stop(_State) -> ok. diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl index b7640733..f76ddf96 100644 --- a/src/amqp_connection.erl +++ b/src/amqp_connection.erl @@ -101,7 +101,7 @@ start(Type) -> start(Type, AmqpParams) -> amqp_client:start(), {ok, _Sup, Connection} = - amqp_sup:start_connection_sup(amqp_sup, Type, AmqpParams), + amqp_sup:start_connection_sup(Type, AmqpParams), Module = case Type of direct -> amqp_direct_connection; network -> amqp_network_connection end, diff --git a/src/amqp_sup.erl b/src/amqp_sup.erl index 672c15d0..871e800d 100644 --- a/src/amqp_sup.erl +++ b/src/amqp_sup.erl @@ -29,7 +29,7 @@ -behaviour(supervisor2). --export([start_link/0, start_connection_sup/3]). +-export([start_link/0, start_connection_sup/2]). -export([init/1]). %%--------------------------------------------------------------------------- @@ -37,10 +37,10 @@ %%--------------------------------------------------------------------------- start_link() -> - supervisor2:start_link(?MODULE, []). + supervisor2:start_link({local, amqp_sup}, ?MODULE, []). -start_connection_sup(Sup, Type, AmqpParams) -> - supervisor2:start_child(Sup, [Type, AmqpParams]). +start_connection_sup(Type, AmqpParams) -> + supervisor2:start_child(amqp_sup, [Type, AmqpParams]). %%--------------------------------------------------------------------------- %% supervisor2 callbacks From 738c1ae9a18aa7b0a1f1482dab0606a9c0fbfe6f Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Thu, 16 Sep 2010 14:45:01 +0100 Subject: [PATCH 3/5] fixing closing the socket properly --- src/amqp_main_reader.erl | 11 ++--------- src/amqp_network_connection.erl | 10 +++++----- 2 files changed, 7 insertions(+), 14 deletions(-) diff --git a/src/amqp_main_reader.erl b/src/amqp_main_reader.erl index d6a1176d..cf9a7273 100644 --- a/src/amqp_main_reader.erl +++ b/src/amqp_main_reader.erl @@ -56,15 +56,8 @@ init([Sock, Connection, ChMgr, Framing0]) -> {ok, #state{sock = Sock, connection = Connection, channels_manager = ChMgr, framing0 = Framing0}}. -terminate(Reason, #state{sock = Sock}) -> - Nice = case Reason of normal -> true; - shutdown -> true; - {shutdown, _} -> true; - _ -> false - end, - ok = case Nice of true -> rabbit_net:close(Sock); - false -> ok - end. +terminate(_Reason, _State) -> + ok. code_change(_OldVsn, State, _Extra) -> State. diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index dcad19b8..1730e060 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -149,13 +149,13 @@ handle_method(#'connection.close'{} = Close, none, State) -> #closing{reason = server_initiated_close, close = Close}, State)}; -handle_method(#'connection.close_ok'{}, none, - State = #state{closing = Closing}) -> +handle_method(#'connection.close_ok'{}, none, State = #state{closing = Closing, + sock = Sock}) -> + ok = rabbit_net:close(Sock), #closing{from = From, close = #'connection.close'{reply_code = ReplyCode}} = Closing, - case From of - none -> ok; - _ -> gen_server:reply(From, ok) + case From of none -> ok; + _ -> gen_server:reply(From, ok) end, if ReplyCode =:= 200 -> {stop, normal, State}; true -> {stop, closing_to_reason(Closing), State} From 6cd550bfe8dbfe9b7b83bbad31c9d880a9cde762 Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Mon, 4 Oct 2010 12:44:09 +0100 Subject: [PATCH 4/5] sleeping before executing rabbitmqctl, so that rabbit has enough time to start; do not fail if can't remove user after tests --- test.mk | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) mode change 100755 => 100644 test.mk diff --git a/test.mk b/test.mk old mode 100755 new mode 100644 index 53e261ef..5b20e66d --- a/test.mk +++ b/test.mk @@ -61,11 +61,12 @@ run_test_broker: start_test_broker_node unboot_broker $$OK start_test_broker_node: boot_broker + sleep 1 $(RABBITMQCTL) delete_user test_user_no_perm 2>/dev/null || true $(RABBITMQCTL) add_user test_user_no_perm test_user_no_perm stop_test_broker_node: - $(RABBITMQCTL) delete_user test_user_no_perm + $(RABBITMQCTL) delete_user test_user_no_perm || true $(MAKE) unboot_broker boot_broker: From 6f0aaf326d4565e2a244cb350d79a195af419818 Mon Sep 17 00:00:00 2001 From: Vlad Alexandru Ionescu Date: Tue, 5 Oct 2010 17:46:22 +0100 Subject: [PATCH 5/5] don't do nice connection close if the app stops --- src/amqp_client.erl | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/amqp_client.erl b/src/amqp_client.erl index d36555e1..ba4ca6b8 100644 --- a/src/amqp_client.erl +++ b/src/amqp_client.erl @@ -28,7 +28,7 @@ -behaviour(application). -export([start/0]). --export([start/2, prep_stop/1, stop/1]). +-export([start/2, stop/1]). %%--------------------------------------------------------------------------- %% Interface @@ -44,13 +44,5 @@ start() -> start(_StartType, _StartArgs) -> amqp_sup:start_link(). -prep_stop(State) -> - ConnectionSups = supervisor2:find_child(amqp_sup, undefined), - lists:foreach(fun(CSup) -> - [Connection] = supervisor2:find_child(CSup, connection), - amqp_connection:close(Connection) - end, ConnectionSups), - State. - stop(_State) -> ok.