Skip to content

Commit

Permalink
Implement new log effects type: log_ext that only reads any entries…
Browse files Browse the repository at this point in the history
… that

exists in the mem table and returns an opaque "read plan" that can be
executed by any process on the same node by using the `ra_log_read_plan:execute/2`
function.

This PR also refactors follower effect filtering to be done in the ra_server_proc
only in order to keep this logic in one place.

Other minor refactorings and fixes and some improvements to effect documentation.
  • Loading branch information
kjnilsson committed Nov 25, 2024
1 parent 9e4cbb0 commit df18788
Show file tree
Hide file tree
Showing 18 changed files with 377 additions and 135 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,17 @@ logger:set_primary_config(level, debug).
Ra attempts to follow [Semantic Versioning](https://semver.org/).

The modules that form part of the public API are:

* `ra`
* `ra_machine` (behaviour callbacks only)
* `ra_aux`
* `ra_system`
* `ra_counters`
* `ra_counters` (counter keys may vary between minors)
* `ra_leaderboard`
* `ra_env`
* `ra_directory`
* `ra_flru`
* `ra_log_read_plan`

## Copyright and License

Expand Down
17 changes: 17 additions & 0 deletions docs/internals/STATE_MACHINE_TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,23 @@ Only the leader that first applies an entry will attempt the effect.
Followers process the same set of commands but simply throw away any effects
returned by the state machine unless specific effect provide the `local` option.

### Machine Effects table

| Spec | Executed on |
| -----| ----------- |
| `{send_msg, pid(), Msg :: term()}` | leader |
| `{send_msg, pid(), Msg :: term(), [local]}` | on member local to `pid()` else leader |
| `{monitor \| demonitor, process \| node, pid() \| node()}` | leader |
| `{mod_call, mfa()}` | leader |
| `{timer, Name :: term(), Time :: non_neg_integer() \| infinity}` | leader |
| `{append, term()}` | leader |
| `{append, term(), ra_server:command_reply_mode()}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects())}` | leader |
| `{log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{log_ext, [ra_index()], fun(([ra_log:read_plan()]) -> effects()), {local, node()}}` | on member local to `node()` else leader |
| `{release_cursor \| checkpoint, ra_index(), term()}` | all members |
| `{aux, term()}` | every member |


### Send a message

Expand Down
1 change: 0 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1262,7 +1262,6 @@ key_metrics({Name, N} = ServerId, _Timeout) when N == node() ->
key_metrics({_, N} = ServerId, Timeout) ->
erpc:call(N, ?MODULE, ?FUNCTION_NAME, [ServerId], Timeout).


%% internal

-spec usr(UserCommand, ReplyMode) -> Command when
Expand Down
1 change: 0 additions & 1 deletion src/ra_aux.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,3 @@ log_fetch(Idx, #{log := Log0} = State)
-spec log_stats(ra_aux:internal_state()) -> ra_log:overview().
log_stats(#{log := Log}) ->
ra_log:overview(Log).

7 changes: 6 additions & 1 deletion src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
is_dir/1,
is_file/1,
ensure_dir/1,
consult/1
consult/1,
cons/2
]).

-type file_err() :: file:posix() | badarg | terminated | system_limit.
Expand Down Expand Up @@ -454,6 +455,10 @@ consult(Path) ->
Err
end.

cons(Item, List)
when is_list(List) ->
[Item | List].

tokens(Str) ->
case erl_scan:string(Str) of
{ok, Tokens, _EndLoc} ->
Expand Down
76 changes: 71 additions & 5 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
write_sync/2,
fold/5,
sparse_read/2,
partial_read/3,
execute_read_plan/3,
read_plan_info/1,
last_index_term/1,
set_last_index/2,
handle_event/2,
Expand Down Expand Up @@ -70,6 +73,7 @@
{down, pid(), term()}.

-type event() :: {ra_log_event, event_body()}.
-type transform_fun() :: fun ((ra_index(), ra_term(), ra_server:command()) -> term()).

-type effect() ::
{delete_snapshot, Dir :: file:filename(), ra_idxterm()} |
Expand Down Expand Up @@ -110,7 +114,11 @@
tx = false :: boolean()
}).

