diff --git a/include/rabbit_routing_prefixes.hrl b/include/rabbit_routing_prefixes.hrl new file mode 100644 index 00000000..c32b8fbf --- /dev/null +++ b/include/rabbit_routing_prefixes.hrl @@ -0,0 +1,24 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved. +%% + +-define(QUEUE_PREFIX, "/queue"). +-define(TOPIC_PREFIX, "/topic"). +-define(EXCHANGE_PREFIX, "/exchange"). +-define(AMQQUEUE_PREFIX, "/amq/queue"). +-define(TEMP_QUEUE_PREFIX, "/temp-queue"). +%% reply queues names can have slashes in the content so no further +%% parsing happens. +-define(REPLY_QUEUE_PREFIX, "/reply-queue/"). diff --git a/rabbit_common.app.in b/rabbit_common.app.in index 5e91d879..2df650e7 100644 --- a/rabbit_common.app.in +++ b/rabbit_common.app.in @@ -25,6 +25,7 @@ rabbit_msg_store_index, rabbit_net, rabbit_nodes, + rabbit_policy_validator, rabbit_reader, rabbit_writer, rabbit_event, diff --git a/src/amqp_channel.erl b/src/amqp_channel.erl index 758de231..5aa74d2b 100644 --- a/src/amqp_channel.erl +++ b/src/amqp_channel.erl @@ -214,7 +214,7 @@ next_publish_seqno(Channel) -> %% Channel = pid() %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker. Note, when called on a -%% non-Confirm channel, waitForConfirms returns true immediately. +%% non-Confirm channel, waitForConfirms returns an error. wait_for_confirms(Channel) -> wait_for_confirms(Channel, infinity). @@ -224,10 +224,13 @@ wait_for_confirms(Channel) -> %% Timeout = non_neg_integer() | 'infinity' %% @doc Wait until all messages published since the last call have %% been either ack'd or nack'd by the broker or the timeout expires. -%% Note, when called on a non-Confirm channel, waitForConfirms returns -%% true immediately. +%% Note, when called on a non-Confirm channel, waitForConfirms throws +%% an exception. wait_for_confirms(Channel, Timeout) -> - gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity). + case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of + {error, Reason} -> throw(Reason); + Other -> Other + end. %% @spec (Channel) -> true %% where @@ -709,23 +712,19 @@ handle_method_from_server1( {noreply, State}; handle_method_from_server1(#'basic.ack'{} = BasicAck, none, #state{confirm_handler = none} = State) -> - ?LOG_WARN("Channel (~p): received ~p but there is no " - "confirm handler registered~n", [self(), BasicAck]), {noreply, update_confirm_set(BasicAck, State)}; -handle_method_from_server1( - #'basic.ack'{} = BasicAck, none, - #state{confirm_handler = {ConfirmHandler, _Ref}} = State) -> - ConfirmHandler ! BasicAck, +handle_method_from_server1(#'basic.ack'{} = BasicAck, none, + #state{confirm_handler = {CH, _Ref}} = State) -> + CH ! BasicAck, {noreply, update_confirm_set(BasicAck, State)}; handle_method_from_server1(#'basic.nack'{} = BasicNack, none, #state{confirm_handler = none} = State) -> ?LOG_WARN("Channel (~p): received ~p but there is no " "confirm handler registered~n", [self(), BasicNack]), {noreply, update_confirm_set(BasicNack, State)}; -handle_method_from_server1( - #'basic.nack'{} = BasicNack, none, - #state{confirm_handler = {ConfirmHandler, _Ref}} = State) -> - ConfirmHandler ! BasicNack, +handle_method_from_server1(#'basic.nack'{} = BasicNack, none, + #state{confirm_handler = {CH, _Ref}} = State) -> + CH ! BasicNack, {noreply, update_confirm_set(BasicNack, State)}; handle_method_from_server1(Method, none, State) -> @@ -884,6 +883,8 @@ notify_confirm_waiters(State = #state{waiting_set = WSet, State#state{waiting_set = gb_trees:empty(), only_acks_received = true}. +handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) -> + {reply, {error, not_in_confirm_mode}, State}; handle_wait_for_confirms(From, Timeout, State = #state{unconfirmed_set = USet, waiting_set = WSet}) -> diff --git a/src/amqp_connection.erl b/src/amqp_connection.erl index c2b09d4c..60c50c64 100644 --- a/src/amqp_connection.erl +++ b/src/amqp_connection.erl @@ -73,6 +73,7 @@ -export([start/1, close/1, close/2, close/3]). -export([error_atom/1]). -export([info/2, info_keys/1, info_keys/0]). +-export([socket_adapter_info/2]). -define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}). @@ -332,3 +333,8 @@ info_keys(ConnectionPid) -> %% atoms that can be used for a certain connection, use info_keys/1. info_keys() -> amqp_gen_connection:info_keys(). + +%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{} +%% based on the socket for the protocol given. +socket_adapter_info(Sock, Protocol) -> + amqp_direct_connection:socket_adapter_info(Sock, Protocol). diff --git a/src/amqp_direct_connection.erl b/src/amqp_direct_connection.erl index 88e56c4c..c003aa61 100644 --- a/src/amqp_direct_connection.erl +++ b/src/amqp_direct_connection.erl @@ -26,6 +26,8 @@ -export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2, info_keys/0, handle_message/2, closing/3, channels_terminated/1]). +-export([socket_adapter_info/2]). + -record(state, {node, user, vhost, @@ -153,3 +155,52 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) -> ensure_adapter_info(A#amqp_adapter_info{name = Name}); ensure_adapter_info(Info) -> Info. + +socket_adapter_info(Sock, Protocol) -> + {PeerHost, PeerPort, Host, Port} = + case rabbit_net:socket_ends(Sock, inbound) of + {ok, Res} -> Res; + _ -> {unknown, unknown, unknown, unknown} + end, + Name = case rabbit_net:connection_string(Sock, inbound) of + {ok, Res1} -> Res1; + _ -> unknown + end, + #amqp_adapter_info{protocol = Protocol, + name = list_to_binary(Name), + host = Host, + port = Port, + peer_host = PeerHost, + peer_port = PeerPort, + additional_info = maybe_ssl_info(Sock)}. + +maybe_ssl_info(Sock) -> + case rabbit_net:is_ssl(Sock) of + true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock); + false -> [{ssl, false}] + end. + +ssl_info(Sock) -> + {Protocol, KeyExchange, Cipher, Hash} = + case rabbit_net:ssl_info(Sock) of + {ok, {P, {K, C, H}}} -> {P, K, C, H}; + {ok, {P, {K, C, H, _}}} -> {P, K, C, H}; + _ -> {unknown, unknown, unknown, unknown} + end, + [{ssl_protocol, Protocol}, + {ssl_key_exchange, KeyExchange}, + {ssl_cipher, Cipher}, + {ssl_hash, Hash}]. + +ssl_cert_info(Sock) -> + case rabbit_net:peercert(Sock) of + {ok, Cert} -> + [{peer_cert_issuer, list_to_binary( + rabbit_ssl:peer_cert_issuer(Cert))}, + {peer_cert_subject, list_to_binary( + rabbit_ssl:peer_cert_subject(Cert))}, + {peer_cert_validity, list_to_binary( + rabbit_ssl:peer_cert_validity(Cert))}]; + _ -> + [] + end. diff --git a/src/amqp_gen_consumer.erl b/src/amqp_gen_consumer.erl index b937a488..2fd471dd 100644 --- a/src/amqp_gen_consumer.erl +++ b/src/amqp_gen_consumer.erl @@ -34,7 +34,7 @@ -export([start_link/2, call_consumer/2, call_consumer/3]). -export([behaviour_info/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_info/2]). + handle_info/2, prioritise_info/3]). -record(state, {module, module_state}). @@ -193,8 +193,8 @@ init([ConsumerModule, ExtraParams]) -> ignore end. -prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1; -prioritise_info(_, _State) -> 0. +prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _Len, _State) -> 1; +prioritise_info(_, _Len, _State) -> 0. handle_call({consumer_call, Msg}, From, State = #state{module = ConsumerModule, diff --git a/src/rabbit_routing_util.erl b/src/rabbit_routing_util.erl new file mode 100644 index 00000000..c8f49549 --- /dev/null +++ b/src/rabbit_routing_util.erl @@ -0,0 +1,196 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved. +%% + +-module(rabbit_routing_util). + +-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). +-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). +-export([parse_endpoint/1, parse_endpoint/2]). +-export([parse_routing/1, dest_temp_queue/1]). + +-include("amqp_client.hrl"). +-include("rabbit_routing_prefixes.hrl"). + +%%---------------------------------------------------------------------------- + +init_state() -> sets:new(). + +dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX, + ?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX]. + +all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()]. + +%% -------------------------------------------------------------------------- + +parse_endpoint(Destination) -> + parse_endpoint(Destination, false). + +parse_endpoint(undefined, AllowAnonymousQueue) -> + parse_endpoint("/queue", AllowAnonymousQueue); + +parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) -> + parse_endpoint(unicode:characters_to_list(Destination), + AllowAnonymousQueue); +parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) -> + case re:split(Destination, "/", [{return, list}]) of + [Name] -> + {ok, {queue, unescape(Name)}}; + ["", Type | Rest] + when Type =:= "exchange" orelse Type =:= "queue" orelse + Type =:= "topic" orelse Type =:= "temp-queue" -> + parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue); + ["", "amq", "queue" | Rest] -> + parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue); + ["", "reply-queue" = Prefix | [_|_]] -> + parse_endpoint0(reply_queue, + [lists:nthtail(2 + length(Prefix), Destination)], + AllowAnonymousQueue); + _ -> + {error, {unknown_destination, Destination}} + end. + +parse_endpoint0(exchange, ["" | _] = Rest, _) -> + {error, {invalid_destination, exchange, to_url(Rest)}}; +parse_endpoint0(exchange, [Name], _) -> + {ok, {exchange, {unescape(Name), undefined}}}; +parse_endpoint0(exchange, [Name, Pattern], _) -> + {ok, {exchange, {unescape(Name), unescape(Pattern)}}}; +parse_endpoint0(queue, [], false) -> + {error, {invalid_destination, queue, []}}; +parse_endpoint0(queue, [], true) -> + {ok, {queue, undefined}}; +parse_endpoint0(Type, [[_|_]] = [Name], _) -> + {ok, {Type, unescape(Name)}}; +parse_endpoint0(Type, Rest, _) -> + {error, {invalid_destination, Type, to_url(Rest)}}. + +%% -------------------------------------------------------------------------- + +ensure_endpoint(Dir, Channel, EndPoint, State) -> + ensure_endpoint(Dir, Channel, EndPoint, [], State). + +ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) -> + check_exchange(Name, Channel, + proplists:get_value(check_exchange, Params, false)), + Method = queue_declare_method(#'queue.declare'{}, exchange, Params), + #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), + {ok, Queue, State}; + +ensure_endpoint(source, Channel, {topic, _}, Params, State) -> + Method = queue_declare_method(#'queue.declare'{}, topic, Params), + #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method), + {ok, Queue, State}; + +ensure_endpoint(_Dir, _Channel, {queue, undefined}, _Params, State) -> + {ok, undefined, State}; + +ensure_endpoint(_, Channel, {queue, Name}, Params, State) -> + Params1 = rabbit_misc:pset(durable, true, Params), + Queue = list_to_binary(Name), + State1 = case sets:is_element(Queue, State) of + true -> State; + _ -> Method = queue_declare_method( + #'queue.declare'{queue = Queue, + nowait = true}, + queue, Params1), + amqp_channel:cast(Channel, Method), + sets:add_element(Queue, State) + end, + {ok, Queue, State1}; + +ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) -> + check_exchange(Name, Channel, + proplists:get_value(check_exchange, Params, false)), + {ok, undefined, State}; + +ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) -> + {ok, undefined, State}; + +ensure_endpoint(_, _Ch, {Type, Name}, _Params, State) + when Type =:= reply_queue orelse Type =:= amqqueue -> + {ok, list_to_binary(Name), State}; + +ensure_endpoint(_Direction, _Ch, _Endpoint, _Params, _State) -> + {error, invalid_endpoint}. + +%% -------------------------------------------------------------------------- + +ensure_binding(QueueBin, {"", Queue}, _Channel) -> + %% i.e., we should only be asked to bind to the default exchange a + %% queue with its own name + QueueBin = list_to_binary(Queue), + ok; +ensure_binding(Queue, {Exchange, RoutingKey}, Channel) -> + #'queue.bind_ok'{} = + amqp_channel:call(Channel, + #'queue.bind'{ + queue = Queue, + exchange = list_to_binary(Exchange), + routing_key = list_to_binary(RoutingKey)}), + ok. + +%% -------------------------------------------------------------------------- + +parse_routing({exchange, {Name, undefined}}) -> + {Name, ""}; +parse_routing({exchange, {Name, Pattern}}) -> + {Name, Pattern}; +parse_routing({topic, Name}) -> + {"amq.topic", Name}; +parse_routing({Type, Name}) + when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue -> + {"", Name}. + +dest_temp_queue({temp_queue, Name}) -> Name; +dest_temp_queue(_) -> none. + +%% -------------------------------------------------------------------------- + +check_exchange(_, _, false) -> + ok; +check_exchange(ExchangeName, Channel, true) -> + XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName), + passive = true }, + #'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl), + ok. + +queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> + Method1 = case proplists:get_value(durable, Params, false) of + true -> Method#'queue.declare'{durable = true}; + false -> Method#'queue.declare'{auto_delete = true, + exclusive = true} + end, + case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of + {topic, SQNG} when is_function(SQNG) -> + Method1#'queue.declare'{queue = SQNG()}; + _ -> + Method1 + end. + +%% -------------------------------------------------------------------------- + +to_url([]) -> []; +to_url(Lol) -> "/" ++ string:join(Lol, "/"). + +atomise(Name) when is_list(Name) -> + list_to_atom(re:replace(Name, "-", "_", [{return,list}, global])). + +unescape(Str) -> unescape(Str, []). + +unescape("%2F" ++ Str, Acc) -> unescape(Str, [$/ | Acc]); +unescape([C | Str], Acc) -> unescape(Str, [C | Acc]); +unescape([], Acc) -> lists:reverse(Acc). + diff --git a/test/amqp_client_SUITE.erl b/test/amqp_client_SUITE.erl index 222efdb3..f4479fdc 100644 --- a/test/amqp_client_SUITE.erl +++ b/test/amqp_client_SUITE.erl @@ -37,6 +37,7 @@ %%--------------------------------------------------------------------------- amqp_uri_parse_test_() -> ?RUN([]). +route_destination_test_() -> ?RUN([]). basic_get_test_() -> ?RUN([]). basic_get_ipv6_test_() -> ?RUN([]). basic_return_test_() -> ?RUN([]). @@ -65,7 +66,7 @@ pub_and_close_test_() -> ?RUN([]). channel_tune_negotiation_test_() -> ?RUN([]). confirm_test_() -> ?RUN([]). confirm_barrier_test_() -> ?RUN([]). -confirm_barrier_nop_test_() -> ?RUN([]). +confirm_select_before_wait_test_() -> ?RUN([]). confirm_barrier_timeout_test_() -> ?RUN([]). confirm_barrier_die_timeout_test_() -> ?RUN([]). default_consumer_test_() -> ?RUN([]). diff --git a/test/test_util.erl b/test/test_util.erl index 419791b6..bc8bb9c9 100644 --- a/test/test_util.erl +++ b/test/test_util.erl @@ -140,6 +140,93 @@ amqp_uri_parse_test() -> ok. +%%-------------------------------------------------------------------- +%% Destination Parsing Tests +%%-------------------------------------------------------------------- + +route_destination_test() -> + %% valid queue + ?assertMatch({ok, {queue, "test"}}, parse_dest("/queue/test")), + + %% valid topic + ?assertMatch({ok, {topic, "test"}}, parse_dest("/topic/test")), + + %% valid exchange + ?assertMatch({ok, {exchange, {"test", undefined}}}, parse_dest("/exchange/test")), + + %% valid temp queue + ?assertMatch({ok, {temp_queue, "test"}}, parse_dest("/temp-queue/test")), + + %% valid reply queue + ?assertMatch({ok, {reply_queue, "test"}}, parse_dest("/reply-queue/test")), + ?assertMatch({ok, {reply_queue, "test/2"}}, parse_dest("/reply-queue/test/2")), + + %% valid exchange with pattern + ?assertMatch({ok, {exchange, {"test", "pattern"}}}, + parse_dest("/exchange/test/pattern")), + + %% valid pre-declared queue + ?assertMatch({ok, {amqqueue, "test"}}, parse_dest("/amq/queue/test")), + + %% queue without name + ?assertMatch({error, {invalid_destination, queue, ""}}, parse_dest("/queue")), + ?assertMatch({ok, {queue, undefined}}, parse_dest("/queue", true)), + + %% topic without name + ?assertMatch({error, {invalid_destination, topic, ""}}, parse_dest("/topic")), + + %% exchange without name + ?assertMatch({error, {invalid_destination, exchange, ""}}, + parse_dest("/exchange")), + + %% exchange default name + ?assertMatch({error, {invalid_destination, exchange, "//foo"}}, + parse_dest("/exchange//foo")), + + %% amqqueue without name + ?assertMatch({error, {invalid_destination, amqqueue, ""}}, + parse_dest("/amq/queue")), + + %% queue without name with trailing slash + ?assertMatch({error, {invalid_destination, queue, "/"}}, parse_dest("/queue/")), + + %% topic without name with trailing slash + ?assertMatch({error, {invalid_destination, topic, "/"}}, parse_dest("/topic/")), + + %% exchange without name with trailing slash + ?assertMatch({error, {invalid_destination, exchange, "/"}}, + parse_dest("/exchange/")), + + %% queue with invalid name + ?assertMatch({error, {invalid_destination, queue, "/foo/bar"}}, + parse_dest("/queue/foo/bar")), + + %% topic with invalid name + ?assertMatch({error, {invalid_destination, topic, "/foo/bar"}}, + parse_dest("/topic/foo/bar")), + + %% exchange with invalid name + ?assertMatch({error, {invalid_destination, exchange, "/foo/bar/baz"}}, + parse_dest("/exchange/foo/bar/baz")), + + %% unknown destination + ?assertMatch({error, {unknown_destination, "/blah/boo"}}, + parse_dest("/blah/boo")), + + %% queue with escaped name + ?assertMatch({ok, {queue, "te/st"}}, parse_dest("/queue/te%2Fst")), + + %% valid exchange with escaped name and pattern + ?assertMatch({ok, {exchange, {"te/st", "pa/tt/ern"}}}, + parse_dest("/exchange/te%2Fst/pa%2Ftt%2Fern")), + + ok. + +parse_dest(Destination, Params) -> + rabbit_routing_util:parse_endpoint(Destination, Params). +parse_dest(Destination) -> + rabbit_routing_util:parse_endpoint(Destination). + %%%% %% %% This is an example of how the client interaction should work @@ -640,13 +727,14 @@ confirm_barrier_test() -> true = amqp_channel:wait_for_confirms(Channel), teardown(Connection, Channel). -confirm_barrier_nop_test() -> +confirm_select_before_wait_test() -> {ok, Connection} = new_connection(), {ok, Channel} = amqp_connection:open_channel(Connection), - true = amqp_channel:wait_for_confirms(Channel), - amqp_channel:call(Channel, #'basic.publish'{routing_key = <<"whoosh">>}, - #amqp_msg{payload = <<"foo">>}), - true = amqp_channel:wait_for_confirms(Channel), + try amqp_channel:wait_for_confirms(Channel) of + _ -> exit(success_despite_lack_of_confirm_mode) + catch + not_in_confirm_mode -> ok + end, teardown(Connection, Channel). confirm_barrier_timeout_test() ->