diff --git a/README.md b/README.md index 05ba25eb..8b45fa7b 100644 --- a/README.md +++ b/README.md @@ -440,14 +440,17 @@ logger:set_primary_config(level, debug). Ra attempts to follow [Semantic Versioning](https://semver.org/). The modules that form part of the public API are: + * `ra` * `ra_machine` (behaviour callbacks only) * `ra_aux` * `ra_system` -* `ra_counters` +* `ra_counters` (counter keys may vary between minors) * `ra_leaderboard` * `ra_env` * `ra_directory` +* `ra_flru` +* `ra_log_read_plan` ## Copyright and License diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md index ea6ec1b3..7ddaaf7f 100644 --- a/docs/internals/STATE_MACHINE_TUTORIAL.md +++ b/docs/internals/STATE_MACHINE_TUTORIAL.md @@ -181,6 +181,23 @@ Only the leader that first applies an entry will attempt the effect. Followers process the same set of commands but simply throw away any effects returned by the state machine unless specific effect provide the `local` option. +### Machine Effects table + +| Spec | Executed on | +| -----| ----------- | +| `{send_msg, pid(), Msg :: term()}` | leader | +| `{send_msg, pid(), Msg :: term(), [local]}` | on member local to `pid()` else leader | +| `{monitor \| demonitor, process \| node, pid() \| node()}` | leader | +| `{mod_call, mfa()}` | leader | +| `{timer, Name :: term(), Time :: non_neg_integer() \| infinity}` | leader | +| `{append, term()}` | leader | +| `{append, term(), ra_server:command_reply_mode()}` | leader | +| `{log, [ra_index()], fun(([user_command()]) -> effects())}` | leader | +| `{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}}` | on member local to `node()` else leader | +| `{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}}` | on member local to `node()` else leader | +| `{release_cursor \| checkpoint, ra_index(), term()}` | all members | +| `{aux, term()}` | every member | + ### Send a message diff --git a/src/ra.erl b/src/ra.erl index 2fe35e5a..91e7124e 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -1262,7 +1262,6 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() -> key_metrics({_, N} = ServerId, Timeout) -> erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout). - %% internal -spec usr(UserCommand, ReplyMode) -> Command when diff --git a/src/ra_aux.erl b/src/ra_aux.erl index e28b37ee..e41beb8a 100644 --- a/src/ra_aux.erl +++ b/src/ra_aux.erl @@ -76,4 +76,3 @@ log_fetch(Idx, #{log := Log0} = State) -spec log_stats(ra_aux:internal_state()) -> ra_log:overview(). log_stats(#{log := Log}) -> ra_log:overview(Log). - diff --git a/src/ra_lib.erl b/src/ra_lib.erl index a2cfe514..f24d0248 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -49,7 +49,8 @@ is_dir/1, is_file/1, ensure_dir/1, - consult/1 + consult/1, + cons/2 ]). -type file_err() :: file:posix() | badarg | terminated | system_limit. @@ -454,6 +455,10 @@ consult(Path) -> Err end. +cons(Item, List) + when is_list(List) -> + [Item | List]. + tokens(Str) -> case erl_scan:string(Str) of {ok, Tokens, _EndLoc} -> diff --git a/src/ra_log.erl b/src/ra_log.erl index 97f4e3ce..7523ca6d 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -20,6 +20,9 @@ write_sync/2, fold/5, sparse_read/2, + partial_read/3, + execute_read_plan/3, + read_plan_info/1, last_index_term/1, set_last_index/2, handle_event/2, @@ -70,6 +73,7 @@ {down, pid(), term()}. -type event() :: {ra_log_event, event_body()}. +-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()). -type effect() :: {delete_snapshot, Dir :: file:filename(), ra_idxterm()} | @@ -110,7 +114,11 @@ tx = false :: boolean() }). +-record(read_plan, {dir :: file:filename_all(), + read :: #{ra_index() := log_entry()}, + plan :: ra_log_reader:read_plan()}). +-opaque read_plan() :: #read_plan{}. -opaque state() :: #?MODULE{}. -type ra_log_init_args() :: #{uid := ra_uid(), @@ -145,6 +153,7 @@ atom() => term()}. -export_type([state/0, + read_plan/0, ra_log_init_args/0, ra_meta_key/0, segment_ref/0, @@ -303,7 +312,6 @@ init(#{uid := UId, {SnapIdx, SnapTerm}, State#?MODULE.last_written_index_term ]), - ?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]), element(1, delete_segments(SnapIdx, State)). -spec close(state()) -> ok. @@ -488,8 +496,8 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg, %% drop any indexes that are larger than the last index available Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1), - {Entries0, CacheNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), - ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead), + {Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), + ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead), {Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0), %% here we recover the original order of indexes Entries = case Sort of @@ -507,6 +515,65 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg, end, {Entries, State#?MODULE{reader = Reader}}. + +%% read a list of indexes, +%% found indexes be returned in the same order as the input list of indexes +-spec partial_read([ra_index()], state(), + fun ((ra_index(), ra_term(), ra_server:command()) -> term()) + ) -> + read_plan(). +partial_read(Indexes0, #?MODULE{cfg = Cfg, + reader = Reader0, + last_index = LastIdx, + mem_table = Mt}, + TransformFun) -> + ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1), + %% indexes need to be sorted high -> low for correct and efficient reading + Sort = ra_lib:lists_detect_sort(Indexes0), + Indexes1 = case Sort of + unsorted -> + lists:sort(fun erlang:'>'/2, Indexes0); + ascending -> + lists:reverse(Indexes0); + _ -> + % descending or undefined + Indexes0 + end, + + %% drop any indexes that are larger than the last index available + Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1), + {Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), + ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead), + Read = lists:foldl(fun ({I, T, Cmd}, Acc) -> + maps:put(I, TransformFun(I, T, Cmd), Acc) + end, #{}, Entries0), + + Plan = ra_log_reader:read_plan(Reader0, Indexes), + #read_plan{dir = Cfg#cfg.directory, + read = Read, + plan = Plan}. + + +-spec execute_read_plan(read_plan(), undefined | ra_flru:state(), + TransformFun :: transform_fun()) -> + {#{ra_index() => Command :: term()}, ra_flru:state()}. +execute_read_plan(#read_plan{dir = Dir, + read = Read, + plan = Plan}, Flru0, TransformFun) -> + ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read). + +-spec read_plan_info(read_plan()) -> map(). +read_plan_info(#read_plan{read = Read, + plan = Plan}) -> + NumSegments = length(Plan), + NumInSegments = lists:foldl(fun ({_, Idxs}, Acc) -> + Acc + length(Idxs) + end, 0, Plan), + #{num_read => map_size(Read), + num_in_segments => NumInSegments, + num_segments => NumSegments}. + + -spec last_index_term(state()) -> ra_idxterm(). last_index_term(#?MODULE{last_index = LastIdx, last_term = LastTerm}) -> {LastIdx, LastTerm}. @@ -1309,8 +1376,7 @@ put_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. server_data_dir(Dir, UId) -> - Me = ra_lib:to_list(UId), - filename:join(Dir, Me). + filename:join(Dir, UId). maps_with_values(Keys, Map) -> lists:foldr( diff --git a/src/ra_log_read_plan.erl b/src/ra_log_read_plan.erl new file mode 100644 index 00000000..c99d2801 --- /dev/null +++ b/src/ra_log_read_plan.erl @@ -0,0 +1,20 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% +-module(ra_log_read_plan). + + +-export([execute/2, + info/1]). + +-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) -> + {#{ra:index() => Command :: ra_server:command()}, ra_flru:state()}. +execute(Plan, Flru) -> + ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3). + +-spec info(ra_log:read_plan()) -> map(). +info(Plan) -> + ra_log:read_plan_info(Plan). diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index 902c51bf..d19575de 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -19,6 +19,8 @@ update_first_index/2, fold/5, sparse_read/3, + read_plan/2, + exec_read_plan/5, fetch_term/2 ]). @@ -44,10 +46,12 @@ }). -opaque state() :: #?STATE{}. +-type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}]. -export_type([ - state/0 + state/0, + read_plan/0 ]). %% PUBLIC @@ -170,6 +174,31 @@ sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes, Entries0) -> ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC), {Entries, State#?MODULE{open_segments = Open}}. +-spec read_plan(state(), [ra_index()]) -> read_plan(). +read_plan(#?STATE{segment_refs = SegRefs}, Indexes) -> + %% TODO: add counter for number of read plans requested + segment_read_plan(SegRefs, Indexes, []). + +-spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(), + TransformFun :: fun(), + #{ra_index() => Command :: term()}) -> + {#{ra_index() => Command :: term()}, ra_flru:state()}. +exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) -> + Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end), + exec_read_plan(Dir, Plan, Open, TransformFun, Acc0); +exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0) + when is_list(Plan) -> + Fun = fun (I, T, B, Acc) -> + E = TransformFun(I, T, binary_to_term(B)), + Acc#{I => E} + end, + lists:foldl( + fun ({Idxs, BaseName}, {Acc1, Open1}) -> + {Seg, Open} = get_segment_ext(Dir, Open1, BaseName), + {_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1), + {Acc, Open} + end, {Acc0, Open0}, Plan). + -spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}. fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) -> incr_counter(Cfg, ?C_RA_LOG_FETCH_TERM, 1), @@ -177,6 +206,23 @@ fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) -> %% LOCAL +segment_read_plan(_RegRefs, [], Acc) -> + lists:reverse(Acc); +segment_read_plan([], _Indexes, Acc) -> + %% TODO: not all indexes were found + lists:reverse(Acc); +segment_read_plan([{To, From, Fn} | SegRefs], Indexes, Acc) -> + %% TODO: address unnecessary allocation here + Range = {To, From}, + case sparse_read_split(fun (I) -> + ra_range:in(I, Range) + end, Indexes, []) of + {[], _} -> + segment_read_plan(SegRefs, Indexes, Acc); + {Idxs, Rem} -> + segment_read_plan(SegRefs, Rem, [{Idxs, Fn} | Acc]) + end. + segment_term_query(Idx, #?MODULE{segment_refs = SegRefs, cfg = Cfg, open_segments = OpenSegs} = State) -> @@ -255,7 +301,10 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs, end, Idxs, []), {ReadSparseCount, Entries} = ra_log_segment:read_sparse(Seg, ReadIdxs, - fun binary_to_term/1, []), + fun (I, T, B, Acc) -> + [{I, T, binary_to_term(B)} | Acc] + end, + []), {Open, RemIdxs, C + ReadSparseCount, lists:reverse(Entries, En0)}; (_Segref, Acc) -> @@ -294,6 +343,24 @@ get_segment(#cfg{directory = Dir, end end. +get_segment_ext(Dir, Open0, Fn) -> + case ra_flru:fetch(Fn, Open0) of + {ok, S, Open1} -> + {S, Open1}; + error -> + AbsFn = filename:join(Dir, Fn), + case ra_log_segment:open(AbsFn, + #{mode => read, + access_pattern => random}) + of + {ok, S} -> + {S, ra_flru:insert(Fn, S, Open0)}; + {error, Err} -> + exit({ra_log_failed_to_open_segment, Err, + AbsFn}) + end + end. + compact_seg_refs([], PreviousSegRefs) -> PreviousSegRefs; compact_seg_refs(NewSegRefs, []) -> diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index 10e61017..84b436e3 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -272,16 +272,18 @@ fold(#state{cfg = #cfg{mode = read} = Cfg, fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc). -spec read_sparse(state(), [ra_index()], - fun((binary()) -> term()), term()) -> - {non_neg_integer(), term()}. + fun((ra:index(), ra_term(), binary(), Acc) -> Acc), + Acc) -> + {NumRead :: non_neg_integer(), Acc} + when Acc :: term(). read_sparse(#state{index = Index, - cfg = Cfg}, Indexes, Fun, Acc) -> + cfg = Cfg}, Indexes, AccFun, Acc) -> Cache0 = prepare_cache(Cfg, Indexes, Index), - read_sparse0(Cfg, Indexes, Index, Cache0, Fun, Acc, 0). + read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0). -read_sparse0(_Cfg, [], _Index, _Cache, _Fun, Acc, Num) -> +read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) -> {Num, Acc}; -read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num) +read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num) when is_map_key(NextIdx, Index) -> {Term, Offset, Length, _} = map_get(NextIdx, Index), case cache_read(Cache0, Offset, Length) of @@ -289,16 +291,18 @@ read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num) case prepare_cache(Cfg, Indexes, Index) of undefined -> {ok, Data, _} = pread(Cfg, undefined, Offset, Length), - read_sparse0(Cfg, Rem, Index, undefined, Fun, - [{NextIdx, Term, Fun(Data)} | Acc], Num+1); + read_sparse0(Cfg, Rem, Index, undefined, + AccFun(NextIdx, Term, Data, Acc), + AccFun, Num+1); Cache -> - read_sparse0(Cfg, Indexes, Index, Cache, Fun, Acc, Num+1) + read_sparse0(Cfg, Indexes, Index, Cache, + Acc, AccFun, Num+1) end; Data -> - read_sparse0(Cfg, Rem, Index, Cache0, Fun, - [{NextIdx, Term, Fun(Data)} | Acc], Num+1) + read_sparse0(Cfg, Rem, Index, Cache0, + AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1) end; -read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Fun, _Acc, _Num) -> +read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) -> exit({missing_key, NextIdx}). cache_read({CPos, CLen, Bin}, Pos, Length) diff --git a/src/ra_machine.erl b/src/ra_machine.erl index e7106ba4..63e97fcc 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -119,26 +119,31 @@ -type version() :: non_neg_integer(). -type effect() :: + %% These effects are only executed on the leader {send_msg, To :: locator(), Msg :: term()} | - %% @TODO: with local deliveries is it theoretically possible for a follower - %% to apply entries but not know who the current leader is? - %% If so, `To' must also include undefined - {send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} | {mod_call, module(), Function :: atom(), [term()]} | %% appends a user command to the raft log - {append, term()} | - {append, term(), ra_server:command_reply_mode()} | + {append, Cmd :: term()} | + {append, Cmd :: term(), ra_server:command_reply_mode()} | {monitor, process, pid()} | {monitor, node, node()} | {demonitor, process, pid()} | {demonitor, node, node()} | {timer, term(), non_neg_integer() | infinity} | {log, [ra_index()], fun(([user_command()]) -> effects())} | + + %% these are either conditional on the local configuration or + %% will always be evaluated when seen by members in any raft state + {send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} | {log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} | + {log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}} | {release_cursor, ra_index(), state()} | {release_cursor, ra_index()} | {checkpoint, ra_index(), state()} | {aux, term()} | + %% like append/3 but a special backwards compatible function + %% that tries to execute in any raft state + {try_append, term(), ra_server:command_reply_mode()} | garbage_collection. %% Effects are data structures that can be returned by {@link apply/3} to ask diff --git a/src/ra_server.erl b/src/ra_server.erl index 4d3a2461..1cf44eb0 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -58,11 +58,13 @@ terminate/2, log_fold/3, log_read/2, + log_partial_read/2, get_membership/1, get_condition_timeout/2, recover/1, state_query/2, - fetch_term/2 + fetch_term/2, + transform_for_partial_read/3 ]). -type ra_await_condition_fun() :: @@ -155,8 +157,12 @@ -type effect() :: ra_machine:effect() | ra_log:effect() | + %% this is used for replies for immedate requests {reply, ra_reply_body()} | - {reply, term(), ra_reply_body()} | + %% this is used by the leader only + {reply, from(), ra_reply_body()} | + {reply, from(), ra_reply_body(), + Replier :: leader | local | {member, ra_server_id()}} | {cast, ra_server_id(), term()} | {send_vote_requests, [{ra_server_id(), #request_vote_rpc{} | #pre_vote_rpc{}}]} | @@ -1801,10 +1807,8 @@ evaluate_commit_index_follower(#{commit_index := CommitIndex, {delete_and_terminate, State1, Effects} -> Reply = append_entries_reply(Term, true, State1), {delete_and_terminate, State1, - [cast_reply(Id, LeaderId, Reply) | - filter_follower_effects(Effects)]}; - {#{last_applied := LastApplied} = State, Effects1} -> - Effects = filter_follower_effects(Effects1), + [cast_reply(Id, LeaderId, Reply) | Effects]}; + {#{last_applied := LastApplied} = State, Effects} -> case LastApplied > LastApplied0 of true -> %% entries were applied, append eval_aux effect @@ -1818,51 +1822,6 @@ evaluate_commit_index_follower(State, Effects) -> %% when no leader is known {follower, State, Effects}. -filter_follower_effects(Effects) -> - lists:foldr(fun ({release_cursor, _, _} = C, Acc) -> - [C | Acc]; - ({release_cursor, _} = C, Acc) -> - [C | Acc]; - ({checkpoint, _, _} = C, Acc) -> - [C | Acc]; - ({record_leader_msg, _} = C, Acc) -> - [C | Acc]; - ({aux, _} = C, Acc) -> - [C | Acc]; - (garbage_collection = C, Acc) -> - [C | Acc]; - ({delete_snapshot, _} = C, Acc) -> - [C | Acc]; - ({send_msg, _, _, _Opts} = C, Acc) -> - %% send_msg effects _may_ have the local option - %% and will be evaluated properly during - %% effect processing - [C | Acc]; - ({log, _, _, _Opts} = C, Acc) -> - [C | Acc]; - ({reply, _, _, leader}, Acc) -> - Acc; - ({reply, _, _, _} = C, Acc) -> - %% If the reply-from is not `leader', the follower - %% might be the replier. - [C | Acc]; - ({monitor, _ProcOrNode, Comp, _} = C, Acc) - when Comp =/= machine -> - %% only machine monitors should not be emitted - %% by followers - [C | Acc]; - (L, Acc) when is_list(L) -> - %% nested case - recurse - case filter_follower_effects(L) of - [] -> Acc; - Filtered -> - [Filtered | Acc] - end; - (_, Acc) -> - Acc - end, [], Effects). - - make_pipelined_rpc_effects(#{cfg := #cfg{id = Id, max_append_entries_rpc_batch_size = MaxBatchSize, @@ -2200,6 +2159,15 @@ log_read(Indexes, #{log := Log0} = State) -> || {_Idx, _Term, {'$usr', _, Data, _}} <- Entries], State#{log => Log}}. +%% reads user commands at the specified index +-spec log_partial_read([ra_index()], ra_server_state()) -> + ra_log:read_plan(). +log_partial_read(Indexes, #{log := Log0}) -> + %% strip the ra internal command wrapper away to leave the original + %% machine command written + ra_log:partial_read(Indexes, Log0, + fun transform_for_partial_read/3). + %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -2518,6 +2486,15 @@ fetch_term(Idx, #{log := Log0} = State) -> {Term, State#{log => Log}} end. + +transform_for_partial_read(_Idx, _Term, {'$usr', _, Cmd, _}) -> + %% stip usr commands of ra internals + Cmd; +transform_for_partial_read(_Idx, _Term, Cmd) -> + %% Other commands leave + Cmd. + + -spec make_cluster(ra_server_id(), ra_cluster_snapshot() | [ra_server_id()]) -> ra_cluster(). make_cluster(Self, Nodes0) when is_list(Nodes0) -> @@ -2596,11 +2573,11 @@ make_notify_effects(Nots, Prior) when map_size(Nots) > 0 -> make_notify_effects(_Nots, Prior) -> Prior. -append_app_effects([], Effs) -> +append_machine_effects([], Effs) -> Effs; -append_app_effects([AppEff], Effs) -> +append_machine_effects([AppEff], Effs) -> [AppEff | Effs]; -append_app_effects(AppEffs, Effs) -> +append_machine_effects(AppEffs, Effs) -> [AppEffs | Effs]. cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}}, @@ -2631,9 +2608,9 @@ apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyMode}}, Meta = augment_command_meta(Idx, Term, MacVer, ReplyMode, CmdMeta), Ts = maps:get(ts, CmdMeta, LastTs), case ra_machine:apply(Module, Meta, Cmd, MacSt) of - {NextMacSt, Reply, AppEffs} -> + {NextMacSt, Reply, MacEffs} -> {Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyMode, - append_app_effects(AppEffs, Effects0), + append_machine_effects(MacEffs, Effects0), Notifys0), {Module, Idx, State, NextMacSt, Effects, Notifys, Ts}; diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 1de8a1a6..9f3113f6 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1318,7 +1318,7 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) -> end, {State0, Actions0}, Effects0), {State, lists:reverse(Actions)}. -handle_effect(_, {send_rpc, To, Rpc}, _, +handle_effect(leader, {send_rpc, To, Rpc}, _, #state{conf = Conf} = State0, Actions) -> % fully qualified use only so that we can mock it for testing % TODO: review / refactor to remove the mod call here @@ -1347,7 +1347,7 @@ handle_effect(_, {next_event, Evt}, EvtType, State, Actions) -> {State, [{next_event, EvtType, Evt} | Actions]}; handle_effect(_, {next_event, _, _} = Next, _, State, Actions) -> {State, [Next | Actions]}; -handle_effect(_, {send_msg, To, Msg}, _, State, Actions) -> +handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) -> %% default is to send without any wrapping %% TODO: handle send failure? how? _ = send(To, Msg, State#state.conf), @@ -1370,18 +1370,26 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, ok end, {State, Actions}; -handle_effect(RaftState, {log, Idxs, Fun, {local, Node}}, EvtType, - State, Actions) -> +handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType, + State, Actions) + when LogOrLogExt == log orelse LogOrLogExt == log_ext -> case can_execute_locally(RaftState, Node, State) of true -> - handle_effect(RaftState, {log, Idxs, Fun}, EvtType, State, Actions); + handle_effect(RaftState, {LogOrLogExt, Idxs, Fun}, EvtType, + State, Actions); false -> {State, Actions} end; -handle_effect(_RaftState, {append, Cmd}, _EvtType, State, Actions) -> +handle_effect(leader, {append, Cmd}, _EvtType, State, Actions) -> Evt = {command, normal, {'$usr', Cmd, noreply}}, {State, [{next_event, cast, Evt} | Actions]}; -handle_effect(_RaftState, {append, Cmd, ReplyMode}, _EvtType, State, Actions) -> +handle_effect(leader, {append, Cmd, ReplyMode}, _EvtType, State, Actions) -> + Evt = {command, normal, {'$usr', Cmd, ReplyMode}}, + {State, [{next_event, cast, Evt} | Actions]}; +handle_effect(_RaftState, {try_append, Cmd, ReplyMode}, _EvtType, State, Actions) -> + %% this is a special mode to retain the backwards compatibility of + %% certain prior uses of {append, when it wasn't (accidentally) + %% limited to the leader Evt = {command, normal, {'$usr', Cmd, ReplyMode}}, {State, [{next_event, cast, Evt} | Actions]}; handle_effect(RaftState, {log, Idxs, Fun}, EvtType, @@ -1399,6 +1407,18 @@ handle_effect(RaftState, {log, Idxs, Fun}, EvtType, handle_effects(RaftState, Effects, EvtType, State#state{server_state = SS}, Actions) end; +handle_effect(RaftState, {log_ext, Idxs, Fun}, EvtType, + State = #state{server_state = SS0}, Actions) + when is_list(Idxs) -> + ReadState = ra_server:log_partial_read(Idxs, SS0), + case Fun(ReadState) of + [] -> + {State, Actions}; + Effects -> + %% recurse with the new effects + handle_effects(RaftState, Effects, EvtType, + State, Actions) + end; handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) -> {_, ServerState, Effects} = ra_server:handle_aux(RaftState, cast, Cmd, State0#state.server_state), @@ -1406,19 +1426,21 @@ handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) -> handle_effects(RaftState, Effects, EventType, State0#state{server_state = ServerState}), {State, Actions0 ++ Actions}; -handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) -> +handle_effect(leader, {notify, Nots}, _, #state{} = State0, Actions) -> %% should only be done by leader State = send_applied_notifications(State0, Nots), {State, Actions}; -handle_effect(_, {cast, To, Msg}, _, State, Actions) -> +handle_effect(_AnyState, {cast, To, Msg}, _, State, Actions) -> %% TODO: handle send failure _ = gen_cast(To, Msg, State), {State, Actions}; handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _, State, Actions) -> case Replier of - leader -> + leader when RaftState == leader -> ok = gen_statem:reply(From, Reply); + leader -> + ok; local -> case can_execute_locally(RaftState, node(Pid), State) of true -> @@ -1437,15 +1459,17 @@ handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _, ok end, {State, Actions}; -handle_effect(_, {reply, From, Reply}, _, State, Actions) -> - % reply directly +handle_effect(leader, {reply, From, Reply}, _, State, Actions) -> + % reply directly, this is only done from the leader + % this is like reply/4 above with the Replier=leader ok = gen_statem:reply(From, Reply), {State, Actions}; -handle_effect(_, {reply, Reply}, {call, From}, State, Actions) -> - % reply directly +handle_effect(_RaftState, {reply, Reply}, {call, From}, State, Actions) -> + % this is the reply effect for replying to the current call, any state + % can use this ok = gen_statem:reply(From, Reply), {State, Actions}; -handle_effect(_, {reply, _Reply}, _EvtType, State, Actions) -> +handle_effect(_RaftState, {reply, _From, _Reply}, _EvtType, State, Actions) -> {State, Actions}; handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _, #state{server_state = SS0, @@ -1534,10 +1558,21 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; -handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, +handle_effect(leader, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> + %% this effect type is only emitted by state machines and thus will + %% only be monitored from the leader {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, Actions0}; +handle_effect(leader, {monitor, _ProcOrNode, machine, PidOrNode}, _, + #state{monitors = Monitors} = State, Actions0) -> + {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, + Actions0}; +handle_effect(_RaftState, {monitor, _ProcOrNode, machine, _PidOrNode}, _, + #state{} = State, Actions0) -> + %% AFAIK: there is nothing emitting this effect type but we have to + %% guard against it being actioned on the follower anyway + {State, Actions0}; handle_effect(_, {monitor, _ProcOrNode, Component, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, Component, Monitors)}, @@ -1550,9 +1585,9 @@ handle_effect(_, {demonitor, _ProcOrNode, Component, PidOrNode}, _, #state{monitors = Monitors0} = State, Actions) -> Monitors = ra_monitors:remove(PidOrNode, Component, Monitors0), {State#state{monitors = Monitors}, Actions}; -handle_effect(_, {timer, Name, T}, _, State, Actions) -> +handle_effect(leader, {timer, Name, T}, _, State, Actions) -> {State, [{{timeout, Name}, T, machine_timeout} | Actions]}; -handle_effect(_, {mod_call, Mod, Fun, Args}, _, +handle_effect(leader, {mod_call, Mod, Fun, Args}, _, State, Actions) -> %% TODO: catch and log failures or rely on calling function never crashing _ = erlang:apply(Mod, Fun, Args), @@ -1569,6 +1604,8 @@ handle_effect(follower, {record_leader_msg, _LeaderId}, _, State0, Actions) -> {State, [{state_timeout, infinity, undefined} | Actions]}; handle_effect(_, {record_leader_msg, _LeaderId}, _, State0, Actions) -> %% non follower states don't need to reset state timeout after an effect + {State0, Actions}; +handle_effect(_, _, _, State0, Actions) -> {State0, Actions}. send_rpcs(State0) -> @@ -1941,7 +1978,8 @@ can_execute_locally(RaftState, TargetNode, #state{server_state = ServerState} = State) -> Membership = ra_server:get_membership(ServerState), case RaftState of - follower when Membership == voter -> + _ when RaftState =/= leader andalso + Membership == voter -> TargetNode == node(); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 98e90994..66ab47c7 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -487,16 +487,16 @@ disconnected_node_catches_up(Config) -> Self = self(), SPid = erlang:spawn(DownNode, - fun () -> - erlang:register(snapshot_installed_proc, self()), - receive - {snapshot_installed, _Meta} = Evt -> - Self ! Evt, - ok - after 10000 -> - ok - end - end), + fun () -> + erlang:register(snapshot_installed_proc, self()), + receive + {snapshot_installed, _Meta} = Evt -> + Self ! Evt, + ok + after 10000 -> + ok + end + end), await_condition( fun () -> ok == ra:restart_server(?SYS, DownServerId) @@ -512,10 +512,11 @@ disconnected_node_catches_up(Config) -> receive {snapshot_installed, Meta} -> - ct:pal("snapshot installed receive ~p", [Meta]), + ct:pal("snapshot installed received ~p", [Meta]), ok after 10000 -> erlang:exit(SPid, kill), + flush(), ct:fail("snapshot_installed not received"), ok end, @@ -1344,7 +1345,7 @@ snapshot_installed(#{machine_version := _, undefined -> []; Pid -> - [{send_msg, Pid, {snapshot_installed, Meta}}] + [{send_msg, Pid, {snapshot_installed, Meta}, local}] end. node_setup(DataDir) -> diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index c3d71e8b..ddbf33a6 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -102,8 +102,18 @@ restart_ra(DataDir) -> init_per_group(_G, Config) -> PrivDir = ?config(priv_dir, Config), DataDir = filename:join([PrivDir, "data"]), + ra_env:configure_logger(logger), + ok = logger:set_primary_config(level, debug), + LogFile = filename:join(?config(priv_dir, Config), "ra.log"), + SaslFile = filename:join(?config(priv_dir, Config), "ra_sasl.log"), + logger:add_handler(ra_handler, logger_std_h, #{config => #{file => LogFile}}), + application:load(sasl), + application:set_env(sasl, sasl_error_logger, {file, SaslFile}), + application:stop(sasl), + application:start(sasl), + _ = error_logger:tty(false), ok = restart_ra(DataDir), - % ok = logger:set_application_level(ra, all), + ok = logger:set_application_level(ra, all), Config. end_per_group(_, Config) -> @@ -297,6 +307,7 @@ start_servers(Config) -> ra:overview(?SYS), % issue command to confirm n3 joined the cluster successfully {ok, _, _} = ra:process_command(N3, 5, ?PROCESS_COMMAND_TIMEOUT), + flush(), terminate_cluster([N1, N2, N3] -- [element(1, Target)]). @@ -894,16 +905,6 @@ server_catches_up(Config) -> terminate_cluster([N3 | InitialNodes]). snapshot_installation(Config) -> - ra_env:configure_logger(logger), - ok = logger:set_primary_config(level, debug), - LogFile = filename:join(?config(priv_dir, Config), "ra.log"), - SaslFile = filename:join(?config(priv_dir, Config), "ra_sasl.log"), - logger:add_handler(ra_handler, logger_std_h, #{config => #{file => LogFile}}), - application:load(sasl), - application:set_env(sasl, sasl_error_logger, {file, SaslFile}), - application:stop(sasl), - application:start(sasl), - _ = error_logger:tty(false), N1 = nth_server_name(Config, 1), N2 = nth_server_name(Config, 2), N3 = nth_server_name(Config, 3), diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 007cca2f..1a4a8784 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -55,6 +55,7 @@ all_tests() -> transient_writer_is_handled, read_opt, sparse_read, + read_plan, sparse_read_out_of_range, sparse_read_out_of_range_2, written_event_after_snapshot, @@ -443,6 +444,33 @@ sparse_read(Config) -> {99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2), ok. +read_plan(Config) -> + Num = 256 * 2, + Div = 2, + Log0 = write_and_roll(1, Num div Div, 1, ra_log_init(Config), 50), + Log1 = wait_for_segments(Log0, 5000), + Log2 = write_no_roll(Num div Div, Num, 1, Log1, 50), + %% read small batch of the latest entries + {_, Log3} = ra_log_take(Num - 5, Num, Log2), + %% ensure cache is empty as this indicates all enties have at least + %% been written to the WAL and thus will be available in mem tables. + Log4 = deliver_log_events_cond(Log3, + fun (L) -> + ra_log:last_written(L) == + ra_log:last_index_term(L) + end, 100), + %% create a list of indexes with some consecutive and some gaps + Indexes = lists:usort(lists:seq(1, Num, 2) ++ lists:seq(1, Num, 5)), + %% make sure that the ETS deletes have been finished before we re-init + gen_server:call(ra_log_ets, ok), + ReadPlan = ra_log:partial_read(Indexes, Log4, fun (_, _, Cmd) -> Cmd end), + ?assert(is_map(ra_log_read_plan:info(ReadPlan))), + {EntriesOut, _} = ra_log_read_plan:execute(ReadPlan, undefined), + ?assertEqual(length(Indexes), maps:size(EntriesOut)), + %% assert the indexes requestd were all returned in order + [] = Indexes -- [I || I <- maps:keys(EntriesOut)], + ok. + written_event_after_snapshot(Config) -> Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}), Log1 = ra_log:append({1, 1, <<"one">>}, Log0), @@ -1384,7 +1412,6 @@ external_reader(Config) -> {Es, R2} = ra_log_reader:sparse_read( R1, lists:seq(0, 220), []), Len1 = length(Es), - ct:pal("Es ~w", [Len1]), Self ! {got, Evt, Es}, receive {ra_log_update, _, F, _} = Evt2 -> diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index f8d0c300..63079fe3 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -412,5 +412,8 @@ make_data(Size) -> term_to_binary(crypto:strong_rand_bytes(Size)). read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun (I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 3acb77a9..25d4e1c6 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -732,7 +732,10 @@ segments_for(UId, DataDir) -> SegFiles. read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun(I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). get_names(System) when is_atom(System) -> diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index 3158742f..518bfca9 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -374,8 +374,8 @@ machine_state_enter_effects(Config) -> end), meck:expect(Mod, state_enter, fun (RaftState, _State) -> - [{mod_call, erlang, send, - [Self, {state_enter, RaftState}]}] + Self ! {state_enter, RaftState}, + [] end), ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]), ra:delete_cluster([ServerId]), @@ -471,8 +471,12 @@ append_effect_with_notify(Config) -> end), ClusterName = ?config(cluster_name, Config), ServerId = ?config(server_id, Config), - ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]), - {ok, _, ServerId} = ra:process_command(ServerId, cmd), + ServerId2 = ?config(server_id2, Config), + ServerId3 = ?config(server_id3, Config), + + ok = start_cluster(ClusterName, {module, Mod, #{}}, + [ServerId, ServerId2, ServerId3]), + {ok, _, _Leader} = ra:process_command(ServerId, cmd), receive {ra_event, _, {applied, [{42, ok}]}} = Evt -> ct:pal("Got ~p", [Evt]) @@ -487,6 +491,7 @@ append_effect_with_notify(Config) -> flush(), exit(cmd2_timeout) end, + flush(), ok. append_effect_follower(Config) -> @@ -499,10 +504,12 @@ append_effect_follower(Config) -> (_, {cmd2, "yo"}, State) -> {State, ok, [{send_msg, Self, got_cmd2}]} end), + %% have to use the special try_append here as {append, should only be + %% applied to the leader meck:expect(Mod, handle_aux, fun (_, _, {cmd, ReplyMode}, Aux, Log, _MacState) -> {no_reply, Aux, Log, - [{append, {cmd2, "yo"}, ReplyMode}]}; + [{try_append, {cmd2, "yo"}, ReplyMode}]}; (_, _, _Evt, Aux, Log, _MacState) -> {no_reply, Aux, Log} end),