From 127f558d7d897a392d6e2f9d826310396ee93e64 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 31 Jan 2013 13:26:43 +0000 Subject: [PATCH 1/7] Added tag rabbitmq_v3_0_2 for changeset 23d6ca5c9601 From 42b5a423dbc227834573f040a5472655bb3c7945 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 6 Mar 2013 10:45:49 +0000 Subject: [PATCH 2/7] Added tag rabbitmq_v3_0_3 for changeset 08d1d6c2ef52 From 5e953bada3a8a3c082e192b4710bbab0662a477b Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 12 Mar 2013 15:26:47 +0000 Subject: [PATCH 3/7] Added tag rabbitmq_v3_0_4 for changeset 90c2fdb532b3 From 2d80c21283b928f30e0be26e65d76c175aaecf0c Mon Sep 17 00:00:00 2001 From: Tim Watson Date: Thu, 11 Apr 2013 12:06:49 +0100 Subject: [PATCH 4/7] Do not negotiate frame sizes above 64Mb --- src/amqp_network_connection.erl | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index e5e98216..7232876c 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -26,6 +26,7 @@ -define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]). -define(SOCKET_CLOSING_TIMEOUT, 1000). -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000). +-define(TCP_MAX_PACKET_SIZE, 16#4000000). -record(state, {sock, heartbeat, @@ -212,9 +213,13 @@ tune(#'connection.tune'{channel_max = ServerChannelMax, [ChannelMax, Heartbeat, FrameMax] = lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 -> lists:max([Client, Server]); + ({ClientFrame, TcpMaxPacketSz}, ServerFrame) -> + lists:min([ClientFrame, ServerFrame, + TcpMaxPacketSz]); (Client, Server) -> lists:min([Client, Server]) - end, [ClientChannelMax, ClientHeartbeat, ClientFrameMax], + end, [ClientChannelMax, ClientHeartbeat, + {ClientFrameMax, ?TCP_MAX_PACKET_SIZE}], [ServerChannelMax, ServerHeartbeat, ServerFrameMax]), NewState = State#state{heartbeat = Heartbeat, frame_max = FrameMax}, start_heartbeat(SHF, NewState), From 5b2b66cc988b83ea7edd37b56264d2f91c82db90 Mon Sep 17 00:00:00 2001 From: Tim Watson Date: Thu, 11 Apr 2013 15:11:46 +0100 Subject: [PATCH 5/7] Handle unbounded frame size negotiation --- src/amqp_network_connection.erl | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index 7232876c..d71c677c 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -27,6 +27,7 @@ -define(SOCKET_CLOSING_TIMEOUT, 1000). -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000). -define(TCP_MAX_PACKET_SIZE, 16#4000000). +-define(FRAME_SIZE_UNBOUNDED, 0). -record(state, {sock, heartbeat, @@ -213,20 +214,26 @@ tune(#'connection.tune'{channel_max = ServerChannelMax, [ChannelMax, Heartbeat, FrameMax] = lists:zipwith(fun (Client, Server) when Client =:= 0; Server =:= 0 -> lists:max([Client, Server]); - ({ClientFrame, TcpMaxPacketSz}, ServerFrame) -> - lists:min([ClientFrame, ServerFrame, - TcpMaxPacketSz]); (Client, Server) -> lists:min([Client, Server]) end, [ClientChannelMax, ClientHeartbeat, - {ClientFrameMax, ?TCP_MAX_PACKET_SIZE}], - [ServerChannelMax, ServerHeartbeat, ServerFrameMax]), + cap_max_supported_frame_size(ClientFrameMax)], + [ServerChannelMax, ServerHeartbeat, + cap_max_supported_frame_size(ServerFrameMax)]), NewState = State#state{heartbeat = Heartbeat, frame_max = FrameMax}, start_heartbeat(SHF, NewState), {#'connection.tune_ok'{channel_max = ChannelMax, frame_max = FrameMax, heartbeat = Heartbeat}, ChannelMax, NewState}. +%% if we attempt to recv > 64Mb, inet_drv will return enomem, +%% so we cap the max negotiated frame size accordingly +cap_max_supported_frame_size(MaxFrameSize) + when MaxFrameSize > ?FRAME_SIZE_UNBOUNDED -> + lists:min([MaxFrameSize, ?TCP_MAX_PACKET_SIZE]); +cap_max_supported_frame_size(_UnboundedMaxFrame) -> + ?TCP_MAX_PACKET_SIZE. + start_heartbeat(SHF, #state{sock = Sock, heartbeat = Heartbeat}) -> Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun () -> catch rabbit_net:send(Sock, Frame) end, From c997e1c4e73097b79933c63e2487ecbb7f707e47 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 11 Apr 2013 15:29:48 +0100 Subject: [PATCH 6/7] simplify --- src/amqp_network_connection.erl | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index d71c677c..8f00ec90 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -27,7 +27,6 @@ -define(SOCKET_CLOSING_TIMEOUT, 1000). -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000). -define(TCP_MAX_PACKET_SIZE, 16#4000000). --define(FRAME_SIZE_UNBOUNDED, 0). -record(state, {sock, heartbeat, @@ -216,24 +215,21 @@ tune(#'connection.tune'{channel_max = ServerChannelMax, lists:max([Client, Server]); (Client, Server) -> lists:min([Client, Server]) - end, [ClientChannelMax, ClientHeartbeat, - cap_max_supported_frame_size(ClientFrameMax)], - [ServerChannelMax, ServerHeartbeat, - cap_max_supported_frame_size(ServerFrameMax)]), - NewState = State#state{heartbeat = Heartbeat, frame_max = FrameMax}, + end, + [ClientChannelMax, ClientHeartbeat, ClientFrameMax], + [ServerChannelMax, ServerHeartbeat, ServerFrameMax]), + %% if we attempt to recv > 64Mb, inet_drv will return enomem, so + %% we cap the max negotiated frame size accordingly + CappedFrameMax = case FrameMax of + 0 -> ?TCP_MAX_PACKET_SIZE; + _ -> lists:min([FrameMax, ?TCP_MAX_PACKET_SIZE]) + end, + NewState = State#state{heartbeat = Heartbeat, frame_max = CappedFrameMax}, start_heartbeat(SHF, NewState), {#'connection.tune_ok'{channel_max = ChannelMax, - frame_max = FrameMax, + frame_max = CappedFrameMax, heartbeat = Heartbeat}, ChannelMax, NewState}. -%% if we attempt to recv > 64Mb, inet_drv will return enomem, -%% so we cap the max negotiated frame size accordingly -cap_max_supported_frame_size(MaxFrameSize) - when MaxFrameSize > ?FRAME_SIZE_UNBOUNDED -> - lists:min([MaxFrameSize, ?TCP_MAX_PACKET_SIZE]); -cap_max_supported_frame_size(_UnboundedMaxFrame) -> - ?TCP_MAX_PACKET_SIZE. - start_heartbeat(SHF, #state{sock = Sock, heartbeat = Heartbeat}) -> Frame = rabbit_binary_generator:build_heartbeat_frame(), SendFun = fun () -> catch rabbit_net:send(Sock, Frame) end, From bdd7aa129fe3ff9d4170f496e07b9e497f423636 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 12 Apr 2013 12:31:34 +0100 Subject: [PATCH 7/7] eek out some more bytes --- src/amqp_network_connection.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/amqp_network_connection.erl b/src/amqp_network_connection.erl index 8f00ec90..69fd71c8 100644 --- a/src/amqp_network_connection.erl +++ b/src/amqp_network_connection.erl @@ -26,7 +26,7 @@ -define(RABBIT_TCP_OPTS, [binary, {packet, 0}, {active,false}, {nodelay, true}]). -define(SOCKET_CLOSING_TIMEOUT, 1000). -define(HANDSHAKE_RECEIVE_TIMEOUT, 60000). --define(TCP_MAX_PACKET_SIZE, 16#4000000). +-define(TCP_MAX_PACKET_SIZE, (16#4000000 + ?EMPTY_FRAME_SIZE - 1)). -record(state, {sock, heartbeat, @@ -218,8 +218,10 @@ tune(#'connection.tune'{channel_max = ServerChannelMax, end, [ClientChannelMax, ClientHeartbeat, ClientFrameMax], [ServerChannelMax, ServerHeartbeat, ServerFrameMax]), - %% if we attempt to recv > 64Mb, inet_drv will return enomem, so - %% we cap the max negotiated frame size accordingly + %% If we attempt to recv > 64Mb, inet_drv will return enomem, so + %% we cap the max negotiated frame size accordingly. Note that + %% since we receive the frame header separately, we can actually + %% cope with frame sizes of 64M + ?EMPTY_FRAME_SIZE - 1. CappedFrameMax = case FrameMax of 0 -> ?TCP_MAX_PACKET_SIZE; _ -> lists:min([FrameMax, ?TCP_MAX_PACKET_SIZE])