Skip to content

Commit

Permalink
Remove max_frame_size from AMQP writer
Browse files Browse the repository at this point in the history
because the session process already splits frames that are too large
into smaller frames
  • Loading branch information
ansd committed Jul 26, 2024
1 parent dde8e69 commit d3109e9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 19 deletions.
5 changes: 2 additions & 3 deletions deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,9 @@ handle_connection_frame(#'v1_0.close'{}, State0) ->
close(undefined, State).

start_writer(#v1{helper_sup = SupPid,
sock = Sock,
connection = #v1_connection{outgoing_max_frame_size = MaxFrame}} = State) ->
sock = Sock} = State) ->
ChildSpec = #{id => writer,
start => {rabbit_amqp_writer, start_link, [Sock, MaxFrame, self()]},
start => {rabbit_amqp_writer, start_link, [Sock, self()]},
restart => transient,
significant => true,
shutdown => ?WORKER_WAIT,
Expand Down
26 changes: 10 additions & 16 deletions deps/rabbit/src/rabbit_amqp_writer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
-include("rabbit_amqp.hrl").

%% client API
-export([start_link/3,
-export([start_link/2,
send_command/3,
send_command/4,
send_command_sync/3,
Expand All @@ -27,7 +27,6 @@

-record(state, {
sock :: rabbit_net:socket(),
max_frame_size :: unlimited | pos_integer(),
reader :: rabbit_types:connection(),
pending :: iolist(),
%% This field is just an optimisation to minimize the cost of erlang:iolist_size/1
Expand All @@ -46,10 +45,10 @@
%%% client API %%%
%%%%%%%%%%%%%%%%%%

-spec start_link (rabbit_net:socket(), non_neg_integer(), pid()) ->
-spec start_link (rabbit_net:socket(), pid()) ->
rabbit_types:ok(pid()).
start_link(Sock, MaxFrame, ReaderPid) ->
Args = {Sock, MaxFrame, ReaderPid},
start_link(Sock, ReaderPid) ->
Args = {Sock, ReaderPid},
Opts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Args, Opts).

Expand Down Expand Up @@ -96,9 +95,8 @@ internal_send_command(Sock, Performative, Protocol) ->
%%% gen_server callbacks %%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%

init({Sock, MaxFrame, ReaderPid}) ->
init({Sock, ReaderPid}) ->
State = #state{sock = Sock,
max_frame_size = MaxFrame,
reader = ReaderPid,
pending = [],
pending_size = 0,
Expand Down Expand Up @@ -142,12 +140,10 @@ format_status(Status) ->
maps:update_with(
state,
fun(#state{sock = Sock,
max_frame_size = MaxFrame,
reader = Reader,
pending = Pending,
pending_size = PendingSize}) ->
#{socket => Sock,
max_frame_size => MaxFrame,
reader => Reader,
%% Below 2 fields should always have the same value.
pending => iolist_size(Pending),
Expand Down Expand Up @@ -189,12 +185,11 @@ internal_send_command_async(Channel, Performative,
pending_size = PendingSize + iolist_size(Frame)}).

internal_send_command_async(Channel, Performative, Payload,
State = #state{max_frame_size = MaxFrame,
pending = Pending,
State = #state{pending = Pending,
pending_size = PendingSize}) ->
Frames = assemble_frame(Channel, Performative, Payload, MaxFrame),
maybe_flush(State#state{pending = [Frames | Pending],
pending_size = PendingSize + iolist_size(Frames)}).
Frame = assemble_frame_with_payload(Channel, Performative, Payload),
maybe_flush(State#state{pending = [Frame | Pending],
pending_size = PendingSize + iolist_size(Frame)}).

assemble_frame(Channel, Performative) ->
assemble_frame(Channel, Performative, amqp10_framing).
Expand All @@ -210,8 +205,7 @@ assemble_frame(Channel, Performative, rabbit_amqp_sasl) ->
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).

%%TODO respect MaxFrame
assemble_frame(Channel, Performative, Payload, _MaxFrame) ->
assemble_frame_with_payload(Channel, Performative, Payload) ->
?TRACE("channel ~b <-~n ~tp~n followed by ~tb bytes of payload",
[Channel, amqp10_framing:pprint(Performative), iolist_size(Payload)]),
PerfIoData = amqp10_framing:encode_bin(Performative),
Expand Down

0 comments on commit d3109e9

Please sign in to comment.