Skip to content


Merge pull request #304 from rabbitmq/ra-log-fold
Browse files Browse the repository at this point in the history
Refactor reading from log to reduce peak memory use
  • Loading branch information
kjnilsson authored Sep 23, 2022
2 parents 805c1dc + 7e13d38 commit 211331e
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 515 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dep_aten = hex 0.5.8
dep_seshat = hex 0.3.2
DEPS = aten gen_batch_server seshat

TEST_DEPS = proper meck eunit_formatters looking_glass inet_tcp_proxy
TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy

BUILD_DEPS = elvis_mk

Expand All @@ -24,7 +24,7 @@ dep_inet_tcp_proxy = git

DEP_PLUGINS = elvis_mk

PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck looking_glass gen_batch_server inet_tcp_proxy
PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck gen_batch_server inet_tcp_proxy

EDOC_OPTS = {pretty_printer, erl_pp}, {sort_functions, false}
Expand Down
128 changes: 51 additions & 77 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Expand Down Expand Up @@ -284,29 +284,46 @@ write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId},
[UId, Idx, LastIdx+1])),
{error, {integrity_error, Msg}}.

-spec take(ra_index(), non_neg_integer(), state()) ->
{[log_entry()], NumRead :: non_neg_integer(), state()}.
take(Start, Num, #?MODULE{cfg = Cfg,
first_index = FirstIdx,
reader = Reader0,
last_index = LastIdx} = State)
when Start >= FirstIdx andalso Start =< LastIdx andalso Num > 0 ->
-spec fold(FromIdx :: ra_index(), ToIdx :: ra_index(),
fun((log_entry(), Acc) -> Acc), Acc, state()) ->
{Acc, state()} when Acc :: term().
fold(From0, To00, Fun, Acc0,
#?MODULE{cfg = Cfg,
cache = Cache,
first_index = FirstIdx,
last_index = LastIdx,
reader = Reader0} = State)
when To00 >= From0 ->
From = max(From0, FirstIdx),
To0 = min(To00, LastIdx),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1),
% 0. Check that the request isn't outside of first_index and last_index
% 1. Check the local cache for any unflushed entries, carry remainders
% 2. Check ra_log_open_mem_tables
% 3. Check ra_log_closed_mem_tables in turn
% 4. Check on disk segments in turn
case cache_take(Start, Num, State) of
{Entries, C0, undefined} ->
{Entries, C0, State};
{Entries0, C0, {S, F}} ->
{Entries, C1, Reader} = ra_log_reader:read(S, F, Reader0, Entries0),
{Entries, C0 + C1, State#?MODULE{reader = Reader}}
take(_, _, State) ->
{[], 0, State}.

CacheEntries = get_cache_items(From, To0, Cache, []),
To = case CacheEntries of
[] ->
[{Idx, _, _} | _] ->
NumRead = To0 - Idx + 1,
ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead),
Idx - 1
{Reader, Acc1} = ra_log_reader:fold(From, To, Fun, Acc0, Reader0),
Acc = lists:foldl(Fun, Acc1, CacheEntries),
{Acc, State#?MODULE{reader = Reader}};
fold(_From, _To, _Fun, Acc, State) ->
{Acc, State}.

get_cache_items(From, To, _Cache, Acc)
when From > To ->
get_cache_items(From, To, Cache, Acc) ->
case Cache of
#{To := Entry} ->
get_cache_items(From, To - 1, Cache, [Entry | Acc]);
_ ->

%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
Expand Down Expand Up @@ -517,12 +534,12 @@ next_index(#?MODULE{last_index = LastIdx}) ->
LastIdx + 1.

-spec fetch(ra_index(), state()) ->
{'maybe'(log_entry()), state()}.
{maybe(log_entry()), state()}.
fetch(Idx, State0) ->
case take(Idx, 1, State0) of
{[], _, State} ->
case fold(Idx, Idx, fun(E, Acc) -> [E | Acc] end, [], State0) of
{[], State} ->
{undefined, State};
{[Entry], _, State} ->
{[Entry], State} ->
{Entry, State}

Expand Down Expand Up @@ -771,12 +788,12 @@ log_update_effects(Pids, ReplyPid, #?MODULE{first_index = Idx,

%% deletes all segments where the last index is lower than
%% the Idx argument
delete_segments(Idx, #?MODULE{cfg = #cfg{log_id = LogId,
segment_writer = SegWriter,
uid = UId},
readers = Readers,
reader = Reader0} = State0) ->
case ra_log_reader:update_first_index(Idx, Reader0) of
delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
segment_writer = SegWriter,
uid = UId},
readers = Readers,
reader = Reader0} = State0) ->
case ra_log_reader:update_first_index(SnapIdx + 1, Reader0) of
{Reader, []} ->
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, undefined, State)};
Expand All @@ -789,7 +806,7 @@ delete_segments(Idx, #?MODULE{cfg = #cfg{log_id = LogId,
Active = ra_log_reader:segment_refs(Reader),
?DEBUG("~s: ~b obsolete segments at ~b - remaining: ~b, pivot ~w",
[LogId, length(Obsolete), Idx, length(Active), Pivot]),
[LogId, length(Obsolete), SnapIdx, length(Active), Pivot]),
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, Pid, State)}
Expand Down Expand Up @@ -878,37 +895,6 @@ cache_without(Idx, Idx, Cache) ->
cache_without(FromIdx, ToIdx, Cache) ->
cache_without(FromIdx + 1, ToIdx, maps:remove(FromIdx, Cache)).

