From bfa293ab3b5e2e3b69703288d222d38f79205308 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Nov 2024 09:07:40 +0000 Subject: [PATCH] QQ: reduce memory use when dropping many messages at once. As may happen when a max_length configuration change is made when there are many messages on the queue. --- deps/rabbit/src/rabbit_fifo.erl | 21 ++++++++++++++++++- deps/rabbit/test/rabbit_fifo_SUITE.erl | 28 ++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 6a61d1d2e87f..bed02ecda30b 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -1596,11 +1596,30 @@ drop_head(#?STATE{ra_indexes = Indexes0} = State0, Effects) -> #?STATE{cfg = #cfg{dead_letter_handler = DLH}, dlx = DlxState} = State = State3, {_, DlxEffects} = rabbit_fifo_dlx:discard([Msg], maxlen, DLH, DlxState), - {State, DlxEffects ++ Effects}; + {State, combine_effects(DlxEffects, Effects)}; empty -> {State0, Effects} end. +%% combine global counter update effects to avoid bulding a huge list of +%% effects if many messages are dropped at the same time as could happen +%% when the `max_length' is changed via a configuration update. +combine_effects([{mod_call, + rabbit_global_counters, + messages_dead_lettered, + [Reason, rabbit_quorum_queue, Type, NewLen]}], + [{mod_call, + rabbit_global_counters, + messages_dead_lettered, + [Reason, rabbit_quorum_queue, Type, PrevLen]} | Rem]) -> + [{mod_call, + rabbit_global_counters, + messages_dead_lettered, + [Reason, rabbit_quorum_queue, Type, PrevLen + NewLen]} | Rem]; +combine_effects(New, Old) -> + New ++ Old. + + maybe_set_msg_ttl(Msg, RaCmdTs, Header, #?STATE{cfg = #cfg{msg_ttl = MsgTTL}}) -> case mc:is(Msg) of diff --git a/deps/rabbit/test/rabbit_fifo_SUITE.erl b/deps/rabbit/test/rabbit_fifo_SUITE.erl index e14b9406eee8..45f3f2cd12cd 100644 --- a/deps/rabbit/test/rabbit_fifo_SUITE.erl +++ b/deps/rabbit/test/rabbit_fifo_SUITE.erl @@ -2185,6 +2185,34 @@ update_config_delivery_limit_test(Config) -> ok. +update_config_max_length_test(Config) -> + QName = rabbit_misc:r("/", queue, ?FUNCTION_NAME_B), + InitConf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + delivery_limit => 20 + }, + State0 = init(InitConf), + ?assertMatch(#{config := #{delivery_limit := 20}}, + rabbit_fifo:overview(State0)), + + State1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Config, Num, Num, Num, FS0), + FS + end, State0, lists:seq(1, 100)), + Conf = #{name => ?FUNCTION_NAME, + queue_resource => QName, + max_length => 2, + dead_letter_handler => undefined}, + %% assert only one global counter effect is generated rather than 1 per + %% dropped message + {State, ok, Effects} = apply(meta(Config, ?LINE), + rabbit_fifo:make_update_config(Conf), State1), + ?assertMatch([{mod_call, rabbit_global_counters, messages_dead_lettered, + [maxlen, rabbit_quorum_queue,disabled, 98]}], Effects), + ?assertMatch(#{config := #{max_length := 2}, + num_ready_messages := 2}, rabbit_fifo:overview(State)), + ok. + purge_nodes_test(Config) -> Node = purged@node, ThisNode = node(),