From e04e945b7da030a3bb2e6fad75290bf12ec2ff6f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 8 Nov 2012 12:53:58 +0000 Subject: [PATCH 01/44] API change --- src/amqp_gen_consumer.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/amqp_gen_consumer.erl b/src/amqp_gen_consumer.erl index 0c950b7d..99026d96 100644 --- a/src/amqp_gen_consumer.erl +++ b/src/amqp_gen_consumer.erl @@ -34,7 +34,7 @@ -export([start_link/2, call_consumer/2, call_consumer/3]). -export([behaviour_info/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_info/2]). + handle_info/2, prioritise_info/3]). -record(state, {module, module_state}). @@ -193,8 +193,8 @@ init([ConsumerModule, ExtraParams]) -> ignore end. -prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1; -prioritise_info(_, _State) -> 0. +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _Len, _State) -> 1; +prioritise_info(_, _Len, _State) -> 0. handle_call({consumer_call, Msg}, From, State = #state{module = ConsumerModule, From d461d7836d2e9fcf4db4f4fc7fd81b5810a6e269 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 19 Nov 2012 11:08:28 +0000 Subject: [PATCH 02/44] Added tag rabbitmq_v3_0_0 for changeset eed5df227e09 From 5207a226513da302bdd9bc35c12cfd3d5a382494 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 20 Dec 2012 15:00:24 +0000 Subject: [PATCH 03/44] Prevent waiting for confirms unless selected --- src/amqp_channel.erl | 13 +++++++++---- test/test_util.erl | 12 ++++++------ 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index e806798f..0766ceda 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -98,7 +98,8 @@ start_writer_fun, unconfirmed_set = gb_sets:new(), waiting_set = gb_trees:empty(), - only_acks_received = true + only_acks_received = true, + confirms_selected = false }). %%--------------------------------------------------------------------------- @@ -214,7 +215,7 @@ next_publish_seqno(Channel) -> %% Channel = pid() %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker. Note, when called on a -%% non-Confirm channel, waitForConfirms returns true immediately. +%% non-Confirm channel, waitForConfirms returns an error. wait_for_confirms(Channel) -> wait_for_confirms(Channel, infinity). @@ -225,7 +226,7 @@ wait_for_confirms(Channel) -> %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. %% Note, when called on a non-Confirm channel, waitForConfirms returns -%% true immediately. +%% an error. wait_for_confirms(Channel, Timeout) -> gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity). @@ -520,7 +521,8 @@ handle_method_to_server(Method, AmqpMsg, From, Sender, Flow, {ok, _, ok} -> State1 = case {Method, State#state.next_pub_seqno} of {#'confirm.select'{}, _} -> - State#state{next_pub_seqno = 1}; + State#state{next_pub_seqno = 1, + confirms_selected = true}; {#'basic.publish'{}, 0} -> State; {#'basic.publish'{}, SeqNo} -> @@ -884,6 +886,9 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. +handle_wait_for_confirms(_From, _Timeout, + State = #state{confirms_selected = false}) -> + handle_shutdown({invalid_state, "wait requires confirms selected"}, State); handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, waiting_set = WSet}) -> diff --git a/test/test_util.erl b/test/test_util.erl index 72f841ff..e79e9e1c 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -640,14 +640,14 @@ confirm_barrier_test() -> true = amqp_channel:wait_for_confirms(Channel), teardown(Connection, Channel). -confirm_barrier_nop_test() -> +confirm_select_before_wait_test() -> {ok, Connection} = new_connection(), {ok, Channel} = amqp_connection:open_channel(Connection), - true = amqp_channel:wait_for_confirms(Channel), - amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>}, - #amqp_msg{payload = <<"foo">>}), - true = amqp_channel:wait_for_confirms(Channel), - teardown(Connection, Channel). + try amqp_channel:wait_for_confirms(Channel) of + _ -> fail + catch + exit:{{shutdown,{invalid_state, _}}, _} -> ok + end. confirm_barrier_timeout_test() -> {ok, Connection} = new_connection(), From ca22caa621ee7f291118b74022be0a2a19e752fc Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 20 Dec 2012 16:16:32 +0000 Subject: [PATCH 04/44] Use less state --- src/amqp_channel.erl | 8 +++----- test/amqp_client_SUITE.erl | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index 0766ceda..2d682b01 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -98,8 +98,7 @@ start_writer_fun, unconfirmed_set = gb_sets:new(), waiting_set = gb_trees:empty(), - only_acks_received = true, - confirms_selected = false + only_acks_received = true }). %%--------------------------------------------------------------------------- @@ -521,8 +520,7 @@ handle_method_to_server(Method, AmqpMsg, From, Sender, Flow, {ok, _, ok} -> State1 = case {Method, State#state.next_pub_seqno} of {#'confirm.select'{}, _} -> - State#state{next_pub_seqno = 1, - confirms_selected = true}; + State#state{next_pub_seqno = 1}; {#'basic.publish'{}, 0} -> State; {#'basic.publish'{}, SeqNo} -> @@ -887,7 +885,7 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, only_acks_received = true}. handle_wait_for_confirms(_From, _Timeout, - State = #state{confirms_selected = false}) -> + State = #state{next_pub_seqno = 0}) -> handle_shutdown({invalid_state, "wait requires confirms selected"}, State); handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, diff --git a/test/amqp_client_SUITE.erl b/test/amqp_client_SUITE.erl index ff8b1823..af03381d 100644 --- a/test/amqp_client_SUITE.erl +++ b/test/amqp_client_SUITE.erl @@ -65,7 +65,7 @@ pub_and_close_test_() -> ?RUN([]). channel_tune_negotiation_test_() -> ?RUN([]). confirm_test_() -> ?RUN([]). confirm_barrier_test_() -> ?RUN([]). -confirm_barrier_nop_test_() -> ?RUN([]). +confirm_select_before_wait_test() -> ?RUN([]). confirm_barrier_timeout_test_() -> ?RUN([]). confirm_barrier_die_timeout_test_() -> ?RUN([]). default_consumer_test() -> ?RUN([]). From bbfa29d8c28f3df20f128ebf66c998116889311b Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 20 Dec 2012 16:19:25 +0000 Subject: [PATCH 05/44] Cosmetic --- src/amqp_channel.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index 2d682b01..284cbfae 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -884,8 +884,7 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. -handle_wait_for_confirms(_From, _Timeout, - State = #state{next_pub_seqno = 0}) -> +handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> handle_shutdown({invalid_state, "wait requires confirms selected"}, State); handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, From 2829507ca80197282d3d6d33024d3f960cd78dad Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 4 Jan 2013 00:31:40 +0000 Subject: [PATCH 06/44] silence annoying warnings, and some tidying up in the area --- src/amqp_channel.erl | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index e806798f..295d9532 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -709,23 +709,19 @@ handle_method_from_server1( {noreply, State}; handle_method_from_server1(#'basic.ack'{} = BasicAck, none, #state{confirm_handler = none} = State) -> - ?LOG_WARN("Channel (~p): received ~p but there is no " - "confirm handler registered~n", [self(), BasicAck]), {noreply, update_confirm_set(BasicAck, State)}; -handle_method_from_server1( - #'basic.ack'{} = BasicAck, none, - #state{confirm_handler = {ConfirmHandler, _Ref}} = State) -> - ConfirmHandler ! BasicAck, +handle_method_from_server1(#'basic.ack'{} = BasicAck, none, + #state{confirm_handler = {CH, _Ref}} = State) -> + CH ! BasicAck, {noreply, update_confirm_set(BasicAck, State)}; handle_method_from_server1(#'basic.nack'{} = BasicNack, none, #state{confirm_handler = none} = State) -> ?LOG_WARN("Channel (~p): received ~p but there is no " "confirm handler registered~n", [self(), BasicNack]), {noreply, update_confirm_set(BasicNack, State)}; -handle_method_from_server1( - #'basic.nack'{} = BasicNack, none, - #state{confirm_handler = {ConfirmHandler, _Ref}} = State) -> - ConfirmHandler ! BasicNack, +handle_method_from_server1(#'basic.nack'{} = BasicNack, none, + #state{confirm_handler = {CH, _Ref}} = State) -> + CH ! BasicNack, {noreply, update_confirm_set(BasicNack, State)}; handle_method_from_server1(Method, none, State) -> From a7e18a0b9e4a7003478e49576f9e80f6798021b0 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 7 Jan 2013 17:03:35 +0000 Subject: [PATCH 07/44] Less severe failure when waiting on unselected channels --- src/amqp_channel.erl | 11 +++++++---- test/test_util.erl | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index 284cbfae..e122383d 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -224,10 +224,13 @@ wait_for_confirms(Channel) -> %% Timeout = non_neg_integer() | 'infinity' %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. -%% Note, when called on a non-Confirm channel, waitForConfirms returns -%% an error. +%% Note, when called on a non-Confirm channel, waitForConfirms throws +%% an exception. wait_for_confirms(Channel, Timeout) -> - gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity). + case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of + Normal when is_boolean(Normal) -> Normal; + {error, Reason} -> throw(Reason) + end. %% @spec (Channel) -> true %% where @@ -885,7 +888,7 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, only_acks_received = true}. handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> - handle_shutdown({invalid_state, "wait requires confirms selected"}, State); + {reply, {error, not_in_confirm_mode}, State}; handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, waiting_set = WSet}) -> diff --git a/test/test_util.erl b/test/test_util.erl index e79e9e1c..032b8251 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -646,8 +646,9 @@ confirm_select_before_wait_test() -> try amqp_channel:wait_for_confirms(Channel) of _ -> fail catch - exit:{{shutdown,{invalid_state, _}}, _} -> ok - end. + exit:not_in_confirm_mode -> ok + end, + teardown(Connection, Channel). confirm_barrier_timeout_test() -> {ok, Connection} = new_connection(), From 02f56fa9b42b645c04e98877a210640c7a466c80 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Mon, 7 Jan 2013 18:08:40 +0000 Subject: [PATCH 08/44] fix bug in test it's a throw, not an exit --- test/test_util.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_util.erl b/test/test_util.erl index 032b8251..a5c53c25 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -646,7 +646,7 @@ confirm_select_before_wait_test() -> try amqp_channel:wait_for_confirms(Channel) of _ -> fail catch - exit:not_in_confirm_mode -> ok + not_in_confirm_mode -> ok end, teardown(Connection, Channel). From 1d3e84d2d44a4cbd8b47bd2ad67c432b01d816c0 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 8 Jan 2013 01:10:35 +0000 Subject: [PATCH 09/44] fix another bug: timeouts were broken --- src/amqp_channel.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index e122383d..e23689ff 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -228,8 +228,8 @@ wait_for_confirms(Channel) -> %% an exception. wait_for_confirms(Channel, Timeout) -> case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of - Normal when is_boolean(Normal) -> Normal; - {error, Reason} -> throw(Reason) + {error, Reason} -> throw(Reason); + Other -> Other end. %% @spec (Channel) -> true From 8a1b07cf77e1f402646cfeb387cc4f0ac04d8144 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 8 Jan 2013 11:36:57 +0000 Subject: [PATCH 10/44] fail properly --- test/test_util.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_util.erl b/test/test_util.erl index a5c53c25..9e196f6f 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -644,7 +644,7 @@ confirm_select_before_wait_test() -> {ok, Connection} = new_connection(), {ok, Channel} = amqp_connection:open_channel(Connection), try amqp_channel:wait_for_confirms(Channel) of - _ -> fail + _ -> exit(success_despite_lack_of_confirm_mode) catch not_in_confirm_mode -> ok end, From bd50a873152dcc218b9000aa3f50078f9e5af256 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Tue, 8 Jan 2013 11:39:30 +0000 Subject: [PATCH 11/44] actually enable the test --- test/amqp_client_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/amqp_client_SUITE.erl b/test/amqp_client_SUITE.erl index af03381d..5a894081 100644 --- a/test/amqp_client_SUITE.erl +++ b/test/amqp_client_SUITE.erl @@ -65,7 +65,7 @@ pub_and_close_test_() -> ?RUN([]). channel_tune_negotiation_test_() -> ?RUN([]). confirm_test_() -> ?RUN([]). confirm_barrier_test_() -> ?RUN([]). -confirm_select_before_wait_test() -> ?RUN([]). +confirm_select_before_wait_test_() -> ?RUN([]). confirm_barrier_timeout_test_() -> ?RUN([]). confirm_barrier_die_timeout_test_() -> ?RUN([]). default_consumer_test() -> ?RUN([]). From a82b402178e633db9eb1af3c8669001fc3cd0ae3 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 8 Jan 2013 16:23:40 +0000 Subject: [PATCH 12/44] Move adapter_info(Sock) to the Erlang client. --- src/amqp_direct_connection.erl | 51 ++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/amqp_direct_connection.erl b/src/amqp_direct_connection.erl index d05972dd..e62ebaec 100644 --- a/src/amqp_direct_connection.erl +++ b/src/amqp_direct_connection.erl @@ -26,6 +26,8 @@ -export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2, info_keys/0, handle_message/2, closing/3, channels_terminated/1]). +-export([socket_adapter_info/1]). + -record(state, {node, user, vhost, @@ -153,3 +155,52 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) -> ensure_adapter_info(A#amqp_adapter_info{name = Name}); ensure_adapter_info(Info) -> Info. + +socket_adapter_info(Sock) -> + {PeerHost, PeerPort, Host, Port} = + case rabbit_net:socket_ends(Sock, inbound) of + {ok, Res} -> Res; + _ -> {unknown, unknown} + end, + Name = case rabbit_net:connection_string(Sock, inbound) of + {ok, Res3} -> Res3; + _ -> unknown + end, + #amqp_adapter_info{protocol = unknown, + name = list_to_binary(Name), + host = Host, + port = Port, + peer_host = PeerHost, + peer_port = PeerPort, + additional_info = maybe_ssl_info(Sock)}. + +maybe_ssl_info(Sock) -> + case rabbit_net:is_ssl(Sock) of + true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock); + false -> [{ssl, false}] + end. + +ssl_info(Sock) -> + {Protocol, KeyExchange, Cipher, Hash} = + case rabbit_net:ssl_info(Sock) of + {ok, {P, {K, C, H}}} -> {P, K, C, H}; + {ok, {P, {K, C, H, _}}} -> {P, K, C, H}; + _ -> {unknown, unknown, unknown, unknown} + end, + [{ssl_protocol, Protocol}, + {ssl_key_exchange, KeyExchange}, + {ssl_cipher, Cipher}, + {ssl_hash, Hash}]. + +ssl_cert_info(Sock) -> + case rabbit_net:peercert(Sock) of + {ok, Cert} -> + [{peer_cert_issuer, list_to_binary( + rabbit_ssl:peer_cert_issuer(Cert))}, + {peer_cert_subject, list_to_binary( + rabbit_ssl:peer_cert_subject(Cert))}, + {peer_cert_validity, list_to_binary( + rabbit_ssl:peer_cert_validity(Cert))}]; + _ -> + [] + end. From 3af90e095245ac20ec40bce3d1ed7a9b9d63e681 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 9 Jan 2013 11:14:09 +0000 Subject: [PATCH 13/44] Various tweaks. --- src/amqp_direct_connection.erl | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/amqp_direct_connection.erl b/src/amqp_direct_connection.erl index e62ebaec..427c1c39 100644 --- a/src/amqp_direct_connection.erl +++ b/src/amqp_direct_connection.erl @@ -160,10 +160,10 @@ socket_adapter_info(Sock) -> {PeerHost, PeerPort, Host, Port} = case rabbit_net:socket_ends(Sock, inbound) of {ok, Res} -> Res; - _ -> {unknown, unknown} + _ -> {unknown, unknown, unknown, unknown} end, Name = case rabbit_net:connection_string(Sock, inbound) of - {ok, Res3} -> Res3; + {ok, Res1} -> Res1; _ -> unknown end, #amqp_adapter_info{protocol = unknown, @@ -187,10 +187,10 @@ ssl_info(Sock) -> {ok, {P, {K, C, H, _}}} -> {P, K, C, H}; _ -> {unknown, unknown, unknown, unknown} end, - [{ssl_protocol, Protocol}, - {ssl_key_exchange, KeyExchange}, - {ssl_cipher, Cipher}, - {ssl_hash, Hash}]. + [{ssl_protocol, Protocol}, + {ssl_key_exchange, KeyExchange}, + {ssl_cipher, Cipher}, + {ssl_hash, Hash}]. ssl_cert_info(Sock) -> case rabbit_net:peercert(Sock) of From 4b7c45a737b0ef6b5010ae83146d10887cb9efe2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 9 Jan 2013 11:18:21 +0000 Subject: [PATCH 14/44] API change --- src/amqp_direct_connection.erl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/amqp_direct_connection.erl b/src/amqp_direct_connection.erl index 427c1c39..08060128 100644 --- a/src/amqp_direct_connection.erl +++ b/src/amqp_direct_connection.erl @@ -26,7 +26,7 @@ -export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2, info_keys/0, handle_message/2, closing/3, channels_terminated/1]). --export([socket_adapter_info/1]). +-export([socket_adapter_info/2]). -record(state, {node, user, @@ -156,7 +156,7 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) -> ensure_adapter_info(Info) -> Info. -socket_adapter_info(Sock) -> +socket_adapter_info(Sock, Protocol) -> {PeerHost, PeerPort, Host, Port} = case rabbit_net:socket_ends(Sock, inbound) of {ok, Res} -> Res; @@ -166,7 +166,7 @@ socket_adapter_info(Sock) -> {ok, Res1} -> Res1; _ -> unknown end, - #amqp_adapter_info{protocol = unknown, + #amqp_adapter_info{protocol = Protocol, name = list_to_binary(Name), host = Host, port = Port, From e2536b7af70a0fd5f9b29258f1b2b079bb52eb71 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 9 Jan 2013 11:23:59 +0000 Subject: [PATCH 15/44] Move API (sort of, I didn't want to dump a load of code here). --- src/amqp_connection.erl | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl index 8bf7c33b..08ce840f 100644 --- a/src/amqp_connection.erl +++ b/src/amqp_connection.erl @@ -73,6 +73,7 @@ -export([start/1, close/1, close/2, close/3]). -export([error_atom/1]). -export([info/2, info_keys/1, info_keys/0]). +-export([socket_adapter_info/2]). -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}). @@ -332,3 +333,8 @@ info_keys(ConnectionPid) -> %% atoms that can be used for a certain connection, use info_keys/1. info_keys() -> amqp_gen_connection:info_keys(). + +%% @doc Returns an #amqp_adapter_info{} based on the underlying socket +%% for a non-AMQP network protocol. +socket_adapter_info(Sock, Protocol) -> + amqp_direct_connection:socket_adapter_info(Sock, Protocol). From 7a0476fec210b07d73369219e84cceb18e39309f Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 9 Jan 2013 13:52:14 +0000 Subject: [PATCH 16/44] Make input and output clearer. --- src/amqp_connection.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl index 08ce840f..d3fc10fa 100644 --- a/src/amqp_connection.erl +++ b/src/amqp_connection.erl @@ -334,7 +334,7 @@ info_keys(ConnectionPid) -> info_keys() -> amqp_gen_connection:info_keys(). -%% @doc Returns an #amqp_adapter_info{} based on the underlying socket -%% for a non-AMQP network protocol. +%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} +%% based on the socket for the protocol given. socket_adapter_info(Sock, Protocol) -> amqp_direct_connection:socket_adapter_info(Sock, Protocol). From a5c1a4cf5ca3f435061694540ff37fb46e49d22f Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 15 Feb 2013 16:09:36 +0000 Subject: [PATCH 17/44] Destination semantics for STOMP and AMQP 1.0 --- src/routing_util.erl | 150 +++++++++++++++++++++++++++++++++++++ test/amqp_client_SUITE.erl | 1 + test/test_util.erl | 83 ++++++++++++++++++++ 3 files changed, 234 insertions(+) create mode 100644 src/routing_util.erl diff --git a/src/routing_util.erl b/src/routing_util.erl new file mode 100644 index 00000000..5f6a9d96 --- /dev/null +++ b/src/routing_util.erl @@ -0,0 +1,150 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved. +%% + +-module(routing_util). + +-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). +-export([ensure_endpoint/4, ensure_binding/3]). +-export([parse_endpoint/1, parse_endpoint/2, parse_routing/1]). + +-include("amqp_client.hrl"). +-include("routing_prefixes.hrl"). + +%%---------------------------------------------------------------------------- + +init_state() -> sets:new(). + +dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, + ?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]. + +all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. + + +ensure_endpoint(source, Channel, {exchange, _}, State) -> + #'queue.declare_ok'{queue = Queue} = + amqp_channel:call(Channel, #'queue.declare'{auto_delete = true, + exclusive = true}), + {ok, Queue, State}; + +ensure_endpoint(source, Channel, {topic, Name}, State) -> + ensure_endpoint(source, Channel, {topic, Name, true}, State); +ensure_endpoint(source, Channel, {topic, Name, Durable}, State) -> + Method = + case Durable of + true -> + Q = list_to_binary(Name), + #'queue.declare'{durable = true, queue = Q}; + false -> + #'queue.declare'{auto_delete = true, exclusive = true} + end, + #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), + {ok, Queue, State}; + +ensure_endpoint(_, Channel, {queue, Name}, State) -> + Queue = list_to_binary(Name), + State1 = case sets:is_element(Queue, State) of + true -> State; + _ -> amqp_channel:cast(Channel, + #'queue.declare'{durable = true, + queue = Queue, + nowait = true}), + sets:add_element(Queue, State) + end, + {ok, Queue, State1}; + +ensure_endpoint(dest, _Channel, {exchange, _}, State) -> + {ok, undefined, State}; + +ensure_endpoint(dest, _Ch, {topic, _}, State) -> + {ok, undefined, State}; + +ensure_endpoint(_, _Ch, {Type, Name}, State) + when Type =:= reply_queue orelse Type =:= amqqueue -> + {ok, list_to_binary(Name), State}. + +%% -------------------------------------------------------------------------- + +ensure_binding(QueueBin, {"", Queue}, _Channel) -> + %% i.e., we should only be asked to bind to the default exchange a + %% queue with its own name + QueueBin = list_to_binary(Queue), + ok; +ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> + #'queue.bind_ok'{} = + amqp_channel:call(Channel, + #'queue.bind'{ + queue = Queue, + exchange = list_to_binary(Exchange), + routing_key = list_to_binary(RoutingKey)}), + ok. + +%% -------------------------------------------------------------------------- + +parse_endpoint(Destination, Enc) when is_binary(Destination) -> + parse_endpoint(unicode:characters_to_list(Destination, Enc)). + +parse_endpoint(Destination) when is_list(Destination) -> + case re:split(Destination, "/", [{return, list}]) of + [Name] -> + {ok, {queue, unescape(Name)}}; + ["", Type | Rest] + when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; + Type =:= "temp-queue"; Type =:= "reply-queue" -> + parse_endpoint0(atomise(Type), Rest); + ["", "amq", "queue" | Rest] -> + parse_endpoint0(amqqueue, Rest); + _ -> + {error, {unknown_destination, Destination}} + end. + +parse_endpoint0(exchange, ["" | _] = Rest) -> + {error, {invalid_destination, exchange, to_url(Rest)}}; +parse_endpoint0(exchange, [Name]) -> + {ok, {exchange, {unescape(Name), undefined}}}; +parse_endpoint0(exchange, [Name, Pattern]) -> + {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; +parse_endpoint0(Type, [[_|_]] = [Name]) -> + {ok, {Type, unescape(Name)}}; +parse_endpoint0(Type, Rest) -> + {error, {invalid_destination, Type, to_url(Rest)}}. + +%% -------------------------------------------------------------------------- + +parse_routing({exchange, {Name, undefined}}) -> + {Name, ""}; +parse_routing({exchange, {Name, Pattern}}) -> + {Name, Pattern}; +parse_routing({topic, Name}) -> + {"amq.topic", Name}; +parse_routing({Type, Name}) + when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue -> + {"", Name}. + +%%---------------------------------------------------------------------------- + +to_url([]) -> []; +to_url(Lol) -> "/" ++ string:join(Lol, "/"). + +atomise(Name) when is_list(Name) -> + list_to_atom(re:replace(Name, "-", "_", [{return,list}, global])). + +unescape_all(Lol) -> [unescape(L) || L <- Lol]. +unescape(Str) -> unescape(Str, []). + +unescape("%2F" ++ Str, Acc) -> unescape(Str, [$/ | Acc]); +unescape([C | Str], Acc) -> unescape(Str, [C | Acc]); +unescape([], Acc) -> lists:reverse(Acc). + diff --git a/test/amqp_client_SUITE.erl b/test/amqp_client_SUITE.erl index 33267914..f4479fdc 100644 --- a/test/amqp_client_SUITE.erl +++ b/test/amqp_client_SUITE.erl @@ -37,6 +37,7 @@ %%--------------------------------------------------------------------------- amqp_uri_parse_test_() -> ?RUN([]). +route_destination_test_() -> ?RUN([]). basic_get_test_() -> ?RUN([]). basic_get_ipv6_test_() -> ?RUN([]). basic_return_test_() -> ?RUN([]). diff --git a/test/test_util.erl b/test/test_util.erl index d892d210..f326cc58 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -140,6 +140,89 @@ amqp_uri_parse_test() -> ok. +%%-------------------------------------------------------------------- +%% Destination Parsing Tests +%%-------------------------------------------------------------------- + +route_destination_test() -> + %% valid queue + ?assertMatch({ok, {queue, "test"}}, parse_dest("/queue/test")), + + %% valid topic + ?assertMatch({ok, {topic, "test"}}, parse_dest("/topic/test")), + + %% valid exchange + ?assertMatch({ok, {exchange, {"test", undefined}}}, parse_dest("/exchange/test")), + + %% valid temp queue + ?assertMatch({ok, {temp_queue, "test"}}, parse_dest("/temp-queue/test")), + + %% valid reply queue + ?assertMatch({ok, {reply_queue, "test"}}, parse_dest("/reply-queue/test")), + + %% valid exchange with pattern + ?assertMatch({ok, {exchange, {"test", "pattern"}}}, + parse_dest("/exchange/test/pattern")), + + %% valid pre-declared queue + ?assertMatch({ok, {amqqueue, "test"}}, parse_dest("/amq/queue/test")), + + %% queue without name + ?assertMatch({error, {invalid_destination, queue, ""}}, parse_dest("/queue")), + + %% topic without name + ?assertMatch({error, {invalid_destination, topic, ""}}, parse_dest("/topic")), + + %% exchange without name + ?assertMatch({error, {invalid_destination, exchange, ""}}, + parse_dest("/exchange")), + + %% exchange default name + ?assertMatch({error, {invalid_destination, exchange, "//foo"}}, + parse_dest("/exchange//foo")), + + %% amqqueue without name + ?assertMatch({error, {invalid_destination, amqqueue, ""}}, + parse_dest("/amq/queue")), + + %% queue without name with trailing slash + ?assertMatch({error, {invalid_destination, queue, "/"}}, parse_dest("/queue/")), + + %% topic without name with trailing slash + ?assertMatch({error, {invalid_destination, topic, "/"}}, parse_dest("/topic/")), + + %% exchange without name with trailing slash + ?assertMatch({error, {invalid_destination, exchange, "/"}}, + parse_dest("/exchange/")), + + %% queue with invalid name + ?assertMatch({error, {invalid_destination, queue, "/foo/bar"}}, + parse_dest("/queue/foo/bar")), + + %% topic with invalid name + ?assertMatch({error, {invalid_destination, topic, "/foo/bar"}}, + parse_dest("/topic/foo/bar")), + + %% exchange with invalid name + ?assertMatch({error, {invalid_destination, exchange, "/foo/bar/baz"}}, + parse_dest("/exchange/foo/bar/baz")), + + %% unknown destination + ?assertMatch({error, {unknown_destination, "/blah/boo"}}, + parse_dest("/blah/boo")), + + %% queue with escaped name + ?assertMatch({ok, {queue, "te/st"}}, parse_dest("/queue/te%2Fst")), + + %% valid exchange with escaped name and pattern + ?assertMatch({ok, {exchange, {"te/st", "pa/tt/ern"}}}, + parse_dest("/exchange/te%2Fst/pa%2Ftt%2Fern")), + + ok. + +parse_dest(Destination) -> + routing_util:parse_endpoint(Destination). + %%%% %% %% This is an example of how the client interaction should work From da8efed65118c9c86f436ec514a376d5e9ca085b Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 15 Feb 2013 17:51:20 +0000 Subject: [PATCH 18/44] Permit parameters when parsing endpoints and support dynamic queues --- src/routing_util.erl | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index 5f6a9d96..481b0ddd 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -41,6 +41,7 @@ ensure_endpoint(source, Channel, {exchange, _}, State) -> ensure_endpoint(source, Channel, {topic, Name}, State) -> ensure_endpoint(source, Channel, {topic, Name, true}, State); + ensure_endpoint(source, Channel, {topic, Name, Durable}, State) -> Method = case Durable of @@ -53,6 +54,11 @@ ensure_endpoint(source, Channel, {topic, Name, Durable}, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; +ensure_endpoint(_, Channel, {queue, undefined}, State) -> + #'queue.declare_ok'{queue = Queue} = + amqp_channel:call(Channel, #'queue.declare'{durable = true}), + {ok, Queue, State}; + ensure_endpoint(_, Channel, {queue, Name}, State) -> Queue = list_to_binary(Name), State1 = case sets:is_element(Queue, State) of @@ -93,32 +99,43 @@ ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> %% -------------------------------------------------------------------------- -parse_endpoint(Destination, Enc) when is_binary(Destination) -> - parse_endpoint(unicode:characters_to_list(Destination, Enc)). +parse_endpoint(Destination) -> + parse_endpoint(Destination, []). + +parse_endpoint(Destination, Params) when is_binary(Destination) -> + parse_endpoint( + unicode:characters_to_list( + Destination, proplists:get_value(encoding, Params)), Params); -parse_endpoint(Destination) when is_list(Destination) -> +parse_endpoint(Destination, Params) when is_list(Destination) -> case re:split(Destination, "/", [{return, list}]) of [Name] -> {ok, {queue, unescape(Name)}}; ["", Type | Rest] when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; Type =:= "temp-queue"; Type =:= "reply-queue" -> - parse_endpoint0(atomise(Type), Rest); + parse_endpoint0(atomise(Type), Rest, Params); ["", "amq", "queue" | Rest] -> - parse_endpoint0(amqqueue, Rest); + parse_endpoint0(amqqueue, Rest, Params); _ -> {error, {unknown_destination, Destination}} end. -parse_endpoint0(exchange, ["" | _] = Rest) -> +parse_endpoint0(exchange, ["" | _] = Rest, _Params) -> {error, {invalid_destination, exchange, to_url(Rest)}}; -parse_endpoint0(exchange, [Name]) -> +parse_endpoint0(exchange, [Name], Params) -> {ok, {exchange, {unescape(Name), undefined}}}; -parse_endpoint0(exchange, [Name, Pattern]) -> +parse_endpoint0(exchange, [Name, Pattern], _Params) -> {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; -parse_endpoint0(Type, [[_|_]] = [Name]) -> +parse_endpoint0(queue, [], Params) -> + case {proplists:get_value(direction, Params), + proplists:get_value(dynamic, Params)} of + {dest, true} -> {ok, {queue, undefined}}; + _ -> {error, {invalid_destination, queue, []}} + end; +parse_endpoint0(Type, [[_|_]] = [Name], _Params) -> {ok, {Type, unescape(Name)}}; -parse_endpoint0(Type, Rest) -> +parse_endpoint0(Type, Rest, _Params) -> {error, {invalid_destination, Type, to_url(Rest)}}. %% -------------------------------------------------------------------------- From 71476ebc9efd9bf03ef511cd96edc6d628deb8ef Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 18 Feb 2013 10:26:04 +0000 Subject: [PATCH 19/44] Test for dynamic queues --- test/test_util.erl | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/test_util.erl b/test/test_util.erl index f326cc58..b1998d29 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -169,6 +169,8 @@ route_destination_test() -> %% queue without name ?assertMatch({error, {invalid_destination, queue, ""}}, parse_dest("/queue")), + ?assertMatch({ok, {queue, undefined}}, + parse_dest("/queue", [{direction, dest}, {dynamic, true}])), %% topic without name ?assertMatch({error, {invalid_destination, topic, ""}}, parse_dest("/topic")), @@ -220,6 +222,8 @@ route_destination_test() -> ok. +parse_dest(Destination, Params) -> + routing_util:parse_endpoint(Destination, Params). parse_dest(Destination) -> routing_util:parse_endpoint(Destination). From 4e182e1e1c4f50b2c57a58ae531aad3243d005dc Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 18 Feb 2013 15:01:20 +0000 Subject: [PATCH 20/44] Check exchange existence --- src/routing_util.erl | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index 481b0ddd..b80bd398 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -33,7 +33,8 @@ dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. -ensure_endpoint(source, Channel, {exchange, _}, State) -> +ensure_endpoint(source, Channel, {exchange, {Name, _}}, State) -> + check_exchange(Name, Channel), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{auto_delete = true, exclusive = true}), @@ -71,7 +72,8 @@ ensure_endpoint(_, Channel, {queue, Name}, State) -> end, {ok, Queue, State1}; -ensure_endpoint(dest, _Channel, {exchange, _}, State) -> +ensure_endpoint(dest, Channel, {exchange, {Name, _}}, State) -> + check_exchange(Name, Channel), {ok, undefined, State}; ensure_endpoint(dest, _Ch, {topic, _}, State) -> @@ -152,6 +154,20 @@ parse_routing({Type, Name}) %%---------------------------------------------------------------------------- +check_exchange(ExchangeName, Channel) -> + #'queue.declare_ok'{queue = Queue} = + amqp_channel:call(Channel, #'queue.declare'{auto_delete = true}), + #'queue.bind_ok'{} = + amqp_channel:call(Channel, + #'queue.bind'{ + queue = Queue, + exchange = list_to_binary(ExchangeName)}), + #'queue.delete_ok'{} = + amqp_channel:call(Channel, #'queue.delete'{queue = Queue}), + ok. + +%%---------------------------------------------------------------------------- + to_url([]) -> []; to_url(Lol) -> "/" ++ string:join(Lol, "/"). From c9ab71f1a376a53b58f3816710eded994a3af7dc Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Tue, 19 Feb 2013 18:19:31 +0000 Subject: [PATCH 21/44] Better support for topic exchanges and checking exchange existence --- src/routing_util.erl | 65 ++++++++++++++++++++++++++------------------ 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index b80bd398..ab125eb4 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -17,7 +17,7 @@ -module(routing_util). -export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). --export([ensure_endpoint/4, ensure_binding/3]). +-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). -export([parse_endpoint/1, parse_endpoint/2, parse_routing/1]). -include("amqp_client.hrl"). @@ -32,35 +32,26 @@ dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. +ensure_endpoint(Dir, Channel, EndPoint, State) -> + ensure_endpoint(Dir, Channel, EndPoint, [], State). -ensure_endpoint(source, Channel, {exchange, {Name, _}}, State) -> +ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> check_exchange(Name, Channel), - #'queue.declare_ok'{queue = Queue} = - amqp_channel:call(Channel, #'queue.declare'{auto_delete = true, - exclusive = true}), + Method = queue_declare_method(#'queue.declare'{}, Params), + #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(source, Channel, {topic, Name}, State) -> - ensure_endpoint(source, Channel, {topic, Name, true}, State); - -ensure_endpoint(source, Channel, {topic, Name, Durable}, State) -> - Method = - case Durable of - true -> - Q = list_to_binary(Name), - #'queue.declare'{durable = true, queue = Q}; - false -> - #'queue.declare'{auto_delete = true, exclusive = true} - end, +ensure_endpoint(source, Channel, {topic, Name}, Params, State) -> + Method = queue_declare_method(#'queue.declare'{}, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(_, Channel, {queue, undefined}, State) -> +ensure_endpoint(_, Channel, {queue, undefined}, _Params, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{durable = true}), {ok, Queue, State}; -ensure_endpoint(_, Channel, {queue, Name}, State) -> +ensure_endpoint(_, Channel, {queue, Name}, _Params, State) -> Queue = list_to_binary(Name), State1 = case sets:is_element(Queue, State) of true -> State; @@ -72,14 +63,15 @@ ensure_endpoint(_, Channel, {queue, Name}, State) -> end, {ok, Queue, State1}; -ensure_endpoint(dest, Channel, {exchange, {Name, _}}, State) -> +ensure_endpoint(dest, Channel, {exchange, {Name, _}}, _Params, State) -> check_exchange(Name, Channel), {ok, undefined, State}; -ensure_endpoint(dest, _Ch, {topic, _}, State) -> +ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> {ok, undefined, State}; -ensure_endpoint(_, _Ch, {Type, Name}, State) +%TODO: restrict direction +ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) when Type =:= reply_queue orelse Type =:= amqqueue -> {ok, list_to_binary(Name), State}. @@ -131,7 +123,7 @@ parse_endpoint0(exchange, [Name, Pattern], _Params) -> {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; parse_endpoint0(queue, [], Params) -> case {proplists:get_value(direction, Params), - proplists:get_value(dynamic, Params)} of + proplists:get_value(anonymous, Params)} of {dest, true} -> {ok, {queue, undefined}}; _ -> {error, {invalid_destination, queue, []}} end; @@ -154,18 +146,37 @@ parse_routing({Type, Name}) %%---------------------------------------------------------------------------- +check_exchange("amq." ++ _, Channel) -> + ok; check_exchange(ExchangeName, Channel) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{auto_delete = true}), + #'basic.consume_ok'{consumer_tag = Tag} = + amqp_channel:call(Channel, #'basic.consume'{queue = Queue}), #'queue.bind_ok'{} = - amqp_channel:call(Channel, - #'queue.bind'{ - queue = Queue, - exchange = list_to_binary(ExchangeName)}), + amqp_channel:call(Channel, + #'queue.bind'{ + queue = Queue, + exchange = list_to_binary(ExchangeName)}), + #'basic.cancel_ok'{} = amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}), #'queue.delete_ok'{} = amqp_channel:call(Channel, #'queue.delete'{queue = Queue}), ok. + +queue_declare_method(#'queue.declare'{} = Method, Params) -> + Durable = proplists:get_value(durable, Params, false), + Method1 = Method#'queue.declare'{durable = Durable, + auto_delete = not Durable, + exclusive = not Durable}, + queue_declare_name(Method1, Params). + +queue_declare_name(#'queue.declare'{} = Method, Params) -> + case proplists:get_value(queue_name_gen, Params) of + undefined -> Method; + QG -> Method#'queue.declare'{queue = QG()} + end. + %%---------------------------------------------------------------------------- to_url([]) -> []; From 2c371dfddd082c5675eec7f39693a4305e5eb3a0 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 20 Feb 2013 14:44:53 +0000 Subject: [PATCH 22/44] Added include/routing_prefixes.hrl --- include/routing_prefixes.hrl | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 include/routing_prefixes.hrl diff --git a/include/routing_prefixes.hrl b/include/routing_prefixes.hrl new file mode 100644 index 00000000..c32b8fbf --- /dev/null +++ b/include/routing_prefixes.hrl @@ -0,0 +1,24 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. +%% + +-define(QUEUE_PREFIX, "/queue"). +-define(TOPIC_PREFIX, "/topic"). +-define(EXCHANGE_PREFIX, "/exchange"). +-define(AMQQUEUE_PREFIX, "/amq/queue"). +-define(TEMP_QUEUE_PREFIX, "/temp-queue"). +%% reply queues names can have slashes in the content so no further +%% parsing happens. +-define(REPLY_QUEUE_PREFIX, "/reply-queue/"). From d951d65426fdb2aa9db641fbf075b06949f21ece Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 20 Feb 2013 15:14:01 +0000 Subject: [PATCH 23/44] More consistent declaration parameters --- src/routing_util.erl | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index ab125eb4..7ce616a4 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -46,19 +46,21 @@ ensure_endpoint(source, Channel, {topic, Name}, Params, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(_, Channel, {queue, undefined}, _Params, State) -> +ensure_endpoint(_, Channel, {queue, undefined}, Params, State) -> + Method = queue_declare_method(#'queue.declare'{}, Params), #'queue.declare_ok'{queue = Queue} = - amqp_channel:call(Channel, #'queue.declare'{durable = true}), + amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(_, Channel, {queue, Name}, _Params, State) -> +ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> + Params1 = rabbit_misc:pset(durable, true, Params), Queue = list_to_binary(Name), State1 = case sets:is_element(Queue, State) of true -> State; - _ -> amqp_channel:cast(Channel, - #'queue.declare'{durable = true, - queue = Queue, - nowait = true}), + _ -> Method = queue_declare_method( + #'queue.declare'{queue = Queue, + nowait = true}, Params1), + amqp_channel:cast(Channel, Method), sets:add_element(Queue, State) end, {ok, Queue, State1}; @@ -158,23 +160,21 @@ check_exchange(ExchangeName, Channel) -> #'queue.bind'{ queue = Queue, exchange = list_to_binary(ExchangeName)}), - #'basic.cancel_ok'{} = amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}), + #'basic.cancel_ok'{} = + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}), #'queue.delete_ok'{} = amqp_channel:call(Channel, #'queue.delete'{queue = Queue}), ok. - queue_declare_method(#'queue.declare'{} = Method, Params) -> - Durable = proplists:get_value(durable, Params, false), - Method1 = Method#'queue.declare'{durable = Durable, - auto_delete = not Durable, - exclusive = not Durable}, - queue_declare_name(Method1, Params). - -queue_declare_name(#'queue.declare'{} = Method, Params) -> + Method1 = case proplists:get_value(durable, Params, false) of + true -> Method#'queue.declare'{durable = true}; + false -> Method#'queue.declare'{auto_delete = true, + exclusive = true} + end, case proplists:get_value(queue_name_gen, Params) of - undefined -> Method; - QG -> Method#'queue.declare'{queue = QG()} + undefined -> Method1; + QG -> Method1#'queue.declare'{queue = QG()} end. %%---------------------------------------------------------------------------- From 8c451912f709133c4621f1a0a8a1f9e607649aa4 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 20 Feb 2013 15:58:55 +0000 Subject: [PATCH 24/44] Indent --- src/routing_util.erl | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index 7ce616a4..da805e8a 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -53,16 +53,16 @@ ensure_endpoint(_, Channel, {queue, undefined}, Params, State) -> {ok, Queue, State}; ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> - Params1 = rabbit_misc:pset(durable, true, Params), - Queue = list_to_binary(Name), - State1 = case sets:is_element(Queue, State) of - true -> State; - _ -> Method = queue_declare_method( - #'queue.declare'{queue = Queue, - nowait = true}, Params1), - amqp_channel:cast(Channel, Method), - sets:add_element(Queue, State) - end, + Params1 = rabbit_misc:pset(durable, true, Params), + Queue = list_to_binary(Name), + State1 = case sets:is_element(Queue, State) of + true -> State; + _ -> Method = queue_declare_method( + #'queue.declare'{queue = Queue, + nowait = true}, Params1), + amqp_channel:cast(Channel, Method), + sets:add_element(Queue, State) + end, {ok, Queue, State1}; ensure_endpoint(dest, Channel, {exchange, {Name, _}}, _Params, State) -> From 1c503a6ba8f9586d5395597dd4d211a21dbec1fe Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Wed, 20 Feb 2013 17:39:07 +0000 Subject: [PATCH 25/44] Remove TODO, direction cannot be restricted --- src/routing_util.erl | 1 - 1 file changed, 1 deletion(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index da805e8a..b322ecf3 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -72,7 +72,6 @@ ensure_endpoint(dest, Channel, {exchange, {Name, _}}, _Params, State) -> ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> {ok, undefined, State}; -%TODO: restrict direction ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) when Type =:= reply_queue orelse Type =:= amqqueue -> {ok, list_to_binary(Name), State}. From 1b9ccccc878e2384a4438a3af9a2a06225944eab Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 11:07:05 +0000 Subject: [PATCH 26/44] Support more selective endpoint validation for exchanges And silence some compilation warnings --- src/routing_util.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index b322ecf3..4e68d2ec 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -36,12 +36,12 @@ ensure_endpoint(Dir, Channel, EndPoint, State) -> ensure_endpoint(Dir, Channel, EndPoint, [], State). ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel), + check_exchange(Name, Channel, proplists:get_value(validate, Params)), Method = queue_declare_method(#'queue.declare'{}, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(source, Channel, {topic, Name}, Params, State) -> +ensure_endpoint(source, Channel, {topic, _}, Params, State) -> Method = queue_declare_method(#'queue.declare'{}, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; @@ -65,8 +65,8 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> end, {ok, Queue, State1}; -ensure_endpoint(dest, Channel, {exchange, {Name, _}}, _Params, State) -> - check_exchange(Name, Channel), +ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) -> + check_exchange(Name, Channel, proplists:get_value(validate, Params)), {ok, undefined, State}; ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> @@ -118,7 +118,7 @@ parse_endpoint(Destination, Params) when is_list(Destination) -> parse_endpoint0(exchange, ["" | _] = Rest, _Params) -> {error, {invalid_destination, exchange, to_url(Rest)}}; -parse_endpoint0(exchange, [Name], Params) -> +parse_endpoint0(exchange, [Name], _Params) -> {ok, {exchange, {unescape(Name), undefined}}}; parse_endpoint0(exchange, [Name, Pattern], _Params) -> {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; @@ -147,9 +147,12 @@ parse_routing({Type, Name}) %%---------------------------------------------------------------------------- -check_exchange("amq." ++ _, Channel) -> +check_exchange(_, _, Validation) + when Validation == false orelse Validation == undefined -> ok; -check_exchange(ExchangeName, Channel) -> +check_exchange("amq." ++ _, _Channel, _Validation) -> + ok; +check_exchange(ExchangeName, Channel, true) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, #'queue.declare'{auto_delete = true}), #'basic.consume_ok'{consumer_tag = Tag} = @@ -184,7 +187,6 @@ to_url(Lol) -> "/" ++ string:join(Lol, "/"). atomise(Name) when is_list(Name) -> list_to_atom(re:replace(Name, "-", "_", [{return,list}, global])). -unescape_all(Lol) -> [unescape(L) || L <- Lol]. unescape(Str) -> unescape(Str, []). unescape("%2F" ++ Str, Acc) -> unescape(Str, [$/ | Acc]); From b8abd6c87d6ba531965e94a7570026cb0a64f4f3 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 11:55:58 +0000 Subject: [PATCH 27/44] Check exchanges existence using passive declare --- src/routing_util.erl | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/src/routing_util.erl b/src/routing_util.erl index 4e68d2ec..bc810095 100644 --- a/src/routing_util.erl +++ b/src/routing_util.erl @@ -153,19 +153,8 @@ check_exchange(_, _, Validation) check_exchange("amq." ++ _, _Channel, _Validation) -> ok; check_exchange(ExchangeName, Channel, true) -> - #'queue.declare_ok'{queue = Queue} = - amqp_channel:call(Channel, #'queue.declare'{auto_delete = true}), - #'basic.consume_ok'{consumer_tag = Tag} = - amqp_channel:call(Channel, #'basic.consume'{queue = Queue}), - #'queue.bind_ok'{} = - amqp_channel:call(Channel, - #'queue.bind'{ - queue = Queue, - exchange = list_to_binary(ExchangeName)}), - #'basic.cancel_ok'{} = - amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}), - #'queue.delete_ok'{} = - amqp_channel:call(Channel, #'queue.delete'{queue = Queue}), + XDecl = #'exchange.declare'{ exchange = ExchangeName, passive = true }, + #'exchange.declare_ok'{} = rabbit_amqp1_0_channel:call(Channel, XDecl), ok. queue_declare_method(#'queue.declare'{} = Method, Params) -> From 6e21047892c9ee58fb815df54faad44c5ef041b3 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 12:15:22 +0000 Subject: [PATCH 28/44] Rename --- include/{routing_prefixes.hrl => rabbit_routing_prefixes.hrl} | 0 src/{routing_util.erl => rabbit_routing_util.erl} | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename include/{routing_prefixes.hrl => rabbit_routing_prefixes.hrl} (100%) rename src/{routing_util.erl => rabbit_routing_util.erl} (99%) diff --git a/include/routing_prefixes.hrl b/include/rabbit_routing_prefixes.hrl similarity index 100% rename from include/routing_prefixes.hrl rename to include/rabbit_routing_prefixes.hrl diff --git a/src/routing_util.erl b/src/rabbit_routing_util.erl similarity index 99% rename from src/routing_util.erl rename to src/rabbit_routing_util.erl index bc810095..fb70ac49 100644 --- a/src/routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -14,14 +14,14 @@ %% Copyright (c) 2013-2013 VMware, Inc. All rights reserved. %% --module(routing_util). +-module(rabbit_routing_util). -export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). -export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). -export([parse_endpoint/1, parse_endpoint/2, parse_routing/1]). -include("amqp_client.hrl"). --include("routing_prefixes.hrl"). +-include("rabbit_routing_prefixes.hrl"). %%---------------------------------------------------------------------------- From 36c0d5ea8711ce561cc81104ed15e91e37613b67 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 12:22:46 +0000 Subject: [PATCH 29/44] Remove support for encoding parameter --- src/rabbit_routing_util.erl | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index fb70ac49..a41609d1 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -98,9 +98,7 @@ parse_endpoint(Destination) -> parse_endpoint(Destination, []). parse_endpoint(Destination, Params) when is_binary(Destination) -> - parse_endpoint( - unicode:characters_to_list( - Destination, proplists:get_value(encoding, Params)), Params); + parse_endpoint(unicode:characters_to_list(Destination), Params); parse_endpoint(Destination, Params) when is_list(Destination) -> case re:split(Destination, "/", [{return, list}]) of From 46cb6fceec3e30c0efd16bfdc15206c36650cf3c Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 12:31:31 +0000 Subject: [PATCH 30/44] Simplify parse parameters --- src/rabbit_routing_util.erl | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index a41609d1..f8ad7a71 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -95,40 +95,38 @@ ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> %% -------------------------------------------------------------------------- parse_endpoint(Destination) -> - parse_endpoint(Destination, []). + parse_endpoint(Destination, false). -parse_endpoint(Destination, Params) when is_binary(Destination) -> - parse_endpoint(unicode:characters_to_list(Destination), Params); - -parse_endpoint(Destination, Params) when is_list(Destination) -> +parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> + parse_endpoint(unicode:characters_to_list(Destination), + AllowAnonymousQueue); +parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> case re:split(Destination, "/", [{return, list}]) of [Name] -> {ok, {queue, unescape(Name)}}; ["", Type | Rest] when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; Type =:= "temp-queue"; Type =:= "reply-queue" -> - parse_endpoint0(atomise(Type), Rest, Params); + parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue); ["", "amq", "queue" | Rest] -> - parse_endpoint0(amqqueue, Rest, Params); + parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue); _ -> {error, {unknown_destination, Destination}} end. -parse_endpoint0(exchange, ["" | _] = Rest, _Params) -> +parse_endpoint0(exchange, ["" | _] = Rest, _) -> {error, {invalid_destination, exchange, to_url(Rest)}}; -parse_endpoint0(exchange, [Name], _Params) -> +parse_endpoint0(exchange, [Name], _) -> {ok, {exchange, {unescape(Name), undefined}}}; -parse_endpoint0(exchange, [Name, Pattern], _Params) -> +parse_endpoint0(exchange, [Name, Pattern], _) -> {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; -parse_endpoint0(queue, [], Params) -> - case {proplists:get_value(direction, Params), - proplists:get_value(anonymous, Params)} of - {dest, true} -> {ok, {queue, undefined}}; - _ -> {error, {invalid_destination, queue, []}} - end; -parse_endpoint0(Type, [[_|_]] = [Name], _Params) -> +parse_endpoint0(queue, [], false) -> + {error, {invalid_destination, queue, []}}; +parse_endpoint0(queue, [], true) -> + {ok, {queue, undefined}}; +parse_endpoint0(Type, [[_|_]] = [Name], _) -> {ok, {Type, unescape(Name)}}; -parse_endpoint0(Type, Rest, _Params) -> +parse_endpoint0(Type, Rest, _) -> {error, {invalid_destination, Type, to_url(Rest)}}. %% -------------------------------------------------------------------------- From 50a6d71082ab6393f3fd4b7f5ef27151e12867b2 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 14:00:22 +0000 Subject: [PATCH 31/44] Transparent subscriptions to topic destinations --- src/rabbit_routing_util.erl | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index f8ad7a71..361bf71c 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -37,17 +37,17 @@ ensure_endpoint(Dir, Channel, EndPoint, State) -> ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> check_exchange(Name, Channel, proplists:get_value(validate, Params)), - Method = queue_declare_method(#'queue.declare'{}, Params), + Method = queue_declare_method(#'queue.declare'{}, exchange, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; ensure_endpoint(source, Channel, {topic, _}, Params, State) -> - Method = queue_declare_method(#'queue.declare'{}, Params), + Method = queue_declare_method(#'queue.declare'{}, topic, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; ensure_endpoint(_, Channel, {queue, undefined}, Params, State) -> - Method = queue_declare_method(#'queue.declare'{}, Params), + Method = queue_declare_method(#'queue.declare'{}, queue, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; @@ -59,7 +59,8 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> true -> State; _ -> Method = queue_declare_method( #'queue.declare'{queue = Queue, - nowait = true}, Params1), + nowait = true}, + queue, Params1), amqp_channel:cast(Channel, Method), sets:add_element(Queue, State) end, @@ -153,15 +154,20 @@ check_exchange(ExchangeName, Channel, true) -> #'exchange.declare_ok'{} = rabbit_amqp1_0_channel:call(Channel, XDecl), ok. -queue_declare_method(#'queue.declare'{} = Method, Params) -> +queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> Method1 = case proplists:get_value(durable, Params, false) of true -> Method#'queue.declare'{durable = true}; false -> Method#'queue.declare'{auto_delete = true, exclusive = true} end, - case proplists:get_value(queue_name_gen, Params) of - undefined -> Method1; - QG -> Method1#'queue.declare'{queue = QG()} + case {Type, proplists:get_value(topic_queue_name_gen, Params)} of + {topic, DTQNG} when is_function(DTQNG) -> + Method1#'queue.declare'{queue = DTQNG()}; + _ -> + case proplists:get_value(queue_name_gen, Params) of + undefined -> Method1; + QG -> Method1#'queue.declare'{queue = QG()} + end end. %%---------------------------------------------------------------------------- From 984e5bdeff94bbf6e608f7b1393e711348f0af42 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 14:16:35 +0000 Subject: [PATCH 32/44] Further transparency enhancement --- src/rabbit_routing_util.erl | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 361bf71c..699e39c2 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -18,7 +18,8 @@ -export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). -export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). --export([parse_endpoint/1, parse_endpoint/2, parse_routing/1]). +-export([parse_endpoint/0, parse_endpoint/1, parse_endpoint/2]). +-export([parse_routing/1, dest_temp_queue/1]). -include("amqp_client.hrl"). -include("rabbit_routing_prefixes.hrl"). @@ -95,6 +96,8 @@ ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> %% -------------------------------------------------------------------------- +parse_endpoint() -> {queue, undefined}. + parse_endpoint(Destination) -> parse_endpoint(Destination, false). @@ -142,6 +145,9 @@ parse_routing({Type, Name}) when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue -> {"", Name}. +dest_temp_queue({temp_queue, Name}) -> Name; +dest_temp_queue(_) -> none. + %%---------------------------------------------------------------------------- check_exchange(_, _, Validation) From abd3c89ab0f50087490ec3425af0b7ef45f542a1 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 14:22:05 +0000 Subject: [PATCH 33/44] Re-order --- src/rabbit_routing_util.erl | 84 +++++++++++++++++++------------------ 1 file changed, 43 insertions(+), 41 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 699e39c2..2d7aff08 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -33,6 +33,47 @@ dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. +%% -------------------------------------------------------------------------- + +parse_endpoint() -> {queue, undefined}. + +parse_endpoint(Destination) -> + parse_endpoint(Destination, false). + +parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> + parse_endpoint(unicode:characters_to_list(Destination), + AllowAnonymousQueue); +parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> + case re:split(Destination, "/", [{return, list}]) of + [Name] -> + {ok, {queue, unescape(Name)}}; + ["", Type | Rest] + when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; + Type =:= "temp-queue"; Type =:= "reply-queue" -> + parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue); + ["", "amq", "queue" | Rest] -> + parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue); + _ -> + {error, {unknown_destination, Destination}} + end. + +parse_endpoint0(exchange, ["" | _] = Rest, _) -> + {error, {invalid_destination, exchange, to_url(Rest)}}; +parse_endpoint0(exchange, [Name], _) -> + {ok, {exchange, {unescape(Name), undefined}}}; +parse_endpoint0(exchange, [Name, Pattern], _) -> + {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; +parse_endpoint0(queue, [], false) -> + {error, {invalid_destination, queue, []}}; +parse_endpoint0(queue, [], true) -> + {ok, {queue, undefined}}; +parse_endpoint0(Type, [[_|_]] = [Name], _) -> + {ok, {Type, unescape(Name)}}; +parse_endpoint0(Type, Rest, _) -> + {error, {invalid_destination, Type, to_url(Rest)}}. + +%% -------------------------------------------------------------------------- + ensure_endpoint(Dir, Channel, EndPoint, State) -> ensure_endpoint(Dir, Channel, EndPoint, [], State). @@ -96,45 +137,6 @@ ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> %% -------------------------------------------------------------------------- -parse_endpoint() -> {queue, undefined}. - -parse_endpoint(Destination) -> - parse_endpoint(Destination, false). - -parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> - parse_endpoint(unicode:characters_to_list(Destination), - AllowAnonymousQueue); -parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> - case re:split(Destination, "/", [{return, list}]) of - [Name] -> - {ok, {queue, unescape(Name)}}; - ["", Type | Rest] - when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; - Type =:= "temp-queue"; Type =:= "reply-queue" -> - parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue); - ["", "amq", "queue" | Rest] -> - parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue); - _ -> - {error, {unknown_destination, Destination}} - end. - -parse_endpoint0(exchange, ["" | _] = Rest, _) -> - {error, {invalid_destination, exchange, to_url(Rest)}}; -parse_endpoint0(exchange, [Name], _) -> - {ok, {exchange, {unescape(Name), undefined}}}; -parse_endpoint0(exchange, [Name, Pattern], _) -> - {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; -parse_endpoint0(queue, [], false) -> - {error, {invalid_destination, queue, []}}; -parse_endpoint0(queue, [], true) -> - {ok, {queue, undefined}}; -parse_endpoint0(Type, [[_|_]] = [Name], _) -> - {ok, {Type, unescape(Name)}}; -parse_endpoint0(Type, Rest, _) -> - {error, {invalid_destination, Type, to_url(Rest)}}. - -%% -------------------------------------------------------------------------- - parse_routing({exchange, {Name, undefined}}) -> {Name, ""}; parse_routing({exchange, {Name, Pattern}}) -> @@ -148,7 +150,7 @@ parse_routing({Type, Name}) dest_temp_queue({temp_queue, Name}) -> Name; dest_temp_queue(_) -> none. -%%---------------------------------------------------------------------------- +%% -------------------------------------------------------------------------- check_exchange(_, _, Validation) when Validation == false orelse Validation == undefined -> @@ -176,7 +178,7 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> end end. -%%---------------------------------------------------------------------------- +%% -------------------------------------------------------------------------- to_url([]) -> []; to_url(Lol) -> "/" ++ string:join(Lol, "/"). From b40e1a66e39be0df40b9716e8f70f06f04bce358 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Fri, 22 Feb 2013 14:40:54 +0000 Subject: [PATCH 34/44] Remove dependency on AMQP 1.0 --- src/rabbit_routing_util.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 2d7aff08..45527ed2 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -159,7 +159,7 @@ check_exchange("amq." ++ _, _Channel, _Validation) -> ok; check_exchange(ExchangeName, Channel, true) -> XDecl = #'exchange.declare'{ exchange = ExchangeName, passive = true }, - #'exchange.declare_ok'{} = rabbit_amqp1_0_channel:call(Channel, XDecl), + #'exchange.declare_ok'{} = rabbit_channel:call(Channel, XDecl), ok. queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> From 84aeac98ff1fc919c59b217877e5fdd5721c5fe2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 22 Feb 2013 16:05:43 +0000 Subject: [PATCH 35/44] Cosmetic. --- src/rabbit_routing_util.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 45527ed2..0f8520f2 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -78,7 +78,7 @@ ensure_endpoint(Dir, Channel, EndPoint, State) -> ensure_endpoint(Dir, Channel, EndPoint, [], State). ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, proplists:get_value(validate, Params)), + check_exchange(Name, Channel, proplists:get_value(validate, Params, false)), Method = queue_declare_method(#'queue.declare'{}, exchange, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; @@ -109,7 +109,7 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> {ok, Queue, State1}; ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, proplists:get_value(validate, Params)), + check_exchange(Name, Channel, proplists:get_value(validate, Params, false)), {ok, undefined, State}; ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> @@ -152,8 +152,7 @@ dest_temp_queue(_) -> none. %% -------------------------------------------------------------------------- -check_exchange(_, _, Validation) - when Validation == false orelse Validation == undefined -> +check_exchange(_, _, false) -> ok; check_exchange("amq." ++ _, _Channel, _Validation) -> ok; From 466cd3edd0d347578e80fdadfe8a52516922eb50 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 22 Feb 2013 16:14:15 +0000 Subject: [PATCH 36/44] Cosmetic --- src/rabbit_routing_util.erl | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 0f8520f2..1516711b 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -57,19 +57,19 @@ parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> {error, {unknown_destination, Destination}} end. -parse_endpoint0(exchange, ["" | _] = Rest, _) -> +parse_endpoint0(exchange, ["" | _] = Rest, _) -> {error, {invalid_destination, exchange, to_url(Rest)}}; -parse_endpoint0(exchange, [Name], _) -> +parse_endpoint0(exchange, [Name], _) -> {ok, {exchange, {unescape(Name), undefined}}}; -parse_endpoint0(exchange, [Name, Pattern], _) -> +parse_endpoint0(exchange, [Name, Pattern], _) -> {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; -parse_endpoint0(queue, [], false) -> +parse_endpoint0(queue, [], false) -> {error, {invalid_destination, queue, []}}; -parse_endpoint0(queue, [], true) -> +parse_endpoint0(queue, [], true) -> {ok, {queue, undefined}}; -parse_endpoint0(Type, [[_|_]] = [Name], _) -> +parse_endpoint0(Type, [[_|_]] = [Name], _) -> {ok, {Type, unescape(Name)}}; -parse_endpoint0(Type, Rest, _) -> +parse_endpoint0(Type, Rest, _) -> {error, {invalid_destination, Type, to_url(Rest)}}. %% -------------------------------------------------------------------------- From bd7ce33737940db9682c867f0328c1267dfd4ca1 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 22 Feb 2013 16:42:45 +0000 Subject: [PATCH 37/44] Eliminate parse_endpoint/0. --- src/rabbit_routing_util.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 1516711b..86f150f3 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -18,7 +18,7 @@ -export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). -export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). --export([parse_endpoint/0, parse_endpoint/1, parse_endpoint/2]). +-export([parse_endpoint/1, parse_endpoint/2]). -export([parse_routing/1, dest_temp_queue/1]). -include("amqp_client.hrl"). @@ -35,11 +35,12 @@ all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. %% -------------------------------------------------------------------------- -parse_endpoint() -> {queue, undefined}. - parse_endpoint(Destination) -> parse_endpoint(Destination, false). +parse_endpoint(undefined, AllowAnonymousQueue) -> + parse_endpoint("/queue", AllowAnonymousQueue); + parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> parse_endpoint(unicode:characters_to_list(Destination), AllowAnonymousQueue); From 3eabf9d9eb8d0eadec8890fd10008d651e102d49 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 25 Feb 2013 14:30:25 +0000 Subject: [PATCH 38/44] Mostly renaming --- src/rabbit_routing_util.erl | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 86f150f3..b1f297da 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -79,7 +79,8 @@ ensure_endpoint(Dir, Channel, EndPoint, State) -> ensure_endpoint(Dir, Channel, EndPoint, [], State). ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, proplists:get_value(validate, Params, false)), + check_exchange(Name, Channel, + proplists:get_value(check_exchange, Params, false)), Method = queue_declare_method(#'queue.declare'{}, exchange, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; @@ -110,7 +111,8 @@ ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> {ok, Queue, State1}; ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) -> - check_exchange(Name, Channel, proplists:get_value(validate, Params, false)), + check_exchange(Name, Channel, + proplists:get_value(check_exchange, Params, false)), {ok, undefined, State}; ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> @@ -118,7 +120,10 @@ ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) when Type =:= reply_queue orelse Type =:= amqqueue -> - {ok, list_to_binary(Name), State}. + {ok, list_to_binary(Name), State}; + +ensure_endpoint(_Direction, _Ch, Endpoint, _Params, State) -> + throw(invalid_endpoint). %% -------------------------------------------------------------------------- @@ -158,8 +163,9 @@ check_exchange(_, _, false) -> check_exchange("amq." ++ _, _Channel, _Validation) -> ok; check_exchange(ExchangeName, Channel, true) -> - XDecl = #'exchange.declare'{ exchange = ExchangeName, passive = true }, - #'exchange.declare_ok'{} = rabbit_channel:call(Channel, XDecl), + XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName), + passive = true }, + #'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl), ok. queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> @@ -168,14 +174,11 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> false -> Method#'queue.declare'{auto_delete = true, exclusive = true} end, - case {Type, proplists:get_value(topic_queue_name_gen, Params)} of - {topic, DTQNG} when is_function(DTQNG) -> - Method1#'queue.declare'{queue = DTQNG()}; + case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of + {topic, SQNG} when is_function(SQNG) -> + Method1#'queue.declare'{queue = SQNG()}; _ -> - case proplists:get_value(queue_name_gen, Params) of - undefined -> Method1; - QG -> Method1#'queue.declare'{queue = QG()} - end + Method1 end. %% -------------------------------------------------------------------------- From ec4bc6b2ab2bc059f1e81839108c55e5b276ce10 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 25 Feb 2013 16:53:50 +0000 Subject: [PATCH 39/44] Treat built-in exchanges like any other and update tests --- src/rabbit_routing_util.erl | 2 -- test/test_util.erl | 7 +++---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index b1f297da..bcf52d4c 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -160,8 +160,6 @@ dest_temp_queue(_) -> none. check_exchange(_, _, false) -> ok; -check_exchange("amq." ++ _, _Channel, _Validation) -> - ok; check_exchange(ExchangeName, Channel, true) -> XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName), passive = true }, diff --git a/test/test_util.erl b/test/test_util.erl index b1998d29..087ab453 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -169,8 +169,7 @@ route_destination_test() -> %% queue without name ?assertMatch({error, {invalid_destination, queue, ""}}, parse_dest("/queue")), - ?assertMatch({ok, {queue, undefined}}, - parse_dest("/queue", [{direction, dest}, {dynamic, true}])), + ?assertMatch({ok, {queue, undefined}}, parse_dest("/queue", true)), %% topic without name ?assertMatch({error, {invalid_destination, topic, ""}}, parse_dest("/topic")), @@ -223,9 +222,9 @@ route_destination_test() -> ok. parse_dest(Destination, Params) -> - routing_util:parse_endpoint(Destination, Params). + rabbit_routing_util:parse_endpoint(Destination, Params). parse_dest(Destination) -> - routing_util:parse_endpoint(Destination). + rabbit_routing_util:parse_endpoint(Destination). %%%% %% From f781e73f2fe8124ce6893c93a3fafe016ecf1d02 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Tue, 26 Feb 2013 13:47:23 +0000 Subject: [PATCH 40/44] Reply-queues with slashes and changes to dynamic queues --- src/rabbit_routing_util.erl | 13 ++++++++++--- test/test_util.erl | 1 + 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index bcf52d4c..46087a4c 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -49,11 +49,15 @@ parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> [Name] -> {ok, {queue, unescape(Name)}}; ["", Type | Rest] - when Type =:= "exchange"; Type =:= "queue"; Type =:= "topic"; - Type =:= "temp-queue"; Type =:= "reply-queue" -> + when Type =:= "exchange" orelse Type =:= "queue" orelse + Type =:= "topic" orelse Type =:= "temp-queue" -> parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue); ["", "amq", "queue" | Rest] -> parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue); + ["", "reply-queue" = Prefix | [_|_]] -> + parse_endpoint0(reply_queue, + [lists:nthtail(2 + length(Prefix), Destination)], + AllowAnonymousQueue); _ -> {error, {unknown_destination, Destination}} end. @@ -90,12 +94,15 @@ ensure_endpoint(source, Channel, {topic, _}, Params, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(_, Channel, {queue, undefined}, Params, State) -> +ensure_endpoint(source, Channel, {queue, undefined}, Params, State) -> Method = queue_declare_method(#'queue.declare'{}, queue, Params), #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; +ensure_endpoint(dest, Channel, {queue, undefined}, Params, State) -> + {ok, undefined, State}; + ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> Params1 = rabbit_misc:pset(durable, true, Params), Queue = list_to_binary(Name), diff --git a/test/test_util.erl b/test/test_util.erl index 087ab453..bc8bb9c9 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -159,6 +159,7 @@ route_destination_test() -> %% valid reply queue ?assertMatch({ok, {reply_queue, "test"}}, parse_dest("/reply-queue/test")), + ?assertMatch({ok, {reply_queue, "test/2"}}, parse_dest("/reply-queue/test/2")), %% valid exchange with pattern ?assertMatch({ok, {exchange, {"test", "pattern"}}}, From bcf9114f94c54d864429c557d96c040794d7279d Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Tue, 26 Feb 2013 14:42:19 +0000 Subject: [PATCH 41/44] Change dynamic subscriptions --- src/rabbit_routing_util.erl | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 46087a4c..85ac6558 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -94,13 +94,7 @@ ensure_endpoint(source, Channel, {topic, _}, Params, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(source, Channel, {queue, undefined}, Params, State) -> - Method = queue_declare_method(#'queue.declare'{}, queue, Params), - #'queue.declare_ok'{queue = Queue} = - amqp_channel:call(Channel, Method), - {ok, Queue, State}; - -ensure_endpoint(dest, Channel, {queue, undefined}, Params, State) -> +ensure_endpoint(_Dir, Channel, {queue, undefined}, _Params, State) -> {ok, undefined, State}; ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> From 56f45ca16cb1ed6985d58d2bfc2a120612cbfe53 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Tue, 26 Feb 2013 14:47:06 +0000 Subject: [PATCH 42/44] Warnings --- src/rabbit_routing_util.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index 85ac6558..bd0519a7 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -94,7 +94,7 @@ ensure_endpoint(source, Channel, {topic, _}, Params, State) -> #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), {ok, Queue, State}; -ensure_endpoint(_Dir, Channel, {queue, undefined}, _Params, State) -> +ensure_endpoint(_Dir, _Channel, {queue, undefined}, _Params, State) -> {ok, undefined, State}; ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> @@ -123,7 +123,7 @@ ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) when Type =:= reply_queue orelse Type =:= amqqueue -> {ok, list_to_binary(Name), State}; -ensure_endpoint(_Direction, _Ch, Endpoint, _Params, State) -> +ensure_endpoint(_Direction, _Ch, _Endpoint, _Params, _State) -> throw(invalid_endpoint). %% -------------------------------------------------------------------------- From 771cf29c4dccba2929dcf3b5751c1761d1a1cf5b Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 7 Mar 2013 12:14:49 +0000 Subject: [PATCH 43/44] API change --- src/rabbit_routing_util.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl index bd0519a7..c8f49549 100644 --- a/src/rabbit_routing_util.erl +++ b/src/rabbit_routing_util.erl @@ -124,7 +124,7 @@ ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) {ok, list_to_binary(Name), State}; ensure_endpoint(_Direction, _Ch, _Endpoint, _Params, _State) -> - throw(invalid_endpoint). + {error, invalid_endpoint}. %% -------------------------------------------------------------------------- From 7833caa4cdc98dba48307902265f67522631dbb0 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 14 Mar 2013 11:04:34 +0000 Subject: [PATCH 44/44] Add rabbit_policy_validator --- rabbit_common.app.in | 1 + 1 file changed, 1 insertion(+) diff --git a/rabbit_common.app.in b/rabbit_common.app.in index 5e91d879..2df650e7 100644 --- a/rabbit_common.app.in +++ b/rabbit_common.app.in @@ -25,6 +25,7 @@ rabbit_msg_store_index, rabbit_net, rabbit_nodes, + rabbit_policy_validator, rabbit_reader, rabbit_writer, rabbit_event,