diff --git a/Makefile b/Makefile
index da1e9587..e72f13f3 100644
--- a/Makefile
+++ b/Makefile
@@ -72,18 +72,6 @@ $(DOC_DIR)/index.html: $(DEPS_DIR)/$(COMMON_PACKAGE_DIR) $(DOC_DIR)/overview.edo
include test.mk
-test_common_package: $(DIST_DIR)/$(COMMON_PACKAGE_EZ) package prepare_tests
- $(MAKE) start_test_broker_node
- OK=true && \
- TMPFILE=$(MKTEMP) && \
- { $(LIBS_PATH) erl -noshell -pa $(TEST_DIR) \
- -eval 'error_logger:tty(false), network_client_SUITE:test(), halt().' 2>&1 | \
- tee $$TMPFILE || OK=false; } && \
- { egrep "All .+ tests (successful|passed)." $$TMPFILE || OK=false; } && \
- rm $$TMPFILE && \
- $(MAKE) stop_test_broker_node && \
- $$OK
-
compile_tests: $(TEST_TARGETS) $(EBIN_DIR)/$(PACKAGE).app
$(TEST_TARGETS): $(TEST_DIR)
@@ -98,10 +86,8 @@ $(TEST_DIR): $(DEPS_DIR)/$(COMMON_PACKAGE_DIR)
COPY=cp -pR
-$(DIST_DIR)/$(COMMON_PACKAGE_EZ): $(DIST_DIR)/$(COMMON_PACKAGE_DIR) | $(DIST_DIR)
- (cd $(DIST_DIR); zip -r $(COMMON_PACKAGE_EZ) $(COMMON_PACKAGE_DIR))
-
-$(DIST_DIR)/$(COMMON_PACKAGE_DIR): $(BROKER_DEPS) $(COMMON_PACKAGE).app | $(DIST_DIR)
+$(DIST_DIR)/$(COMMON_PACKAGE_EZ): $(BROKER_DEPS) $(COMMON_PACKAGE).app | $(DIST_DIR)
+ rm -f $@
$(MAKE) -C $(BROKER_DIR)
rm -rf $(DIST_DIR)/$(COMMON_PACKAGE_DIR)
mkdir -p $(DIST_DIR)/$(COMMON_PACKAGE_DIR)/$(INCLUDE_DIR)
@@ -111,6 +97,7 @@ $(DIST_DIR)/$(COMMON_PACKAGE_DIR): $(BROKER_DEPS) $(COMMON_PACKAGE).app | $(DIST
( cp $(BROKER_DIR)/ebin/$(DEP).beam $(DIST_DIR)/$(COMMON_PACKAGE_DIR)/$(EBIN_DIR)/ \
);)
cp $(BROKER_DIR)/include/*.hrl $(DIST_DIR)/$(COMMON_PACKAGE_DIR)/$(INCLUDE_DIR)/
+ (cd $(DIST_DIR); zip -r $(COMMON_PACKAGE_EZ) $(COMMON_PACKAGE_DIR))
source_tarball: $(DIST_DIR)/$(COMMON_PACKAGE_EZ) $(EBIN_DIR)/$(PACKAGE).app | $(DIST_DIR)
mkdir -p $(DIST_DIR)/$(SOURCE_PACKAGE_DIR)/$(DIST_DIR)
diff --git a/Makefile.in b/Makefile.in
index d32a1e70..864444a3 100644
--- a/Makefile.in
+++ b/Makefile.in
@@ -23,4 +23,4 @@ clean: common_clean
test: compile
$(MAKE) -C test VERSION=$(VERSION)
$(LIBS_PATH) erl -pa ebin test -noinput -eval \
- 'error_logger:tty(false), io:format("Testing in progress. Please wait...~n"), network_client_SUITE:test(), halt()'
+ 'error_logger:tty(false), io:format("Testing in progress. Please wait...~n"), network_client_SUITE:test(), init:stop()'
diff --git a/common.mk b/common.mk
index 5d3f7cd8..8635ad6e 100644
--- a/common.mk
+++ b/common.mk
@@ -63,7 +63,7 @@ export COMMON_PACKAGE_DIR=$(COMMON_PACKAGE)$(if $(APPEND_VERSION),-$(VERSION),)
COMMON_PACKAGE_EZ=$(COMMON_PACKAGE_DIR).ez
DEPS=$(shell erl -noshell -eval '{ok,[{_,_,[_,_,{modules, Mods},_,_,_]}]} = \
- file:consult("$(COMMON_PACKAGE).app"), \
+ file:consult("$(COMMON_PACKAGE).app.in"), \
[io:format("~p ",[M]) || M <- Mods], halt().')
INCLUDES=$(wildcard $(INCLUDE_DIR)/*.hrl)
@@ -80,10 +80,9 @@ else
LIBS_PATH=ERL_LIBS=$(LIBS_PATH_UNIX)
endif
-LOAD_PATH=$(EBIN_DIR) $(BROKER_DIR)/ebin $(TEST_DIR) $(ERL_PATH)
+LOAD_PATH=$(EBIN_DIR) $(TEST_DIR) $(ERL_PATH)
-COVER_START := -s cover start -s rabbit_misc enable_cover ../rabbitmq-erlang-client
-COVER_STOP := -s rabbit_misc report_cover ../rabbitmq-erlang-client -s cover stop
+RUN:=$(LIBS_PATH) erl -pa $(LOAD_PATH) -sname amqp_client
MKTEMP=$$(mktemp $(TMPDIR)/tmp.XXXXXXXXXX)
@@ -142,19 +141,17 @@ common_clean:
rm -f $(DEPS_FILE)
$(MAKE) -C $(TEST_DIR) clean
-compile: $(TARGETS)
+compile: $(TARGETS) $(EBIN_DIR)/$(PACKAGE).app
-run: compile $(EBIN_DIR)/$(PACKAGE).app
- $(LIBS_PATH) erl -pa $(LOAD_PATH)
+run: compile
+ $(RUN)
###############################################################################
## Packaging
###############################################################################
-$(DIST_DIR)/$(PACKAGE_NAME_EZ): $(DIST_DIR)/$(PACKAGE_DIR) | $(DIST_DIR)
- (cd $(DIST_DIR); zip -r $(PACKAGE_NAME_EZ) $(PACKAGE_DIR))
-
-$(DIST_DIR)/$(PACKAGE_DIR): $(TARGETS) $(EBIN_DIR)/$(PACKAGE).app | $(DIST_DIR)
+$(DIST_DIR)/$(PACKAGE_NAME_EZ): $(TARGETS) $(EBIN_DIR)/$(PACKAGE).app | $(DIST_DIR)
+ rm -f $@
rm -rf $(DIST_DIR)/$(PACKAGE_DIR)
mkdir -p $(DIST_DIR)/$(PACKAGE_DIR)/$(EBIN_DIR)
mkdir -p $(DIST_DIR)/$(PACKAGE_DIR)/$(INCLUDE_DIR)
@@ -162,6 +159,7 @@ $(DIST_DIR)/$(PACKAGE_DIR): $(TARGETS) $(EBIN_DIR)/$(PACKAGE).app | $(DIST_DIR)
cp -r $(EBIN_DIR)/*.app $(DIST_DIR)/$(PACKAGE_DIR)/$(EBIN_DIR)
mkdir -p $(DIST_DIR)/$(PACKAGE_DIR)/$(INCLUDE_DIR)
cp -r $(INCLUDE_DIR)/* $(DIST_DIR)/$(PACKAGE_DIR)/$(INCLUDE_DIR)
+ (cd $(DIST_DIR); zip -r $(PACKAGE_NAME_EZ) $(PACKAGE_DIR))
package: $(DIST_DIR)/$(PACKAGE_NAME_EZ)
diff --git a/include/amqp_client.hrl b/include/amqp_client.hrl
index 367c2102..6aeb71cd 100644
--- a/include/amqp_client.hrl
+++ b/include/amqp_client.hrl
@@ -31,6 +31,7 @@
virtual_host = <<"/">>,
host = "localhost",
port = ?PROTOCOL_PORT,
+ node = node(),
channel_max = 0,
frame_max = 0,
heartbeat = 0,
diff --git a/rabbit_common.app.in b/rabbit_common.app.in
index 97e86163..68cbf62e 100644
--- a/rabbit_common.app.in
+++ b/rabbit_common.app.in
@@ -3,6 +3,7 @@
{vsn, "%%VSN%%"},
{modules, [
gen_server2,
+ delegate,
priority_queue,
rabbit_backing_queue,
rabbit_basic,
@@ -21,6 +22,8 @@
rabbit_net,
rabbit_reader,
rabbit_writer,
+ rabbit_queue_collector,
+ rabbit_amqqueue,
supervisor2
]},
{registered, []},
diff --git a/src/amqp_channel_sup.erl b/src/amqp_channel_sup.erl
index 6a5da223..5573d43c 100644
--- a/src/amqp_channel_sup.erl
+++ b/src/amqp_channel_sup.erl
@@ -43,40 +43,27 @@ start_link(Type, InfraArgs, ChNumber) ->
%% Internal plumbing
%%---------------------------------------------------------------------------
-start_writer_fun(Sup, direct, [User, VHost, Collector], ChNumber) ->
- fun() ->
- ChPid = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {rabbit_channel, {rabbit_channel, start_link,
- [ChNumber, ChPid, ChPid, User, VHost,
- Collector, start_limiter_fun(Sup)]},
- transient, ?MAX_WAIT, worker, [rabbit_channel]})
+start_writer_fun(_Sup, direct, [Node, User, VHost, Collector], ChNumber) ->
+ fun () ->
+ {ok, RabbitCh} =
+ rpc:call(Node, rabbit_direct, start_channel,
+ [ChNumber, self(), User, VHost, Collector]),
+ link(RabbitCh),
+ {ok, RabbitCh}
end;
start_writer_fun(Sup, network, [Sock], ChNumber) ->
- fun() ->
- ChPid = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {writer, {rabbit_writer, start_link,
- [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
- ChPid]},
- transient, ?MAX_WAIT, worker, [rabbit_writer]})
+ fun () ->
+ {ok, _} = supervisor2:start_child(
+ Sup,
+ {writer, {rabbit_writer, start_link,
+ [Sock, ChNumber, ?FRAME_MIN_SIZE, ?PROTOCOL,
+ self()]},
+ transient, ?MAX_WAIT, worker, [rabbit_writer]})
end.
init_command_assembler(direct) -> {ok, none};
init_command_assembler(network) -> rabbit_command_assembler:init(?PROTOCOL).
-start_limiter_fun(Sup) ->
- fun (UnackedCount) ->
- Parent = self(),
- {ok, _} = supervisor2:start_child(
- Sup,
- {limiter, {rabbit_limiter, start_link,
- [Parent, UnackedCount]},
- transient, ?MAX_WAIT, worker, [rabbit_limiter]})
- end.
-
%%---------------------------------------------------------------------------
%% supervisor2 callbacks
%%---------------------------------------------------------------------------
diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl
index e232de6b..9835e969 100644
--- a/src/amqp_connection.erl
+++ b/src/amqp_connection.erl
@@ -46,17 +46,18 @@
%%
virtual_host :: binary() - The name of a virtual host in the broker,
%% defaults to <<"/">>
%% host :: string() - The hostname of the broker,
-%% defaults to "localhost"
+%% defaults to "localhost" (network only)
%% port :: integer() - The port the broker is listening on,
-%% defaults to 5672
+%% defaults to 5672 (network only)
+%% node :: atom() - The node the broker runs on (direct only)
%% channel_max :: non_neg_integer() - The channel_max handshake parameter,
%% defaults to 0
%% frame_max :: non_neg_integer() - The frame_max handshake parameter,
-%% defaults to 0
+%% defaults to 0 (network only)
%% heartbeat :: non_neg_integer() - The hearbeat interval in seconds,
-%% defaults to 0 (turned off)
+%% defaults to 0 (turned off) (network only)
%% ssl_options :: term() - The second parameter to be used with the
-%% ssl:connect/2 function, defaults to 'none'
+%% ssl:connect/2 function, defaults to 'none' (network only)
%% client_properties :: [{binary(), atom(), binary()}] - A list of extra
%% client properties to be sent to the server, defaults to []
%%
diff --git a/src/amqp_direct_connection.erl b/src/amqp_direct_connection.erl
index 12b59b44..d6297b6c 100644
--- a/src/amqp_direct_connection.erl
+++ b/src/amqp_direct_connection.erl
@@ -24,7 +24,8 @@
-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).
--record(state, {user,
+-record(state, {node,
+ user,
vhost,
collector,
closing_reason %% undefined | Reason
@@ -37,8 +38,11 @@
init([]) ->
{ok, #state{}}.
-open_channel_args(#state{user = User, vhost = VHost, collector = Collector}) ->
- [User, VHost, Collector].
+open_channel_args(#state{node = Node,
+ user = User,
+ vhost = VHost,
+ collector = Collector}) ->
+ [Node, User, VHost, Collector].
do(_Method, _State) ->
ok.
@@ -63,29 +67,19 @@ i(Item, _State) -> throw({bad_argument, Item}).
info_keys() ->
?INFO_KEYS.
-connect(AmqpParams, SIF, _ChMgr, State) ->
- try do_connect(AmqpParams, SIF, State) of
- Return -> Return
- catch _:Reason -> {error, Reason}
+connect(#amqp_params{username = Username,
+ password = Pass,
+ node = Node,
+ virtual_host = VHost}, SIF, _ChMgr, State) ->
+ case rpc:call(Node, rabbit_direct, connect, [Username, Pass, VHost]) of
+ {ok, {User, ServerProperties}} ->
+ {ok, Collector} = SIF(),
+ {ok, {ServerProperties, 0, State#state{node = Node,
+ user = User,
+ vhost = VHost,
+ collector = Collector}}};
+ {error, _} = E ->
+ E;
+ {badrpc, nodedown} ->
+ {error, {nodedown, Node}}
end.
-
-do_connect(#amqp_params{username = Username, password = Pass,
- virtual_host = VHost},
- SIF, State) ->
- case lists:keymember(rabbit, 1, application:which_applications()) of
- true -> ok;
- false -> exit(broker_not_found_in_vm)
- end,
- User = try rabbit_access_control:user_pass_login(Username, Pass) of
- User1 -> User1
- catch exit:#amqp_error{name = access_refused} -> exit(auth_failure)
- end,
- try rabbit_access_control:check_vhost_access(User, VHost) of
- _ -> ok
- catch exit:#amqp_error{name = access_refused} -> exit(access_refused)
- end,
- {ok, Collector} = SIF(),
- {ok, {rabbit_reader:server_properties(), 0,
- State#state{user = User,
- vhost = VHost,
- collector = Collector}}}.
diff --git a/test.mk b/test.mk
index 6d7bc2fe..3a3ed4b4 100644
--- a/test.mk
+++ b/test.mk
@@ -14,18 +14,22 @@
# Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
#
+IS_SUCCESS:=egrep "All .+ tests (successful|passed)."
+TESTING_MESSAGE:=-eval 'error_logger:tty(false), io:format("~nTesting in progress. Please wait...~n~n").'
+
prepare_tests: compile compile_tests
all_tests: prepare_tests
OK=true && \
{ $(MAKE) test_suites || OK=false; } && \
{ $(MAKE) test_common_package || OK=false; } && \
+ { $(MAKE) test_direct || OK=false; } && \
$$OK
test_suites: prepare_tests
OK=true && \
{ $(MAKE) test_network || OK=false; } && \
- { $(MAKE) test_direct || OK=false; } && \
+ { $(MAKE) test_remote_direct || OK=false; } && \
$(ALL_SSL) && \
$$OK
@@ -36,21 +40,31 @@ test_suites_coverage: prepare_tests
$(ALL_SSL_COVERAGE) && \
$$OK
-## This performs test setup and teardown procedures to ensure that
-## that the correct users are configured in the test instance
-run_test_broker: start_test_broker_node unboot_broker
+## Starts a broker, configures users and runs the tests on the same node
+run_test_in_broker: start_test_broker_node unboot_broker
OK=true && \
TMPFILE=$(MKTEMP) && \
{ $(MAKE) -C $(BROKER_DIR) run-node \
RABBITMQ_SERVER_START_ARGS="$(PA_LOAD_PATH) $(SSL_BROKER_ARGS) \
- -noshell -s rabbit $(RUN_TEST_BROKER_ARGS) -s init stop" 2>&1 | \
+ -noshell -s rabbit $(RUN_TEST_ARGS) -s init stop" 2>&1 | \
tee $$TMPFILE || OK=false; } && \
- { egrep "All .+ tests (successful|passed)." $$TMPFILE || OK=false; } && \
+ { $(IS_SUCCESS) $$TMPFILE || OK=false; } && \
rm $$TMPFILE && \
$(MAKE) boot_broker && \
$(MAKE) stop_test_broker_node && \
$$OK
+## Starts a broker, configures users and runs the tests from a different node
+run_test_detached: start_test_broker_node
+ OK=true && \
+ TMPFILE=$(MKTEMP) && \
+ { $(RUN) -noinput $(TESTING_MESSAGE) $(RUN_TEST_ARGS) \
+ -s init stop 2>&1 | tee $$TMPFILE || OK=false; } && \
+ { $(IS_SUCCESS) $$TMPFILE || OK=false; } && \
+ rm $$TMPFILE && \
+ $(MAKE) stop_test_broker_node && \
+ $$OK
+
start_test_broker_node: boot_broker
sleep 1
- $(RABBITMQCTL) delete_user test_user_no_perm
@@ -72,22 +86,31 @@ ssl:
$(SSL)
test_ssl: prepare_tests ssl
- $(MAKE) run_test_broker RUN_TEST_BROKER_ARGS="-s ssl_client_SUITE test"
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s ssl_client_SUITE test"
test_network: prepare_tests
- $(MAKE) run_test_broker RUN_TEST_BROKER_ARGS="-s network_client_SUITE test"
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s network_client_SUITE test"
test_direct: prepare_tests
- $(MAKE) run_test_broker RUN_TEST_BROKER_ARGS="-s direct_client_SUITE test"
+ $(MAKE) run_test_in_broker RUN_TEST_ARGS="-s direct_client_SUITE test"
+
+test_remote_direct: prepare_tests
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s direct_client_SUITE test"
+
+test_common_package: $(DIST_DIR)/$(COMMON_PACKAGE_EZ) package prepare_tests
+ $(MAKE) run_test_detached RUN="$(LIBS_PATH) erl -pa $(TEST_DIR)" \
+ RUN_TEST_ARGS="-s network_client_SUITE test"
+ $(MAKE) run_test_detached RUN="$(LIBS_PATH) erl -pa $(TEST_DIR) -sname amqp_client" \
+ RUN_TEST_ARGS="-s direct_client_SUITE test"
test_ssl_coverage: prepare_tests ssl
- $(MAKE) run_test_broker \
- RUN_TEST_BROKER_ARGS="$(COVER_START) -s ssl_client_SUITE test $(COVER_STOP)"
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s ssl_client_SUITE test_coverage"
test_network_coverage: prepare_tests
- $(MAKE) run_test_broker \
- RUN_TEST_BROKER_ARGS="$(COVER_START) -s network_client_SUITE test $(COVER_STOP)"
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s network_client_SUITE test_coverage"
test_direct_coverage: prepare_tests
- $(MAKE) run_test_broker \
- RUN_TEST_BROKER_ARGS="$(COVER_START) -s direct_client_SUITE test $(COVER_STOP)"
+ $(MAKE) run_test_in_broker RUN_TEST_ARGS="-s direct_client_SUITE test_coverage"
+
+test_remote_direct_coverage: prepare_tests
+ $(MAKE) run_test_detached RUN_TEST_ARGS="-s direct_client_SUITE test_coverage"
diff --git a/test/direct_client_SUITE.erl b/test/direct_client_SUITE.erl
index 1afa692b..52d9e294 100644
--- a/test/direct_client_SUITE.erl
+++ b/test/direct_client_SUITE.erl
@@ -132,12 +132,14 @@ new_connection() ->
new_connection(#amqp_params{}).
new_connection(AmqpParams) ->
- case amqp_connection:start(direct, AmqpParams) of {ok, Conn} -> Conn;
- {error, _} = E -> E
+ case amqp_connection:start(
+ direct,
+ AmqpParams#amqp_params{node = rabbit_misc:makenode(rabbit)}) of
+ {ok, Conn} -> Conn;
+ {error, _} = E -> E
end.
test_coverage() ->
rabbit_misc:enable_cover(),
test(),
rabbit_misc:report_cover().
-
diff --git a/test/test_util.erl b/test/test_util.erl
index d0ef37b8..9b61ef0e 100644
--- a/test/test_util.erl
+++ b/test/test_util.erl
@@ -632,8 +632,8 @@ rpc_test(Connection) ->
setup_publish(Channel) ->
Publish = #publish{routing_key = <<"a.b.c.d">>,
- q = <<"a.b.c">>,
- x = <<"x">>,
+ q = uuid(),
+ x = uuid(),
bind_key = <<"a.b.c.*">>,
payload = <<"foobar">>},
setup_publish(Channel, Publish).