Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
stable to default
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Apr 25, 2013
2 parents aaf0801 + ceb2520 commit 4845004
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 23 deletions.
24 changes: 24 additions & 0 deletions include/rabbit_routing_prefixes.hrl
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2011-2013 VMware, Inc. All rights reserved.
%%

-define(QUEUE_PREFIX, "/queue").
-define(TOPIC_PREFIX, "/topic").
-define(EXCHANGE_PREFIX, "/exchange").
-define(AMQQUEUE_PREFIX, "/amq/queue").
-define(TEMP_QUEUE_PREFIX, "/temp-queue").
%% reply queues names can have slashes in the content so no further
%% parsing happens.
-define(REPLY_QUEUE_PREFIX, "/reply-queue/").
1 change: 1 addition & 0 deletions rabbit_common.app.in
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
rabbit_msg_store_index,
rabbit_net,
rabbit_nodes,
rabbit_policy_validator,
rabbit_reader,
rabbit_writer,
rabbit_event,
Expand Down
29 changes: 15 additions & 14 deletions src/amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ next_publish_seqno(Channel) ->
%% Channel = pid()
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker. Note, when called on a
%% non-Confirm channel, waitForConfirms returns true immediately.
%% non-Confirm channel, waitForConfirms returns an error.
wait_for_confirms(Channel) ->
wait_for_confirms(Channel, infinity).

Expand All @@ -224,10 +224,13 @@ wait_for_confirms(Channel) ->
%% Timeout = non_neg_integer() | 'infinity'
%% @doc Wait until all messages published since the last call have
%% been either ack'd or nack'd by the broker or the timeout expires.
%% Note, when called on a non-Confirm channel, waitForConfirms returns
%% true immediately.
%% Note, when called on a non-Confirm channel, waitForConfirms throws
%% an exception.
wait_for_confirms(Channel, Timeout) ->
gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity).
case gen_server:call(Channel, {wait_for_confirms, Timeout}, infinity) of
{error, Reason} -> throw(Reason);
Other -> Other
end.