-record(read_plan, {dir :: file:filename_all(),
read :: #{ra_index() := log_entry()},
plan :: ra_log_reader:read_plan()}).

-opaque read_plan() :: #read_plan{}.
-opaque state() :: #?MODULE{}.

-type ra_log_init_args() :: #{uid := ra_uid(),
Expand Down Expand Up @@ -145,6 +153,7 @@
atom() => term()}.

-export_type([state/0,
read_plan/0,
ra_log_init_args/0,
ra_meta_key/0,
segment_ref/0,
Expand Down Expand Up @@ -303,7 +312,6 @@ init(#{uid := UId,
{SnapIdx, SnapTerm},
State#?MODULE.last_written_index_term
]),
?DEBUG("~ts: ra_log:init overview ~p", [overview(State)]),
element(1, delete_segments(SnapIdx, State)).

-spec close(state()) -> ok.
Expand Down Expand Up @@ -488,8 +496,8 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, CacheNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, CacheNumRead),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
{Entries1, Reader} = ra_log_reader:sparse_read(Reader0, Indexes, Entries0),
%% here we recover the original order of indexes
Entries = case Sort of
Expand All @@ -507,6 +515,65 @@ sparse_read(Indexes0, #?MODULE{cfg = Cfg,
end,
{Entries, State#?MODULE{reader = Reader}}.


%% read a list of indexes,
%% found indexes be returned in the same order as the input list of indexes
-spec partial_read([ra_index()], state(),
fun ((ra_index(), ra_term(), ra_server:command()) -> term())
) ->
read_plan().
partial_read(Indexes0, #?MODULE{cfg = Cfg,
reader = Reader0,
last_index = LastIdx,
mem_table = Mt},
TransformFun) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_OPS, 1),
%% indexes need to be sorted high -> low for correct and efficient reading
Sort = ra_lib:lists_detect_sort(Indexes0),
Indexes1 = case Sort of
unsorted ->
lists:sort(fun erlang:'>'/2, Indexes0);
ascending ->
lists:reverse(Indexes0);
_ ->
% descending or undefined
Indexes0
end,

%% drop any indexes that are larger than the last index available
Indexes2 = lists:dropwhile(fun (I) -> I > LastIdx end, Indexes1),
{Entries0, MemTblNumRead, Indexes} = ra_mt:get_items(Indexes2, Mt),
ok = incr_counter(Cfg, ?C_RA_LOG_READ_MEM_TBL, MemTblNumRead),
Read = lists:foldl(fun ({I, T, Cmd}, Acc) ->
maps:put(I, TransformFun(I, T, Cmd), Acc)
end, #{}, Entries0),

Plan = ra_log_reader:read_plan(Reader0, Indexes),
#read_plan{dir = Cfg#cfg.directory,
read = Read,
plan = Plan}.


-spec execute_read_plan(read_plan(), undefined | ra_flru:state(),
TransformFun :: transform_fun()) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
execute_read_plan(#read_plan{dir = Dir,
read = Read,
plan = Plan}, Flru0, TransformFun) ->
ra_log_reader:exec_read_plan(Dir, Plan, Flru0, TransformFun, Read).

-spec read_plan_info(read_plan()) -> map().
read_plan_info(#read_plan{read = Read,
plan = Plan}) ->
NumSegments = length(Plan),
NumInSegments = lists:foldl(fun ({_, Idxs}, Acc) ->
Acc + length(Idxs)
end, 0, Plan),
#{num_read => map_size(Read),
num_in_segments => NumInSegments,
num_segments => NumSegments}.


