Skip to content

Commit

Permalink
address feedback #2
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 20, 2023
1 parent d854c56 commit 831be88
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 23 deletions.
7 changes: 4 additions & 3 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,14 @@ start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) ->
start_server(System, ClusterName, #{id => ServerId}, Machine, ServerIds);
start_server(System, ClusterName, #{id := {_, _}} = Conf0, Machine, ServerIds)
when is_atom(System) ->
UId = new_uid(ra_lib:to_binary(ClusterName)),
UId = maps:get(uid, Conf0,
new_uid(ra_lib:to_binary(ClusterName))),
Conf = #{cluster_name => ClusterName,
uid => UId,
initial_members => ServerIds,
log_init_args => #{uid => UId},
machine => Machine},
start_server(System, maps:merge(Conf, Conf0)).
start_server(System, maps:merge(Conf0, Conf)).

%% @doc Starts a ra server in the default system
%% @param Conf a ra_server_config() configuration map.
Expand Down Expand Up @@ -1134,7 +1135,7 @@ key_metrics({Name, N} = ServerId) when N == node() ->
case whereis(Name) of
undefined ->
Counters#{state => noproc,
non_voter => noproc};
non_voter => unknown};
_ ->
case ets:lookup(ra_state, Name) of
[] ->
Expand Down
10 changes: 6 additions & 4 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,17 @@
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter_status := ra_voter_status(),
voter_status => ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status()}.

-type ra_cluster() :: #{ra_server_id() => ra_peer_state()}.

-type ra_cluster_servers() :: [ra_server_id()].
%% Dehydrated cluster:
-type ra_cluster_servers() :: [ra_server_id()]. % Deprecated
-type ra_peer_snapshot() :: #{voter_status => ra_voter_status()}.
-type ra_cluster_snapshot() :: #{ra_server_id() => ra_peer_snapshot()}.

%% represent a unique entry in the ra log
-type log_entry() :: {ra_index(), ra_term(), term()}.
Expand Down Expand Up @@ -154,8 +157,7 @@

-type snapshot_meta() :: #{index := ra_index(),
term := ra_term(),
cluster := ra_cluster_servers(),
cluster_state => ra_cluster(), %% TODO replace `cluster`
cluster := ra_cluster_snapshot(),
machine_version := ra_machine:version()}.

-record(install_snapshot_rpc,
Expand Down
5 changes: 3 additions & 2 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -637,14 +637,15 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState,
#?MODULE{cfg = #cfg{snapshot_interval = SnapInter},
reader = Reader,
snapshot_state = SnapState} = State0) ->
ClusterServerIds = maps:keys(Cluster),
ClusterServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
end, Cluster),
SnapLimit = case ra_snapshot:current(SnapState) of
undefined -> SnapInter;
{I, _} -> I + SnapInter
end,
Meta = #{index => Idx,
cluster => ClusterServerIds,
cluster_state => Cluster,
machine_version => MacVersion},
% The release cursor index is the last entry _not_ contributing
% to the current state. I.e. the last entry that can be discarded.
Expand Down
34 changes: 20 additions & 14 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,8 @@ init(#{id := Id,
0, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes0,
machine_version := MacVersion} = Snapshot, MacSt} ->
ClusterNodes = maps:get(cluster_state, Snapshot, ClusterNodes0),
cluster := ClusterNodes,
machine_version := MacVersion}, MacSt} ->
Clu = make_cluster(Id, ClusterNodes),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
Expand Down Expand Up @@ -1295,16 +1294,14 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
Cfg0
end,