%% @spec (Channel) -> true
%% where
Expand Down Expand Up @@ -709,23 +712,19 @@ handle_method_from_server1(
{noreply, State};
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
#state{confirm_handler = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered~n", [self(), BasicAck]),
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(
#'basic.ack'{} = BasicAck, none,
#state{confirm_handler = {ConfirmHandler, _Ref}} = State) ->
ConfirmHandler ! BasicAck,
handle_method_from_server1(#'basic.ack'{} = BasicAck, none,
#state{confirm_handler = {CH, _Ref}} = State) ->
CH ! BasicAck,
{noreply, update_confirm_set(BasicAck, State)};
handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
#state{confirm_handler = none} = State) ->
?LOG_WARN("Channel (~p): received ~p but there is no "
"confirm handler registered~n", [self(), BasicNack]),
{noreply, update_confirm_set(BasicNack, State)};
handle_method_from_server1(
#'basic.nack'{} = BasicNack, none,
#state{confirm_handler = {ConfirmHandler, _Ref}} = State) ->
ConfirmHandler ! BasicNack,
handle_method_from_server1(#'basic.nack'{} = BasicNack, none,
#state{confirm_handler = {CH, _Ref}} = State) ->
CH ! BasicNack,
{noreply, update_confirm_set(BasicNack, State)};

handle_method_from_server1(Method, none, State) ->
Expand Down Expand Up @@ -884,6 +883,8 @@ notify_confirm_waiters(State = #state{waiting_set = WSet,
State#state{waiting_set = gb_trees:empty(),
only_acks_received = true}.

handle_wait_for_confirms(_From, _Timeout, State = #state{next_pub_seqno = 0}) ->
{reply, {error, not_in_confirm_mode}, State};
handle_wait_for_confirms(From, Timeout,
State = #state{unconfirmed_set = USet,
waiting_set = WSet}) ->
Expand Down
6 changes: 6 additions & 0 deletions src/amqp_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
-export([start/1, close/1, close/2, close/3]).
-export([error_atom/1]).
-export([info/2, info_keys/1, info_keys/0]).
-export([socket_adapter_info/2]).

-define(DEFAULT_CONSUMER, {amqp_selective_consumer, []}).

Expand Down Expand Up @@ -332,3 +333,8 @@ info_keys(ConnectionPid) ->
%% atoms that can be used for a certain connection, use info_keys/1.
info_keys() ->
amqp_gen_connection:info_keys().

%% @doc Takes a socket and a protocol, returns an #amqp_adapter_info{}
%% based on the socket for the protocol given.
socket_adapter_info(Sock, Protocol) ->
amqp_direct_connection:socket_adapter_info(Sock, Protocol).
51 changes: 51 additions & 0 deletions src/amqp_direct_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
-export([init/1, terminate/2, connect/4, do/2, open_channel_args/1, i/2,
info_keys/0, handle_message/2, closing/3, channels_terminated/1]).

-export([socket_adapter_info/2]).

-record(state, {node,
user,
vhost,
Expand Down Expand Up @@ -153,3 +155,52 @@ ensure_adapter_info(A = #amqp_adapter_info{name = unknown}) ->
ensure_adapter_info(A#amqp_adapter_info{name = Name});

ensure_adapter_info(Info) -> Info.

socket_adapter_info(Sock, Protocol) ->
{PeerHost, PeerPort, Host, Port} =
case rabbit_net:socket_ends(Sock, inbound) of
{ok, Res} -> Res;
_ -> {unknown, unknown, unknown, unknown}
end,
Name = case rabbit_net:connection_string(Sock, inbound) of
{ok, Res1} -> Res1;
_ -> unknown
end,
#amqp_adapter_info{protocol = Protocol,
name = list_to_binary(Name),
host = Host,
port = Port,
peer_host = PeerHost,
peer_port = PeerPort,
additional_info = maybe_ssl_info(Sock)}.

maybe_ssl_info(Sock) ->
case rabbit_net:is_ssl(Sock) of
true -> [{ssl, true}] ++ ssl_info(Sock) ++ ssl_cert_info(Sock);
false -> [{ssl, false}]
end.

ssl_info(Sock) ->
{Protocol, KeyExchange, Cipher, Hash} =
case rabbit_net:ssl_info(Sock) of
{ok, {P, {K, C, H}}} -> {P, K, C, H};
{ok, {P, {K, C, H, _}}} -> {P, K, C, H};
_ -> {unknown, unknown, unknown, unknown}
end,
[{ssl_protocol, Protocol},
{ssl_key_exchange, KeyExchange},
{ssl_cipher, Cipher},
{ssl_hash, Hash}].

ssl_cert_info(Sock) ->
case rabbit_net:peercert(Sock) of
{ok, Cert} ->
[{peer_cert_issuer, list_to_binary(
rabbit_ssl:peer_cert_issuer(Cert))},
{peer_cert_subject, list_to_binary(
rabbit_ssl:peer_cert_subject(Cert))},
{peer_cert_validity, list_to_binary(
rabbit_ssl:peer_cert_validity(Cert))}];
_ ->
[]
end.
6 changes: 3 additions & 3 deletions src/amqp_gen_consumer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
-export([start_link/2, call_consumer/2, call_consumer/3]).
-export([behaviour_info/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_info/2]).
handle_info/2, prioritise_info/3]).

-record(state, {module,
module_state}).
Expand Down Expand Up @@ -193,8 +193,8 @@ init([ConsumerModule, ExtraParams]) ->
ignore
end.

prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _State) -> 1;
prioritise_info(_, _State) -> 0.
prioritise_info({'DOWN', _MRef, process, _Pid, _Info}, _Len, _State) -> 1;
prioritise_info(_, _Len, _State) -> 0.

handle_call({consumer_call, Msg}, From,
State = #state{module = ConsumerModule,
Expand Down
196 changes: 196 additions & 0 deletions src/rabbit_routing_util.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License at
%% http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
%% License for the specific language governing rights and limitations
%% under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is VMware, Inc.
%% Copyright (c) 2013-2013 VMware, Inc. All rights reserved.
%%

-module(rabbit_routing_util).

-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]).
-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]).
-export([parse_endpoint/1, parse_endpoint/2]).
-export([parse_routing/1, dest_temp_queue/1]).

-include("amqp_client.hrl").
-include("rabbit_routing_prefixes.hrl").

%%----------------------------------------------------------------------------

init_state() -> sets:new().

dest_prefixes() -> [?EXCHANGE_PREFIX, ?TOPIC_PREFIX, ?QUEUE_PREFIX,
?AMQQUEUE_PREFIX, ?REPLY_QUEUE_PREFIX].

all_dest_prefixes() -> [?TEMP_QUEUE_PREFIX | dest_prefixes()].

%% --------------------------------------------------------------------------

parse_endpoint(Destination) ->
parse_endpoint(Destination, false).

parse_endpoint(undefined, AllowAnonymousQueue) ->
parse_endpoint("/queue", AllowAnonymousQueue);

parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) ->
parse_endpoint(unicode:characters_to_list(Destination),
AllowAnonymousQueue);
parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) ->
case re:split(Destination, "/", [{return, list}]) of
[Name] ->
{ok, {queue, unescape(Name)}};
["", Type | Rest]
when Type =:= "exchange" orelse Type =:= "queue" orelse
Type =:= "topic" orelse Type =:= "temp-queue" ->
parse_endpoint0(atomise(Type), Rest, AllowAnonymousQueue);
["", "amq", "queue" | Rest] ->
parse_endpoint0(amqqueue, Rest, AllowAnonymousQueue);
["", "reply-queue" = Prefix | [_|_]] ->
parse_endpoint0(reply_queue,
[lists:nthtail(2 + length(Prefix), Destination)],
AllowAnonymousQueue);
_ ->
{error, {unknown_destination, Destination}}
end.