-spec last_index_term(state()) -> ra_idxterm().
last_index_term(#?MODULE{last_index = LastIdx, last_term = LastTerm}) ->
{LastIdx, LastTerm}.
Expand Down Expand Up @@ -1309,8 +1376,7 @@ put_counter(#cfg{counter = undefined}, _Ix, _N) ->
ok.

server_data_dir(Dir, UId) ->
Me = ra_lib:to_list(UId),
filename:join(Dir, Me).
filename:join(Dir, UId).

maps_with_values(Keys, Map) ->
lists:foldr(
Expand Down
20 changes: 20 additions & 0 deletions src/ra_log_read_plan.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
-module(ra_log_read_plan).


-export([execute/2,
info/1]).

-spec execute(ra_log:read_plan(), undefined | ra_flru:state()) ->
{#{ra:index() => Command :: ra_server:command()}, ra_flru:state()}.
execute(Plan, Flru) ->
ra_log:execute_read_plan(Plan, Flru, fun ra_server:transform_for_partial_read/3).

-spec info(ra_log:read_plan()) -> map().
info(Plan) ->
ra_log:read_plan_info(Plan).
71 changes: 69 additions & 2 deletions src/ra_log_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
update_first_index/2,
fold/5,
sparse_read/3,
read_plan/2,
exec_read_plan/5,
fetch_term/2
]).

Expand All @@ -44,10 +46,12 @@
}).

-opaque state() :: #?STATE{}.
-type read_plan() :: [{BaseName :: file:filename_all(), [ra:index()]}].


-export_type([
state/0
state/0,
read_plan/0
]).

