Skip to content
This repository has been archived by the owner on Nov 18, 2020. It is now read-only.

Commit

Permalink
Merging bug22994 into default
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Sackman committed Jul 22, 2010
2 parents 369e2e2 + ea81f1b commit 33d9555
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 76 deletions.
18 changes: 4 additions & 14 deletions src/amqp_channel_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,12 @@ start_channel_infrastructure(network, ChannelNumber, {Sock, MainReader}) ->
FramingPid = rabbit_framing_channel:start_link(fun(X) -> X end, [self()]),
WriterPid = rabbit_writer:start_link(Sock, ChannelNumber, ?FRAME_MIN_SIZE),
case MainReader of
none ->
ok;
_ ->
MainReader ! {register_framing_channel, ChannelNumber, FramingPid,
self()},
MonitorRef = erlang:monitor(process, MainReader),
receive
registered_framing_channel ->
erlang:demonitor(MonitorRef), ok;
{'DOWN', MonitorRef, process, MainReader, _Info} ->
erlang:error(main_reader_died_while_registering_framing)
end
none -> ok;
_ -> amqp_main_reader:register_framing_channel(
MainReader, ChannelNumber, FramingPid)
end,
{FramingPid, WriterPid};
start_channel_infrastructure(
direct, ChannelNumber, {User, VHost, Collector}) ->
start_channel_infrastructure(direct, ChannelNumber, {User, VHost, Collector}) ->
Peer = rabbit_channel:start_link(ChannelNumber, self(), self(), User, VHost,
Collector),
{Peer, Peer}.
Expand Down
137 changes: 78 additions & 59 deletions src/amqp_main_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,47 +27,76 @@

-include("amqp_client.hrl").

-export([start/2]).
-behaviour(gen_server).