{#{cluster := ClusterIds0} = Snapshot, MacState} = ra_log:recover_snapshot(Log),
ClusterIds = maps:get(cluster_state, Snapshot, ClusterIds0),
Cluster = make_cluster(Id, ClusterIds),
{#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log),
State = update_term(Term,
State0#{cfg => Cfg,
log => Log,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => Cluster,
non_voter => get_non_voter(Cluster, State0),
cluster => make_cluster(Id, ClusterIds),
non_voter => get_non_voter(ClusterIds, State0),
machine_state => MacState}),
%% it was the last snapshot chunk so we can revert back to
%% follower status
Expand Down Expand Up @@ -2244,15 +2241,17 @@ fetch_term(Idx, #{log := Log0} = State) ->
{Term, State#{log => Log}}
end.

-spec make_cluster(ra_server_id(), ra_cluster_snapshot() | [ra_server_id()]) ->
ra_cluster().
make_cluster(Self, Nodes0) when is_list(Nodes0) ->
Nodes = lists:foldl(fun(N, Acc) ->
Acc#{N => new_peer()}
end, #{}, Nodes0),
append_self(Self, Nodes);
make_cluster(Self, Nodes0) when is_map(Nodes0) ->
ct:pal("MAKE CLUSTER ~p", [[Self, Nodes0]]),
Nodes = maps:map(fun(_, Peer0) ->
Peer1 = maps:with([voter_status], Peer0),
new_peer_with(Peer1)
new_peer_with(Peer0)
end, Nodes0),
append_self(Self, Nodes).

Expand All @@ -2268,8 +2267,8 @@ append_self(Self, Nodes) ->

initialise_peers(State = #{log := Log, cluster := Cluster0}) ->
NextIdx = ra_log:next_index(Log),
Cluster = maps:map(fun (_, Peer) ->
Peer1 = maps:with([voter_status], Peer),
Cluster = maps:map(fun (_, Peer0) ->
Peer1 = maps:with([voter_status], Peer0),
Peer2 = Peer1#{next_index => NextIdx},
new_peer_with(Peer2)
end, Cluster0),
Expand Down Expand Up @@ -2917,7 +2916,12 @@ ensure_promotion_target(Voter, _) ->
{ok, Voter}.

%% Get non_voter of a given Id+UId from a (possibly new) cluster.
-spec get_non_voter(ra_cluster(), ra_server_id(), ra_uid(), boolean()) -> boolean().
-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_server_id(), ra_uid(), boolean()) ->
boolean().
get_non_voter(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) ->
%% Legacy cluster snapshot does not retain voter_status.
Default;
get_non_voter(Cluster, PeerId, UId, Default) ->
case maps:get(PeerId, Cluster, undefined) of
#{voter_status := #{uid := UId} = VoterStatus} ->
Expand All @@ -2928,7 +2932,9 @@ get_non_voter(Cluster, PeerId, UId, Default) ->

%% Get this node's non_voter from a (possibly new) cluster.
%% Defaults to last known-locally value.
-spec get_non_voter(ra_cluster(), ra_state()) -> boolean().
-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_state()) ->
boolean().
get_non_voter(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) ->
Default = maps:get(non_voter, State, false),
get_non_voter(Cluster, Id, UId, Default).
Expand Down
29 changes: 29 additions & 0 deletions test/ra_snapshot_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ all_tests() ->
take_snapshot,
take_snapshot_crash,
init_recover,
init_recover_voter_status,
init_recover_multi_corrupt,
init_recover_corrupt,
read_snapshot,
Expand Down Expand Up @@ -162,6 +163,34 @@ init_recover(Config) ->
{ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
ok.

init_recover_voter_status(Config) ->
UId = ?config(uid, Config),
State0 = ra_snapshot:init(UId, ra_log_snapshot,
?config(snap_dir, Config), undefined),
Meta = meta(55, 2, #{node() => #{voter_status => test}}),
{State1, [{monitor, process, snapshot_writer, _}]} =
ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0),
receive
{ra_log_event, {snapshot_written, IdxTerm}} ->
_ = ra_snapshot:complete_snapshot(IdxTerm, State1),
ok
after 1000 ->
error(snapshot_event_timeout)
end,

%% open a new snapshot state to simulate a restart
Recover = ra_snapshot:init(UId, ra_log_snapshot,
?config(snap_dir, Config), undefined),
%% ensure last snapshot is recovered
%% it also needs to be validated as could have crashed mid write
undefined = ra_snapshot:pending(Recover),
{55, 2} = ra_snapshot:current(Recover),
55 = ra_snapshot:last_index_for(UId),

%% recover the meta data and machine state
{ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover),
ok.

init_multi(Config) ->
UId = ?config(uid, Config),
State0 = ra_snapshot:init(UId, ra_log_snapshot,
Expand Down

0 comments on commit 831be88

Please sign in to comment.