Skip to content

Commit

Permalink
Merge pull request rabbitmq#4522 from rabbitmq/loic-cq-dont-reduce-me…
Browse files Browse the repository at this point in the history
…mory-usage

CQ: Merge lazy/default behavior into a unified mode
  • Loading branch information
michaelklishin authored Oct 1, 2022
2 parents 7136ad0 + 1eb1710 commit 69b06d3
Show file tree
Hide file tree
Showing 20 changed files with 1,054 additions and 1,732 deletions.
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ suites = [
PACKAGE,
name = "classic_queue_prop_SUITE",
size = "large",
shard_count = 5,
shard_count = 3,
sharding_method = "case",
deps = [
"@proper//:erlang_app",
Expand Down
8 changes: 0 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1274,9 +1274,6 @@ prioritise_cast(Msg, _Len, State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
%% bump_reduce_memory_use is prioritised over publishes, because sending
%% credit to self is hard to reason about. Consumers can continue while
%% reduce_memory_use is in progress.

consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
case BQ:msg_rates(BQS) of
Expand All @@ -1294,7 +1291,6 @@ prioritise_info(Msg, _Len, #q{q = Q}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
bump_reduce_memory_use -> 1;
_ -> 0
end.

Expand Down Expand Up @@ -1776,10 +1772,6 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
backing_queue_state = BQS0}) ->
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
noreply(State#q{backing_queue_state = BQ:resume(BQS1)});

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,6 @@
[ack()], Acc, state())
-> Acc.

%% Called when rabbit_amqqueue_process receives a message via
%% handle_info and it should be processed by the backing
%% queue
-callback handle_info(term(), state()) -> state().

-spec info_keys() -> rabbit_types:info_keys().

info_keys() -> ?INFO_KEYS.
31 changes: 17 additions & 14 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-export([erase/1, init/3, reset_state/1, recover/7,
terminate/3, delete_and_terminate/1,
publish/7, ack/2, read/3]).
publish/7, publish/8, ack/2, read/3]).

%% Recovery. Unlike other functions in this module, these
%% apply to all queues all at once.
Expand Down Expand Up @@ -111,7 +111,7 @@
%% and there are outstanding unconfirmed messages.
%% In that case the buffer is flushed to disk when
%% the queue requests a sync (after a timeout).
confirms = gb_sets:new() :: gb_sets:set(),
confirms = sets:new([{version,2}]) :: sets:set(),

%% Segments we currently know of along with the
%% number of unacked messages remaining in the
Expand Down Expand Up @@ -156,7 +156,7 @@

%% Types copied from rabbit_queue_index.

-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type on_sync_fun() :: fun ((sets:set()) -> ok).
-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
-type shutdown_terms() :: list() | 'non_clean_shutdown'.

Expand Down Expand Up @@ -290,7 +290,7 @@ recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStor
list_to_integer(filename:basename(F, ?SEGMENT_EXTENSION))
|| F <- SegmentFiles]),
%% We use a temporary store state to check that messages do exist.
StoreState0 = rabbit_classic_queue_store_v2:init(Name, OnSyncMsgFun),
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
{State1, StoreState} = recover_segments(State0, ContainsCheckFun, StoreState0, CountersRef, Segments),
_ = rabbit_classic_queue_store_v2:terminate(StoreState),
State1
Expand Down Expand Up @@ -482,7 +482,7 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir },
V1State, CountersRef) ->
%% Use a temporary per-queue store state to store embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name, fun(_, _) -> ok end),
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
%% Go through the v1 index and publish messages to the v2 index.
{LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1State),
%% When resuming after a crash we need to double check the messages that are both
Expand Down Expand Up @@ -564,9 +564,12 @@ delete_and_terminate(State = #qi { dir = Dir,
rabbit_types:message_properties(), boolean(),
non_neg_integer() | infinity, State) -> State when State::state().

publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) ->
publish(MsgId, SeqId, Location, Props, IsPersistent, true, TargetRamCount, State).

%% Because we always persist to the msg_store, the Msg(Or)Id argument
%% here is always a binary, never a record.
publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount,
publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, TargetRamCount,
State0 = #qi { write_buffer = WriteBuffer0,
segments = Segments }) ->
?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State0]),
Expand All @@ -583,7 +586,7 @@ publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount,
end,
%% When publisher confirms have been requested for this
%% message we mark the message as unconfirmed.
State = maybe_mark_unconfirmed(MsgId, Props, State2),
State = maybe_mark_unconfirmed(MsgId, Props, ShouldConfirm, State2),
maybe_flush_buffer(State, SegmentEntryCount).

new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) ->
Expand Down Expand Up @@ -657,9 +660,9 @@ reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
end.

maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true },
State = #qi { confirms = Confirms }) ->
State#qi{ confirms = gb_sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, State) ->
true, State = #qi { confirms = Confirms }) ->
State#qi{ confirms = sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, _, State) ->
State.

maybe_flush_buffer(State = #qi { write_buffer = WriteBuffer,
Expand Down Expand Up @@ -1055,19 +1058,19 @@ sync(State0 = #qi{ confirms = Confirms,
on_sync = OnSyncFun }) ->
?DEBUG("~0p", [State0]),
State = flush_buffer(State0, full, segment_entry_count()),
_ = case gb_sets:is_empty(Confirms) of
_ = case sets:is_empty(Confirms) of
true ->
ok;
false ->
OnSyncFun(Confirms)
end,
State#qi{ confirms = gb_sets:new() }.
State#qi{ confirms = sets:new([{version,2}]) }.

-spec needs_sync(state()) -> 'false'.

needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
case gb_sets:is_empty(Confirms) of
case sets:is_empty(Confirms) of
true -> false;
false -> confirms
end.
Expand Down Expand Up @@ -1183,7 +1186,7 @@ stop(VHost) ->

pre_publish(MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) ->
?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State]),
publish(MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State).
publish(MsgOrId, SeqId, Location, Props, IsPersistent, false, TargetRamCount, State).

flush_pre_publish_cache(TargetRamCount, State) ->
?DEBUG("~0p ~0p", [TargetRamCount, State]),
Expand Down
Loading

0 comments on commit 69b06d3

Please sign in to comment.