-export([start_link/2, register_framing_channel/3, start_heartbeat/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).

-record(mr_state, {sock,
message = none, %% none | {Type, Channel, Length}
framing_channels = amqp_channel_util:new_channel_dict()}).

start(Sock, Framing0Pid) ->
spawn_link(
fun() ->
State0 = #mr_state{sock = Sock},
State1 = register_framing_channel(0, Framing0Pid, none, State0),
{ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
main_loop(State1)
end).

main_loop(State = #mr_state{sock = Sock}) ->
receive
{inet_async, Sock, _, _} = InetAsync ->
main_loop(handle_inet_async(InetAsync, State));
{heartbeat, Heartbeat} ->
rabbit_heartbeat:start_heartbeat(Sock, Heartbeat),
main_loop(State);
{register_framing_channel, Number, Pid, Caller} ->
main_loop(register_framing_channel(Number, Pid, Caller, State));
timeout ->
?LOG_WARN("Main reader (~p) received timeout from heartbeat, "
"exiting~n", [self()]),
exit(connection_timeout);
socket_closing_timeout ->
?LOG_WARN("Main reader (~p) received socket_closing_timeout, "
"exiting~n", [self()]),
exit(socket_closing_timeout);
close ->
close(State);
{'DOWN', _MonitorRef, process, _Pid, _Info} = Down ->
main_loop(handle_down(Down, State));
Other ->
?LOG_WARN("Main reader (~p) closing: unexpected message ~p",
[self(), Other]),
exit({unexpected_message, Other})
end.
%%---------------------------------------------------------------------------
%% Interface
%%---------------------------------------------------------------------------

start_link(Sock, Framing0Pid) ->
gen_server:start_link(?MODULE, [Sock, Framing0Pid], []).

register_framing_channel(MainReaderPid, Number, FramingPid) ->
gen_server:call(MainReaderPid,
{register_framing_channel, Number, FramingPid}, infinity).

start_heartbeat(MainReaderPid, Heartbeat) ->
gen_server:cast(MainReaderPid, {heartbeat, Heartbeat}).

%%---------------------------------------------------------------------------
%% gen_server callbacks
%%---------------------------------------------------------------------------

init([Sock, Framing0Pid]) ->
State0 = #mr_state{sock = Sock},
State1 = internal_register_framing_channel(0, Framing0Pid, State0),
{ok, _Ref} = rabbit_net:async_recv(Sock, 7, infinity),
{ok, State1}.

terminate(Reason, #mr_state{sock = Sock}) ->
Nice = case Reason of
normal -> true;
shutdown -> true;
{shutdown, _} -> true;
_ -> false
end,
ok = case Nice of
true -> rabbit_net:close(Sock);
false -> ok
end.

code_change(_OldVsn, State, _Extra) ->
State.

handle_call({register_framing_channel, Number, Pid}, _From, State) ->
{reply, ok, internal_register_framing_channel(Number, Pid, State)}.

handle_cast({heartbeat, Heartbeat}, State = #mr_state{sock = Sock}) ->
rabbit_heartbeat:start_heartbeat(Sock, Heartbeat),
{noreply, State}.

handle_info({inet_async, _, _, _} = InetAsync, State) ->
handle_inet_async(InetAsync, State);
handle_info({'DOWN', _, _, _, _} = Down, State) ->
handle_down(Down, State);
handle_info(timeout, State) ->
{stop, connection_timeout, State};
handle_info(socket_closing_timeout, State) ->
{stop, socket_closing_timeout, State};
handle_info(close, State) ->
{stop, normal, State}.

%%---------------------------------------------------------------------------
%% Internal plumbing
%%---------------------------------------------------------------------------

handle_inet_async({inet_async, Sock, _, Msg},
State = #mr_state{sock = Sock,
Expand All @@ -79,19 +108,18 @@ handle_inet_async({inet_async, Sock, _, Msg},
case Msg of
{ok, <<Payload:Length/binary, ?FRAME_END>>} ->
case handle_frame(Type, Channel, Payload, State) of
closed_ok -> close(State);
closed_ok -> {stop, normal, State};
_ -> {ok, _Ref} =
rabbit_net:async_recv(Sock, 7, infinity),
State#mr_state{message = none}
{noreply, State#mr_state{message = none}}
end;
{ok, <<NewType:8, NewChannel:16, NewLength:32>>} ->
{ok, _Ref} = rabbit_net:async_recv(Sock, NewLength + 1, infinity),
State#mr_state{message = {NewType, NewChannel, NewLength}};
{noreply, State#mr_state{message={NewType, NewChannel, NewLength}}};
{error, closed} ->
exit(socket_closed);
{stop, socket_closed, State};
{error, Reason} ->
?LOG_WARN("Socket error: ~p~n", [Reason]),
exit({socket_error, Reason})
{stop, {socket_error, Reason}, State}
end.

handle_frame(Type, Channel, Payload, State) ->
Expand Down Expand Up @@ -122,29 +150,20 @@ pass_frame(Channel, Frame, #mr_state{framing_channels = Channels}) ->
rabbit_framing_channel:process(FramingPid, Frame)
end.

register_framing_channel(Number, Pid, Caller,
State = #mr_state{framing_channels = Channels}) ->
NewChannels = amqp_channel_util:register_channel(Number, Pid, Channels),
erlang:monitor(process, Pid),
case Caller of
none -> ok;
_ -> Caller ! registered_framing_channel
end,
State#mr_state{framing_channels = NewChannels}.

handle_down({'DOWN', _MonitorRef, process, Pid, Info},
State = #mr_state{framing_channels = Channels}) ->
case amqp_channel_util:is_channel_pid_registered(Pid, Channels) of
true ->
NewChannels =
amqp_channel_util:unregister_channel_pid(Pid, Channels),
State#mr_state{framing_channels = NewChannels};
{noreply, State#mr_state{framing_channels = NewChannels}};
false ->
?LOG_WARN("Reader received unexpected DOWN signal from (~p)."
"Info: ~p~n", [Pid, Info]),
exit({unexpected_down, Pid, Info})
{stop, {unexpected_down, Pid, Info}, State}
end.

close(#mr_state{sock = Sock}) ->
rabbit_net:close(Sock),
exit(normal).
internal_register_framing_channel(
Number, Pid, State = #mr_state{framing_channels = Channels}) ->
NewChannels = amqp_channel_util:register_channel(Number, Pid, Channels),
erlang:monitor(process, Pid),
State#mr_state{framing_channels = NewChannels}.

4 changes: 2 additions & 2 deletions src/amqp_network_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -385,12 +385,12 @@ do_handshake(State0 = #nc_state{sock = Sock}) ->
ok = rabbit_net:send(Sock, ?PROTOCOL_HEADER),
{Framing0Pid, Writer0Pid} =
amqp_channel_util:start_channel_infrastructure(network, 0, {Sock, none}),
MainReaderPid = amqp_main_reader:start(Sock, Framing0Pid),
{ok, MainReaderPid} = amqp_main_reader:start_link(Sock, Framing0Pid),
State1 = State0#nc_state{channel0_framing_pid = Framing0Pid,
channel0_writer_pid = Writer0Pid,
main_reader_pid = MainReaderPid},
State2 = network_handshake(State1),
MainReaderPid ! {heartbeat, State2#nc_state.heartbeat},
amqp_main_reader:start_heartbeat(MainReaderPid, State2#nc_state.heartbeat),
State2.

network_handshake(State = #nc_state{channel0_writer_pid = Writer0,
Expand Down
2 changes: 1 addition & 1 deletion test/amqp_dbg.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ all_args() ->
ncl_args() ++ dcl_args() ++ cl_args() ++ m_args().

ncl_args() ->
[{amqp_main_reader, start, return_ms()},
[{amqp_main_reader, start_link, return_ms()},
{amqp_network_connection, set_closing_state, []},
{amqp_network_connection, all_channels_closed_event, []},
{amqp_network_connection, handshake, []},
Expand Down

0 comments on commit 33d9555

Please sign in to comment.