parse_endpoint0(exchange, ["" | _] = Rest, _) ->
{error, {invalid_destination, exchange, to_url(Rest)}};
parse_endpoint0(exchange, [Name], _) ->
{ok, {exchange, {unescape(Name), undefined}}};
parse_endpoint0(exchange, [Name, Pattern], _) ->
{ok, {exchange, {unescape(Name), unescape(Pattern)}}};
parse_endpoint0(queue, [], false) ->
{error, {invalid_destination, queue, []}};
parse_endpoint0(queue, [], true) ->
{ok, {queue, undefined}};
parse_endpoint0(Type, [[_|_]] = [Name], _) ->
{ok, {Type, unescape(Name)}};
parse_endpoint0(Type, Rest, _) ->
{error, {invalid_destination, Type, to_url(Rest)}}.

%% --------------------------------------------------------------------------

ensure_endpoint(Dir, Channel, EndPoint, State) ->
ensure_endpoint(Dir, Channel, EndPoint, [], State).

ensure_endpoint(source, Channel, {exchange, {Name, _}}, Params, State) ->
check_exchange(Name, Channel,
proplists:get_value(check_exchange, Params, false)),
Method = queue_declare_method(#'queue.declare'{}, exchange, Params),
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
{ok, Queue, State};

ensure_endpoint(source, Channel, {topic, _}, Params, State) ->
Method = queue_declare_method(#'queue.declare'{}, topic, Params),
#'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, Method),
{ok, Queue, State};

ensure_endpoint(_Dir, _Channel, {queue, undefined}, _Params, State) ->
{ok, undefined, State};

ensure_endpoint(_, Channel, {queue, Name}, Params, State) ->
Params1 = rabbit_misc:pset(durable, true, Params),
Queue = list_to_binary(Name),
State1 = case sets:is_element(Queue, State) of
true -> State;
_ -> Method = queue_declare_method(
#'queue.declare'{queue = Queue,
nowait = true},
queue, Params1),
amqp_channel:cast(Channel, Method),
sets:add_element(Queue, State)
end,
{ok, Queue, State1};

ensure_endpoint(dest, Channel, {exchange, {Name, _}}, Params, State) ->
check_exchange(Name, Channel,
proplists:get_value(check_exchange, Params, false)),
{ok, undefined, State};

ensure_endpoint(dest, _Ch, {topic, _}, _Params, State) ->
{ok, undefined, State};

ensure_endpoint(_, _Ch, {Type, Name}, _Params, State)
when Type =:= reply_queue orelse Type =:= amqqueue ->
{ok, list_to_binary(Name), State};

ensure_endpoint(_Direction, _Ch, _Endpoint, _Params, _State) ->
{error, invalid_endpoint}.

%% --------------------------------------------------------------------------

ensure_binding(QueueBin, {"", Queue}, _Channel) ->
%% i.e., we should only be asked to bind to the default exchange a
%% queue with its own name
QueueBin = list_to_binary(Queue),
ok;
ensure_binding(Queue, {Exchange, RoutingKey}, Channel) ->
#'queue.bind_ok'{} =
amqp_channel:call(Channel,
#'queue.bind'{
queue = Queue,
exchange = list_to_binary(Exchange),
routing_key = list_to_binary(RoutingKey)}),
ok.

%% --------------------------------------------------------------------------

parse_routing({exchange, {Name, undefined}}) ->
{Name, ""};
parse_routing({exchange, {Name, Pattern}}) ->
{Name, Pattern};
parse_routing({topic, Name}) ->
{"amq.topic", Name};
parse_routing({Type, Name})
when Type =:= queue orelse Type =:= reply_queue orelse Type =:= amqqueue ->
{"", Name}.

dest_temp_queue({temp_queue, Name}) -> Name;
dest_temp_queue(_) -> none.

%% --------------------------------------------------------------------------

check_exchange(_, _, false) ->
ok;
check_exchange(ExchangeName, Channel, true) ->
XDecl = #'exchange.declare'{ exchange = list_to_binary(ExchangeName),
passive = true },
#'exchange.declare_ok'{} = amqp_channel:call(Channel, XDecl),
ok.

queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
Method1 = case proplists:get_value(durable, Params, false) of
true -> Method#'queue.declare'{durable = true};
false -> Method#'queue.declare'{auto_delete = true,
exclusive = true}
end,
case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of
{topic, SQNG} when is_function(SQNG) ->
Method1#'queue.declare'{queue = SQNG()};
_ ->
Method1
end.

%% --------------------------------------------------------------------------

to_url([]) -> [];
to_url(Lol) -> "/" ++ string:join(Lol, "/").

atomise(Name) when is_list(Name) ->
list_to_atom(re:replace(Name, "-", "_", [{return,list}, global])).

unescape(Str) -> unescape(Str, []).

unescape("%2F" ++ Str, Acc) -> unescape(Str, [$/ | Acc]);
unescape([C | Str], Acc) -> unescape(Str, [C | Acc]);
unescape([], Acc) -> lists:reverse(Acc).

Loading

0 comments on commit 4845004

Please sign in to comment.