From f7ed56e7ed0a6c9622691897750c4a5cb7b1dc61 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 15 Apr 2013 11:27:48 +0100 Subject: [PATCH 1/3] Fail predictably when connecting to a refusing server --- src/amqp_main_reader.erl | 45 ++++++++++++++++++++++++++++----- src/amqp_network_connection.erl | 2 ++ 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/src/amqp_main_reader.erl b/src/amqp_main_reader.erl index 7953e4d2..1d073fd9 100644 --- a/src/amqp_main_reader.erl +++ b/src/amqp_main_reader.erl @@ -29,9 +29,13 @@ connection, channels_manager, astate, - message = none %% none | {Type, Channel, Length} + message, %% opening, none | {Type, Channel, Length} + requested = 0, + received }). +-define(FRAME_HEADERLEN, 7). + %%--------------------------------------------------------------------------- %% Interface %%--------------------------------------------------------------------------- @@ -44,8 +48,9 @@ start_link(Sock, Connection, ChMgr, AState) -> %%--------------------------------------------------------------------------- init([Sock, Connection, ChMgr, AState]) -> - case next(7, #state{sock = Sock, connection = Connection, - channels_manager = ChMgr, astate = AState}) of + case next(?FRAME_HEADERLEN, #state{sock = Sock, connection = Connection, + channels_manager = ChMgr, astate = AState, + message = opening}) of {noreply, State} -> {ok, State}; {stop, Reason, _State} -> {stop, Reason} end. @@ -62,13 +67,36 @@ handle_call(Call, From, State) -> handle_cast(Cast, State) -> {stop, {unexpected_cast, Cast}, State}. +handle_info({inet_async, Sock, _, {ok, <<"AMQP", _:24>> = Data}}, + State = #state{sock = Sock, + message = opening, + requested = ?FRAME_HEADERLEN}) -> + next(1, State#state{received = Data}); +handle_info({inet_async, Sock, _, {ok, <>}}, + State = #state{sock = Sock, + message = opening, + requested = 1, + received = <<"AMQP", A, B, C>>}) -> + handle_error({refused, {A, B, C, D}}, + State#state{requested = 0, received = undefined}); +handle_info({inet_async, Sock, _R, {ok, <<_D:(?FRAME_HEADERLEN * 8)>>}} = Arg, + State = #state{sock = Sock, + message = opening, + requested = ?FRAME_HEADERLEN}) -> + handle_info(Arg, State#state{message = none}); handle_info({inet_async, Sock, _, {ok, <>}}, - State = #state{sock = Sock, message = none}) -> + State = #state{sock = Sock, + message = none, + requested = ?FRAME_HEADERLEN}) -> next(Length + 1, State#state{message = {Type, Channel, Length}}); handle_info({inet_async, Sock, _, {ok, Data}}, - State = #state{sock = Sock, message = {Type, Channel, L}}) -> + State = #state{sock = Sock, + message = {Type, Channel, L}, + requested = Req}) when size(Data) == Req -> <> = Data, - next(7, process_frame(Type, Channel, Payload, State#state{message = none})); + next(?FRAME_HEADERLEN, + process_frame(Type, Channel, Payload, + State#state{message = none})); handle_info({inet_async, Sock, _, {error, Reason}}, State = #state{sock = Sock}) -> handle_error(Reason, State). @@ -101,13 +129,16 @@ process_frame(Type, ChNumber, Payload, next(Length, State = #state{sock = Sock}) -> case rabbit_net:async_recv(Sock, Length, infinity) of - {ok, _} -> {noreply, State}; + {ok, _} -> {noreply, State#state{requested = Length}}; {error, Reason} -> handle_error(Reason, State) end. handle_error(closed, State = #state{connection = Conn}) -> Conn ! socket_closed, {noreply, State}; +handle_error({refused, Version}, State = #state{connection = Conn}) -> + Conn ! {refused, Version}, + {noreply, State}; handle_error(Reason, State = #state{connection = Conn}) -> Conn ! {socket_error, Reason}, {stop, {socket_error, Reason}, State}. diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index 69fd71c8..13eab79a 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -309,6 +309,8 @@ handshake_recv(Expecting) -> end; {socket_error, _} = SocketError -> exit({SocketError, {expecting, Expecting}}); + {refused, Version} -> + exit({server_refused_connection, Version}); heartbeat_timeout -> exit(heartbeat_timeout); Other -> From 2632bb0ffe33f380320e99af1d35f47b189af092 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Thu, 25 Apr 2013 13:15:21 +0100 Subject: [PATCH 2/3] Reader changes --- src/amqp_main_reader.erl | 46 ++++++++++------------------------------ 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/src/amqp_main_reader.erl b/src/amqp_main_reader.erl index 1d073fd9..b794a997 100644 --- a/src/amqp_main_reader.erl +++ b/src/amqp_main_reader.erl @@ -29,13 +29,9 @@ connection, channels_manager, astate, - message, %% opening, none | {Type, Channel, Length} - requested = 0, - received + message = none %% none | {Type, Channel, Length} }). --define(FRAME_HEADERLEN, 7). - %%--------------------------------------------------------------------------- %% Interface %%--------------------------------------------------------------------------- @@ -48,9 +44,8 @@ start_link(Sock, Connection, ChMgr, AState) -> %%--------------------------------------------------------------------------- init([Sock, Connection, ChMgr, AState]) -> - case next(?FRAME_HEADERLEN, #state{sock = Sock, connection = Connection, - channels_manager = ChMgr, astate = AState, - message = opening}) of + case next(7, #state{sock = Sock, connection = Connection, + channels_manager = ChMgr, astate = AState}) of {noreply, State} -> {ok, State}; {stop, Reason, _State} -> {stop, Reason} end. @@ -67,36 +62,17 @@ handle_call(Call, From, State) -> handle_cast(Cast, State) -> {stop, {unexpected_cast, Cast}, State}. -handle_info({inet_async, Sock, _, {ok, <<"AMQP", _:24>> = Data}}, - State = #state{sock = Sock, - message = opening, - requested = ?FRAME_HEADERLEN}) -> - next(1, State#state{received = Data}); -handle_info({inet_async, Sock, _, {ok, <>}}, - State = #state{sock = Sock, - message = opening, - requested = 1, - received = <<"AMQP", A, B, C>>}) -> - handle_error({refused, {A, B, C, D}}, - State#state{requested = 0, received = undefined}); -handle_info({inet_async, Sock, _R, {ok, <<_D:(?FRAME_HEADERLEN * 8)>>}} = Arg, - State = #state{sock = Sock, - message = opening, - requested = ?FRAME_HEADERLEN}) -> - handle_info(Arg, State#state{message = none}); +handle_info({inet_async, Sock, _, {ok, <<"AMQP", A, B, C>>}}, + State = #state{sock = Sock, message = none}) -> + {ok, <>} = rabbit_net:sync_recv(Sock, 1), + handle_error({refused, {A, B, C, D}}, State); handle_info({inet_async, Sock, _, {ok, <>}}, - State = #state{sock = Sock, - message = none, - requested = ?FRAME_HEADERLEN}) -> + State = #state{sock = Sock, message = none}) -> next(Length + 1, State#state{message = {Type, Channel, Length}}); handle_info({inet_async, Sock, _, {ok, Data}}, - State = #state{sock = Sock, - message = {Type, Channel, L}, - requested = Req}) when size(Data) == Req -> + State = #state{sock = Sock, message = {Type, Channel, L}}) -> <> = Data, - next(?FRAME_HEADERLEN, - process_frame(Type, Channel, Payload, - State#state{message = none})); + next(7, process_frame(Type, Channel, Payload, State#state{message = none})); handle_info({inet_async, Sock, _, {error, Reason}}, State = #state{sock = Sock}) -> handle_error(Reason, State). @@ -129,7 +105,7 @@ process_frame(Type, ChNumber, Payload, next(Length, State = #state{sock = Sock}) -> case rabbit_net:async_recv(Sock, Length, infinity) of - {ok, _} -> {noreply, State#state{requested = Length}}; + {ok, _} -> {noreply, State}; {error, Reason} -> handle_error(Reason, State) end. From df1e2050e33ce3caf2b6eafff8cfcfa53bd596c2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 25 Apr 2013 14:10:33 +0100 Subject: [PATCH 3/3] Also blow up if we get a completely malformed header --- src/amqp_main_reader.erl | 9 ++++++++- src/amqp_network_connection.erl | 2 ++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/src/amqp_main_reader.erl b/src/amqp_main_reader.erl index b794a997..b1de33c6 100644 --- a/src/amqp_main_reader.erl +++ b/src/amqp_main_reader.erl @@ -67,8 +67,12 @@ handle_info({inet_async, Sock, _, {ok, <<"AMQP", A, B, C>>}}, {ok, <>} = rabbit_net:sync_recv(Sock, 1), handle_error({refused, {A, B, C, D}}, State); handle_info({inet_async, Sock, _, {ok, <>}}, - State = #state{sock = Sock, message = none}) -> + State = #state{sock = Sock, message = none}) when + Type =:= 1; Type =:= 2; Type =:= 3; Type =:= 4 -> next(Length + 1, State#state{message = {Type, Channel, Length}}); +handle_info({inet_async, Sock, _, {ok, <> = All}}, + State = #state{sock = Sock, message = none}) -> + handle_error({malformed_header, All}, State); handle_info({inet_async, Sock, _, {ok, Data}}, State = #state{sock = Sock, message = {Type, Channel, L}}) -> <> = Data, @@ -115,6 +119,9 @@ handle_error(closed, State = #state{connection = Conn}) -> handle_error({refused, Version}, State = #state{connection = Conn}) -> Conn ! {refused, Version}, {noreply, State}; +handle_error({malformed_header, Version}, State = #state{connection = Conn}) -> + Conn ! {malformed_header, Version}, + {noreply, State}; handle_error(Reason, State = #state{connection = Conn}) -> Conn ! {socket_error, Reason}, {stop, {socket_error, Reason}, State}. diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index 13eab79a..2ab561e1 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -311,6 +311,8 @@ handshake_recv(Expecting) -> exit({SocketError, {expecting, Expecting}}); {refused, Version} -> exit({server_refused_connection, Version}); + {malformed_header, All} -> + exit({server_sent_malformed_header, All}); heartbeat_timeout -> exit(heartbeat_timeout); Other ->