cache_take(Start, Num, #?MODULE{cfg = Cfg,
cache = Cache,
last_index = LastIdx}) ->
Highest = min(LastIdx, Start + Num - 1),
% cache needs to be queried in reverse to ensure
% we can bail out when an item is not found
case cache_take0(Highest, Start, Cache, []) of
[] ->
{[], 0, {Start, Highest}};
[Last | _] = Entries when element(1, Last) =:= Start ->
NumRead = Highest - Start + 1,
ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead),
% there is no remainder - everything was in the cache
{Entries, NumRead, undefined};
[Last | _] = Entries ->
LastEntryIdx = element(1, Last),
NumRead = Highest - LastEntryIdx + 1,
ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead),
{Entries, NumRead, {Start, LastEntryIdx - 1}}

cache_take0(Next, Last, _Cache, Acc)
when Next < Last ->
cache_take0(Next, Last, Cache, Acc) ->
case Cache of
#{Next := Entry} ->
cache_take0(Next-1, Last, Cache, [Entry | Acc]);
_ ->

cache_read_sparse(Indexes, Cache, Acc) ->
cache_read_sparse(Indexes, Cache, 0, Acc).
Expand Down Expand Up @@ -1090,18 +1076,6 @@ maps_with_values(Keys, Map) ->


cache_take0_test() ->
Cache = #{1 => {1, 9, a}, 2 => {2, 9, b}, 3 => {3, 9, c}},
State = #?MODULE{cache = Cache, last_index = 3, first_index = 1},
% no remainder
{[{2, 9, b}], 1, undefined} = cache_take(2, 1, State),
{[{2, 9, b}, {3, 9, c}], 2, undefined} = cache_take(2, 2, State),
{[{1, 9, a}, {2, 9, b}, {3, 9, c}], 3, undefined} = cache_take(1, 3, State),
% small remainder
{[{3, 9, c}], 1, {1, 2}} = cache_take(1, 3, State#?MODULE{cache = #{3 => {3, 9, c}}}),
{[], 0, {1, 3}} = cache_take(1, 3, State#?MODULE{cache = #{4 => {4, 9, d}}}),

pick_range_test() ->
Ranges1 = [{76, 90}, {50, 75}, {1, 100}],
{1, 90} = pick_range(Ranges1, undefined),
Expand Down

0 comments on commit 211331e

Please sign in to comment.