From 3aa01e50bbbeefa14797e99fd6ea9e024a778378 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Nov 2024 17:06:21 +0000 Subject: [PATCH 1/4] wip --- deps/rabbit/src/rabbit_fifo.erl | 78 +++++++++++++++++++------- deps/rabbit/src/rabbit_fifo_client.erl | 13 ++++- rabbitmq-components.mk | 3 +- 3 files changed, 70 insertions(+), 24 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 6a61d1d2e87f..04c6bfa0d417 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -89,7 +89,10 @@ make_purge/0, make_purge_nodes/1, make_update_config/1, - make_garbage_collection/0 + make_garbage_collection/0, + + exec_read/3 + ]). -ifdef(TEST). @@ -2058,31 +2061,57 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}], {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]}, ?DELIVERY_SEND_MSG_OPTS}; delivery_effect(ConsumerKey, Msgs, - #?STATE{cfg = #cfg{resource = QR}} = State) -> + #?STATE{cfg = #cfg{resource = _QR}} = State) -> {CTag, CPid} = consumer_id(ConsumerKey, State), - {RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> + {RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> {[I | Acc], N+1} end, {[], 0}, Msgs), - {log, RaftIdxs, - fun (Commands) - when length(Commands) < Num -> - %% the mandatory length/1 guard is a bit :( - rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b " - "indexes ~w but only ~b were returned. " - "This is most likely a stale read request " - "and can be ignored", - [rabbit_misc:rs(QR), CTag, Num, RaftIdxs, - length(Commands)]), - []; - (Commands) -> - DelMsgs = lists:zipwith( - fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) -> - {MsgId, {Header, get_msg(Cmd)}} - end, Commands, Msgs), - [{send_msg, CPid, {delivery, CTag, DelMsgs}, + {log_ext, RaftIdxs, + fun + % (Commands) + % when length(Commands) < Num -> + % %% the mandatory length/1 guard is a bit :( + % rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b " + % "indexes ~w but only ~b were returned. " + % "This is most likely a stale read request " + % "and can be ignored", + % [rabbit_misc:rs(QR), CTag, Num, RaftIdxs, + % length(Commands)]), + % []; + (ReadPlan) -> + % Fun = fun (Flru0) -> + % {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0), + % %% pretend entries is a map + % {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> + % {_, _, Cmd} = maps:get(Idx, Entries), + % %% hacky + % {MsgId, {Header, get_msg(element(3, Cmd))}} + % end, Msgs), Flru} + % end, + [{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs}, ?DELIVERY_SEND_MSG_OPTS}] end, {local, node(CPid)}}. + % {log, RaftIdxs, + % fun (Commands) + % when length(Commands) < Num -> + % %% the mandatory length/1 guard is a bit :( + % rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b " + % "indexes ~w but only ~b were returned. " + % "This is most likely a stale read request " + % "and can be ignored", + % [rabbit_misc:rs(QR), CTag, Num, RaftIdxs, + % length(Commands)]), + % []; + % (Commands) -> + % DelMsgs = lists:zipwith( + % fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) -> + % {MsgId, {Header, get_msg(Cmd)}} + % end, Commands, Msgs), + % [{send_msg, CPid, {delivery, CTag, DelMsgs}, + % ?DELIVERY_SEND_MSG_OPTS}] + % end, + % {local, node(CPid)}}. reply_log_effect(RaftIdx, MsgId, Header, Ready, From) -> {log, [RaftIdx], @@ -2995,3 +3024,12 @@ incr_msg(Msg0, DelFailed, Anns) -> false -> Msg2 end. + +exec_read(Flru0, ReadPlan, Msgs) -> + {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0), + %% pretend entries is a map + {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> + {_, _, Cmd} = maps:get(Idx, Entries), + %% hacky + {MsgId, {Header, get_msg(element(3, Cmd))}} + end, Msgs), Flru}. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 20d57d89577f..9259e816f1c7 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -69,7 +69,8 @@ pending = #{} :: #{seq() => {term(), rabbit_fifo:command()}}, consumers = #{} :: #{rabbit_types:ctag() => #consumer{}}, - timer_state :: term() + timer_state :: term(), + cached_segments :: undefined | ra_flru:state() }). -opaque state() :: #state{}. @@ -633,7 +634,8 @@ handle_ra_event(QName, From, {applied, Seqs}, _ -> {ok, State1, Actions} end; -handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> +handle_ra_event(QName, From, {machine, Del}, State0) + when element(1, Del) == delivery -> handle_delivery(QName, From, Del, State0); handle_ra_event(_QName, _From, {machine, Action}, State) when element(1, Action) =:= credit_reply orelse @@ -835,7 +837,12 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs}, %% we should return all messages. MsgIntIds = [Id || {Id, _} <- IdMsgs], {State1, Deliveries} = return(Tag, MsgIntIds, State0), - {ok, State1, Deliveries}. + {ok, State1, Deliveries}; +handle_delivery(QName, Leader, {delivery, Tag, ReadState, Msgs}, + #state{cached_segments = Cached0} = State) -> + {MsgIds, Cached} = rabbit_fifo:exec_read(Cached0, ReadState, Msgs), + handle_delivery(QName, Leader, {delivery, Tag, MsgIds}, + State#state{cached_segments = Cached}). transform_msgs(QName, QRef, Msgs) -> lists:map( diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index a420191e91be..fb94911f406e 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -50,7 +50,8 @@ dep_khepri = hex 0.16.0 dep_khepri_mnesia_migration = hex 0.7.0 dep_osiris = git https://github.com/rabbitmq/osiris v1.8.4 dep_prometheus = hex 4.11.0 -dep_ra = hex 2.14.0 +dep_ra = git https://github.com/rabbitmq/ra partial-read-api +# dep_ra = hex 2.14.0 dep_ranch = hex 2.1.0 dep_recon = hex 2.5.6 dep_redbug = hex 2.0.7 From ef9260ec700b5bdc09fb20746641132649d0aec1 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 19 Nov 2024 12:53:25 +0000 Subject: [PATCH 2/4] try --- deps/rabbit/src/rabbit_channel.erl | 4 +++- deps/rabbit/src/rabbit_fifo.erl | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 0d7bd5bf45d7..b1418b870a34 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -564,8 +564,10 @@ prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; {reject_publish, _MsgSeqNos, _QPid} -> 5; - {queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5; + % {queue_event, _, {confirm, _MsgSeqNos, _QPid}} -> 5; {queue_event, _, {reject_publish, _MsgSeqNos, _QPid}} -> 5; + {method, #'basic.ack'{}, _Content, _Flow} -> 5; + % {queue_event, _, {delivery, _, _}} -> 0; _ -> 0 end. diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 04c6bfa0d417..28bd2ecb8fe1 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -2064,8 +2064,8 @@ delivery_effect(ConsumerKey, Msgs, #?STATE{cfg = #cfg{resource = _QR}} = State) -> {CTag, CPid} = consumer_id(ConsumerKey, State), {RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> - {[I | Acc], N+1} - end, {[], 0}, Msgs), + {[I | Acc], N+1} + end, {[], 0}, Msgs), {log_ext, RaftIdxs, fun % (Commands) From 1b973d2789f375b92cea3bde2dd2ebd118908b44 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 21 Nov 2024 16:37:54 +0000 Subject: [PATCH 3/4] wip --- deps/rabbit/src/rabbit_fifo.erl | 19 +++++++------------ deps/rabbit/src/rabbit_fifo_client.erl | 2 +- deps/rabbit/test/quorum_queue_SUITE.erl | 1 + 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 28bd2ecb8fe1..1f4cb72d40e1 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -2066,6 +2066,7 @@ delivery_effect(ConsumerKey, Msgs, {RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> {[I | Acc], N+1} end, {[], 0}, Msgs), + rabbit_log:debug("DELIVERY EFFECT created for ~s ~p!", [CTag, RaftIdxs]), {log_ext, RaftIdxs, fun % (Commands) @@ -2079,15 +2080,9 @@ delivery_effect(ConsumerKey, Msgs, % length(Commands)]), % []; (ReadPlan) -> - % Fun = fun (Flru0) -> - % {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0), - % %% pretend entries is a map - % {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> - % {_, _, Cmd} = maps:get(Idx, Entries), - % %% hacky - % {MsgId, {Header, get_msg(element(3, Cmd))}} - % end, Msgs), Flru} - % end, + rabbit_log:debug("READPLAN created for ~s ~p!", [CTag, RaftIdxs]), + %% TODO: check if CPid is local or not + %% TODO: could consider introducing a leader local proxy process [{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs}, ?DELIVERY_SEND_MSG_OPTS}] end, @@ -3026,10 +3021,10 @@ incr_msg(Msg0, DelFailed, Anns) -> end. exec_read(Flru0, ReadPlan, Msgs) -> - {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0), + {Entries, Flru} = ra:execute_read_plan(ReadPlan, Flru0), %% pretend entries is a map {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> - {_, _, Cmd} = maps:get(Idx, Entries), + {_, _, Cmd, _} = maps:get(Idx, Entries), %% hacky - {MsgId, {Header, get_msg(element(3, Cmd))}} + {MsgId, {Header, get_msg(Cmd)}} end, Msgs), Flru}. diff --git a/deps/rabbit/src/rabbit_fifo_client.erl b/deps/rabbit/src/rabbit_fifo_client.erl index 9259e816f1c7..082948554f1d 100644 --- a/deps/rabbit/src/rabbit_fifo_client.erl +++ b/deps/rabbit/src/rabbit_fifo_client.erl @@ -168,7 +168,7 @@ enqueue(QName, Correlation, Msg, %% @param QueueName Name of the queue. %% @param Msg an arbitrary erlang term representing the message. %% @param State the current {@module} state. -%% @returns +%% @return's %% `{ok, State, Actions}' if the command was successfully sent. %% {@module} assigns a sequence number to every raft command it issues. The %% SequenceNumber can be correlated to the applied sequence numbers returned diff --git a/deps/rabbit/test/quorum_queue_SUITE.erl b/deps/rabbit/test/quorum_queue_SUITE.erl index 8cdb18dc045c..691d7b0235b0 100644 --- a/deps/rabbit/test/quorum_queue_SUITE.erl +++ b/deps/rabbit/test/quorum_queue_SUITE.erl @@ -3632,6 +3632,7 @@ receive_and_ack(Ch) -> amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, multiple = false}) after 5000 -> + flush(1), ct:fail("receive_and_ack timed out", []) end. From a751df9d5c7f0c8bcd64cd9abf593beb7050f99a Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Mon, 25 Nov 2024 14:43:03 +0000 Subject: [PATCH 4/4] wip --- deps/rabbit/src/rabbit_fifo.erl | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 1f4cb72d40e1..855969b63727 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -3021,10 +3021,9 @@ incr_msg(Msg0, DelFailed, Anns) -> end. exec_read(Flru0, ReadPlan, Msgs) -> - {Entries, Flru} = ra:execute_read_plan(ReadPlan, Flru0), - %% pretend entries is a map + {Entries, Flru} = ra_log_read_plan:execute(ReadPlan, Flru0), + %% return a list in original order {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> - {_, _, Cmd, _} = maps:get(Idx, Entries), - %% hacky + Cmd = maps:get(Idx, Entries), {MsgId, {Header, get_msg(Cmd)}} end, Msgs), Flru}.