From 1f7a27c51d0a46dbebafcbd48da24ff788eb18b7 Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Sun, 17 Nov 2024 00:41:53 +0200 Subject: [PATCH 1/2] rabbit_backing_queue: pass the whole message to discard callback The previous behaviour was passing solely the message ID making queue implementations such as, for example, the priority one hard to fulfil. Signed-off-by: Matteo Cafasso --- deps/rabbit/src/rabbit_backing_queue.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_backing_queue.erl b/deps/rabbit/src/rabbit_backing_queue.erl index ffa0a791f1b5..504ea3d0ecf8 100644 --- a/deps/rabbit/src/rabbit_backing_queue.erl +++ b/deps/rabbit/src/rabbit_backing_queue.erl @@ -105,7 +105,7 @@ %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). +-callback discard(rabbit_types:basic_message(), pid(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). From facddb363f2515395388fc4289ed1936c66809fd Mon Sep 17 00:00:00 2001 From: Matteo Cafasso Date: Sun, 17 Nov 2024 00:42:08 +0200 Subject: [PATCH 2/2] Adopt new rabbit_backing_queue:discard implementation Signed-off-by: Matteo Cafasso --- deps/rabbit/src/rabbit_amqqueue_process.erl | 4 ++-- deps/rabbit/src/rabbit_priority_queue.erl | 22 ++++++--------------- deps/rabbit/src/rabbit_variable_queue.erl | 2 +- 3 files changed, 9 insertions(+), 19 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 63f886bd3763..f3ce1515b94f 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm, true -> confirm_messages([MsgId], MTC, QName); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, BQS), + BQS1 = BQ:discard(Msg, SenderPid, BQS), {BQS1, MTC1}. run_message_queue(ActiveConsumersChanged, State) -> @@ -815,7 +815,7 @@ send_reject_publish(#delivery{confirm = true, amqqueue:get_name(Q), MsgSeqNo), MTC1 = maps:remove(MsgId, MTC), - BQS1 = BQ:discard(MsgId, SenderPid, BQS), + BQS1 = BQ:discard(Msg, SenderPid, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, State) -> State. diff --git a/deps/rabbit/src/rabbit_priority_queue.erl b/deps/rabbit/src/rabbit_priority_queue.erl index 6e08a44f565f..bf025ab7aedb 100644 --- a/deps/rabbit/src/rabbit_priority_queue.erl +++ b/deps/rabbit/src/rabbit_priority_queue.erl @@ -218,22 +218,12 @@ publish_delivered(Msg, MsgProps, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)). -%% TODO this is a hack. The BQ api does not give us enough information -%% here - if we had the Msg we could look at its priority and forward -%% to the appropriate sub-BQ. But we don't so we are stuck. -%% -%% But fortunately VQ ignores discard/4, so we can too, *assuming we -%% are talking to VQ*. discard/4 is used by HA, but that's "above" us -%% (if in use) so we don't break that either, just some hypothetical -%% alternate BQ implementation. -discard(_MsgId, _ChPid, State = #state{}) -> - State; - %% We should have something a bit like this here: - %% pick1(fun (_P, BQSN) -> - %% BQ:discard(MsgId, ChPid, BQSN) - %% end, Msg, State); -discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) -> - ?passthrough1(discard(MsgId, ChPid, BQS)). +discard(Msg, ChPid, State = #state{bq = BQ}) -> + pick1(fun (_P, BQSN) -> + BQ:discard(Msg, ChPid, BQSN) + end, Msg, State); +discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(discard(Msg, ChPid, BQS)). drain_confirmed(State = #state{bq = BQ}) -> fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State); diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index b9d5bb8480ac..ec362d8657c7 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) -> State), {SeqId, a(maybe_update_rates(State1))}. -discard(_MsgId, _ChPid, State) -> State. +discard(_Msg, _ChPid, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case sets:is_empty(C) of