diff --git a/Makefile b/Makefile index 0235fbd1..39e009d1 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ dep_aten = hex 0.5.8 dep_seshat = hex 0.3.2 DEPS = aten gen_batch_server seshat -TEST_DEPS = proper meck eunit_formatters looking_glass inet_tcp_proxy +TEST_DEPS = proper meck eunit_formatters inet_tcp_proxy BUILD_DEPS = elvis_mk @@ -24,7 +24,7 @@ dep_inet_tcp_proxy = git https://github.com/rabbitmq/inet_tcp_proxy.git DEP_PLUGINS = elvis_mk -PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck looking_glass gen_batch_server inet_tcp_proxy +PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten mnesia ssh ssl meck gen_batch_server inet_tcp_proxy EDOC_OUTPUT = docs EDOC_OPTS = {pretty_printer, erl_pp}, {sort_functions, false} diff --git a/src/ra_log.erl b/src/ra_log.erl index 9d6083f3..a888b446 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -16,7 +16,7 @@ write/2, append_sync/2, write_sync/2, - take/3, + fold/5, sparse_read/2, last_index_term/1, set_last_index/2, @@ -284,29 +284,46 @@ write([{Idx, _, _} | _], #?MODULE{cfg = #cfg{uid = UId}, [UId, Idx, LastIdx+1])), {error, {integrity_error, Msg}}. --spec take(ra_index(), non_neg_integer(), state()) -> - {[log_entry()], NumRead :: non_neg_integer(), state()}. -take(Start, Num, #?MODULE{cfg = Cfg, - first_index = FirstIdx, - reader = Reader0, - last_index = LastIdx} = State) - when Start >= FirstIdx andalso Start =< LastIdx andalso Num > 0 -> +-spec fold(FromIdx :: ra_index(), ToIdx :: ra_index(), + fun((log_entry(), Acc) -> Acc), Acc, state()) -> + {Acc, state()} when Acc :: term(). +fold(From0, To00, Fun, Acc0, + #?MODULE{cfg = Cfg, + cache = Cache, + first_index = FirstIdx, + last_index = LastIdx, + reader = Reader0} = State) + when To00 >= From0 -> + From = max(From0, FirstIdx), + To0 = min(To00, LastIdx), ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1), - % 0. Check that the request isn't outside of first_index and last_index - % 1. Check the local cache for any unflushed entries, carry remainders - % 2. Check ra_log_open_mem_tables - % 3. Check ra_log_closed_mem_tables in turn - % 4. Check on disk segments in turn - case cache_take(Start, Num, State) of - {Entries, C0, undefined} -> - {Entries, C0, State}; - {Entries0, C0, {S, F}} -> - {Entries, C1, Reader} = ra_log_reader:read(S, F, Reader0, Entries0), - {Entries, C0 + C1, State#?MODULE{reader = Reader}} - end; -take(_, _, State) -> - {[], 0, State}. - + CacheEntries = get_cache_items(From, To0, Cache, []), + To = case CacheEntries of + [] -> + To0; + [{Idx, _, _} | _] -> + NumRead = To0 - Idx + 1, + ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead), + Idx - 1 + end, + {Reader, Acc1} = ra_log_reader:fold(From, To, Fun, Acc0, Reader0), + Acc = lists:foldl(Fun, Acc1, CacheEntries), + {Acc, State#?MODULE{reader = Reader}}; +fold(_From, _To, _Fun, Acc, State) -> + {Acc, State}. + + + +get_cache_items(From, To, _Cache, Acc) + when From > To -> + Acc; +get_cache_items(From, To, Cache, Acc) -> + case Cache of + #{To := Entry} -> + get_cache_items(From, To - 1, Cache, [Entry | Acc]); + _ -> + Acc + end. %% read a list of indexes, %% found indexes be returned in the same order as the input list of indexes @@ -517,12 +534,12 @@ next_index(#?MODULE{last_index = LastIdx}) -> LastIdx + 1. -spec fetch(ra_index(), state()) -> - {'maybe'(log_entry()), state()}. + {maybe(log_entry()), state()}. fetch(Idx, State0) -> - case take(Idx, 1, State0) of - {[], _, State} -> + case fold(Idx, Idx, fun(E, Acc) -> [E | Acc] end, [], State0) of + {[], State} -> {undefined, State}; - {[Entry], _, State} -> + {[Entry], State} -> {Entry, State} end. @@ -771,12 +788,12 @@ log_update_effects(Pids, ReplyPid, #?MODULE{first_index = Idx, %% deletes all segments where the last index is lower than %% the Idx argument -delete_segments(Idx, #?MODULE{cfg = #cfg{log_id = LogId, - segment_writer = SegWriter, - uid = UId}, - readers = Readers, - reader = Reader0} = State0) -> - case ra_log_reader:update_first_index(Idx, Reader0) of +delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId, + segment_writer = SegWriter, + uid = UId}, + readers = Readers, + reader = Reader0} = State0) -> + case ra_log_reader:update_first_index(SnapIdx + 1, Reader0) of {Reader, []} -> State = State0#?MODULE{reader = Reader}, {State, log_update_effects(Readers, undefined, State)}; @@ -789,7 +806,7 @@ delete_segments(Idx, #?MODULE{cfg = #cfg{log_id = LogId, end), Active = ra_log_reader:segment_refs(Reader), ?DEBUG("~s: ~b obsolete segments at ~b - remaining: ~b, pivot ~w", - [LogId, length(Obsolete), Idx, length(Active), Pivot]), + [LogId, length(Obsolete), SnapIdx, length(Active), Pivot]), State = State0#?MODULE{reader = Reader}, {State, log_update_effects(Readers, Pid, State)} end. @@ -878,37 +895,6 @@ cache_without(Idx, Idx, Cache) -> cache_without(FromIdx, ToIdx, Cache) -> cache_without(FromIdx + 1, ToIdx, maps:remove(FromIdx, Cache)). -cache_take(Start, Num, #?MODULE{cfg = Cfg, - cache = Cache, - last_index = LastIdx}) -> - Highest = min(LastIdx, Start + Num - 1), - % cache needs to be queried in reverse to ensure - % we can bail out when an item is not found - case cache_take0(Highest, Start, Cache, []) of - [] -> - {[], 0, {Start, Highest}}; - [Last | _] = Entries when element(1, Last) =:= Start -> - NumRead = Highest - Start + 1, - ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead), - % there is no remainder - everything was in the cache - {Entries, NumRead, undefined}; - [Last | _] = Entries -> - LastEntryIdx = element(1, Last), - NumRead = Highest - LastEntryIdx + 1, - ok = incr_counter(Cfg, ?C_RA_LOG_READ_CACHE, NumRead), - {Entries, NumRead, {Start, LastEntryIdx - 1}} - end. - -cache_take0(Next, Last, _Cache, Acc) - when Next < Last -> - Acc; -cache_take0(Next, Last, Cache, Acc) -> - case Cache of - #{Next := Entry} -> - cache_take0(Next-1, Last, Cache, [Entry | Acc]); - _ -> - Acc - end. cache_read_sparse(Indexes, Cache, Acc) -> cache_read_sparse(Indexes, Cache, 0, Acc). @@ -1090,18 +1076,6 @@ maps_with_values(Keys, Map) -> -ifdef(TEST). -cache_take0_test() -> - Cache = #{1 => {1, 9, a}, 2 => {2, 9, b}, 3 => {3, 9, c}}, - State = #?MODULE{cache = Cache, last_index = 3, first_index = 1}, - % no remainder - {[{2, 9, b}], 1, undefined} = cache_take(2, 1, State), - {[{2, 9, b}, {3, 9, c}], 2, undefined} = cache_take(2, 2, State), - {[{1, 9, a}, {2, 9, b}, {3, 9, c}], 3, undefined} = cache_take(1, 3, State), - % small remainder - {[{3, 9, c}], 1, {1, 2}} = cache_take(1, 3, State#?MODULE{cache = #{3 => {3, 9, c}}}), - {[], 0, {1, 3}} = cache_take(1, 3, State#?MODULE{cache = #{4 => {4, 9, d}}}), - ok. - pick_range_test() -> Ranges1 = [{76, 90}, {50, 75}, {1, 100}], {1, 90} = pick_range(Ranges1, undefined), diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index 626d102c..e7d05a9a 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -17,13 +17,13 @@ segment_refs/1, num_open_segments/1, update_first_index/2, - read/3, - read/4, + fold/5, sparse_read/3, fetch_term/2, delete_closed_mem_table_object/2, closed_mem_tables/1, - open_mem_table_lookup/1 + open_mem_table_lookup/1, + range_overlap/4 ]). -include("ra.hrl"). @@ -120,9 +120,10 @@ handle_log_update({ra_log_update, From, FstIdx, SegRefs}, -spec update_first_index(ra_index(), state()) -> {state(), [segment_ref()]}. -update_first_index(Idx, #?STATE{segment_refs = SegRefs0, - open_segments = OpenSegs0} = State) -> - case lists:partition(fun({_, To, _}) when To >= Idx -> true; +update_first_index(FstIdx, #?STATE{segment_refs = SegRefs0, + open_segments = OpenSegs0} = State) -> + case lists:partition(fun({_, To, _}) + when To >= FstIdx -> true; (_) -> false end, SegRefs0) of {_, []} -> @@ -137,7 +138,7 @@ update_first_index(Idx, #?STATE{segment_refs = SegRefs0, end end, OpenSegs0, ObsoleteKeys), {State#?STATE{open_segments = OpenSegs, - first_index = Idx + 1, + first_index = FstIdx, segment_refs = Active}, Obsolete} end. @@ -150,28 +151,43 @@ segment_refs(#?STATE{segment_refs = SegmentRefs}) -> num_open_segments(#?STATE{open_segments = Open}) -> ra_flru:size(Open). --spec read(ra_index(), ra_index(), state()) -> - {[log_entry()], NumRead :: non_neg_integer(), state()}. -read(From, To, State) -> - read(From, To, State, []). - --spec read(ra_index(), ra_index(), state(), [log_entry()]) -> - {[log_entry()], NumRead :: non_neg_integer(), state()}. -read(From, To, State, Entries) when From =< To -> - retry_read(2, From, To, Entries, State); -read(_From, _To, State, Entries) -> - {Entries, 0, State}. +mem_tbl_fold(_Tid, From, To, _Fun, Acc) + when From > To -> + Acc; +mem_tbl_fold(Tid, From, To, Fun, Acc0) -> + [Entry] = ets:lookup(Tid, From), + Acc = Fun(Entry, Acc0), + mem_tbl_fold(Tid, From+1, To, Fun, Acc). + + +-spec fold(ra_index(), ra_index(), fun(), term(), state()) -> + {state(), term()}. +fold(FromIdx, ToIdx, Fun, Acc, + #?STATE{cfg = #cfg{} = Cfg} = State) + when ToIdx >= FromIdx -> + Plan = read_plan(Cfg, FromIdx, ToIdx), + lists:foldl( + fun ({ets, Tid, CIx, From, To}, {S, Ac}) -> + ok = incr_counter(Cfg, CIx, To - From + 1), + {S, mem_tbl_fold(Tid, From, To, Fun, Ac)}; + ({segments, From, To}, {S, Ac}) -> + ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, To - From + 1), + segment_fold(S, From, To, Fun, Ac) + end, {State, Acc}, Plan); +fold(_FromIdx, _ToIdx, _Fun, Acc, + #?STATE{} = State) -> + {State, Acc}. -spec sparse_read(state(), [ra_index()], [log_entry()]) -> {[log_entry()], state()}. sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes0, Entries0) -> - case open_mem_tbl_sparse_read(Cfg, Indexes0, Entries0) of + try open_mem_tbl_sparse_read(Cfg, Indexes0, Entries0) of {Entries1, OpenC, []} -> ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPEN_MEM_TBL, OpenC), {Entries1, State}; {Entries1, OpenC, Rem1} -> ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPEN_MEM_TBL, OpenC), - case closed_mem_tbl_sparse_read(Cfg, Rem1, Entries1) of + try closed_mem_tbl_sparse_read(Cfg, Rem1, Entries1) of {Entries2, ClosedC, []} -> ok = incr_counter(Cfg, ?C_RA_LOG_READ_CLOSED_MEM_TBL, ClosedC), {Entries2, State}; @@ -180,44 +196,16 @@ sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes0, Entries0) -> {Open, _, SegC, Entries} = (catch segment_sparse_read(State, Rem2, Entries2)), ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC), {Entries, State#?MODULE{open_segments = Open}} + catch _:_ -> + sparse_read(State, Indexes0, Entries0) end + catch _:_ -> + %% table was most likely concurrently deleted + %% try again + %% TODO: avoid infinite loop + sparse_read(State, Indexes0, Entries0) end. -retry_read(0, From, To, _Entries0, State) -> - exit({ra_log_reader_reader_retry_exhausted, From, To, State}); -retry_read(N, From, To, Entries0, - #?STATE{cfg = #cfg{uid = UId, - open_mem_tbls = OpenTbl, - closed_mem_tbls = ClosedTbl} = Cfg} = State) -> - % 2. Check open mem table - % 3. Check closed mem tables in turn - % 4. Check on disk segments in turn - case open_mem_tbl_take(OpenTbl, UId, {From, To}, Entries0) of - {Entries1, {_, C} = Counter0, undefined} -> - ok = incr_counter(Cfg, Counter0), - {Entries1, C, State}; - {Entries1, {_, C0} = Counter0, Rem1} -> - ok = incr_counter(Cfg, Counter0), - case catch closed_mem_tbl_take(ClosedTbl, UId, Rem1, Entries1) of - {Entries2, {_, C1} = Counter1, undefined} -> - ok = incr_counter(Cfg, Counter1), - {Entries2, C0 + C1, State}; - {Entries2, {_, C1} = Counter1, {S, E} = Rem2} -> - ok = incr_counter(Cfg, Counter1), - case catch segment_take(State, Rem2, Entries2) of - {Open, undefined, Entries} -> - C = (E - S + 1) + C0 + C1, - incr_counter(Cfg, {?C_RA_LOG_READ_SEGMENT, E - S + 1}), - {Entries, C, State#?MODULE{open_segments = Open}} - end; - {ets_miss, _Index} -> - %% this would happen if a mem table was deleted after - %% an external reader had read the range - retry_read(N-1, From, To, Entries0, State) - end - end. - - -spec fetch_term(ra_index(), state()) -> {ra_index(), state()}. fetch_term(Idx, #?STATE{cfg = #cfg{uid = UId, open_mem_tbls = OpenTbl, @@ -280,7 +268,58 @@ segment_term_query0(Idx, [_ | Tail], Open, Cfg) -> segment_term_query0(_Idx, [], Open, _) -> {undefined, Open}. -open_mem_tbl_sparse_read(#cfg{uid = UId, open_mem_tbls = OpenTbl}, +range_overlap(F, L, S, E) + when E >= F andalso + L >= S andalso + F =< L -> + X = max(F, S), + {X, min(L, E), F, X - 1}; +range_overlap(F, L, _, _) -> + {undefined, F, L}. + +read_plan(#cfg{uid = UId, + open_mem_tbls = OpenTbl, + closed_mem_tbls = ClosedTbl}, + FromIdx, ToIdx) -> + Acc0 = case ets:lookup(OpenTbl, UId) of + [{_, TStart, TEnd, Tid}] -> + case range_overlap(FromIdx, ToIdx, TStart, TEnd) of + {undefined, _, _} -> + {FromIdx, ToIdx, []}; + {S, E, F, T} -> + {F, T, + [{ets, Tid, ?C_RA_LOG_READ_OPEN_MEM_TBL, S, E}]} + end; + _ -> + {FromIdx, ToIdx, []} + end, + + {RemF, RemL, Plan} = + case closed_mem_tables(ClosedTbl, UId) of + [] -> + Acc0; + Tables -> + lists:foldl( + fun({_, _, S, E, Tid}, {F, T, Plan} = Acc) -> + case range_overlap(F, T, S, E) of + {undefined, _, _} -> + Acc; + {S1, E1, F1, T1} -> + {F1, T1, + [{ets, Tid, ?C_RA_LOG_READ_CLOSED_MEM_TBL, S1, E1} + | Plan]} + end + end, Acc0, Tables) + end, + case RemF =< RemL of + true -> + [{segments, RemF, RemL} | Plan]; + false -> + Plan + end. + +open_mem_tbl_sparse_read(#cfg{uid = UId, + open_mem_tbls = OpenTbl}, Indexes, Acc0) -> case ets:lookup(OpenTbl, UId) of [{_, TStart, TEnd, Tid}] -> @@ -307,102 +346,35 @@ mem_tbl_sparse_read([I | Rem], TblStart, TblEnd, Tid, C, Entries0) mem_tbl_sparse_read(Rem, _TblStart, _TblEnd, _Tid, C, Entries0) -> {Entries0, C, Rem}. -open_mem_tbl_take(OpenTbl, Id, {Start0, End}, Acc0) -> - case ets:lookup(OpenTbl, Id) of - [{_, TStart, TEnd, Tid}] -> - {Entries, Count, Rem} = mem_tbl_take({Start0, End}, TStart, TEnd, - Tid, 0, Acc0), - {Entries, {?C_RA_LOG_READ_OPEN_MEM_TBL, Count}, Rem}; - [] -> - {Acc0, {?C_RA_LOG_READ_OPEN_MEM_TBL, 0}, {Start0, End}} - end. - -closed_mem_tbl_take(ClosedTbl, Id, {Start0, End}, Acc0) -> - case closed_mem_tables(ClosedTbl, Id) of - [] -> - {Acc0, {?C_RA_LOG_READ_CLOSED_MEM_TBL, 0}, {Start0, End}}; - Tables -> - {Entries, Count, Rem} = - lists:foldl(fun({_, _, TblSt, TblEnd, Tid}, {Ac, Count, Range}) -> - mem_tbl_take(Range, TblSt, TblEnd, - Tid, Count, Ac) - end, {Acc0, 0, {Start0, End}}, Tables), - {Entries, {?C_RA_LOG_READ_CLOSED_MEM_TBL, Count}, Rem} - end. - -mem_tbl_take(undefined, _TblStart, _TblEnd, _Tid, Count, Acc0) -> - {Acc0, Count, undefined}; -mem_tbl_take({_Start0, End} = Range, TblStart, _TblEnd, _Tid, Count, Acc0) - when TblStart > End -> - % optimisation to bypass request that has no overlap - {Acc0, Count, Range}; -mem_tbl_take({Start0, End}, TblStart, TblEnd, Tid, Count, Acc0) - when TblEnd >= End -> - Start = max(TblStart, Start0), - Entries = lookup_range(Tid, Start, End, Acc0), - Remainder = case Start =:= Start0 of - true -> - % the range was fully covered by the mem table - undefined; - false -> - {Start0, Start-1} - end, - {Entries, Count + (End - Start + 1), Remainder}; -mem_tbl_take({Start0, End}, TblStart, TblEnd, Tid, Count, Acc0) - when TblEnd < End -> - %% defensive case - truncate the read to end at table end - mem_tbl_take({Start0, TblEnd}, TblStart, TblEnd, Tid, Count, Acc0). - -lookup_range(Tid, Start, Start, Acc) -> - try ets:lookup(Tid, Start) of - [Entry] -> - [Entry | Acc] - catch - error:badarg -> - throw({ets_miss, Start}) - end; -lookup_range(Tid, Start, End, Acc) when End > Start -> - try ets:lookup(Tid, End) of - [Entry] -> - lookup_range(Tid, Start, End-1, [Entry | Acc]) - catch - error:badarg -> - throw({ets_miss, Start}) - end. - -segment_take(#?STATE{segment_refs = [], - open_segments = Open}, - _Range, Entries0) -> - {Open, undefined, Entries0}; -segment_take(#?STATE{segment_refs = [{_From, SEnd, _Fn} | _] = SegRefs, +segrefs_to_read(From0, To0, _SegRefs, Acc) + when To0 < From0 -> + Acc; +segrefs_to_read(From0, To0, [{SStart, SEnd, FileName} | SegRefs], Acc) + when SStart =< To0 andalso + SEnd >= From0 -> + From = max(From0, SStart), + To = min(To0, SEnd), + Spec = {From, To, FileName}, + segrefs_to_read(From0, SStart - 1, SegRefs, [Spec | Acc]); +segrefs_to_read(From0, To0, [_ | SegRefs], Acc) -> + segrefs_to_read(From0, To0, SegRefs, Acc). + +segment_fold(#?STATE{segment_refs = SegRefs, open_segments = OpenSegs, - cfg = Cfg}, - {RStart, REnd}, Entries0) -> - Range = {RStart, min(SEnd, REnd)}, - lists:foldl( - fun(_, {_, undefined, _} = Acc) -> - %% we're done reading - throw(Acc); - ({From, _, _}, {_, {_, End}, _} = Acc) - when From > End -> - Acc; - ({From, To, Fn}, {Open0, {Start0, End}, E0}) - when To >= End -> - {Seg, Open} = get_segment(Cfg, Open0, Fn), - % actual start point cannot be prior to first segment - % index - Start = max(Start0, From), - Num = End - Start + 1, - Entries = ra_log_segment:read_cons(Seg, Start, Num, - fun binary_to_term/1, - E0), - Rem = case Start of - Start0 -> undefined; - _ -> - {Start0, Start-1} - end, - {Open, Rem, Entries} - end, {OpenSegs, Range, Entries0}, SegRefs). + cfg = Cfg} = State, + RStart, REnd, Fun, Acc) -> + SegRefsToReadFrom = segrefs_to_read(RStart, REnd, SegRefs, []), + {Op, A} = + lists:foldl( + fun ({From, To, Fn}, {Open0, Ac0}) -> + {Seg, Open} = get_segment(Cfg, Open0, Fn), + {Open, ra_log_segment:fold(Seg, From, To, + fun binary_to_term/1, + Fun, + Ac0)} + end, {OpenSegs, Acc}, SegRefsToReadFrom), + {State#?MODULE{open_segments = Op}, A}. + segment_sparse_read(#?STATE{open_segments = Open}, [], Entries0) -> {Open, [], 0, Entries0}; @@ -420,7 +392,7 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs, sparse_read_split(fun (I) -> I >= From andalso I =< To end, Idxs, []), - {_Cache, ReadSparseCount, Entries} = + {ReadSparseCount, Entries} = ra_log_segment:read_sparse(Seg, ReadIdxs, fun binary_to_term/1, []), {Open, RemIdxs, C + ReadSparseCount, @@ -514,54 +486,73 @@ incr_counter(#cfg{counter = undefined}, _) -> ok. -ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). -open_mem_tbl_take_test() -> - OTbl = ra_log_open_mem_tables, - _ = ets:new(OTbl, [named_table]), - Tid = ets:new(test_id, []), - true = ets:insert(OTbl, {test_id, 3, 7, Tid}), - Entries = [{3, 2, "3"}, {4, 2, "4"}, - {5, 2, "5"}, {6, 2, "6"}, - {7, 2, "7"}], - % seed the mem table - [ets:insert(Tid, E) || E <- Entries], - - {Entries, _, undefined} = open_mem_tbl_take(OTbl, test_id, {3, 7}, []), - EntriesPlus8 = Entries ++ [{8, 2, "8"}], - {EntriesPlus8, _, {1, 2}} = open_mem_tbl_take(OTbl, test_id, {1, 7}, - [{8, 2, "8"}]), - {[{6, 2, "6"}], _, undefined} = open_mem_tbl_take(OTbl, test_id, {6, 6}, []), - {[], _, {1, 2}} = open_mem_tbl_take(OTbl, test_id, {1, 2}, []), - - ets:delete(Tid), - ets:delete(OTbl), +compact_seg_refs_test() -> + % {From, To, File} + Refs = [{10, 100, "2"}, {10, 75, "2"}, {10, 50, "2"}, {1, 9, "1"}], + [{10, 100, "2"}, {1, 9, "1"}] = compact_seg_refs(Refs), + ok. +range_overlap_test() -> + {undefined, 1, 10} = range_overlap(1, 10, 20, 30), + {undefined, 21, 30} = range_overlap(21, 30, 10, 20), + {20, 20, 20, 19} = range_overlap(20, 30, 10, 20), + ?assertEqual({79, 99, 79, 78}, range_overlap(79, 99, 75, 111)), + ?assertEqual({undefined, 79, 78}, range_overlap(79, 78, 50, 176)), + ?assertEqual({undefined, 79, 78}, range_overlap(79, 78, 25, 49)), + % {10, 10} = range_overlap(1, 10, 10, 30), + % {5, 10} = range_overlap(1, 10, 5, 30), + % {7, 10} = range_overlap(7, 10, 5, 30), ok. -closed_mem_tbl_take_test() -> +read_plan_test() -> + UId = <<"this_uid">>, + OTbl = ra_log_open_mem_tables, + OpnTbl = ets:new(OTbl, []), CTbl = ra_log_closed_mem_tables, - _ = ets:new(CTbl, [named_table, bag]), - Tid1 = ets:new(test_id, []), - Tid2 = ets:new(test_id, []), + ClsdTbl = ets:new(CTbl, [bag]), M1 = erlang:unique_integer([monotonic, positive]), M2 = erlang:unique_integer([monotonic, positive]), - true = ets:insert(CTbl, {test_id, M1, 5, 7, Tid1}), - true = ets:insert(CTbl, {test_id, M2, 8, 10, Tid2}), - Entries1 = [{5, 2, "5"}, {6, 2, "6"}, {7, 2, "7"}], - Entries2 = [{8, 2, "8"}, {9, 2, "9"}, {10, 2, "10"}], - % seed the mem tables - [ets:insert(Tid1, E) || E <- Entries1], - [ets:insert(Tid2, E) || E <- Entries2], - - {Entries1, _, undefined} = closed_mem_tbl_take(CTbl, test_id, {5, 7}, []), - {Entries2, _, undefined} = closed_mem_tbl_take(CTbl, test_id, {8, 10}, []), - {[{9, 2, "9"}], _, undefined} = closed_mem_tbl_take(CTbl, test_id, {9, 9}, []), + + true = ets:insert(OpnTbl, {UId, 75, 111, OTbl}), + true = ets:insert(ClsdTbl, {UId, M2, 50, 176, CTbl}), + true = ets:insert(ClsdTbl, {UId, M1, 25, 49, CTbl}), + %% segments 0 - 24 + Cfg = #cfg{uid = UId, + open_mem_tbls = OpnTbl, + closed_mem_tbls = ClsdTbl}, + ?debugFmt("Read Plan: ~p~n", [read_plan(Cfg, 0, 100)]), + ?assertMatch([{segments, 0, 24}, + {ets, _, _, 25, 49}, + {ets, _, _, 50, 74}, + {ets, _, _, 75, 100}], + read_plan(Cfg, 0, 100)), + + ?debugFmt("Read Plan: ~p~n", [read_plan(Cfg, 10, 55)]), + ?assertMatch([{segments, 10, 24}, + {ets, _, _, 25, 49}, + {ets, _, _, 50, 55}], + read_plan(Cfg, 10, 55)), + ?assertMatch([ + {ets, _, _, 79, 99} + ], + read_plan(Cfg, 79, 99)), ok. -compact_seg_refs_test() -> - % {From, To, File} - Refs = [{10, 100, "2"}, {10, 75, "2"}, {10, 50, "2"}, {1, 9, "1"}], - [{10, 100, "2"}, {1, 9, "1"}] = compact_seg_refs(Refs), +segrefs_to_read_test() -> + SegRefs = [{412,499,"00000005.segment"}, + {284,411,"00000004.segment"}, + {284,310,"00000004b.segment"}, + {200,285,"00000003.segment"}, + {128,255,"00000002.segment"}, + {0,127,"00000001.segment"}], + + ?assertEqual([{199,199,"00000002.segment"}, + {200,283,"00000003.segment"}, + {284,411,"00000004.segment"}, + {412,499,"00000005.segment"}], + segrefs_to_read(199, 499, SegRefs, [])), ok. -endif. diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index e5543164..1a4482e7 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -11,8 +11,7 @@ open/2, append/4, sync/1, - read/3, - read_cons/5, + fold/6, read_sparse/4, term_query/2, close/1, @@ -62,7 +61,7 @@ data_start :: pos_integer(), data_offset :: pos_integer(), data_write_offset :: pos_integer(), - index = undefined :: 'maybe'(ra_segment_index()), + index = undefined :: maybe(ra_segment_index()), range :: 'maybe'({ra_index(), ra_index()}), pending_data = [] :: iodata(), pending_index = [] :: iodata(), @@ -257,30 +256,30 @@ flush(#state{cfg = #cfg{fd = Fd}, Err end. --spec read(state(), Idx :: ra_index(), Num :: non_neg_integer()) -> - [{ra_index(), ra_term(), binary()}]. -read(State, Idx, Num) -> - read_cons(State, Idx, Num, fun ra_lib:id/1, []). - - --spec read_cons(state(), ra_index(), Num :: non_neg_integer(), - fun((binary()) -> term()), Acc) -> - Acc when Acc :: [{ra_index(), ra_term(), binary()}]. -read_cons(#state{cfg = #cfg{mode = read} = Cfg, - cache = Cache, - index = Index}, Idx, Num, Fun, Acc) -> - pread_cons(Cfg, Cache, Idx, Idx + Num - 1, Index, Fun, Acc). - +-spec fold(state(), + FromIdx :: ra_index(), + ToIdx :: ra_index(), + fun((binary()) -> term()), + fun(({ra_index(), ra_term(), term()}, Acc) -> Acc), Acc) -> + Acc when Acc :: term(). +fold(#state{cfg = #cfg{mode = read} = Cfg, + cache = Cache, + index = Index}, + FromIdx, ToIdx, Fun, AccFun, Acc) -> + fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc). + +-spec read_sparse(state(), [ra_index()], + fun((binary()) -> term()), term()) -> + {non_neg_integer(), term()}. read_sparse(#state{index = Index, - cfg = Cfg, - cache = _Cache0}, Indexes, Fun, Acc) -> + cfg = Cfg}, Indexes, Fun, Acc) -> Cache0 = prepare_cache(Cfg, Indexes, Index), - Entries = read_sparse0(Cfg, Indexes, Index, Cache0, Fun, Acc), - {undefined, length(Entries), Entries}. + read_sparse0(Cfg, Indexes, Index, Cache0, Fun, Acc, 0). -read_sparse0(_Cfg, [], _Index, _Cache, _Fun, Acc) -> - Acc; -read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc) -> +read_sparse0(_Cfg, [], _Index, _Cache, _Fun, Acc, Num) -> + {Num, Acc}; +read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num) + when is_map_key(NextIdx, Index) -> {Term, Offset, Length, _} = map_get(NextIdx, Index), case cache_read(Cache0, Offset, Length) of false -> @@ -288,14 +287,16 @@ read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc) -> undefined -> {ok, Data, _} = pread(Cfg, undefined, Offset, Length), read_sparse0(Cfg, Rem, Index, undefined, Fun, - [{NextIdx, Term, Fun(Data)} | Acc]); + [{NextIdx, Term, Fun(Data)} | Acc], Num+1); Cache -> - read_sparse0(Cfg, Indexes, Index, Cache, Fun, Acc) + read_sparse0(Cfg, Indexes, Index, Cache, Fun, Acc, Num+1) end; Data -> read_sparse0(Cfg, Rem, Index, Cache0, Fun, - [{NextIdx, Term, Fun(Data)} | Acc]) - end. + [{NextIdx, Term, Fun(Data)} | Acc], Num+1) + end; +read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Fun, _Acc, _Num) -> + exit({missing_key, NextIdx}). cache_read({CPos, CLen, Bin}, Pos, Length) when Pos >= CPos andalso @@ -313,18 +314,20 @@ prepare_cache(#cfg{fd = Fd} = _Cfg, [FirstIdx | Rem], SegIndex) -> %% no run, no cache; undefined; {FirstIdx, LastIdx} -> - {_, FstPos, FstLength, _} = map_get(FirstIdx, SegIndex), - {_, LastPos, LastLength, _} = map_get(LastIdx, SegIndex), - % MaxCacheLen = LastPos + LastLength - FstPos, + {_, FstPos, FstLength, _} = map_get_(FirstIdx, SegIndex), + {_, LastPos, LastLength, _} = map_get_(LastIdx, SegIndex), % %% read at least the remainder of the block from % %% the first position or the length of the first record - % MinCacheLen = max(FstLength, ?BLOCK_SIZE - (FstPos rem ?BLOCK_SIZE)), - % CacheLen = max(MinCacheLen, min(MaxCacheLen, ?READ_AHEAD_B)), CacheLen = cache_length(FstPos, FstLength, LastPos, LastLength), {ok, CacheData} = ra_file_handle:pread(Fd, FstPos, CacheLen), {FstPos, byte_size(CacheData), CacheData} end. +map_get_(Key, Map) when is_map_key(Key, Map) -> + map_get(Key, Map); +map_get_(Key, _Map) -> + exit({missing_key, Key}). + -spec term_query(state(), Idx :: ra_index()) -> 'maybe'(ra_term()). term_query(#state{index = Index}, Idx) -> case Index of @@ -333,11 +336,10 @@ term_query(#state{index = Index}, Idx) -> _ -> undefined end. -pread_cons(_Cfg, _Cache, Idx, FinalIdx, _, _Fun, Acc) +fold0(_Cfg, _Cache, Idx, FinalIdx, _, _Fun, _AccFun, Acc) when Idx > FinalIdx -> Acc; -pread_cons(Cfg, Cache0, Idx, - FinalIdx, Index, Fun, Acc) -> +fold0(Cfg, Cache0, Idx, FinalIdx, Index, Fun, AccFun, Acc0) -> case Index of #{Idx := {Term, Offset, Length, Crc} = IdxRec} -> case pread(Cfg, Cache0, Offset, Length) of @@ -345,8 +347,8 @@ pread_cons(Cfg, Cache0, Idx, %% performc crc check case validate_checksum(Crc, Data) of true -> - [{Idx, Term, Fun(Data)} | - pread_cons(Cfg, Cache, Idx+1, FinalIdx, Index, Fun, Acc)]; + Acc = AccFun({Idx, Term, Fun(Data)}, Acc0), + fold0(Cfg, Cache, Idx+1, FinalIdx, Index, Fun, AccFun, Acc); false -> %% CRC check failures are irrecoverable exit({ra_log_segment_crc_check_failure, Idx, IdxRec, @@ -358,10 +360,11 @@ pread_cons(Cfg, Cache0, Idx, Cfg#cfg.filename}) end; _ -> - pread_cons(Cfg, Cache0, Idx+1, FinalIdx, Index, Fun, Acc) + exit({ra_log_segment_unexpected_eof, Idx, + Cfg#cfg.filename}) end. --spec range(state()) -> 'maybe'({ra_index(), ra_index()}). +-spec range(state()) -> maybe({ra_index(), ra_index()}). range(#state{range = Range}) -> Range. @@ -446,9 +449,10 @@ dump_index(File) -> dump(File) -> {ok, S0} = open(File, #{mode => read}), {Idx, Last} = range(S0), - L = read_cons(S0, Idx, Last - Idx + 1, fun erlang:binary_to_term/1, []), + L = fold(S0, Idx, Last - Idx + 1, fun erlang:binary_to_term/1, + fun (E, A) -> [E | A] end, []), close(S0), - L. + lists:reverse(L). dump_index_data(< @@ -294,11 +294,12 @@ init(#{id := Id, machine_version := MacVersion}, MacSt} -> Clu = make_cluster(Id, ClusterNodes), %% the snapshot is the last index before the first index - {Idx, Clu, MacVersion, MacSt, {Idx, Term}} + %% TODO: should this be Idx + 1? + {Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}} end, MacMod = ra_machine:which_module(Machine, MacVer), - CommitIndex = max(LastApplied, FirstIndex), + CommitIndex = max(LastApplied, SnapshotIdx), Cfg = #cfg{id = Id, uid = UId, log_id = LogId, @@ -325,7 +326,7 @@ init(#{id := Id, commit_index => CommitIndex, %% set this to the first index so that we can apply all entries %% up to the commit index during recovery - last_applied => FirstIndex, + last_applied => SnapshotIdx, persisted_last_applied => LastApplied, log => Log0, machine_state => MacState, @@ -1683,12 +1684,15 @@ make_append_entries_rpc(PeerId, PrevIdx, PrevTerm, Num, #{log := Log0, current_term := Term, cfg := #cfg{id = Id}, commit_index := CommitIndex} = State) -> - Next = PrevIdx + 1, - {Entries, NumRead, Log} = ra_log:take(Next, Num, Log0), - NextIndex = Next + NumRead, - {NextIndex, + {LastIndex, _} = ra_log:last_index_term(Log0), + From = PrevIdx + 1, + To = min(LastIndex, PrevIdx + Num), + {Entries, Log} = ra_log:fold(From, To, + fun (E, A) -> [E | A] end, + [], Log0), + {To + 1, {send_rpc, PeerId, - #append_entries_rpc{entries = Entries, + #append_entries_rpc{entries = lists:reverse(Entries), term = Term, leader_id = Id, prev_log_index = PrevIdx, @@ -1846,11 +1850,11 @@ log_fold(#{log := Log} = RaState, Fun, State) -> undefined -> 1 end, - case fold_log_from(Idx, Fun, {State, Log}) of + try fold_log_from(Idx, Fun, {State, Log}) of {ok, {State1, Log1}} -> - {ok, State1, RaState#{log => Log1}}; - {error, Reason, Log1} -> - {error, Reason, RaState#{log => Log1}} + {ok, State1, RaState#{log => Log1}} + catch _:Err -> + {error, Err, RaState} end. %% reads user commands at the specified index @@ -2091,10 +2095,6 @@ fetch_term(Idx, #{log := Log0} = State) -> {Term, State#{log => Log}} end. -fetch_entries(From, To, #{log := Log0} = State) -> - {Entries, _, Log} = ra_log:take(From, To - From + 1, Log0), - {Entries, State#{log => Log}}. - make_cluster(Self, Nodes) -> case lists:foldl(fun(N, Acc) -> Acc#{N => new_peer()} @@ -2127,32 +2127,29 @@ apply_to(ApplyTo, ApplyFun, Notifys0, Effects0, cfg := #cfg{machine_version = MacVer, effective_machine_module = MacMod, effective_machine_version = EffMacVer}, - machine_state := MacState0} = State0) + machine_state := MacState0, + log := Log0} = State0) when ApplyTo > LastApplied andalso MacVer >= EffMacVer -> From = LastApplied + 1, - To = min(From + ?MAX_FETCH_ENTRIES, ApplyTo), - case fetch_entries(From, To, State0) of - {[], State} -> - FinalEffs = lists:reverse(make_notify_effects(Notifys0, Effects0)), - {State, FinalEffs}; - %% assert first item read is from - {[{From, _, _} | _] = Entries, State1} -> - {_, AppliedTo, State, MacState, Effects, Notifys, LastTs} = - lists:foldl(ApplyFun, {MacMod, LastApplied, State1, MacState0, - Effects0, Notifys0, undefined}, - Entries), - CommitLatency = case LastTs of - undefined -> - 0; - _ when is_integer(LastTs) -> - erlang:system_time(millisecond) - LastTs - end, - %% due to machine versioning all entries may not have been applied - apply_to(ApplyTo, ApplyFun, Notifys, Effects, - State#{last_applied => AppliedTo, - commit_latency => CommitLatency, - machine_state => MacState}) - end; + {LastIdx, _} = ra_log:last_index_term(Log0), + To = min(LastIdx, ApplyTo), + FoldState = {MacMod, LastApplied, State0, MacState0, + Effects0, Notifys0, undefined}, + {{_, AppliedTo, State, MacState, Effects, Notifys, LastTs}, + Log} = ra_log:fold(From, To, ApplyFun, FoldState, Log0), + CommitLatency = case LastTs of + undefined -> + 0; + _ when is_integer(LastTs) -> + erlang:system_time(millisecond) - LastTs + end, + %% due to machine versioning all entries may not have been applied + %% + FinalEffs = lists:reverse(make_notify_effects(Notifys, Effects)), + {State#{last_applied => AppliedTo, + log => Log, + commit_latency => CommitLatency, + machine_state => MacState}, FinalEffs}; apply_to(_ApplyTo, _, Notifys, Effects, State) when is_list(Effects) -> FinalEffs = lists:reverse(make_notify_effects(Notifys, Effects)), @@ -2481,17 +2478,10 @@ log_unhandled_msg(RaState, Msg, #{cfg := #cfg{log_id = LogId}}) -> ?DEBUG("~s: ~w received unhandled msg: ~W", [LogId, RaState, Msg, 6]). fold_log_from(From, Folder, {St, Log0}) -> - case ra_log:take(From, ?FOLD_LOG_BATCH_SIZE, Log0) of - {[], _, Log} -> - {ok, {St, Log}}; - {Entries, _, Log} -> - try - St1 = lists:foldl(Folder, St, Entries), - fold_log_from(From + ?FOLD_LOG_BATCH_SIZE, Folder, {St1, Log}) - catch - _:Reason -> - {error, Reason, Log} - end + {To, _} = ra_log:last_index_term(Log0), + case ra_log:fold(From, To, Folder, St, Log0) of + {St1, Log} -> + {ok, {St1, Log}} end. drop_existing({Log0, []}) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index a95ddb8e..90825689 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1494,15 +1494,11 @@ send(To, Msg, Conf) -> Res end. - fold_log(From, Fun, Term, State) -> case ra_server:log_fold(State#state.server_state, Fun, Term) of {ok, Result, ServerState} -> {keep_state, State#state{server_state = ServerState}, - [{reply, From, {ok, Result}}]}; - {error, Reason, ServerState} -> - {keep_state, State#state{server_state = ServerState}, - [{reply, From, {error, Reason}}]} + [{reply, From, {ok, Result}}]} end. send_snapshots(Me, Id, Term, {_, ToNode} = To, ChunkSize, diff --git a/test/erlang_node_helpers.erl b/test/erlang_node_helpers.erl index ba841175..c60cb116 100644 --- a/test/erlang_node_helpers.erl +++ b/test/erlang_node_helpers.erl @@ -25,9 +25,10 @@ start_erlang_node(Node, Config) -> code:where_is_file(DistModS ++ ".beam"))), DistArg = re:replace(DistModS, "_dist$", "", [{return, list}]), - "-pa \"" ++ DistModPath ++ "\" -proto_dist " ++ DistArg + "-pa \"" ++ DistModPath ++ "\" -proto_dist " ++ DistArg ++ + " -kernel prevent_overlapping_partitions false" end, - {ok, _} = ct_slave:start(Node, [{erl_flags, StartArgs}]), + _ = ct_slave:start(Node, [{erl_flags, StartArgs}]), wait_for_distribution(Node, 50), add_lib_dir(Node), Node. diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl index bf1a042e..7e24cb9e 100644 --- a/test/partitions_SUITE.erl +++ b/test/partitions_SUITE.erl @@ -25,7 +25,15 @@ groups() -> ], [{tests, [], Tests}]. -init_per_group(_, Config) -> Config. +init_per_suite(Config) -> + application:set_env(kernel, prevent_overlapping_partitions, false), + Config. + +end_per_suite(Config) -> + Config. + +init_per_group(_, Config) -> + Config. end_per_group(_, _Config) -> ok. diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index 2b21791d..f03546da 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -570,7 +570,7 @@ external_reader(Config) -> receive {ra_event, _, {machine, {ra_log_update, _, _, _} = E}} -> R1 = ra_log_reader:handle_log_update(E, R0), - {Entries, _, _R2} = ra_log_reader:read(0, 1026, R1), + {Entries, _R2} = ra_log_reader:sparse_read(R1, lists:seq(0, 1026), []), ct:pal("read ~w ~w", [length(Entries), lists:last(Entries)]), %% read all entries ok diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index dc951301..dec43bf6 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -23,7 +23,7 @@ all_tests() -> receive_segment, read_one, take_after_overwrite_and_init, - validate_sequential_reads, + validate_sequential_fold, validate_reads_for_overlapped_writes, cache_overwrite_then_take, last_written_overwrite, @@ -141,8 +141,8 @@ receive_segment(Config) -> [] = ets:tab2list(ra_log_open_mem_tables), [] = ets:tab2list(ra_log_closed_mem_tables), % validate reads - {Entries, C, FinalLog} = ra_log:take(1, 3, Log3), - ?assertEqual(length(Entries), C), + {Entries, FinalLog} = ra_log_take(1, 3, Log3), + ?assertEqual(length(Entries), 3), ra_log:close(FinalLog), ok. @@ -153,9 +153,8 @@ read_one(Config) -> % Log1 = ra_log:append({1, 1, <<1:64/integer>>}, Log0), % ensure the written event is delivered Log2 = deliver_all_log_events(Log1, 200), - {[_], 1, Log} = ra_log:take(1, 5, Log2), + {[_], Log} = ra_log_take(1, 1, Log2), % read out of range - {[], 0, Log} = ra_log:take(5, 5, Log2), #{?FUNCTION_NAME := #{read_cache := M1, read_open_mem_tbl := M2, read_closed_mem_tbl := M3, @@ -169,67 +168,47 @@ take_after_overwrite_and_init(Config) -> Log0 = ra_log_init(Config), Log1 = write_and_roll_no_deliver(1, 5, 1, Log0), Log2 = deliver_written_log_events(Log1, 200), - {[_, _, _, _], 4, Log3} = ra_log:take(1, 5, Log2), + {[_, _, _, _], Log3} = ra_log_take(1, 4, Log2), Log4 = write_and_roll_no_deliver(1, 2, 2, Log3), % fake lost segments event Log5 = deliver_written_log_events(Log4, 200), % ensure we cannot take stale entries - {[{1, 2, _}], 1, Log6} = ra_log:take(1, 5, Log5), + {[{1, 2, _}], Log6} = ra_log_take(1, 4, Log5), _ = ra_log:close(Log6), Log = ra_log_init(Config), - {[{1, 2, _}], _, _} = ra_log:take(1, 5, Log), + {[{1, 2, _}], _} = ra_log_take(1, 4, Log), ok. -validate_sequential_reads(Config) -> +validate_sequential_fold(Config) -> ra_counters:new(?FUNCTION_NAME, ?RA_COUNTER_FIELDS), Log0 = ra_log_init(Config, #{counter => ra_counters:fetch(?FUNCTION_NAME), max_open_segments => 100}), - % write a few entries + % write 1000 entries Log1 = append_and_roll(1, 500, 1, Log0), - Log2 = append_and_roll(500, 1001, 1, Log1), + Log2 = append_n(500, 999, 1, Log1), %% need to ensure the segments are delivered - Log = deliver_all_log_events(Log2, 200), + Log3 = deliver_all_log_events(Log2, 200), + %% write two to be held in cache + Log = append_n(999, 1001, 1, Log3), _ = erlang:statistics(exact_reductions), {ColdTaken, {ColdReds, FinLog}} = timer:tc(fun () -> {_, Reds0} = erlang:statistics(exact_reductions), - L = validate_read(1, 1001, 1, Log), + L = validate_fold(1, 1000, 1, Log), {_, Reds} = erlang:statistics(exact_reductions), {Reds - Reds0, L} end), + ra_log:close(FinLog), + ct:pal("validate_sequential_fold COLD took ~pms Reductions: ~p~nMetrics: ", + [ColdTaken/1000, ColdReds]), #{?FUNCTION_NAME := #{read_cache := M1, read_open_mem_tbl := M2, read_closed_mem_tbl := M3, read_segment := M4} = O} = ra_counters:overview(), + ct:pal("counters ~p", [O]), ?assertEqual(1000, M1 + M2 + M3 + M4), - - ct:pal("validate_sequential_reads COLD took ~pms Reductions: ~p~nMetrics: ~p", - [ColdTaken/1000, ColdReds, O]), - % we'd like to know if we regress beyond this - % some of the reductions are spent validating the reads - % NB: in OTP 21.1 reduction counts shot up mostly probably due to lists:reverse - % not previously using up enough reductions - ?assert(ColdReds < 200000), - _ = erlang:statistics(exact_reductions), - {WarmTaken, {WarmReds, FinLog2}} = - timer:tc(fun () -> - {_, R0} = erlang:statistics(exact_reductions), - % start_profile(Config, [lists, ra_log, ra_flru, - % file, ra_file_handle, - % ra_log_segment]), - L = validate_read(1, 1001, 1, FinLog), - % stop_profile(Config), - {_, R} = erlang:statistics(exact_reductions), - {R - R0, L} - end), - ct:pal("validate_sequential_reads WARM took ~pms Reductions: ~p", - [WarmTaken/1000, WarmReds]), - % warm reductions should always be less than cold - % NB: this isn't necessarily always the case with OTP 24 so commenting out assertion - % ?assert(WarmReds < ColdReds), - ra_log:close(FinLog2), ok. validate_reads_for_overlapped_writes(Config) -> @@ -247,8 +226,8 @@ validate_reads_for_overlapped_writes(Config) -> Log5 = write_n(500, 551, 2, Log4), Log6 = deliver_all_log_events(Log5, 200), - Log7 = validate_read(1, 200, 1, Log6), - Log8 = validate_read(200, 551, 2, Log7), + Log7 = validate_fold(1, 199, 1, Log6), + Log8 = validate_fold(200, 550, 2, Log7), #{?FUNCTION_NAME := #{read_cache := M1, read_open_mem_tbl := M2, @@ -266,11 +245,11 @@ read_opt(Config) -> Log1 = write_and_roll(1, Num, 1, Log0, 50), Log2 = wait_for_segments(Log1, 5000), %% read small batch of the latest entries - {_, _, Log} = ra_log:take(Num - 5, 5, Log2), + {_, Log} = ra_log_take(Num - 5, Num, Log2), %% measure the time it takes to read the first index {Time, _} = timer:tc(fun () -> _ = erlang:statistics(exact_reductions), - ra_log:take(1, 1, Log) + ra_log_take(1, 1, Log) end), {_, Reds} = erlang:statistics(exact_reductions), ct:pal("read took ~wms Reduction ~w", [Time / 1000, Reds]), @@ -284,7 +263,7 @@ read_opt(Config) -> {Time3, _} = timer:tc(fun () -> _ = erlang:statistics(exact_reductions), - ra_log:take(1, Num, Log) + ra_log_take(1, Num, Log) end), {_, Reds3} = erlang:statistics(exact_reductions), ct:pal("read all took ~wms Reduction ~w", [Time3 / 1000, Reds3]), @@ -333,19 +312,19 @@ sparse_read(Config) -> Log1 = wait_for_segments(Log0, 5000), Log2 = write_no_roll(Num div Div, Num, 1, Log1, 50), %% read small batch of the latest entries - {_, _, Log3} = ra_log:take(Num - 5, 5, Log2), + {_, Log3} = ra_log_take(Num - 5, Num, Log2), ct:pal("log overview ~p", [ra_log:overview(Log3)]), %% warm up run - {_, _, Log4} = ra_log:take(1, Num, Log3), + {_, Log4} = ra_log_take(1, Num, Log3), ra_log:close(Log4), NumDiv2 = Num div 2, %% create a list of indexes with some consecutive and some gaps Indexes = lists:usort(lists:seq(1, Num, 2) ++ lists:seq(1, Num, 5)), LogTake = ra_log_init(Config), - {TimeTake, {_, _, LogTake1}} = + {TimeTake, {_, LogTake1}} = timer:tc(fun () -> _ = erlang:statistics(exact_reductions), - ra_log:take(1, NumDiv2, LogTake) + ra_log_take(1, NumDiv2, LogTake) end), {_, Reds} = erlang:statistics(exact_reductions), ra_log:close(LogTake1), @@ -421,13 +400,13 @@ updated_segment_can_be_read(Config) -> % Log2 = deliver_all_log_events(Log1, 200), %% read some, this will open the segment with the an index of entries %% 1 - 4 - {Entries, C0, Log3} = ra_log:take(1, 25, Log2), - ?assertEqual(length(Entries), C0), + {Entries, Log3} = ra_log_take(1, 25, Log2), + ?assertEqual(4, length(Entries)), %% append a few more itmes and process the segments Log4 = append_and_roll(5, 16, 1, Log3), % this should return all entries - {Entries1, C1, _} = ra_log:take(1, 15, Log4), - ?assertEqual(length(Entries1), C1), + {Entries1, _} = ra_log_take(1, 15, Log4), + ?assertEqual(15, length(Entries1)), ct:pal("Entries: ~p", [Entries]), ct:pal("Entries1: ~p", [Entries1]), ct:pal("Counters ~p", [ra_counters:overview(?FUNCTION_NAME)]), @@ -441,7 +420,7 @@ cache_overwrite_then_take(Config) -> Log1 = write_n(1, 5, 1, Log0), Log2 = write_n(3, 4, 2, Log1), % validate only 3 entries can be read even if requested range is greater - {[_, _, _], 3, _} = ra_log:take(1, 5, Log2), + {[_, _, _], _} = ra_log_take(1, 5, Log2), ok. last_written_overwrite(Config) -> @@ -517,9 +496,9 @@ recovery(Config) -> Log5 = ra_log_init(Config), {20, 3} = ra_log:last_index_term(Log5), - Log6 = validate_read(1, 5, 1, Log5), - Log7 = validate_read(5, 15, 2, Log6), - Log8 = validate_read(15, 21, 3, Log7), + Log6 = validate_fold(1, 4, 1, Log5), + Log7 = validate_fold(5, 14, 2, Log6), + Log8 = validate_fold(15, 20, 3, Log7), ra_log:close(Log8), ok. @@ -576,7 +555,7 @@ resend_write(Config) -> Log6 = assert_log_events(Log5, fun (L) -> {13, 2} == ra_log:last_written(L) end), - {[_, _, _, _, _], 5, _} = ra_log:take(9, 5, Log6), + {[_, _, _, _, _], _} = ra_log_take(9, 14, Log6), ra_log:close(Log6), ok. @@ -598,7 +577,7 @@ wal_crash_recover(Config) -> {100, 2} == ra_log:last_written(L) end), {100, 2} = ra_log:last_written(Log), - validate_read(1, 100, 2, Log), + validate_fold(1, 99, 2, Log), ok. wal_down_read_availability(Config) -> @@ -610,7 +589,7 @@ wal_down_read_availability(Config) -> [SupPid] = [P || {ra_log_wal_sup, P, _, _} <- supervisor:which_children(ra_log_sup)], ok = supervisor:terminate_child(SupPid, ra_log_wal), - {Entries, _, _} = ra_log:take(0, 10, Log2), + {Entries, _} = ra_log_take(0, 10, Log2), ?assert(length(Entries) =:= 10), ok. @@ -664,13 +643,12 @@ detect_lost_written_range(Config) -> {19, 2} == ra_log:last_written(L) end), % validate no writes were lost and can be recovered - {Entries, C0, _} = ra_log:take(0, 20, Log5), - ?assertEqual(length(Entries), C0), + {Entries, _} = ra_log_take(0, 20, Log5), + ?assertEqual(20, length(Entries)), ra_log:close(Log5), Log = ra_log_init(Config), {19, 2} = ra_log:last_written(Log5), - {RecoveredEntries, C1, _} = ra_log:take(0, 20, Log), - ?assertEqual(length(RecoveredEntries), C1), + {RecoveredEntries, _} = ra_log_take(0, 20, Log), ?assert(length(Entries) =:= 20), ?assert(length(RecoveredEntries) =:= 20), Entries = RecoveredEntries, @@ -759,19 +737,19 @@ snapshot_installation(Config) -> % after a snapshot we need a "truncating write" that ignores missing % indexes Log5 = write_n(16, 20, 2, Log4), - {[], 0, _} = ra_log:take(1, 9, Log5), - {[_, _], 2, _} = ra_log:take(16, 2, Log5), + {[], _} = ra_log_take(1, 9, Log5), + {[_, _], _} = ra_log_take(16, 17, Log5), Log6 = assert_log_events(Log5, fun (L) -> {19, 2} == ra_log:last_written(L) end), - {[], 0, _} = ra_log:take(1, 9, Log6), - {[_, _], 2, _} = ra_log:take(16, 2, Log6), + {[], _} = ra_log_take(1, 9, Log6), + {[_, _], _} = ra_log_take(16, 17, Log6), ra_log_wal:force_roll_over(ra_log_wal), - {[], 0, _} = ra_log:take(1, 9, Log6), - {[_, _], 2, _} = ra_log:take(16, 2, Log6), + {[], _} = ra_log_take(1, 9, Log6), + {[_, _], _} = ra_log_take(16, 17, Log6), Log = deliver_all_log_events(Log6, 100), - {[], 0, _} = ra_log:take(1, 9, Log), - {[_, _], 2, _} = ra_log:take(16, 2, Log), + {[], _} = ra_log_take(1, 9, Log), + {[_, _], _} = ra_log_take(16, 17, Log), ok. append_after_snapshot_installation(Config) -> @@ -804,8 +782,8 @@ append_after_snapshot_installation(Config) -> Log = assert_log_events(Log3, fun (L) -> {19, 2} == ra_log:last_written(L) end), - {[], 0, _} = ra_log:take(1, 9, Log), - {[_, _], 2, _} = ra_log:take(16, 2, Log), + {[], _} = ra_log_take(1, 9, Log), + {[_, _], _} = ra_log_take(16, 17, Log), ok. written_event_after_snapshot_installation(Config) -> @@ -875,9 +853,10 @@ update_release_cursor(Config) -> [{UId, 127}] = ets:lookup(ra_log_snapshot_state, ?config(uid, Config)), % this should delete a single segment ra_lib:retry(fun () -> - 1 == length(find_segments(Config)) + Segments = find_segments(Config), + 1 == length(Segments) end, 10, 100), - Log3b = validate_read(128, 150, 2, Log3), + Log3b = validate_fold(128, 149, 2, Log3), % update the release cursor all the way {Log4, _} = ra_log:update_release_cursor(149, #{?N1 => new_peer(), ?N2 => new_peer()}, @@ -903,7 +882,7 @@ update_release_cursor(Config) -> end), % Log6 = append_and_roll(150, 155, 2, Log5), % Log = deliver_all_log_events(Log6, 500), - validate_read(150, 155, 2, Log), + validate_fold(150, 154, 2, Log), % assert there is only one segment - the current % snapshot has been confirmed. [_] = find_segments(Config), @@ -968,7 +947,7 @@ missed_closed_tables_are_deleted_at_next_opportunity(Config) -> [] = ets:tab2list(ra_log_open_mem_tables), % TODO: validate reads - Log5 = validate_read(1, 155, 2, Log4), + Log5 = validate_fold(1, 154, 2, Log4), % then update the release cursor {Log6, _} = ra_log:update_release_cursor(154, #{?N1 => new_peer(), @@ -1008,7 +987,7 @@ open_segments_limit(Config) -> %% this should result in a few segments %% validate as this read all of them Log1b = wait_for_segments(Log1, 5000), - Log2 = validate_read(1, 2000, 1, Log1b), + Log2 = validate_fold(1, 1999, 1, Log1b), Segs = find_segments(Config), #{open_segments := Open} = ra_log:overview(Log2), ?assert(length(Segs) > Max), @@ -1033,7 +1012,8 @@ external_reader(Config) -> fun () -> receive {ra_log_reader_state, R1} = Evt -> - {Es, _, R2} = ra_log_reader:read(0, 220, R1), + {Es, R2} = ra_log_reader:sparse_read( + R1, lists:seq(0, 220), []), Len1 = length(Es), ct:pal("Es ~w", [Len1]), Self ! {got, Evt, Es}, @@ -1041,10 +1021,11 @@ external_reader(Config) -> {ra_log_update, _, F, _} = Evt2 -> %% reader before update has been processed %% should work - {Stale, _, _} = ra_log_reader:read(F, 220, R2), + Indexes = lists:seq(F, 220), + {Stale, _} = ra_log_reader:sparse_read(R2, Indexes, []), ?assertEqual(Len1, length(Stale)), R3 = ra_log_reader:handle_log_update(Evt2, R2), - {Es2, _, _R4} = ra_log_reader:read(F, 220, R3), + {Es2, _R4} = ra_log_reader:sparse_read(R3, Indexes, []), ct:pal("Es2 ~w", [length(Es2)]), ?assertEqual(Len1, length(Es2)), Self ! {got, Evt2, Es2} @@ -1095,25 +1076,22 @@ write_config(Config) -> ok. -validate_read(To, To, _Term, Log0) -> - Log0; -validate_read(From, To, Term, Log0) -> - End = min(From + 25, To), - {Entries, C0, Log} = ra_log:take(From, End - From, Log0), - ?assertEqual(length(Entries), C0), +validate_fold(From, To, Term, Log0) -> + {Entries0, Log} = ra_log:fold(From, To, + fun (E, A) -> [E | A] end, + [], Log0), + ?assertEqual(To - From + 1, length(Entries0)), % validate entries are correctly read Expected = [ {I, Term, <>} || - I <- lists:seq(From, End - 1) ], - ?assertEqual(Expected, Entries), - validate_read(End, To, Term, Log). - + I <- lists:seq(To, From, -1) ], + ?assertEqual(Expected, Entries0), + Log. append_and_roll(From, To, Term, Log0) -> Log1 = append_n(From, To, Term, Log0), ok = ra_log_wal:force_roll_over(ra_log_wal), assert_log_events(Log1, fun(L) -> ra_log:last_written(L) == {To-1, Term} - end). append_and_roll_no_deliver(From, To, Term, Log0) -> @@ -1334,3 +1312,7 @@ ra_log_init(Config, Cfg0) -> Cfg0), %% augment with default system config ra_log:init(Cfg#{system_config => ra_system:default_config()}). + +ra_log_take(From, To, Log0) -> + {Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0), + {lists:reverse(Acc), Log}. diff --git a/test/ra_log_SUITE.erl b/test/ra_log_SUITE.erl index dee57326..2192c63a 100644 --- a/test/ra_log_SUITE.erl +++ b/test/ra_log_SUITE.erl @@ -215,18 +215,17 @@ take(Config) -> ra_log:append_sync(Entry, L0) end, Log0, lists:seq(Idx, LastIdx)), % won't work for memory - {[?IDX(1)], 1, Log2} = ra_log:take(1, 1, Log1), - {[?IDX(1), ?IDX(2)], 2, Log3} = ra_log:take(1, 2, Log2), + {[?IDX(1)], Log2} = ra_log_take(1, 1, Log1), + {[?IDX(1), ?IDX(2)], Log3} = ra_log_take(1, 2, Log2), % partly out of range - {[?IDX(9), ?IDX(10)], 2, Log4} = ra_log:take(9, 3, Log3), + {[?IDX(9), ?IDX(10)], Log4} = ra_log_take(9, 11, Log3), % completely out of range - {[], 0, Log5} = ra_log:take(11, 3, Log4), + {[], Log5} = ra_log_take(11, 14, Log4), % take all - {Taken, C0, _} = ra_log:take(1, 10, Log5), - ?assertEqual(length(Taken), C0), + {Taken, _} = ra_log_take(1, 10, Log5), ?assertEqual(10, length(Taken)), %% take 0 - {[], 0, _} = ra_log:take(5, 0, Log5), + {[], _} = ra_log_take(5, 4, Log5), ok. take_cache(Config) -> @@ -236,7 +235,7 @@ take_cache(Config) -> Log1 = ra_log:append_sync({Idx, Term, <<"one">>}, Log0), Idx2 = Idx +1, Log = ra_log:append({Idx2, Term, <<"two">>}, Log1), - {[?IDX(Idx), ?IDX(Idx2)], 2, _Log2} = ra_log:take(1, 2, Log), + {[?IDX(Idx), ?IDX(Idx2)], _Log2} = ra_log_take(1, 2, Log), ok. last(Config) -> @@ -272,3 +271,7 @@ append_in(Term, Data, Log0) -> Idx = ra_log:next_index(Log0), Entry = {Idx, Term, Data}, ra_log:append_sync(Entry, Log0). + +ra_log_take(From, To, Log0) -> + {Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0), + {lists:reverse(Acc), Log}. diff --git a/test/ra_log_memory.erl b/test/ra_log_memory.erl index b125a001..bce65e44 100644 --- a/test/ra_log_memory.erl +++ b/test/ra_log_memory.erl @@ -11,6 +11,7 @@ append/2, write/2, take/3, + fold/5, last_index_term/1, set_last_index/2, handle_event/2, @@ -106,6 +107,12 @@ take(Start, Num, #state{last_index = LastIdx, entries = Log} = State) -> Entries = sparse_take(Start, Log, Num, LastIdx, []), {Entries, length(Entries), State}. +fold(From, To, Fun, Acc0, #state{last_index = LastIdx, + entries = Log} = State) -> + Entries = sparse_take(From, Log, To - From + 1, LastIdx, []), + Acc = lists:foldl(Fun, Acc0, Entries), + {Acc, State}. + % this allows for missing entries in the log sparse_take(Idx, _Log, Num, Max, Res) when length(Res) =:= Num orelse diff --git a/test/ra_log_props_SUITE.erl b/test/ra_log_props_SUITE.erl index e7d43b36..091b97a5 100644 --- a/test/ra_log_props_SUITE.erl +++ b/test/ra_log_props_SUITE.erl @@ -167,7 +167,7 @@ write_prop(TestCase) -> {ok, Log0} = ra_log:write( Entries, ra_log_init(#{uid => TestCase})), - {LogEntries, _, Log} = ra_log:take(1, length(Entries), Log0), + {LogEntries, Log} = ra_log_take(1, length(Entries), Log0), reset(Log), ?WHENFAIL(io:format("Entries taken from the log: ~p~nRa log state: ~p", [LogEntries, Log]), @@ -215,7 +215,7 @@ write_overwrite_entry_prop(TestCase) -> ra_log_init(#{uid => TestCase})), NewEntry = [{Idx, Term, <<"overwrite">>}], {ok, Log} = ra_log:write(NewEntry, Log0), - {LogEntries, _, Log1} = ra_log:take(1, length(Entries), Log), + {LogEntries, Log1} = ra_log_take(1, length(Entries), Log), reset(Log1), ?WHENFAIL(io:format("Head: ~p~n New entry: ~p~n" "Entries taken from the log: ~p~n" @@ -265,7 +265,7 @@ append_missing_entry_prop(TestCase) -> exit:{integrity_error, _} -> true end, - {LogEntries, _, Log} = ra_log:take(1, length(Head), Log0), + {LogEntries, Log} = ra_log_take(1, length(Head), Log0), reset(Log), ?WHENFAIL(io:format("Failed: ~p~nHead: ~p~n Tail: ~p~n" "Entries taken from the log: ~p~n" @@ -307,7 +307,7 @@ append_prop(TestCase) -> Log0 = append_all( Entries, ra_log_init(#{uid => TestCase})), - {LogEntries, _, Log} = ra_log:take(1, length(Entries), Log0), + {LogEntries, Log} = ra_log_take(1, length(Entries), Log0), reset(Log), ?WHENFAIL(io:format("Entries taken from the log: ~p~nRa log state: ~p", [LogEntries, Log]), @@ -372,7 +372,7 @@ take_prop(TestCase) -> {ok, Log0} = ra_log:write( Entries, ra_log_init(#{uid => TestCase})), - {Selected, _, Log} = ra_log:take(Start, Num, Log0), + {Selected, Log} = ra_log_take(Start, Start + Num - 1, Log0), Expected = lists:sublist(Entries, Start, Num), reset(Log), ?WHENFAIL(io:format("Selected: ~p~nExpected: ~p", @@ -393,7 +393,7 @@ take_out_of_range_prop(TestCase) -> {ok, Log0} = ra_log:write( Entries, ra_log_init(#{uid => TestCase})), - {Reply, _, Log} = ra_log:take(Start, Num, Log0), + {Reply, Log} = ra_log_take(Start, Start + Num - 1, Log0), reset(Log), ?WHENFAIL(io:format("Start: ~p Num: ~p~nReply: ~p", [Start, Num, Reply]), Reply == []) @@ -583,7 +583,7 @@ last_written_with_wal_prop(TestCase) -> {Acc, Last0, Idx, St} end, {Log0, {0, 0}, 0, wal_up}, All), Got = ra_log:last_written(Log), - {Written, _, Log1} = ra_log:take(1, LastIdx, Log), + {Written, Log1} = ra_log_take(1, LastIdx, Log), reset(Log1), ?WHENFAIL(io:format("Got: ~p, Expected: ~p Written: ~p~n Actions: ~p", [Got, Last, Written, All]), @@ -632,7 +632,7 @@ last_written_with_segment_writer_prop(TestCase) -> {Acc, Last0, Idx, St} end, {Log0, {0, 0}, 0, sw_up}, All), Got = ra_log:last_written(Log), - {Written, _, Log1} = ra_log:take(1, LastIdx, Log), + {Written, Log1} = ra_log_take(1, LastIdx, Log), reset(Log1), ?WHENFAIL(ct:pal("Got: ~p, Expected: ~p Written: ~p~n Actions: ~p", [Got, Last, Written, All]), @@ -657,7 +657,7 @@ last_written_with_crashing_segment_writer_prop(TestCase) -> _ = supervisor:restart_child(ra_log_sup, ra_log_segment_writer), Log0 = ra_log_init(#{uid => TestCase, resend_window => 2}), - ra_log:take(1, 10, Log0), + ra_log_take(1, 10, Log0), {Log, _Last, Ts} = lists:foldl(fun({wait, Wait}, Acc) -> timer:sleep(Wait), @@ -707,7 +707,7 @@ last_written_with_crashing_segment_writer_prop(TestCase) -> ets:tab2list(ra_log_open_mem_tables), ets:tab2list(ra_log_closed_mem_tables) ]), - {Written, _, Log2} = ra_log:take(1, EIdx, Log1), + {Written, Log2} = ra_log_take(1, EIdx, Log1), %% We got all the data, can reset now basic_reset(Log2), ?WHENFAIL(ct:pal("Last written entry: ~p; actually last idx term: ~p;" @@ -852,3 +852,7 @@ wal_sup() -> ra_log_init(Cfg) -> %% augment with default system config ra_log:init(Cfg#{system_config => ra_system:default_config()}). + +ra_log_take(From, To, Log0) -> + {Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0), + {lists:reverse(Acc), Log}. diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index 7cdadf48..6d276592 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -87,8 +87,8 @@ corrupted_segment(Config) -> {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), %% for now we are just going to exit when reaching this point %% in the future we can find a strategy for handling this case - ?assertExit({ra_log_segment_unexpected_eof, _, _, _}, - ra_log_segment:read(SegR, 1, 2)), + ?assertExit({missing_key, 2}, + read_sparse(SegR, [1, 2])), ok. @@ -107,7 +107,7 @@ large_segment(Config) -> %% validate all entries can be read {ok, Seg1} = ra_log_segment:open(Fn, #{mode => read}), [begin - [{Idx, 1, _B}] = ra_log_segment:read(Seg1, Idx, 1) + [{Idx, 1, _B}] = read_sparse(Seg1, [Idx]) end || Idx <- lists:seq(1, 4096)], ct:pal("Index ~p", [lists:last(ra_log_segment:dump_index(Fn))]), @@ -141,7 +141,7 @@ versions_v1(Config) -> ok = file:sync(Fd), ok = file:close(Fd), {ok, R0} = ra_log_segment:open(Fn, #{mode => read}), - [{Idx, Term, Data}] = ra_log_segment:read(R0, Idx, 1), + [{Idx, Term, Data}] = read_sparse(R0, [Idx]), ok = ra_log_segment:close(R0), %% append as v1 @@ -150,8 +150,7 @@ versions_v1(Config) -> ok = ra_log_segment:close(W), %% read again {ok, R1} = ra_log_segment:open(Fn, #{mode => read}), - [{Idx, Term, Data}, - {2, Term, Data}] = ra_log_segment:read(R1, Idx, 2), + [{Idx, Term, Data}] = read_sparse(R1, [Idx]), ok = ra_log_segment:close(R1), ok. @@ -214,7 +213,7 @@ write_close_open_write(Config) -> {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), {1, 3} = ra_log_segment:range(SegR), [{1, 2, <<"data1">>}, {2, 2, <<"data2">>}, {3, 2, <<"data3">>}] = - ra_log_segment:read(SegR, 1, 3), + read_sparse(SegR, [1, 2, 3]), ok = ra_log_segment:close(SegA), ok = ra_log_segment:close(SegR), ok. @@ -232,11 +231,8 @@ write_then_read(Config) -> % read two consecutive entries from index 1 {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), - [{1, 2, Data}, {2, 2, Data}] = ra_log_segment:read(SegR, 1, 2), - %% validate a larger range still returns results - [{1, 2, Data}, {2, 2, Data}] = ra_log_segment:read(SegR, 1, 5), - %% out of range returns nothing - [{2, 2, Data}] = ra_log_segment:read(SegR, 2, 2), + [{1, 2, Data}, {2, 2, Data}] = read_sparse(SegR, [1, 2]), + [{2, 2, Data}] = read_sparse(SegR, [2]), {1, 2} = ra_log_segment:range(SegR), ok = ra_log_segment:close(SegR), ok. @@ -254,14 +250,12 @@ write_then_read_no_checksums(Config) -> % read two consecutive entries from index 1 {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), - [{1, 2, Data}, {2, 2, Data}] = ra_log_segment:read(SegR, 1, 2), - %% validate a larger range still returns results - [{1, 2, Data}, {2, 2, Data}] = ra_log_segment:read(SegR, 1, 5), - %% out of range returns nothing - [{2, 2, Data}] = ra_log_segment:read(SegR, 2, 2), + [{1, 2, Data}, {2, 2, Data}] = read_sparse(SegR, [1, 2]), + [{2, 2, Data}] = read_sparse(SegR, [2]), {1, 2} = ra_log_segment:range(SegR), ok = ra_log_segment:close(SegR), ok. + read_cons(Config) -> Dir = ?config(data_dir, Config), Fn = filename:join(Dir, "seg1.seg"), @@ -275,10 +269,13 @@ read_cons(Config) -> %% end of setup {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), - [{3, 2, Data}] = Read = ra_log_segment:read(SegR, 3, 1), + [{1, 2, Data}] = Read = read_sparse(SegR, [1]), %% validate a larger range still returns results - [{1, 2, Data}, {2, 2, Data}, {3, 2, Data}] = ra_log_segment:read_cons(SegR, 1, 2, - fun ra_lib:id/1, Read), + [{1, 2, Data}, {2, 2, Data}, {3, 2, Data}] = + lists:reverse( + ra_log_segment:fold(SegR, 2, 3, fun ra_lib:id/1, + fun (E, A) -> [E | A] end, + Read)), ok = ra_log_segment:close(SegR), ok. @@ -294,7 +291,7 @@ try_read_missing(Config) -> ok = ra_log_segment:close(Seg), {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), - [] = ra_log_segment:read(SegR, 2, 2), + [_] = read_sparse(SegR, [1]), ok. overwrite(Config) -> @@ -309,8 +306,8 @@ overwrite(Config) -> {ok, Seg} = ra_log_segment:sync(Seg2), {ok, SegR} = ra_log_segment:open(Fn, #{mode => read}), {2, 2} = ra_log_segment:range(Seg), - [] = ra_log_segment:read(SegR, 5, 1), - [{2, 2, Data}] = ra_log_segment:read(SegR, 2, 1), + ?assertExit({missing_key, 5}, read_sparse(SegR, [5])), + [{2, 2, Data}] = read_sparse(SegR, [2]), ok = ra_log_segment:close(Seg), ok. @@ -402,3 +399,7 @@ stop_profile(Config) -> Name = filename:join([Dir, "lg_" ++ atom_to_list(Case)]), lg_callgrind:profile_many(Name ++ ".gz.*", Name ++ ".out",#{}), ok. + +read_sparse(R, Idxs) -> + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + lists:reverse(Entries). diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 4e96e60d..39a92f65 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -97,7 +97,7 @@ accept_mem_tables(Config) -> {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred Entries = [{I, T, binary_to_term(B)} - || {I, T, B} <- ra_log_segment:read(Seg, 1, 3)] + || {I, T, B} <- read_sparse(Seg, [1, 2, 3])] after 3000 -> throw(ra_log_event_timeout) end, @@ -300,7 +300,8 @@ skip_entries_lower_than_snapshot_index(Config) -> % assert only entries with a higher index than the snapshot % have been written ok = gen_server:stop(TblWriterPid), - [{4, _, _}, {5, _, _}] = ra_log_segment:read(Seg, 1, 5) + ?assertExit({missing_key, 1}, read_sparse(Seg, [1,2, 3])), + [{4, _, _}, {5, _, _}] = read_sparse(Seg, [4, 5]) after 3000 -> ok = gen_server:stop(TblWriterPid), throw(ra_log_event_timeout) @@ -355,7 +356,7 @@ accept_mem_tables_append(Config) -> {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred AllEntries = [{I, T, binary_to_term(B)} - || {I, T, B} <- ra_log_segment:read(Seg, 1, 5)] + || {I, T, B} <- read_sparse(Seg, lists:seq(1, 5))] after 3000 -> throw(ra_log_event_timeout) end, @@ -382,9 +383,9 @@ accept_mem_tables_overwrite(Config) -> SegmentFile = filename:join(?config(server_dir, Config), Fn), {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), C2 = term_to_binary(c2), - [{1, 43, _}, {2, 43, _}] = ra_log_segment:read(Seg, 1, 2), - [{3, 43, C2}] = ra_log_segment:read(Seg, 3, 1), - [] = ra_log_segment:read(Seg, 4, 2) + [{1, 43, _}, {2, 43, _}] = read_sparse(Seg, [1, 2]), + [{3, 43, C2}] = read_sparse(Seg, [3]), + ?assertExit({missing_key, 4}, read_sparse(Seg, [4])) after 3000 -> throw(ra_log_event_timeout) end, @@ -441,7 +442,7 @@ accept_mem_tables_for_down_server(Config) -> {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred Entries = [{I, T, binary_to_term(B)} - || {I, T, B} <- ra_log_segment:read(Seg, 1, 3)] + || {I, T, B} <- read_sparse(Seg, [1, 2, 3])] after 3000 -> throw(ra_log_event_timeout) end, @@ -496,7 +497,7 @@ accept_mem_tables_with_corrupt_segment(Config) -> {ok, Seg} = ra_log_segment:open(SegmentFile, #{mode => read}), % assert Entries have been fully transferred Entries = [{I, T, binary_to_term(B)} - || {I, T, B} <- ra_log_segment:read(Seg, 1, 3)] + || {I, T, B} <- read_sparse(Seg, [1, 2, 3])] after 3000 -> flush(), throw(ra_log_event_timeout) @@ -597,3 +598,7 @@ segments_for(UId, DataDir) -> Dir = filename:join(DataDir, ra_lib:to_list(UId)), SegFiles = lists:sort(filelib:wildcard(filename:join(Dir, "*.segment"))), SegFiles. + +read_sparse(R, Idxs) -> + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + lists:reverse(Entries). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index b4f7776f..a6919e8c 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -149,7 +149,7 @@ setup_log() -> meck:expect(ra_log, install_snapshot, fun (_, _, Log) -> {Log, []} end), meck:expect(ra_log, recover_snapshot, fun ra_log_memory:recover_snapshot/1), meck:expect(ra_log, snapshot_index_term, fun ra_log_memory:snapshot_index_term/1), - meck:expect(ra_log, take, fun ra_log_memory:take/3), + meck:expect(ra_log, fold, fun ra_log_memory:fold/5), meck:expect(ra_log, release_resources, fun ra_log_memory:release_resources/3), meck:expect(ra_log, append_sync, fun({Idx, Term, _} = E, L) ->