%% PUBLIC
Expand Down Expand Up @@ -170,13 +174,55 @@ sparse_read(#?STATE{cfg = #cfg{} = Cfg} = State, Indexes, Entries0) ->
ok = incr_counter(Cfg, ?C_RA_LOG_READ_SEGMENT, SegC),
{Entries, State#?MODULE{open_segments = Open}}.

-spec read_plan(state(), [ra_index()]) -> read_plan().
read_plan(#?STATE{segment_refs = SegRefs}, Indexes) ->
%% TODO: add counter for number of read plans requested
segment_read_plan(SegRefs, Indexes, []).

-spec exec_read_plan(file:filename_all(), read_plan(), undefined | ra_flru:state(),
TransformFun :: fun(),
#{ra_index() => Command :: term()}) ->
{#{ra_index() => Command :: term()}, ra_flru:state()}.
exec_read_plan(Dir, Plan, undefined, TransformFun, Acc0) ->
Open = ra_flru:new(1, fun({_, Seg}) -> ra_log_segment:close(Seg) end),
exec_read_plan(Dir, Plan, Open, TransformFun, Acc0);
exec_read_plan(Dir, Plan, Open0, TransformFun, Acc0)
when is_list(Plan) ->
Fun = fun (I, T, B, Acc) ->
E = TransformFun(I, T, binary_to_term(B)),
Acc#{I => E}
end,
lists:foldl(
fun ({Idxs, BaseName}, {Acc1, Open1}) ->
{Seg, Open} = get_segment_ext(Dir, Open1, BaseName),
{_, Acc} = ra_log_segment:read_sparse(Seg, Idxs, Fun, Acc1),
{Acc, Open}
end, {Acc0, Open0}, Plan).

-spec fetch_term(ra_index(), state()) -> {option(ra_index()), state()}.
fetch_term(Idx, #?STATE{cfg = #cfg{} = Cfg} = State0) ->
incr_counter(Cfg, ?C_RA_LOG_FETCH_TERM, 1),
segment_term_query(Idx, State0).

%% LOCAL

segment_read_plan(_RegRefs, [], Acc) ->
lists:reverse(Acc);
segment_read_plan([], _Indexes, Acc) ->
%% TODO: not all indexes were found
lists:reverse(Acc);
segment_read_plan([{To, From, Fn} | SegRefs], Indexes, Acc) ->
%% TODO: address unnecessary allocation here
Range = {To, From},
case sparse_read_split(fun (I) ->
ra_range:in(I, Range)
end, Indexes, []) of
{[], _} ->
segment_read_plan(SegRefs, Indexes, Acc);
{Idxs, Rem} ->
segment_read_plan(SegRefs, Rem, [{Idxs, Fn} | Acc])
end.

segment_term_query(Idx, #?MODULE{segment_refs = SegRefs,
cfg = Cfg,
open_segments = OpenSegs} = State) ->
Expand Down Expand Up @@ -255,7 +301,10 @@ segment_sparse_read(#?STATE{segment_refs = SegRefs,
end, Idxs, []),
{ReadSparseCount, Entries} =
ra_log_segment:read_sparse(Seg, ReadIdxs,
fun binary_to_term/1, []),
fun (I, T, B, Acc) ->
[{I, T, binary_to_term(B)} | Acc]
end,
[]),
{Open, RemIdxs, C + ReadSparseCount,
lists:reverse(Entries, En0)};
(_Segref, Acc) ->
Expand Down Expand Up @@ -294,6 +343,24 @@ get_segment(#cfg{directory = Dir,
end
end.

get_segment_ext(Dir, Open0, Fn) ->
case ra_flru:fetch(Fn, Open0) of
{ok, S, Open1} ->
{S, Open1};
error ->
AbsFn = filename:join(Dir, Fn),
case ra_log_segment:open(AbsFn,
#{mode => read,
access_pattern => random})
of
{ok, S} ->
{S, ra_flru:insert(Fn, S, Open0)};
{error, Err} ->
exit({ra_log_failed_to_open_segment, Err,
AbsFn})
end
end.

compact_seg_refs([], PreviousSegRefs) ->
PreviousSegRefs;
compact_seg_refs(NewSegRefs, []) ->
Expand Down
28 changes: 16 additions & 12 deletions src/ra_log_segment.erl
Original file line number Diff line number Diff line change
Expand Up @@ -272,33 +272,37 @@ fold(#state{cfg = #cfg{mode = read} = Cfg,
fold0(Cfg, Cache, FromIdx, ToIdx, Index, Fun, AccFun, Acc).

-spec read_sparse(state(), [ra_index()],
fun((binary()) -> term()), term()) ->
{non_neg_integer(), term()}.
fun((ra:index(), ra_term(), binary(), Acc) -> Acc),
Acc) ->
{NumRead :: non_neg_integer(), Acc}
when Acc :: term().
read_sparse(#state{index = Index,
cfg = Cfg}, Indexes, Fun, Acc) ->
cfg = Cfg}, Indexes, AccFun, Acc) ->
Cache0 = prepare_cache(Cfg, Indexes, Index),
read_sparse0(Cfg, Indexes, Index, Cache0, Fun, Acc, 0).
read_sparse0(Cfg, Indexes, Index, Cache0, Acc, AccFun, 0).

read_sparse0(_Cfg, [], _Index, _Cache, _Fun, Acc, Num) ->
read_sparse0(_Cfg, [], _Index, _Cache, Acc, _AccFun, Num) ->
{Num, Acc};
read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Fun, Acc, Num)
read_sparse0(Cfg, [NextIdx | Rem] = Indexes, Index, Cache0, Acc, AccFun, Num)
when is_map_key(NextIdx, Index) ->
{Term, Offset, Length, _} = map_get(NextIdx, Index),
case cache_read(Cache0, Offset, Length) of
false ->
case prepare_cache(Cfg, Indexes, Index) of
undefined ->
{ok, Data, _} = pread(Cfg, undefined, Offset, Length),
read_sparse0(Cfg, Rem, Index, undefined, Fun,
[{NextIdx, Term, Fun(Data)} | Acc], Num+1);
read_sparse0(Cfg, Rem, Index, undefined,
AccFun(NextIdx, Term, Data, Acc),
AccFun, Num+1);
Cache ->
read_sparse0(Cfg, Indexes, Index, Cache, Fun, Acc, Num+1)
read_sparse0(Cfg, Indexes, Index, Cache,
Acc, AccFun, Num+1)
end;
Data ->
read_sparse0(Cfg, Rem, Index, Cache0, Fun,
[{NextIdx, Term, Fun(Data)} | Acc], Num+1)
read_sparse0(Cfg, Rem, Index, Cache0,
AccFun(NextIdx, Term, Data, Acc), AccFun, Num+1)
end;
read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Fun, _Acc, _Num) ->
read_sparse0(_Cfg, [NextIdx | _], _Index, _Cache, _Acc, _AccFun, _Num) ->
exit({missing_key, NextIdx}).

cache_read({CPos, CLen, Bin}, Pos, Length)
Expand Down
Loading

0 comments on commit df18788

Please sign in to comment.