Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revival of #9620 Pass the message to rabbit_backing_queue:discard callback #12743

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ discard(#delivery{confirm = Confirm,
true -> confirm_messages([MsgId], MTC, QName);
false -> MTC
end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
BQS1 = BQ:discard(Msg, SenderPid, BQS),
{BQS1, MTC1}.

run_message_queue(ActiveConsumersChanged, State) ->
Expand Down Expand Up @@ -815,7 +815,7 @@ send_reject_publish(#delivery{confirm = true,
amqqueue:get_name(Q), MsgSeqNo),

MTC1 = maps:remove(MsgId, MTC),
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
BQS1 = BQ:discard(Msg, SenderPid, BQS),
State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 };
send_reject_publish(#delivery{confirm = false}, State) ->
State.
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@

%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
-callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
-callback discard(rabbit_types:basic_message(), pid(), state()) -> state().

%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
Expand Down
22 changes: 6 additions & 16 deletions deps/rabbit/src/rabbit_priority_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,12 @@ publish_delivered(Msg, MsgProps, ChPid,
State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(publish_delivered(Msg, MsgProps, ChPid, BQS)).

%% TODO this is a hack. The BQ api does not give us enough information
%% here - if we had the Msg we could look at its priority and forward
%% to the appropriate sub-BQ. But we don't so we are stuck.
%%
%% But fortunately VQ ignores discard/4, so we can too, *assuming we
%% are talking to VQ*. discard/4 is used by HA, but that's "above" us
%% (if in use) so we don't break that either, just some hypothetical
%% alternate BQ implementation.
discard(_MsgId, _ChPid, State = #state{}) ->
State;
%% We should have something a bit like this here:
%% pick1(fun (_P, BQSN) ->
%% BQ:discard(MsgId, ChPid, BQSN)
%% end, Msg, State);
discard(MsgId, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(MsgId, ChPid, BQS)).
discard(Msg, ChPid, State = #state{bq = BQ}) ->
pick1(fun (_P, BQSN) ->
BQ:discard(Msg, ChPid, BQSN)
end, Msg, State);
discard(Msg, ChPid, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough1(discard(Msg, ChPid, BQS)).

drain_confirmed(State = #state{bq = BQ}) ->
fold_append2(fun (_P, BQSN) -> BQ:drain_confirmed(BQSN) end, State);
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ publish_delivered(Msg, MsgProps, ChPid, State) ->
State),
{SeqId, a(maybe_update_rates(State1))}.

discard(_MsgId, _ChPid, State) -> State.
discard(_Msg, _ChPid, State) -> State.

drain_confirmed(State = #vqstate { confirmed = C }) ->
case sets:is_empty(C) of
Expand Down
Loading