From 831be889dfd72d1d176ad3e3de5d2adb72ba97ac Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 20 Sep 2023 10:26:59 -0700 Subject: [PATCH] address feedback #2 --- src/ra.erl | 7 ++++--- src/ra.hrl | 10 ++++++---- src/ra_log.erl | 5 +++-- src/ra_server.erl | 34 ++++++++++++++++++++-------------- test/ra_snapshot_SUITE.erl | 29 +++++++++++++++++++++++++++++ 5 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/ra.erl b/src/ra.erl index a7995aa9..f1750473 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -477,13 +477,14 @@ start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) -> start_server(System, ClusterName, #{id => ServerId}, Machine, ServerIds); start_server(System, ClusterName, #{id := {_, _}} = Conf0, Machine, ServerIds) when is_atom(System) -> - UId = new_uid(ra_lib:to_binary(ClusterName)), + UId = maps:get(uid, Conf0, + new_uid(ra_lib:to_binary(ClusterName))), Conf = #{cluster_name => ClusterName, uid => UId, initial_members => ServerIds, log_init_args => #{uid => UId}, machine => Machine}, - start_server(System, maps:merge(Conf, Conf0)). + start_server(System, maps:merge(Conf0, Conf)). %% @doc Starts a ra server in the default system %% @param Conf a ra_server_config() configuration map. @@ -1134,7 +1135,7 @@ key_metrics({Name, N} = ServerId) when N == node() -> case whereis(Name) of undefined -> Counters#{state => noproc, - non_voter => noproc}; + non_voter => unknown}; _ -> case ets:lookup(ra_state, Name) of [] -> diff --git a/src/ra.hrl b/src/ra.hrl index 7ad277d0..64c7022e 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -65,14 +65,17 @@ % used for evaluating pipeline status commit_index_sent := non_neg_integer(), %% whether the peer is part of the consensus - voter_status := ra_voter_status(), + voter_status => ra_voter_status(), %% indicates that a snapshot is being sent %% to the peer status := ra_peer_status()}. -type ra_cluster() :: #{ra_server_id() => ra_peer_state()}. --type ra_cluster_servers() :: [ra_server_id()]. +%% Dehydrated cluster: +-type ra_cluster_servers() :: [ra_server_id()]. % Deprecated +-type ra_peer_snapshot() :: #{voter_status => ra_voter_status()}. +-type ra_cluster_snapshot() :: #{ra_server_id() => ra_peer_snapshot()}. %% represent a unique entry in the ra log -type log_entry() :: {ra_index(), ra_term(), term()}. @@ -154,8 +157,7 @@ -type snapshot_meta() :: #{index := ra_index(), term := ra_term(), - cluster := ra_cluster_servers(), - cluster_state => ra_cluster(), %% TODO replace `cluster` + cluster := ra_cluster_snapshot(), machine_version := ra_machine:version()}. -record(install_snapshot_rpc, diff --git a/src/ra_log.erl b/src/ra_log.erl index 5bd897e8..421546de 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -637,14 +637,15 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState, #?MODULE{cfg = #cfg{snapshot_interval = SnapInter}, reader = Reader, snapshot_state = SnapState} = State0) -> - ClusterServerIds = maps:keys(Cluster), + ClusterServerIds = maps:map(fun (_, V) -> + maps:with([voter_status], V) + end, Cluster), SnapLimit = case ra_snapshot:current(SnapState) of undefined -> SnapInter; {I, _} -> I + SnapInter end, Meta = #{index => Idx, cluster => ClusterServerIds, - cluster_state => Cluster, machine_version => MacVersion}, % The release cursor index is the last entry _not_ contributing % to the current state. I.e. the last entry that can be discarded. diff --git a/src/ra_server.erl b/src/ra_server.erl index 11326910..b4ff0d58 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -306,9 +306,8 @@ init(#{id := Id, 0, InitialMachineState, {0, 0}}; {#{index := Idx, term := Term, - cluster := ClusterNodes0, - machine_version := MacVersion} = Snapshot, MacSt} -> - ClusterNodes = maps:get(cluster_state, Snapshot, ClusterNodes0), + cluster := ClusterNodes, + machine_version := MacVersion}, MacSt} -> Clu = make_cluster(Id, ClusterNodes), %% the snapshot is the last index before the first index %% TODO: should this be Idx + 1? @@ -1295,16 +1294,14 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, Cfg0 end, - {#{cluster := ClusterIds0} = Snapshot, MacState} = ra_log:recover_snapshot(Log), - ClusterIds = maps:get(cluster_state, Snapshot, ClusterIds0), - Cluster = make_cluster(Id, ClusterIds), + {#{cluster := ClusterIds}, MacState} = ra_log:recover_snapshot(Log), State = update_term(Term, State0#{cfg => Cfg, log => Log, commit_index => SnapIndex, last_applied => SnapIndex, - cluster => Cluster, - non_voter => get_non_voter(Cluster, State0), + cluster => make_cluster(Id, ClusterIds), + non_voter => get_non_voter(ClusterIds, State0), machine_state => MacState}), %% it was the last snapshot chunk so we can revert back to %% follower status @@ -2244,15 +2241,17 @@ fetch_term(Idx, #{log := Log0} = State) -> {Term, State#{log => Log}} end. +-spec make_cluster(ra_server_id(), ra_cluster_snapshot() | [ra_server_id()]) -> + ra_cluster(). make_cluster(Self, Nodes0) when is_list(Nodes0) -> Nodes = lists:foldl(fun(N, Acc) -> Acc#{N => new_peer()} end, #{}, Nodes0), append_self(Self, Nodes); make_cluster(Self, Nodes0) when is_map(Nodes0) -> + ct:pal("MAKE CLUSTER ~p", [[Self, Nodes0]]), Nodes = maps:map(fun(_, Peer0) -> - Peer1 = maps:with([voter_status], Peer0), - new_peer_with(Peer1) + new_peer_with(Peer0) end, Nodes0), append_self(Self, Nodes). @@ -2268,8 +2267,8 @@ append_self(Self, Nodes) -> initialise_peers(State = #{log := Log, cluster := Cluster0}) -> NextIdx = ra_log:next_index(Log), - Cluster = maps:map(fun (_, Peer) -> - Peer1 = maps:with([voter_status], Peer), + Cluster = maps:map(fun (_, Peer0) -> + Peer1 = maps:with([voter_status], Peer0), Peer2 = Peer1#{next_index => NextIdx}, new_peer_with(Peer2) end, Cluster0), @@ -2917,7 +2916,12 @@ ensure_promotion_target(Voter, _) -> {ok, Voter}. %% Get non_voter of a given Id+UId from a (possibly new) cluster. --spec get_non_voter(ra_cluster(), ra_server_id(), ra_uid(), boolean()) -> boolean(). +-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(), + ra_server_id(), ra_uid(), boolean()) -> + boolean(). +get_non_voter(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) -> + %% Legacy cluster snapshot does not retain voter_status. + Default; get_non_voter(Cluster, PeerId, UId, Default) -> case maps:get(PeerId, Cluster, undefined) of #{voter_status := #{uid := UId} = VoterStatus} -> @@ -2928,7 +2932,9 @@ get_non_voter(Cluster, PeerId, UId, Default) -> %% Get this node's non_voter from a (possibly new) cluster. %% Defaults to last known-locally value. --spec get_non_voter(ra_cluster(), ra_state()) -> boolean(). +-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(), + ra_state()) -> + boolean(). get_non_voter(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) -> Default = maps:get(non_voter, State, false), get_non_voter(Cluster, Id, UId, Default). diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index 36ff7847..32c74cd9 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -32,6 +32,7 @@ all_tests() -> take_snapshot, take_snapshot_crash, init_recover, + init_recover_voter_status, init_recover_multi_corrupt, init_recover_corrupt, read_snapshot, @@ -162,6 +163,34 @@ init_recover(Config) -> {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), ok. +init_recover_voter_status(Config) -> + UId = ?config(uid, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, + ?config(snap_dir, Config), undefined), + Meta = meta(55, 2, #{node() => #{voter_status => test}}), + {State1, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), + receive + {ra_log_event, {snapshot_written, IdxTerm}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm, State1), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + %% open a new snapshot state to simulate a restart + Recover = ra_snapshot:init(UId, ra_log_snapshot, + ?config(snap_dir, Config), undefined), + %% ensure last snapshot is recovered + %% it also needs to be validated as could have crashed mid write + undefined = ra_snapshot:pending(Recover), + {55, 2} = ra_snapshot:current(Recover), + 55 = ra_snapshot:last_index_for(UId), + + %% recover the meta data and machine state + {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + ok. + init_multi(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot,