From 15c2ced9413869bdde345fb409c9e488e6b56596 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Wed, 13 Nov 2024 14:00:25 +0000 Subject: [PATCH 1/2] Implement new log effects type: `log_ext` that only reads any entries that exists in the mem table and returns an opaque "read plan" that can be executed by any process on the same node by using the `ra_log_read_plan:execute/2` function. This PR also refactors follower effect filtering to be done in the ra_server_proc only in order to keep this logic in one place. Other minor refactorings and fixes and some improvements to effect documentation. --- README.md | 5 +- docs/internals/STATE_MACHINE_TUTORIAL.md | 17 +++++ src/ra.erl | 1 - src/ra_lib.erl | 7 +- src/ra_log.erl | 83 ++++++++++++++++++--- src/ra_log_read_plan.erl | 20 ++++++ src/ra_log_reader.erl | 71 +++++++++++++++++- src/ra_log_segment.erl | 28 ++++---- src/ra_machine.erl | 17 +++-- src/ra_server.erl | 91 +++++++++--------------- src/ra_server_proc.erl | 76 +++++++++++++++----- test/coordination_SUITE.erl | 25 +++---- test/ra_SUITE.erl | 23 +++--- test/ra_log_2_SUITE.erl | 29 +++++++- test/ra_log_segment_SUITE.erl | 5 +- test/ra_log_segment_writer_SUITE.erl | 5 +- test/ra_machine_int_SUITE.erl | 17 +++-- 17 files changed, 383 insertions(+), 137 deletions(-) create mode 100644 src/ra_log_read_plan.erl diff --git a/README.md b/README.md index 05ba25eb..8b45fa7b 100644 --- a/README.md +++ b/README.md @@ -440,14 +440,17 @@ logger:set_primary_config(level, debug). Ra attempts to follow [Semantic Versioning](https://semver.org/). The modules that form part of the public API are: + * `ra` * `ra_machine` (behaviour callbacks only) * `ra_aux` * `ra_system` -* `ra_counters` +* `ra_counters` (counter keys may vary between minors) * `ra_leaderboard` * `ra_env` * `ra_directory` +* `ra_flru` +* `ra_log_read_plan` ## Copyright and License diff --git a/docs/internals/STATE_MACHINE_TUTORIAL.md b/docs/internals/STATE_MACHINE_TUTORIAL.md index ea6ec1b3..7ddaaf7f 100644 --- a/docs/internals/STATE_MACHINE_TUTORIAL.md +++ b/docs/internals/STATE_MACHINE_TUTORIAL.md @@ -181,6 +181,23 @@ Only the leader that first applies an entry will attempt the effect. Followers process the same set of commands but simply throw away any effects returned by the state machine unless specific effect provide the `local` option. +### Machine Effects table + +| Spec | Executed on | +| -----| ----------- | +| `{send_msg, pid(), Msg :: term()}` | leader | +| `{send_msg, pid(), Msg :: term(), [local]}` | on member local to `pid()` else leader | +| `{monitor \| demonitor, process \| node, pid() \| node()}` | leader | +| `{mod_call, mfa()}` | leader | +| `{timer, Name :: term(), Time :: non_neg_integer() \| infinity}` | leader | +| `{append, term()}` | leader | +| `{append, term(), ra_server:command_reply_mode()}` | leader | +| `{log, [ra_index()], fun(([user_command()]) -> effects())}` | leader | +| `{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}}` | on member local to `node()` else leader | +| `{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}}` | on member local to `node()` else leader | +| `{release_cursor \| checkpoint, ra_index(), term()}` | all members | +| `{aux, term()}` | every member | + ### Send a message diff --git a/src/ra.erl b/src/ra.erl index 2fe35e5a..91e7124e 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -1262,7 +1262,6 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() -> key_metrics({_, N} = ServerId, Timeout) -> erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout). - %% internal -spec usr(UserCommand, ReplyMode) -> Command when diff --git a/src/ra_lib.erl b/src/ra_lib.erl index a2cfe514..f24d0248 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -49,7 +49,8 @@ is_dir/1, is_file/1, ensure_dir/1, - consult/1 + consult/1, + cons/2 ]). -type file_err() :: file:posix() | badarg | terminated | system_limit. @@ -454,6 +455,10 @@ consult(Path) -> Err end. +cons(Item, List) + when is_list(List) -> + [Item | List]. + tokens(Str) -> case erl_scan:string(Str) of {ok, Tokens, _EndLoc} -> diff --git a/src/ra_log.erl b/src/ra_log.erl index 97f4e3ce..57e995c8 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -20,6 +20,9 @@ write_sync/2, fold/5, sparse_read/2, + partial_read/3, + execute_read_plan/3, + read_plan_info/1, last_index_term/1, set_last_index/2, handle_event/2, @@ -70,6 +73,7 @@ {down, pid(), term()}. -type event() :: {ra_log_event, event_body()}. +-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()). -type effect() :: {delete_snapshot, Dir :: file:filename(), ra_idxterm()} | @@ -84,7 +88,7 @@ -record(cfg, {uid :: ra_uid(), log_id :: unicode:chardata(), - directory :: file:filename(), + directory :: file:filename_all(), min_snapshot_interval = ?MIN_SNAPSHOT_INTERVAL :: non_neg_integer(), min_checkpoint_interval = ?MIN_CHECKPOINT_INTERVAL :: non_neg_integer(), snapshot_module :: module(), @@ -110,7 +114,11 @@ tx = false :: boolean() }). +-record(read_plan, {dir :: file:filename_all(), + read :: #{ra_index() := log_entry()}, + plan :: ra_log_reader:read_plan()}). +-opaque read_plan() :: #read_plan{}. -opaque state() :: #?MODULE{}. -type ra_log_init_args() :: #{uid := ra_uid(), @@ -145,6 +153,7 @@ atom() => term()}. -export_type([state/0, + read_plan/0, ra_log_init_args/0, ra_meta_key/0, segment_ref/0, @@ -303,7 +312,6 @@ init(#{uid := UId, {SnapIdx, SnapTerm}, State#?MODULE.last_written_index_term ]), - ?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]), element(1, delete_segments(SnapIdx, State)). -spec close(state()) -> ok. @@ -465,8 +473,9 @@ fold(From0, To0, Fun, Acc0, fold(_From, _To, _Fun, Acc, State) -> {Acc, State}. -%% read a list of indexes, -%% found indexes be returned in the same order as the input list of indexes +%% @doc Reads a list of indexes. +%% Found indexes are returned in the same order as the input list of indexes +%% @end -spec sparse_read([ra_index()], state()) -> {[log_entry()], state()}. sparse_read(Indexes0, #?MODULE{cfg = Cfg, @@ -488,8 +497,8 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg, %% drop any indexes that are larger than the last index available Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1), - {Entries0, CacheNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), - ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead), + {Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), + ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead), {Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0), %% here we recover the original order of indexes Entries = case Sort of @@ -507,6 +516,65 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg, end, {Entries, State#?MODULE{reader = Reader}}. + +%% read a list of indexes, +%% found indexes be returned in the same order as the input list of indexes +-spec partial_read([ra_index()], state(), + fun ((ra_index(), ra_term(), ra_server:command()) -> term()) + ) -> + read_plan(). +partial_read(Indexes0, #?MODULE{cfg = Cfg, + reader = Reader0, + last_index = LastIdx, + mem_table = Mt}, + TransformFun) -> + ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1), + %% indexes need to be sorted high -> low for correct and efficient reading + Sort = ra_lib:lists_detect_sort(Indexes0), + Indexes1 = case Sort of + unsorted -> + lists:sort(fun erlang:'>'/2, Indexes0); + ascending -> + lists:reverse(Indexes0); + _ -> + % descending or undefined + Indexes0 + end, + + %% drop any indexes that are larger than the last index available + Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1), + {Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt), + ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead), + Read = lists:foldl(fun ({I, T, Cmd}, Acc) -> + maps:put(I, TransformFun(I, T, Cmd), Acc) + end, #{}, Entries0), + + Plan = ra_log_reader:read_plan(Reader0, Indexes), + #read_plan{dir = Cfg#cfg.directory, + read = Read, + plan = Plan}. + + +-spec execute_read_plan(read_plan(), undefined | ra_flru:state(), + TransformFun :: transform_fun()) -> + {#{ra_index() => Command :: term()}, ra_flru:state()}. +execute_read_plan(#read_plan{dir = Dir, + read = Read, + plan = Plan}, Flru0, TransformFun) -> + ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read). + +-spec read_plan_info(read_plan()) -> map(). +read_plan_info(#read_plan{read = Read, + plan = Plan}) -> + NumSegments = length(Plan), + NumInSegments = lists:foldl(fun ({_, Idxs}, Acc) -> + Acc + length(Idxs) + end, 0, Plan), + #{num_read => map_size(Read), + num_in_segments => NumInSegments, + num_segments => NumSegments}. + + -spec last_index_term(state()) -> ra_idxterm(). last_index_term(#?MODULE{last_index = LastIdx, last_term = LastTerm}) -> {LastIdx, LastTerm}. @@ -1309,8 +1377,7 @@ put_counter(#cfg{counter = undefined}, _Ix, _N) -> ok. server_data_dir(Dir, UId) -> - Me = ra_lib:to_list(UId), - filename:join(Dir, Me). + filename:join(Dir, UId). maps_with_values(Keys, Map) -> lists:foldr( diff --git a/src/ra_log_read_plan.erl b/src/ra_log_read_plan.erl new file mode 100644 index 00000000..c99d2801 --- /dev/null +++ b/src/ra_log_read_plan.erl @@ -0,0 +1,20 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. +%% +-module(ra_log_read_plan). + + +-export([execute/2, + info/1]). + +-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) -> + {#{ra:index() => Command :: ra_server:command()}, ra_flru:state()}. +execute(Plan, Flru) -> + ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3). + +-spec info(ra_log:read_plan()) -> map(). +info(Plan) -> + ra_log:read_plan_info(Plan). diff --git a/src/ra_log_reader.erl b/src/ra_log_reader.erl index 902c51bf..ddc249c5 100644 --- a/src/ra_log_reader.erl +++ b/src/ra_log_reader.erl @@ -19,6 +19,8 @@ update_first_index/2, fold/5, sparse_read/3, + read_plan/2, + exec_read_plan/5, fetch_term/2 ]). @@ -44,10 +46,12 @@ }). -opaque state() :: #?STATE{}. +-type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}]. -export_type([ - state/0 + state/0, + read_plan/0 ]). %% PUBLIC @@ -170,6 +174,31 @@ sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes, Entries0) -> ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC), {Entries, State#?MODULE{open_segments = Open}}. +-spec read_plan(state(), [ra_index()]) -> read_plan(). +read_plan(#?STATE{segment_refs = SegRefs}, Indexes) -> + %% TODO: add counter for number of read plans requested + segment_read_plan(SegRefs, Indexes, []). + +-spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(), + TransformFun :: fun(), + #{ra_index() => Command :: term()}) -> + {#{ra_index() => Command :: term()}, ra_flru:state()}. +exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) -> + Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end), + exec_read_plan(Dir, Plan, Open, TransformFun, Acc0); +exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0) + when is_list(Plan) -> + Fun = fun (I, T, B, Acc) -> + E = TransformFun(I, T, binary_to_term(B)), + Acc#{I => E} + end, + lists:foldl( + fun ({Idxs, BaseName}, {Acc1, Open1}) -> + {Seg, Open} = get_segment_ext(Dir, Open1, BaseName), + {_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1), + {Acc, Open} + end, {Acc0, Open0}, Plan). + -spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}. fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) -> incr_counter(Cfg, ?C_RA_LOG_FETCH_TERM, 1), @@ -177,6 +206,23 @@ fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) -> %% LOCAL +segment_read_plan(_RegRefs, [], Acc) -> + lists:reverse(Acc); +segment_read_plan([], _Indexes, Acc) -> + %% not all indexes were found + lists:reverse(Acc); +segment_read_plan([{To, From, Fn} | SegRefs], Indexes, Acc) -> + %% TODO: address unnecessary allocation here + Range = {To, From}, + case sparse_read_split(fun (I) -> + ra_range:in(I, Range) + end, Indexes, []) of + {[], _} -> + segment_read_plan(SegRefs, Indexes, Acc); + {Idxs, Rem} -> + segment_read_plan(SegRefs, Rem, [{Idxs, Fn} | Acc]) + end. + segment_term_query(Idx, #?MODULE{segment_refs = SegRefs, cfg = Cfg, open_segments = OpenSegs} = State) -> @@ -255,7 +301,10 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs, end, Idxs, []), {ReadSparseCount, Entries} = ra_log_segment:read_sparse(Seg, ReadIdxs, - fun binary_to_term/1, []), + fun (I, T, B, Acc) -> + [{I, T, binary_to_term(B)} | Acc] + end, + []), {Open, RemIdxs, C + ReadSparseCount, lists:reverse(Entries, En0)}; (_Segref, Acc) -> @@ -294,6 +343,24 @@ get_segment(#cfg{directory = Dir, end end. +get_segment_ext(Dir, Open0, Fn) -> + case ra_flru:fetch(Fn, Open0) of + {ok, S, Open1} -> + {S, Open1}; + error -> + AbsFn = filename:join(Dir, Fn), + case ra_log_segment:open(AbsFn, + #{mode => read, + access_pattern => random}) + of + {ok, S} -> + {S, ra_flru:insert(Fn, S, Open0)}; + {error, Err} -> + exit({ra_log_failed_to_open_segment, Err, + AbsFn}) + end + end. + compact_seg_refs([], PreviousSegRefs) -> PreviousSegRefs; compact_seg_refs(NewSegRefs, []) -> diff --git a/src/ra_log_segment.erl b/src/ra_log_segment.erl index 10e61017..84b436e3 100644 --- a/src/ra_log_segment.erl +++ b/src/ra_log_segment.erl @@ -272,16 +272,18 @@ fold(#state{cfg = #cfg{mode = read} = Cfg, fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc). -spec read_sparse(state(), [ra_index()], - fun((binary()) -> term()), term()) -> - {non_neg_integer(), term()}. + fun((ra:index(), ra_term(), binary(), Acc) -> Acc), + Acc) -> + {NumRead :: non_neg_integer(), Acc} + when Acc :: term(). read_sparse(#state{index = Index, - cfg = Cfg}, Indexes, Fun, Acc) -> + cfg = Cfg}, Indexes, AccFun, Acc) -> Cache0 = prepare_cache(Cfg, Indexes, Index), - read_sparse0(Cfg, Indexes, Index, Cache0, Fun, Acc, 0). + read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0). -read_sparse0(_Cfg, [], _Index, _Cache, _Fun, Acc, Num) -> +read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) -> {Num, Acc}; -read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num) +read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num) when is_map_key(NextIdx, Index) -> {Term, Offset, Length, _} = map_get(NextIdx, Index), case cache_read(Cache0, Offset, Length) of @@ -289,16 +291,18 @@ read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num) case prepare_cache(Cfg, Indexes, Index) of undefined -> {ok, Data, _} = pread(Cfg, undefined, Offset, Length), - read_sparse0(Cfg, Rem, Index, undefined, Fun, - [{NextIdx, Term, Fun(Data)} | Acc], Num+1); + read_sparse0(Cfg, Rem, Index, undefined, + AccFun(NextIdx, Term, Data, Acc), + AccFun, Num+1); Cache -> - read_sparse0(Cfg, Indexes, Index, Cache, Fun, Acc, Num+1) + read_sparse0(Cfg, Indexes, Index, Cache, + Acc, AccFun, Num+1) end; Data -> - read_sparse0(Cfg, Rem, Index, Cache0, Fun, - [{NextIdx, Term, Fun(Data)} | Acc], Num+1) + read_sparse0(Cfg, Rem, Index, Cache0, + AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1) end; -read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Fun, _Acc, _Num) -> +read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) -> exit({missing_key, NextIdx}). cache_read({CPos, CLen, Bin}, Pos, Length) diff --git a/src/ra_machine.erl b/src/ra_machine.erl index e7106ba4..63e97fcc 100644 --- a/src/ra_machine.erl +++ b/src/ra_machine.erl @@ -119,26 +119,31 @@ -type version() :: non_neg_integer(). -type effect() :: + %% These effects are only executed on the leader {send_msg, To :: locator(), Msg :: term()} | - %% @TODO: with local deliveries is it theoretically possible for a follower - %% to apply entries but not know who the current leader is? - %% If so, `To' must also include undefined - {send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} | {mod_call, module(), Function :: atom(), [term()]} | %% appends a user command to the raft log - {append, term()} | - {append, term(), ra_server:command_reply_mode()} | + {append, Cmd :: term()} | + {append, Cmd :: term(), ra_server:command_reply_mode()} | {monitor, process, pid()} | {monitor, node, node()} | {demonitor, process, pid()} | {demonitor, node, node()} | {timer, term(), non_neg_integer() | infinity} | {log, [ra_index()], fun(([user_command()]) -> effects())} | + + %% these are either conditional on the local configuration or + %% will always be evaluated when seen by members in any raft state + {send_msg, To :: locator(), Msg :: term(), Options :: send_msg_opts()} | {log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} | + {log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}} | {release_cursor, ra_index(), state()} | {release_cursor, ra_index()} | {checkpoint, ra_index(), state()} | {aux, term()} | + %% like append/3 but a special backwards compatible function + %% that tries to execute in any raft state + {try_append, term(), ra_server:command_reply_mode()} | garbage_collection. %% Effects are data structures that can be returned by {@link apply/3} to ask diff --git a/src/ra_server.erl b/src/ra_server.erl index 4d3a2461..3b06af9f 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -58,11 +58,13 @@ terminate/2, log_fold/3, log_read/2, + log_partial_read/2, get_membership/1, get_condition_timeout/2, recover/1, state_query/2, - fetch_term/2 + fetch_term/2, + transform_for_partial_read/3 ]). -type ra_await_condition_fun() :: @@ -155,8 +157,12 @@ -type effect() :: ra_machine:effect() | ra_log:effect() | + %% this is used for replies for immedate requests {reply, ra_reply_body()} | - {reply, term(), ra_reply_body()} | + %% this is used by the leader only + {reply, from(), ra_reply_body()} | + {reply, from(), ra_reply_body(), + Replier :: leader | local | {member, ra_server_id()}} | {cast, ra_server_id(), term()} | {send_vote_requests, [{ra_server_id(), #request_vote_rpc{} | #pre_vote_rpc{}}]} | @@ -1801,10 +1807,8 @@ evaluate_commit_index_follower(#{commit_index := CommitIndex, {delete_and_terminate, State1, Effects} -> Reply = append_entries_reply(Term, true, State1), {delete_and_terminate, State1, - [cast_reply(Id, LeaderId, Reply) | - filter_follower_effects(Effects)]}; - {#{last_applied := LastApplied} = State, Effects1} -> - Effects = filter_follower_effects(Effects1), + [cast_reply(Id, LeaderId, Reply) | Effects]}; + {#{last_applied := LastApplied} = State, Effects} -> case LastApplied > LastApplied0 of true -> %% entries were applied, append eval_aux effect @@ -1818,51 +1822,6 @@ evaluate_commit_index_follower(State, Effects) -> %% when no leader is known {follower, State, Effects}. -filter_follower_effects(Effects) -> - lists:foldr(fun ({release_cursor, _, _} = C, Acc) -> - [C | Acc]; - ({release_cursor, _} = C, Acc) -> - [C | Acc]; - ({checkpoint, _, _} = C, Acc) -> - [C | Acc]; - ({record_leader_msg, _} = C, Acc) -> - [C | Acc]; - ({aux, _} = C, Acc) -> - [C | Acc]; - (garbage_collection = C, Acc) -> - [C | Acc]; - ({delete_snapshot, _} = C, Acc) -> - [C | Acc]; - ({send_msg, _, _, _Opts} = C, Acc) -> - %% send_msg effects _may_ have the local option - %% and will be evaluated properly during - %% effect processing - [C | Acc]; - ({log, _, _, _Opts} = C, Acc) -> - [C | Acc]; - ({reply, _, _, leader}, Acc) -> - Acc; - ({reply, _, _, _} = C, Acc) -> - %% If the reply-from is not `leader', the follower - %% might be the replier. - [C | Acc]; - ({monitor, _ProcOrNode, Comp, _} = C, Acc) - when Comp =/= machine -> - %% only machine monitors should not be emitted - %% by followers - [C | Acc]; - (L, Acc) when is_list(L) -> - %% nested case - recurse - case filter_follower_effects(L) of - [] -> Acc; - Filtered -> - [Filtered | Acc] - end; - (_, Acc) -> - Acc - end, [], Effects). - - make_pipelined_rpc_effects(#{cfg := #cfg{id = Id, max_append_entries_rpc_batch_size = MaxBatchSize, @@ -2200,6 +2159,13 @@ log_read(Indexes, #{log := Log0} = State) -> || {_Idx, _Term, {'$usr', _, Data, _}} <- Entries], State#{log => Log}}. +%% reads user commands at the specified index +-spec log_partial_read([ra_index()], ra_server_state()) -> + ra_log:read_plan(). +log_partial_read(Indexes, #{log := Log0}) -> + ra_log:partial_read(Indexes, Log0, + fun transform_for_partial_read/3). + %%%=================================================================== %%% Internal functions %%%=================================================================== @@ -2518,6 +2484,19 @@ fetch_term(Idx, #{log := Log0} = State) -> {Term, State#{log => Log}} end. + +%% @doc Strips the Ra internal command wrapper away for user commands +%% to return the original machine command written +%% @end +transform_for_partial_read(_Idx, _Term, {'$usr', _, Cmd, _}) -> + Cmd; +transform_for_partial_read(_Idx, _Term, Cmd) -> + %% Other commands leave as is. + %% It would be quite unusual for these to be read externally + %% but you never know. + Cmd. + + -spec make_cluster(ra_server_id(), ra_cluster_snapshot() | [ra_server_id()]) -> ra_cluster(). make_cluster(Self, Nodes0) when is_list(Nodes0) -> @@ -2596,11 +2575,11 @@ make_notify_effects(Nots, Prior) when map_size(Nots) > 0 -> make_notify_effects(_Nots, Prior) -> Prior. -append_app_effects([], Effs) -> +append_machine_effects([], Effs) -> Effs; -append_app_effects([AppEff], Effs) -> +append_machine_effects([AppEff], Effs) -> [AppEff | Effs]; -append_app_effects(AppEffs, Effs) -> +append_machine_effects(AppEffs, Effs) -> [AppEffs | Effs]. cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}}, @@ -2631,9 +2610,9 @@ apply_with({Idx, Term, {'$usr', CmdMeta, Cmd, ReplyMode}}, Meta = augment_command_meta(Idx, Term, MacVer, ReplyMode, CmdMeta), Ts = maps:get(ts, CmdMeta, LastTs), case ra_machine:apply(Module, Meta, Cmd, MacSt) of - {NextMacSt, Reply, AppEffs} -> + {NextMacSt, Reply, MacEffs} -> {Effects, Notifys} = add_reply(CmdMeta, Reply, ReplyMode, - append_app_effects(AppEffs, Effects0), + append_machine_effects(MacEffs, Effects0), Notifys0), {Module, Idx, State, NextMacSt, Effects, Notifys, Ts}; diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 1de8a1a6..9f3113f6 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1318,7 +1318,7 @@ handle_effects(RaftState, Effects0, EvtType, State0, Actions0) -> end, {State0, Actions0}, Effects0), {State, lists:reverse(Actions)}. -handle_effect(_, {send_rpc, To, Rpc}, _, +handle_effect(leader, {send_rpc, To, Rpc}, _, #state{conf = Conf} = State0, Actions) -> % fully qualified use only so that we can mock it for testing % TODO: review / refactor to remove the mod call here @@ -1347,7 +1347,7 @@ handle_effect(_, {next_event, Evt}, EvtType, State, Actions) -> {State, [{next_event, EvtType, Evt} | Actions]}; handle_effect(_, {next_event, _, _} = Next, _, State, Actions) -> {State, [Next | Actions]}; -handle_effect(_, {send_msg, To, Msg}, _, State, Actions) -> +handle_effect(leader, {send_msg, To, Msg}, _, State, Actions) -> %% default is to send without any wrapping %% TODO: handle send failure? how? _ = send(To, Msg, State#state.conf), @@ -1370,18 +1370,26 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff, ok end, {State, Actions}; -handle_effect(RaftState, {log, Idxs, Fun, {local, Node}}, EvtType, - State, Actions) -> +handle_effect(RaftState, {LogOrLogExt, Idxs, Fun, {local, Node}}, EvtType, + State, Actions) + when LogOrLogExt == log orelse LogOrLogExt == log_ext -> case can_execute_locally(RaftState, Node, State) of true -> - handle_effect(RaftState, {log, Idxs, Fun}, EvtType, State, Actions); + handle_effect(RaftState, {LogOrLogExt, Idxs, Fun}, EvtType, + State, Actions); false -> {State, Actions} end; -handle_effect(_RaftState, {append, Cmd}, _EvtType, State, Actions) -> +handle_effect(leader, {append, Cmd}, _EvtType, State, Actions) -> Evt = {command, normal, {'$usr', Cmd, noreply}}, {State, [{next_event, cast, Evt} | Actions]}; -handle_effect(_RaftState, {append, Cmd, ReplyMode}, _EvtType, State, Actions) -> +handle_effect(leader, {append, Cmd, ReplyMode}, _EvtType, State, Actions) -> + Evt = {command, normal, {'$usr', Cmd, ReplyMode}}, + {State, [{next_event, cast, Evt} | Actions]}; +handle_effect(_RaftState, {try_append, Cmd, ReplyMode}, _EvtType, State, Actions) -> + %% this is a special mode to retain the backwards compatibility of + %% certain prior uses of {append, when it wasn't (accidentally) + %% limited to the leader Evt = {command, normal, {'$usr', Cmd, ReplyMode}}, {State, [{next_event, cast, Evt} | Actions]}; handle_effect(RaftState, {log, Idxs, Fun}, EvtType, @@ -1399,6 +1407,18 @@ handle_effect(RaftState, {log, Idxs, Fun}, EvtType, handle_effects(RaftState, Effects, EvtType, State#state{server_state = SS}, Actions) end; +handle_effect(RaftState, {log_ext, Idxs, Fun}, EvtType, + State = #state{server_state = SS0}, Actions) + when is_list(Idxs) -> + ReadState = ra_server:log_partial_read(Idxs, SS0), + case Fun(ReadState) of + [] -> + {State, Actions}; + Effects -> + %% recurse with the new effects + handle_effects(RaftState, Effects, EvtType, + State, Actions) + end; handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) -> {_, ServerState, Effects} = ra_server:handle_aux(RaftState, cast, Cmd, State0#state.server_state), @@ -1406,19 +1426,21 @@ handle_effect(RaftState, {aux, Cmd}, EventType, State0, Actions0) -> handle_effects(RaftState, Effects, EventType, State0#state{server_state = ServerState}), {State, Actions0 ++ Actions}; -handle_effect(_, {notify, Nots}, _, #state{} = State0, Actions) -> +handle_effect(leader, {notify, Nots}, _, #state{} = State0, Actions) -> %% should only be done by leader State = send_applied_notifications(State0, Nots), {State, Actions}; -handle_effect(_, {cast, To, Msg}, _, State, Actions) -> +handle_effect(_AnyState, {cast, To, Msg}, _, State, Actions) -> %% TODO: handle send failure _ = gen_cast(To, Msg, State), {State, Actions}; handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _, State, Actions) -> case Replier of - leader -> + leader when RaftState == leader -> ok = gen_statem:reply(From, Reply); + leader -> + ok; local -> case can_execute_locally(RaftState, node(Pid), State) of true -> @@ -1437,15 +1459,17 @@ handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _, ok end, {State, Actions}; -handle_effect(_, {reply, From, Reply}, _, State, Actions) -> - % reply directly +handle_effect(leader, {reply, From, Reply}, _, State, Actions) -> + % reply directly, this is only done from the leader + % this is like reply/4 above with the Replier=leader ok = gen_statem:reply(From, Reply), {State, Actions}; -handle_effect(_, {reply, Reply}, {call, From}, State, Actions) -> - % reply directly +handle_effect(_RaftState, {reply, Reply}, {call, From}, State, Actions) -> + % this is the reply effect for replying to the current call, any state + % can use this ok = gen_statem:reply(From, Reply), {State, Actions}; -handle_effect(_, {reply, _Reply}, _EvtType, State, Actions) -> +handle_effect(_RaftState, {reply, _From, _Reply}, _EvtType, State, Actions) -> {State, Actions}; handle_effect(leader, {send_snapshot, {_, ToNode} = To, {SnapState, Id, Term}}, _, #state{server_state = SS0, @@ -1534,10 +1558,21 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; -handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, +handle_effect(leader, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> + %% this effect type is only emitted by state machines and thus will + %% only be monitored from the leader {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, Actions0}; +handle_effect(leader, {monitor, _ProcOrNode, machine, PidOrNode}, _, + #state{monitors = Monitors} = State, Actions0) -> + {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, + Actions0}; +handle_effect(_RaftState, {monitor, _ProcOrNode, machine, _PidOrNode}, _, + #state{} = State, Actions0) -> + %% AFAIK: there is nothing emitting this effect type but we have to + %% guard against it being actioned on the follower anyway + {State, Actions0}; handle_effect(_, {monitor, _ProcOrNode, Component, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, Component, Monitors)}, @@ -1550,9 +1585,9 @@ handle_effect(_, {demonitor, _ProcOrNode, Component, PidOrNode}, _, #state{monitors = Monitors0} = State, Actions) -> Monitors = ra_monitors:remove(PidOrNode, Component, Monitors0), {State#state{monitors = Monitors}, Actions}; -handle_effect(_, {timer, Name, T}, _, State, Actions) -> +handle_effect(leader, {timer, Name, T}, _, State, Actions) -> {State, [{{timeout, Name}, T, machine_timeout} | Actions]}; -handle_effect(_, {mod_call, Mod, Fun, Args}, _, +handle_effect(leader, {mod_call, Mod, Fun, Args}, _, State, Actions) -> %% TODO: catch and log failures or rely on calling function never crashing _ = erlang:apply(Mod, Fun, Args), @@ -1569,6 +1604,8 @@ handle_effect(follower, {record_leader_msg, _LeaderId}, _, State0, Actions) -> {State, [{state_timeout, infinity, undefined} | Actions]}; handle_effect(_, {record_leader_msg, _LeaderId}, _, State0, Actions) -> %% non follower states don't need to reset state timeout after an effect + {State0, Actions}; +handle_effect(_, _, _, State0, Actions) -> {State0, Actions}. send_rpcs(State0) -> @@ -1941,7 +1978,8 @@ can_execute_locally(RaftState, TargetNode, #state{server_state = ServerState} = State) -> Membership = ra_server:get_membership(ServerState), case RaftState of - follower when Membership == voter -> + _ when RaftState =/= leader andalso + Membership == voter -> TargetNode == node(); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 98e90994..66ab47c7 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -487,16 +487,16 @@ disconnected_node_catches_up(Config) -> Self = self(), SPid = erlang:spawn(DownNode, - fun () -> - erlang:register(snapshot_installed_proc, self()), - receive - {snapshot_installed, _Meta} = Evt -> - Self ! Evt, - ok - after 10000 -> - ok - end - end), + fun () -> + erlang:register(snapshot_installed_proc, self()), + receive + {snapshot_installed, _Meta} = Evt -> + Self ! Evt, + ok + after 10000 -> + ok + end + end), await_condition( fun () -> ok == ra:restart_server(?SYS, DownServerId) @@ -512,10 +512,11 @@ disconnected_node_catches_up(Config) -> receive {snapshot_installed, Meta} -> - ct:pal("snapshot installed receive ~p", [Meta]), + ct:pal("snapshot installed received ~p", [Meta]), ok after 10000 -> erlang:exit(SPid, kill), + flush(), ct:fail("snapshot_installed not received"), ok end, @@ -1344,7 +1345,7 @@ snapshot_installed(#{machine_version := _, undefined -> []; Pid -> - [{send_msg, Pid, {snapshot_installed, Meta}}] + [{send_msg, Pid, {snapshot_installed, Meta}, local}] end. node_setup(DataDir) -> diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index c3d71e8b..ddbf33a6 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -102,8 +102,18 @@ restart_ra(DataDir) -> init_per_group(_G, Config) -> PrivDir = ?config(priv_dir, Config), DataDir = filename:join([PrivDir, "data"]), + ra_env:configure_logger(logger), + ok = logger:set_primary_config(level, debug), + LogFile = filename:join(?config(priv_dir, Config), "ra.log"), + SaslFile = filename:join(?config(priv_dir, Config), "ra_sasl.log"), + logger:add_handler(ra_handler, logger_std_h, #{config => #{file => LogFile}}), + application:load(sasl), + application:set_env(sasl, sasl_error_logger, {file, SaslFile}), + application:stop(sasl), + application:start(sasl), + _ = error_logger:tty(false), ok = restart_ra(DataDir), - % ok = logger:set_application_level(ra, all), + ok = logger:set_application_level(ra, all), Config. end_per_group(_, Config) -> @@ -297,6 +307,7 @@ start_servers(Config) -> ra:overview(?SYS), % issue command to confirm n3 joined the cluster successfully {ok, _, _} = ra:process_command(N3, 5, ?PROCESS_COMMAND_TIMEOUT), + flush(), terminate_cluster([N1, N2, N3] -- [element(1, Target)]). @@ -894,16 +905,6 @@ server_catches_up(Config) -> terminate_cluster([N3 | InitialNodes]). snapshot_installation(Config) -> - ra_env:configure_logger(logger), - ok = logger:set_primary_config(level, debug), - LogFile = filename:join(?config(priv_dir, Config), "ra.log"), - SaslFile = filename:join(?config(priv_dir, Config), "ra_sasl.log"), - logger:add_handler(ra_handler, logger_std_h, #{config => #{file => LogFile}}), - application:load(sasl), - application:set_env(sasl, sasl_error_logger, {file, SaslFile}), - application:stop(sasl), - application:start(sasl), - _ = error_logger:tty(false), N1 = nth_server_name(Config, 1), N2 = nth_server_name(Config, 2), N3 = nth_server_name(Config, 3), diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index 007cca2f..1a4a8784 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -55,6 +55,7 @@ all_tests() -> transient_writer_is_handled, read_opt, sparse_read, + read_plan, sparse_read_out_of_range, sparse_read_out_of_range_2, written_event_after_snapshot, @@ -443,6 +444,33 @@ sparse_read(Config) -> {99, _, _}], _LogO3} = ra_log:sparse_read([1000,5,99], LogO2), ok. +read_plan(Config) -> + Num = 256 * 2, + Div = 2, + Log0 = write_and_roll(1, Num div Div, 1, ra_log_init(Config), 50), + Log1 = wait_for_segments(Log0, 5000), + Log2 = write_no_roll(Num div Div, Num, 1, Log1, 50), + %% read small batch of the latest entries + {_, Log3} = ra_log_take(Num - 5, Num, Log2), + %% ensure cache is empty as this indicates all enties have at least + %% been written to the WAL and thus will be available in mem tables. + Log4 = deliver_log_events_cond(Log3, + fun (L) -> + ra_log:last_written(L) == + ra_log:last_index_term(L) + end, 100), + %% create a list of indexes with some consecutive and some gaps + Indexes = lists:usort(lists:seq(1, Num, 2) ++ lists:seq(1, Num, 5)), + %% make sure that the ETS deletes have been finished before we re-init + gen_server:call(ra_log_ets, ok), + ReadPlan = ra_log:partial_read(Indexes, Log4, fun (_, _, Cmd) -> Cmd end), + ?assert(is_map(ra_log_read_plan:info(ReadPlan))), + {EntriesOut, _} = ra_log_read_plan:execute(ReadPlan, undefined), + ?assertEqual(length(Indexes), maps:size(EntriesOut)), + %% assert the indexes requestd were all returned in order + [] = Indexes -- [I || I <- maps:keys(EntriesOut)], + ok. + written_event_after_snapshot(Config) -> Log0 = ra_log_init(Config, #{min_snapshot_interval => 1}), Log1 = ra_log:append({1, 1, <<"one">>}, Log0), @@ -1384,7 +1412,6 @@ external_reader(Config) -> {Es, R2} = ra_log_reader:sparse_read( R1, lists:seq(0, 220), []), Len1 = length(Es), - ct:pal("Es ~w", [Len1]), Self ! {got, Evt, Es}, receive {ra_log_update, _, F, _} = Evt2 -> diff --git a/test/ra_log_segment_SUITE.erl b/test/ra_log_segment_SUITE.erl index f8d0c300..63079fe3 100644 --- a/test/ra_log_segment_SUITE.erl +++ b/test/ra_log_segment_SUITE.erl @@ -412,5 +412,8 @@ make_data(Size) -> term_to_binary(crypto:strong_rand_bytes(Size)). read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun (I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). diff --git a/test/ra_log_segment_writer_SUITE.erl b/test/ra_log_segment_writer_SUITE.erl index 3acb77a9..25d4e1c6 100644 --- a/test/ra_log_segment_writer_SUITE.erl +++ b/test/ra_log_segment_writer_SUITE.erl @@ -732,7 +732,10 @@ segments_for(UId, DataDir) -> SegFiles. read_sparse(R, Idxs) -> - {_, Entries} = ra_log_segment:read_sparse(R, Idxs, fun ra_lib:id/1, []), + {_, Entries} = ra_log_segment:read_sparse(R, Idxs, + fun(I, T, B, Acc) -> + [{I, T, B} | Acc] + end, []), lists:reverse(Entries). get_names(System) when is_atom(System) -> diff --git a/test/ra_machine_int_SUITE.erl b/test/ra_machine_int_SUITE.erl index 3158742f..518bfca9 100644 --- a/test/ra_machine_int_SUITE.erl +++ b/test/ra_machine_int_SUITE.erl @@ -374,8 +374,8 @@ machine_state_enter_effects(Config) -> end), meck:expect(Mod, state_enter, fun (RaftState, _State) -> - [{mod_call, erlang, send, - [Self, {state_enter, RaftState}]}] + Self ! {state_enter, RaftState}, + [] end), ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]), ra:delete_cluster([ServerId]), @@ -471,8 +471,12 @@ append_effect_with_notify(Config) -> end), ClusterName = ?config(cluster_name, Config), ServerId = ?config(server_id, Config), - ok = start_cluster(ClusterName, {module, Mod, #{}}, [ServerId]), - {ok, _, ServerId} = ra:process_command(ServerId, cmd), + ServerId2 = ?config(server_id2, Config), + ServerId3 = ?config(server_id3, Config), + + ok = start_cluster(ClusterName, {module, Mod, #{}}, + [ServerId, ServerId2, ServerId3]), + {ok, _, _Leader} = ra:process_command(ServerId, cmd), receive {ra_event, _, {applied, [{42, ok}]}} = Evt -> ct:pal("Got ~p", [Evt]) @@ -487,6 +491,7 @@ append_effect_with_notify(Config) -> flush(), exit(cmd2_timeout) end, + flush(), ok. append_effect_follower(Config) -> @@ -499,10 +504,12 @@ append_effect_follower(Config) -> (_, {cmd2, "yo"}, State) -> {State, ok, [{send_msg, Self, got_cmd2}]} end), + %% have to use the special try_append here as {append, should only be + %% applied to the leader meck:expect(Mod, handle_aux, fun (_, _, {cmd, ReplyMode}, Aux, Log, _MacState) -> {no_reply, Aux, Log, - [{append, {cmd2, "yo"}, ReplyMode}]}; + [{try_append, {cmd2, "yo"}, ReplyMode}]}; (_, _, _Evt, Aux, Log, _MacState) -> {no_reply, Aux, Log} end), From 7841faeebbff7df7d1680c759c3d82b955447781 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Tue, 26 Nov 2024 12:27:32 +0000 Subject: [PATCH 2/2] Make directories file:filename_all() types. --- src/ra_log.erl | 21 ++++++++++++--------- src/ra_log_wal.erl | 15 ++++++++------- src/ra_snapshot.erl | 28 ++++++++++++++-------------- 3 files changed, 34 insertions(+), 30 deletions(-) diff --git a/src/ra_log.erl b/src/ra_log.erl index 57e995c8..ab6406e7 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -76,7 +76,7 @@ -type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()). -type effect() :: - {delete_snapshot, Dir :: file:filename(), ra_idxterm()} | + {delete_snapshot, Dir :: file:filename_all(), ra_idxterm()} | {monitor, process, log, pid()} | ra_snapshot:effect() | ra_server:effect(). @@ -163,13 +163,16 @@ overview/0 ]). +-define(SNAPSHOTS_DIR, <<"snapshots">>). +-define(CHECKPOINTS_DIR, <<"checkpoints">>). + pre_init(#{uid := UId, system_config := #{data_dir := DataDir}} = Conf) -> Dir = server_data_dir(DataDir, UId), SnapModule = maps:get(snapshot_module, Conf, ?DEFAULT_SNAPSHOT_MODULE), MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS), - SnapshotsDir = filename:join(Dir, "snapshots"), - CheckpointsDir = filename:join(Dir, "checkpoints"), + SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR), + CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR), _ = ra_snapshot:init(UId, SnapModule, SnapshotsDir, CheckpointsDir, undefined, MaxCheckpoints), ok. @@ -192,8 +195,8 @@ init(#{uid := UId, CPInterval = maps:get(min_checkpoint_interval, Conf, ?MIN_CHECKPOINT_INTERVAL), MaxCheckpoints = maps:get(max_checkpoints, Conf, ?DEFAULT_MAX_CHECKPOINTS), - SnapshotsDir = filename:join(Dir, "snapshots"), - CheckpointsDir = filename:join(Dir, "checkpoints"), + SnapshotsDir = filename:join(Dir, ?SNAPSHOTS_DIR), + CheckpointsDir = filename:join(Dir, ?CHECKPOINTS_DIR), Counter = maps:get(counter, Conf, undefined), %% ensure directories are there @@ -1048,8 +1051,8 @@ overview(#?MODULE{last_index = LastIndex, -spec write_config(ra_server:config(), state()) -> ok. write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) -> - ConfigPath = filename:join(Dir, "config"), - TmpConfigPath = filename:join(Dir, "config.tmp"), + ConfigPath = filename:join(Dir, <<"config">>), + TmpConfigPath = filename:join(Dir, <<"config.tmp">>), % clean config of potentially unserialisable data Config = maps:without([parent, counter, @@ -1062,12 +1065,12 @@ write_config(Config0, #?MODULE{cfg = #cfg{directory = Dir}}) -> ok = prim_file:rename(TmpConfigPath, ConfigPath), ok. --spec read_config(state() | file:filename()) -> +-spec read_config(state() | file:filename_all()) -> {ok, ra_server:config()} | {error, term()}. read_config(#?MODULE{cfg = #cfg{directory = Dir}}) -> read_config(Dir); read_config(Dir) -> - ConfigPath = filename:join(Dir, "config"), + ConfigPath = filename:join(Dir, <<"config">>), ra_lib:consult(ConfigPath). -spec delete_everything(state()) -> ok. diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 1048a503..d2a40a6d 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -80,7 +80,7 @@ -type writer_name_cache() :: {NextIntId :: non_neg_integer(), #{writer_id() => binary()}}. --record(conf, {dir :: string(), +-record(conf, {dir :: file:filename_all(), segment_writer = ra_log_segment_writer :: atom() | pid(), compute_checksums = false :: boolean(), max_size_bytes :: non_neg_integer(), @@ -370,21 +370,22 @@ recover_wal(Dir, #conf{segment_writer = SegWriter, ok = ra_log_segment_writer:await(SegWriter), post_boot end, - WalFiles = lists:sort(filelib:wildcard(filename:join(Dir, "*.wal"))), + {ok, Files} = file:list_dir(Dir), + WalFiles = lists:sort([F || F <- Files, + filename:extension(F) == ".wal"]), AllWriters = [begin - FBase = filename:basename(F), - ?DEBUG("wal: recovering ~ts, Mode ~s", [FBase, Mode]), - Fd = open_at_first_record(F), + ?DEBUG("wal: recovering ~ts, Mode ~s", [F, Mode]), + Fd = open_at_first_record(filename:join(Dir, F)), {Time, #recovery{ranges = Ranges, writers = Writers}} = timer:tc(fun () -> recover_wal_chunks(Conf, Fd, Mode) end), - ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, FBase), + ok = ra_log_segment_writer:accept_mem_tables(SegWriter, Ranges, F), close_existing(Fd), ?DEBUG("wal: recovered ~ts time taken ~bms - Writer state recovered ~p", - [FBase, Time div 1000, Writers]), + [F, Time div 1000, Writers]), Writers end || F <- WalFiles], diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index 69502c4d..be6ce2af 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -74,11 +74,11 @@ %% typically /snapshots %% snapshot subdirs are store below %% this as /snapshots/Term_Index - snapshot_directory :: file:filename(), + snapshot_directory :: file:filename_all(), %% /checkpoints %% like snapshots, these are also stored in subdirs %% as /checkpoints/Term_Index - checkpoint_directory :: file:filename(), + checkpoint_directory :: file:filename_all(), pending :: option({pid(), ra_idxterm(), kind()}), accepting :: option(#accept{}), current :: option(ra_idxterm()), @@ -103,7 +103,7 @@ %% Runs in a separate process. %% External storage should be available to read %% `Sync' suggests whether the file should be synchronized with `fsync(1)'. --callback write(Location :: file:filename(), +-callback write(Location :: file:filename_all(), Meta :: meta(), Ref :: term(), Sync :: boolean()) -> @@ -112,7 +112,7 @@ {error, file_err() | term()}. %% Synchronizes the snapshot to disk. --callback sync(Location :: file:filename()) -> +-callback sync(Location :: file:filename_all()) -> ok | {error, file_err() | term()}. @@ -121,7 +121,7 @@ %% The read state should contain all the information required to read a chunk %% The Context is the map returned by the context/0 callback %% This can be used to inform the sender of receive capabilities. --callback begin_read(Location :: file:filename(), Context :: map()) -> +-callback begin_read(Location :: file:filename_all(), Context :: map()) -> {ok, Meta :: meta(), ReadState :: term()} | {error, term()}. @@ -129,11 +129,11 @@ %% Returns a binary chunk of data and a continuation state -callback read_chunk(ReadState, ChunkSizeBytes :: non_neg_integer(), - Location :: file:filename()) -> + Location :: file:filename_all()) -> {ok, Chunk :: term(), {next, ReadState} | last} | {error, term()}. %% begin a stateful snapshot acceptance process --callback begin_accept(SnapDir :: file:filename(), +-callback begin_accept(SnapDir :: file:filename_all(), Meta :: meta()) -> {ok, AcceptState :: term()} | {error, term()}. @@ -149,15 +149,15 @@ %% Side-effect function %% Recover machine state from file --callback recover(Location :: file:filename()) -> +-callback recover(Location :: file:filename_all()) -> {ok, Meta :: meta(), State :: term()} | {error, term()}. %% validate the integrity of the snapshot --callback validate(Location :: file:filename()) -> +-callback validate(Location :: file:filename_all()) -> ok | {error, term()}. %% Only read meta data from snapshot --callback read_meta(Location :: file:filename()) -> +-callback read_meta(Location :: file:filename_all()) -> {ok, meta()} | {error, invalid_format | {invalid_version, integer()} | @@ -167,7 +167,7 @@ -callback context() -> map(). --spec init(ra_uid(), module(), file:filename(), file:filename(), +-spec init(ra_uid(), module(), file:filename_all(), file:filename_all(), undefined | counters:counters_ref(), pos_integer()) -> state(). init(UId, Module, SnapshotsDir, CheckpointDir, Counter, MaxCheckpoints) -> @@ -339,7 +339,7 @@ accepting(#?MODULE{accepting = undefined}) -> accepting(#?MODULE{accepting = #accept{idxterm = Accepting}}) -> Accepting. --spec directory(state(), kind()) -> file:filename(). +-spec directory(state(), kind()) -> file:filename_all(). directory(#?MODULE{snapshot_directory = Dir}, snapshot) -> Dir; directory(#?MODULE{checkpoint_directory = Dir}, checkpoint) -> Dir. @@ -602,7 +602,7 @@ recover(#?MODULE{module = Mod, end, Mod:recover(Dir). --spec read_meta(Module :: module(), Location :: file:filename()) -> +-spec read_meta(Module :: module(), Location :: file:filename_all()) -> {ok, meta()} | {error, invalid_format | {invalid_version, integer()} | @@ -613,7 +613,7 @@ read_meta(Module, Location) -> Module:read_meta(Location). -spec current_snapshot_dir(state()) -> - option(file:filename()). + option(file:filename_all()). current_snapshot_dir(#?MODULE{snapshot_directory = Dir, current = {Idx, Term}}) -> make_snapshot_dir(Dir, Idx, Term);