diff --git a/src/ra.erl b/src/ra.erl index 286bb46b..159628fc 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -455,7 +455,7 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs, %% @doc Starts a new distributed ra cluster. %% @param ClusterName the name of the cluster. -%% @param ServerId the ra_server_id() of the server +%% @param ServerId the ra_server_id() of the server, or a map with server id and settings. %% @param Machine The {@link ra_machine:machine/0} configuration. %% @param ServerIds a list of initial (seed) server configurations %% @returns @@ -470,19 +470,21 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs, %% forcefully deleted. %% @see start_server/1 %% @end --spec start_server(atom(), ra_cluster_name(), ra_server_id(), +-spec start_server(atom(), ra_cluster_name(), ra_server_id() | ra_new_server(), ra_server:machine_conf(), [ra_server_id()]) -> ok | {error, term()}. -start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) +start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) -> + start_server(System, ClusterName, #{id => ServerId}, Machine, ServerIds); +start_server(System, ClusterName, #{id := {_, _}} = Conf0, Machine, ServerIds) when is_atom(System) -> - UId = new_uid(ra_lib:to_binary(ClusterName)), + UId = maps:get(uid, Conf0, + new_uid(ra_lib:to_binary(ClusterName))), Conf = #{cluster_name => ClusterName, - id => ServerId, uid => UId, initial_members => ServerIds, log_init_args => #{uid => UId}, machine => Machine}, - start_server(System, Conf). + start_server(System, maps:merge(Conf0, Conf)). %% @doc Starts a ra server in the default system %% @param Conf a ra_server_config() configuration map. @@ -558,9 +560,10 @@ delete_cluster(ServerIds, Timeout) -> %% affect said cluster's availability characteristics (by increasing quorum node count). %% %% @param ServerLoc the ra server or servers to try to send the command to -%% @param ServerId the ra server id of the new server. +%% @param ServerId the ra server id of the new server, or a map with server id and settings. %% @end --spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) -> +-spec add_member(ra_server_id() | [ra_server_id()], + ra_server_id() | ra_new_server()) -> ra_cmd_ret() | {error, already_member} | {error, cluster_change_not_permitted}. @@ -571,7 +574,8 @@ add_member(ServerLoc, ServerId) -> %% @see add_member/2 %% @end -spec add_member(ra_server_id() | [ra_server_id()], - ra_server_id(), timeout()) -> + ra_server_id() | ra_new_server(), + timeout()) -> ra_cmd_ret() | {error, already_member} | {error, cluster_change_not_permitted}. @@ -580,7 +584,6 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). - %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change %% command to the log. @@ -716,7 +719,6 @@ new_uid(Source) when is_binary(Source) -> Prefix = ra_lib:derive_safe_string(Source, 6), ra_lib:make_uid(string:uppercase(Prefix)). - %% @doc Returns a map of overview data of the default Ra system on the current Erlang %% node. %% DEPRECATED: user overview/1 @@ -1132,13 +1134,16 @@ key_metrics({Name, N} = ServerId) when N == node() -> end, case whereis(Name) of undefined -> - Counters#{state => noproc}; + Counters#{state => noproc, + membership => unknown}; _ -> case ets:lookup(ra_state, Name) of [] -> - Counters#{state => unknown}; - [{_, State}] -> - Counters#{state => State} + Counters#{state => unknown, + membership => unknown}; + [{_, State, Membership}] -> + Counters#{state => State, + membership => Membership} end end; key_metrics({_, N} = ServerId) -> diff --git a/src/ra.hrl b/src/ra.hrl index 08d5c5b3..8f1538cf 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -39,24 +39,45 @@ %% after node restart). Pids are not stable in this sense. -type ra_server_id() :: {Name :: atom(), Node :: node()}. +%% Specifies server configuration for a new cluster member. +%% Subset of ra_server:ra_server_config(). +%% Both `ra:add_member` and `ra:start_server` must be called with the same values. +-type ra_new_server() :: #{id := ra_server_id(), + % Defaults to `voter` if absent. + membership => ra_membership(), + % Required for `promotable` in the above. + uid => ra_uid()}. + -type ra_peer_status() :: normal | {sending_snapshot, pid()} | suspended | disconnected. +-type ra_membership() :: voter | promotable | non_voter | unknown. + +-type ra_voter_status() :: #{membership => ra_membership(), + uid => ra_uid(), + target => ra_index()}. + -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), query_index := non_neg_integer(), % the commit index last sent % used for evaluating pipeline status commit_index_sent := non_neg_integer(), + %% Whether the peer is part of the consensus. + %% Defaults to "yes" if absent. + voter_status => ra_voter_status(), %% indicates that a snapshot is being sent %% to the peer status := ra_peer_status()}. -type ra_cluster() :: #{ra_server_id() => ra_peer_state()}. --type ra_cluster_servers() :: [ra_server_id()]. +%% Dehydrated cluster: +-type ra_cluster_servers() :: [ra_server_id()]. % Deprecated +-type ra_peer_snapshot() :: #{voter_status => ra_voter_status()}. +-type ra_cluster_snapshot() :: #{ra_server_id() => ra_peer_snapshot()}. %% represent a unique entry in the ra log -type log_entry() :: {ra_index(), ra_term(), term()}. @@ -138,7 +159,7 @@ -type snapshot_meta() :: #{index := ra_index(), term := ra_term(), - cluster := ra_cluster_servers(), + cluster := ra_cluster_snapshot(), machine_version := ra_machine:version()}. -record(install_snapshot_rpc, diff --git a/src/ra_directory.erl b/src/ra_directory.erl index 0a393fea..de841475 100644 --- a/src/ra_directory.erl +++ b/src/ra_directory.erl @@ -175,14 +175,20 @@ overview(System) when is_atom(System) -> #{directory := Tbl, directory_rev := _TblRev} = get_names(System), Dir = ets:tab2list(Tbl), - States = maps:from_list(ets:tab2list(ra_state)), + Rows = lists:map(fun({K, S, V}) -> + {K, {S, V}} + end, + ets:tab2list(ra_state)), + States = maps:from_list(Rows), Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)), lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) -> + {S, V} = maps:get(ServerName, States, {undefined, undefined}), Acc#{ServerName => #{uid => UId, pid => Pid, parent => Parent, - state => maps:get(ServerName, States, undefined), + state => S, + membership => V, cluster_name => ClusterName, snapshot_state => maps:get(UId, Snaps, undefined)}} diff --git a/src/ra_log.erl b/src/ra_log.erl index d8371396..421546de 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -637,7 +637,9 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState, #?MODULE{cfg = #cfg{snapshot_interval = SnapInter}, reader = Reader, snapshot_state = SnapState} = State0) -> - ClusterServerIds = maps:keys(Cluster), + ClusterServerIds = maps:map(fun (_, V) -> + maps:with([voter_status], V) + end, Cluster), SnapLimit = case ra_snapshot:current(SnapState) of undefined -> SnapInter; {I, _} -> I + SnapInter diff --git a/src/ra_server.erl b/src/ra_server.erl index 4a67bff5..405b44d9 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -55,6 +55,7 @@ terminate/2, log_fold/3, log_read/2, + get_membership/1, recover/1 ]). @@ -72,6 +73,7 @@ log := term(), voted_for => 'maybe'(ra_server_id()), % persistent votes => non_neg_integer(), + membership => ra_membership(), commit_index := ra_index(), last_applied := ra_index(), persisted_last_applied => ra_index(), @@ -195,7 +197,10 @@ max_pipeline_count => non_neg_integer(), ra_event_formatter => {module(), atom(), [term()]}, counter => counters:counters_ref(), - system_config => ra_system:config()}. + membership => ra_membership(), + system_config => ra_system:config(), + has_changed => boolean() + }. -type mutable_config() :: #{cluster_name => ra_cluster_name(), metrics_key => term(), @@ -328,6 +333,9 @@ init(#{id := Id, put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx), put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm), + NonVoter = get_membership(Cluster0, Id, UId, + maps:get(membership, Config, voter)), + #{cfg => Cfg, current_term => CurrentTerm, cluster => Cluster0, @@ -338,6 +346,7 @@ init(#{id := Id, cluster_change_permitted => false, cluster_index_term => SnapshotIndexTerm, voted_for => VotedFor, + membership => NonVoter, commit_index => CommitIndex, %% set this to the first index so that we can apply all entries %% up to the commit index during recovery @@ -400,8 +409,8 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - {State2, Effects0} = evaluate_quorum(State1, []), - + Effects00 = maybe_promote_peer(PeerId, State1, []), + {State2, Effects0} = evaluate_quorum(State1, Effects00), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), Effects = [{next_event, info, pipeline_rpcs} | Effects1], @@ -782,7 +791,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true}, NewVotes = Votes + 1, ?DEBUG("~ts: vote granted for term ~b votes ~b", [LogId, Term, NewVotes]), - case trunc(maps:size(Nodes) / 2) + 1 of + case required_quorum(Nodes) of NewVotes -> {State1, Effects} = make_all_rpcs(initialise_peers(State0)), Noop = {noop, #{ts => erlang:system_time(millisecond)}, @@ -928,7 +937,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true, [LogId, Token, Term, Votes + 1]), NewVotes = Votes + 1, State = update_term(Term, State0), - case trunc(maps:size(Nodes) / 2) + 1 of + case required_quorum(Nodes) of NewVotes -> call_for_election(candidate, State); _ -> @@ -1110,8 +1119,20 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; +handle_follower(#pre_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, + membership := Membership} = State) when Membership =/= voter -> + ?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0", + [LogId, Membership]), + {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); +handle_follower(#request_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, + membership := Membership} = State) when Membership =/= voter -> + ?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0", + [LogId, Membership]), + {follower, State, []}; handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, #{current_term := Term, voted_for := VotedFor, cfg := #cfg{log_id = LogId}} = State) @@ -1208,6 +1229,12 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; +handle_follower(election_timeout, + #{cfg := #cfg{log_id = LogId}, + membership := Membership} = State) when Membership =/= voter -> + ?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0", + [LogId, Membership]), + {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); handle_follower(try_become_leader, State) -> @@ -1274,6 +1301,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, commit_index => SnapIndex, last_applied => SnapIndex, cluster => make_cluster(Id, ClusterIds), + membership => get_membership(ClusterIds, State0), machine_state => MacState}), %% it was the last snapshot chunk so we can revert back to %% follower status @@ -1376,6 +1404,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg, cluster, leader_id, voted_for, + membership, cluster_change_permitted, cluster_index_term, query_index @@ -2212,10 +2241,21 @@ fetch_term(Idx, #{log := Log0} = State) -> {Term, State#{log => Log}} end. -make_cluster(Self, Nodes) -> - case lists:foldl(fun(N, Acc) -> - Acc#{N => new_peer()} - end, #{}, Nodes) of +-spec make_cluster(ra_server_id(), ra_cluster_snapshot() | [ra_server_id()]) -> + ra_cluster(). +make_cluster(Self, Nodes0) when is_list(Nodes0) -> + Nodes = lists:foldl(fun(N, Acc) -> + Acc#{N => new_peer()} + end, #{}, Nodes0), + append_self(Self, Nodes); +make_cluster(Self, Nodes0) when is_map(Nodes0) -> + Nodes = maps:map(fun(_, Peer0) -> + new_peer_with(Peer0) + end, Nodes0), + append_self(Self, Nodes). + +append_self(Self, Nodes) -> + case Nodes of #{Self := _} = Cluster -> % current server is already in cluster - do nothing Cluster; @@ -2225,12 +2265,12 @@ make_cluster(Self, Nodes) -> end. initialise_peers(State = #{log := Log, cluster := Cluster0}) -> - PeerIds = peer_ids(State), NextIdx = ra_log:next_index(Log), - Cluster = lists:foldl(fun(PeerId, Acc) -> - Acc#{PeerId => - new_peer_with(#{next_index => NextIdx})} - end, Cluster0, PeerIds), + Cluster = maps:map(fun (_, Peer0) -> + Peer1 = maps:with([voter_status], Peer0), + Peer2 = Peer1#{next_index => NextIdx}, + new_peer_with(Peer2) + end, Cluster0), State#{cluster => Cluster}. apply_to(ApplyTo, State, Effs) -> @@ -2326,6 +2366,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, + membership => get_membership(NewCluster, State0), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2458,16 +2499,42 @@ append_log_leader({CmdTag, _, _, _}, when CmdTag == '$ra_join' orelse CmdTag == '$ra_leave' -> {not_appended, cluster_change_not_permitted, State}; +append_log_leader({'$ra_join', From, + #{id := JoiningNode, voter_status := Voter0}, + ReplyMode}, + State = #{cluster := OldCluster}) -> + case ensure_promotion_target(Voter0, State) of + {error, Reason} -> + {not_appended, Reason, State}; + {ok, Voter} -> + case OldCluster of + #{JoiningNode := #{voter_status := Voter}} -> + already_member(State); + #{JoiningNode := Peer} -> + % Update member status. + Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}}, + append_cluster_change(Cluster, From, ReplyMode, State); + _ -> + % Insert new member. + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})}, + append_cluster_change(Cluster, From, ReplyMode, State) + end + end; +append_log_leader({'$ra_join', From, #{id := JoiningNode} = Config, ReplyMode}, + State) -> + append_log_leader({'$ra_join', From, + #{id => JoiningNode, + voter_status => maps:with([membership, uid, target], + Config)}, + ReplyMode}, State); append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> + % Legacy $ra_join, join as voter if no such member in the cluster. case OldCluster of #{JoiningNode := _} -> - % already a member do nothing - % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}; + already_member(State); _ -> - Cluster = OldCluster#{JoiningNode => new_peer()}, - append_cluster_change(Cluster, From, ReplyMode, State) + append_log_leader({'$ra_join', From, #{id => JoiningNode}, ReplyMode}, State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, State = #{cfg := #cfg{log_id = LogId}, @@ -2509,6 +2576,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, + membership => get_membership(Cluster, State), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2587,6 +2655,8 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter_status := #{membership := promotable}}, Acc) -> + Acc; (_K, #{query_index := Idx}, Acc) -> [Idx | Acc] end, [QueryIndex], Cluster). @@ -2597,6 +2667,8 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter_status := #{membership := promotable}}, Acc) -> + Acc; (_K, #{match_index := Idx}, Acc) -> [Idx | Acc] end, [LWIdx], Cluster). @@ -2818,6 +2890,96 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> Name; meta_name(#{names := #{log_meta := Name}}) -> Name. + +already_member(State) -> + % already a member do nothing + % TODO: reply? If we don't reply the caller may block until timeout + {not_appended, already_member, State}. + +%%% ==================== +%%% Voter status helpers +%%% ==================== + +-spec ensure_promotion_target(ra_voter_status(), ra_server_state()) -> + {ok, ra_voter_status()} | {error, term()}. +ensure_promotion_target(#{membership := promotable, target := _, uid := _} = Status, + _) -> + {ok, Status}; +ensure_promotion_target(#{membership := promotable, uid := _} = Status, + #{log := Log}) -> + Target = ra_log:next_index(Log), + {ok, Status#{target => Target}}; +ensure_promotion_target(#{membership := promotable}, _) -> + {error, missing_uid}; +ensure_promotion_target(Voter, _) -> + {ok, Voter}. + +%% Get membership of a given Id+UId from a (possibly new) cluster. +-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(), + ra_server_id(), ra_uid(), ra_membership()) -> + ra_membership(). +get_membership(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) -> + %% Legacy cluster snapshot does not retain voter_status. + Default; +get_membership(Cluster, PeerId, UId, Default) -> + case maps:get(PeerId, Cluster, undefined) of + #{voter_status := #{uid := UId} = VoterStatus} -> + maps:get(membership, VoterStatus, Default); + _ -> + Default + end. + +%% Get this node's membership from a (possibly new) cluster. +%% Defaults to last known-locally value. +-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(), + ra_server_state()) -> + ra_membership(). +get_membership(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) -> + Default = maps:get(membership, State, voter), + get_membership(Cluster, Id, UId, Default). + +%% Get this node's membership. +%% Defaults to last known-locally value. +-spec get_membership(ra_server_state()) -> ra_membership(). +get_membership(#{cfg := #cfg{id = Id, uid = UId}, cluster := Cluster} = State) -> + Default = maps:get(membership, State, voter), + get_membership(Cluster, Id, UId, Default). + +-spec maybe_promote_peer(ra_server_id(), ra_server_state(), effects()) -> + effects(). +maybe_promote_peer(PeerId, #{cluster := Cluster}, Effects) -> + case Cluster of + #{PeerId := #{match_index := MI, + voter_status := #{membership := promotable, + target := Target} = OldStatus}} when + MI >= Target -> + Promote = {next_event, + {command, {'$ra_join', + #{ts => os:system_time(millisecond)}, + #{id => PeerId, + voter_status => OldStatus#{ + membership => voter + }}, + noreply}}}, + [Promote | Effects]; + _ -> + Effects + end. + +-spec required_quorum(ra_cluster()) -> pos_integer(). +required_quorum(Cluster) -> + Voters = count_voters(Cluster), + trunc(Voters / 2) + 1. + +count_voters(Cluster) -> + maps:fold( + fun (_, #{voter_status := #{membership := promotable}}, Count) -> + Count; + (_, _, Count) -> + Count + 1 + end, + 0, Cluster). + %%% =================== %%% Internal unit tests %%% =================== diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 49e8e4d9..c985aa4a 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -183,6 +183,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) -> -spec state_query(server_loc(), all | overview | + voters | members | initial_members | machine, timeout()) -> @@ -193,6 +194,7 @@ state_query(ServerLoc, Spec, Timeout) -> -spec local_state_query(server_loc(), all | overview | + voters | members | initial_members | machine, timeout()) -> @@ -271,7 +273,7 @@ init(Config) -> do_init(#{id := Id, cluster_name := ClusterName} = Config0) -> Key = ra_lib:ra_server_id_to_local_name(Id), - true = ets:insert(ra_state, {Key, init}), + true = ets:insert(ra_state, {Key, init, unknown}), process_flag(trap_exit, true), Config = #{counter := Counter, system_config := SysConf} = maps:merge(config_defaults(Id), @@ -784,11 +786,19 @@ follower(_, tick_timeout, State0) -> set_tick_timer(State, Actions)}; follower({call, From}, {log_fold, Fun, Term}, State) -> fold_log(From, Fun, Term, State); -follower(EventType, Msg, State0) -> +follower(EventType, Msg, #state{conf = #conf{name = Name}, + server_state = SS0} = State0) -> + Membership0 = ra_server:get_membership(SS0), case handle_follower(Msg, State0) of {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), - State = follower_leader_change(State0, State2), + State = #state{server_state = SS} = follower_leader_change(State0, State2), + case ra_server:get_membership(SS) of + Membership0 -> + ok; + Membership -> + true = ets:update_element(ra_state, Name, {3, Membership}) + end, {keep_state, State, Actions}; {pre_vote, State1, Effects} -> {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), @@ -1029,7 +1039,8 @@ format_status(Opt, [_PDict, StateName, handle_enter(RaftState, OldRaftState, #state{conf = #conf{name = Name}, server_state = ServerState0} = State) -> - true = ets:insert(ra_state, {Name, RaftState}), + Membership = ra_server:get_membership(ServerState0), + true = ets:insert(ra_state, {Name, RaftState, Membership}), {ServerState, Effects} = ra_server:handle_state_enter(RaftState, ServerState0), case RaftState == leader orelse OldRaftState == leader of @@ -1510,6 +1521,18 @@ do_state_query(overview, State) -> ra_server:overview(State); do_state_query(machine, #{machine_state := MacState}) -> MacState; +do_state_query(voters, #{cluster := Cluster}) -> + Vs = maps:fold(fun(K, V, Acc) -> + case maps:get(voter_status, V, undefined) of + undefined -> [K|Acc]; + S -> case maps:get(membership, S, undefined) of + undefined -> [K|Acc]; + voter -> [K|Acc]; + _ -> Acc + end + end + end, [], Cluster), + Vs; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); do_state_query(initial_members, #{log := Log}) -> @@ -1717,14 +1740,16 @@ handle_tick_metrics(State) -> _ = ets:insert(ra_metrics, Metrics), State. -can_execute_locally(RaftState, TargetNode, State) -> +can_execute_locally(RaftState, TargetNode, + #state{server_state = ServerState} = State) -> + Membership = ra_server:get_membership(ServerState), case RaftState of - follower -> + follower when Membership == voter -> TargetNode == node(); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. - Members = do_state_query(members, State#state.server_state), + Members = do_state_query(voters, State#state.server_state), not lists:any(fun ({_, N}) -> N == TargetNode end, Members); leader -> true; diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index a2c1c630..4e8f0653 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -29,6 +29,9 @@ all() -> all_tests() -> [ + nonvoter_catches_up, + nonvoter_catches_up_after_restart, + nonvoter_catches_up_after_leader_restart, start_stop_restart_delete_on_remote, start_cluster, start_or_restart_cluster, @@ -295,13 +298,20 @@ start_cluster_minority(Config) -> send_local_msg(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + [A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + NodeIds = [A, B], Machine = {module, ?MODULE, #{}}, {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), % assert all were said to be started [] = Started -- NodeIds, - %% spawn a receiver process on one node + % add permanent non-voter {ok, _, Leader} = ra:members(hd(NodeIds)), + {ok, _, _} = ra:process_command(Leader, banana), + New = #{id => NonVoter, + membership => non_voter, + uid => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds), %% select a non-leader node to spawn on [{_, N} | _] = lists:delete(Leader, NodeIds), test_local_msg(Leader, N, N, send_local_msg, local), @@ -309,36 +319,55 @@ send_local_msg(Config) -> test_local_msg(Leader, N, N, send_local_msg, [local, cast]), test_local_msg(Leader, N, N, send_local_msg, [local, cast, ra_event]), {_, LeaderNode} = Leader, + %% test the same but for a local pid (non-member) test_local_msg(Leader, node(), LeaderNode, send_local_msg, local), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, ra_event]), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast]), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast, ra_event]), - %% test the same but for a local pid (non-member) + %% same for non-voter + {_, NonVoterNode} = NonVoter, + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, local), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, ra_event]), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast]), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast, ra_event]), [ok = slave:stop(S) || {_, S} <- NodeIds], ok. local_log_effect(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + [A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + NodeIds = [A, B], Machine = {module, ?MODULE, #{}}, {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), % assert all were said to be started [] = Started -- NodeIds, - %% spawn a receiver process on one node + % add permanent non-voter {ok, _, Leader} = ra:members(hd(NodeIds)), + {ok, _, _} = ra:process_command(Leader, banana), + New = #{id => NonVoter, + membership => non_voter, + uid => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds), %% select a non-leader node to spawn on [{_, N} | _] = lists:delete(Leader, NodeIds), test_local_msg(Leader, N, N, do_local_log, local), test_local_msg(Leader, N, N, do_local_log, [local, ra_event]), test_local_msg(Leader, N, N, do_local_log, [local, cast]), test_local_msg(Leader, N, N, do_local_log, [local, cast, ra_event]), + %% test the same but for a local pid (non-member) {_, LeaderNode} = Leader, test_local_msg(Leader, node(), LeaderNode, do_local_log, local), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, ra_event]), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast]), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast, ra_event]), - %% test the same but for a local pid (non-member) + %% same for non-voter + {_, NonVoterNode} = NonVoter, + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, local), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, ra_event]), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast]), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast, ra_event]), [ok = slave:stop(S) || {_, S} <- NodeIds], ok. @@ -393,6 +422,115 @@ disconnected_node_catches_up(Config) -> [ok = slave:stop(S) || {_, S} <- ServerIds], ok. +nonvoter_catches_up(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + [A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]), + {ok, _, Leader} = ra:members(hd(Started)), + + [ok = ra:pipeline_command(Leader, N, no_correlation, normal) + || N <- lists:seq(1, 10000)], + {ok, _, _} = ra:process_command(Leader, banana), + + New = #{id => C, membership => promotable, uid => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), + ?assertMatch(#{Group := #{membership := promotable}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := promotable}, + ra:key_metrics(C)), + ?assertMatch({ok, #{membership := promotable}, _}, + ra:member_overview(C)), + + await_condition( + fun () -> + {ok, #{membership := M}, _} = ra:member_overview(C), + M == voter + end, 200), + ?assertMatch(#{Group := #{membership := voter}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := voter}, + ra:key_metrics(C)), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + +nonvoter_catches_up_after_restart(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + [A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]), + {ok, _, Leader} = ra:members(hd(Started)), + + [ok = ra:pipeline_command(Leader, N, no_correlation, normal) + || N <- lists:seq(1, 10000)], + {ok, _, _} = ra:process_command(Leader, banana), + + New = #{id => C, membership => promotable, uid => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), + ?assertMatch(#{Group := #{membership := promotable}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := promotable}, + ra:key_metrics(C)), + ?assertMatch({ok, #{membership := promotable}, _}, + ra:member_overview(C)), + ok = ra:stop_server(?SYS, C), + ok = ra:restart_server(?SYS, C), + + await_condition( + fun () -> + {ok, #{membership := M}, _} = ra:member_overview(C), + M == voter + end, 200), + ?assertMatch(#{Group := #{membership := voter}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := voter}, + ra:key_metrics(C)), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + +nonvoter_catches_up_after_leader_restart(Config) -> + PrivDir = ?config(data_dir, Config), + ClusterName = ?config(cluster_name, Config), + [A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + Machine = {module, ?MODULE, #{}}, + {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]), + {ok, _, Leader} = ra:members(hd(Started)), + + [ok = ra:pipeline_command(Leader, N, no_correlation, normal) + || N <- lists:seq(1, 10000)], + {ok, _, _} = ra:process_command(Leader, banana), + + New = #{id => C, membership => promotable, uid => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), + ?assertMatch(#{Group := #{membership := promotable}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := promotable}, + ra:key_metrics(C)), + ?assertMatch({ok, #{membership := promotable}, _}, + ra:member_overview(C)), + ok = ra:stop_server(?SYS, Leader), + ok = ra:restart_server(?SYS, Leader), + + await_condition( + fun () -> + {ok, #{membership := M}, _} = ra:member_overview(C), + M == voter + end, 200), + ?assertMatch(#{Group := #{membership := voter}}, + rpc:call(NodeC, ra_directory, overview, [?SYS])), + ?assertMatch(#{membership := voter}, + ra:key_metrics(C)), + + [ok = slave:stop(S) || {_, S} <- ServerIds], + ok. + key_metrics(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index 07a7eda8..caccde6d 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -45,6 +45,7 @@ all_tests() -> external_reader, add_member_without_quorum, force_start_follower_as_single_member, + force_start_follower_as_single_member_nonvoter, initial_members_query ]. @@ -682,6 +683,48 @@ force_start_follower_as_single_member(Config) -> ok. +force_start_follower_as_single_member_nonvoter(Config) -> + ok = logger:set_primary_config(level, all), + %% ra:start_server should fail if the node already exists + ClusterName = ?config(cluster_name, Config), + PrivDir = ?config(priv_dir, Config), + ServerId1 = ?config(server_id, Config), + ServerId2 = ?config(server_id2, Config), + ServerId3 = ?config(server_id3, Config), + InitialCluster = [ServerId1, ServerId2, ServerId3], + ok = start_cluster(ClusterName, InitialCluster), + timer:sleep(100), + %% stop majority to simulate permanent outage + ok = ra:stop_server(?SYS, ServerId1), + ok = ra:stop_server(?SYS, ServerId2), + + timer:sleep(100), + %% force the remaining node to change it's membership + ok = ra_server_proc:force_shrink_members_to_current_member(ServerId3), + {ok, [_], ServerId3} = ra:members(ServerId3), + ok = enqueue(ServerId3, msg1), + + %% test that it works after restart + ok = ra:stop_server(?SYS, ServerId3), + ok = ra:restart_server(?SYS, ServerId3), + {ok, [_], ServerId3} = ra:members(ServerId3), + ok = enqueue(ServerId3, msg2), + + %% add a member + ServerId4 = ?config(server_id4, Config), + UId4 = ?config(uid4, Config), + Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]), + {ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, membership => promotable, uid => <<"test">>}), + %% the membership has changed but member not running yet + %% it is nonvoter and does not affect quorum size + {ok, _, _} = ra:process_command(ServerId3, {enq, banana}), + %% start new member + ok = ra:start_server(?SYS, Conf4#{membership => promotable, uid => <<"test">>}), + {ok, _, ServerId3} = ra:members(ServerId4), + ok = enqueue(ServerId3, msg3), + + ok. + initial_members_query(Config) -> ok = logger:set_primary_config(level, all), %% ra:start_server should fail if the node already exists diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 0e29fc35..895a27d3 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -64,7 +64,10 @@ all_tests() -> post_partition_liveness, all_metrics_are_integers, transfer_leadership, - transfer_leadership_two_node + transfer_leadership_two_node, + new_nonvoter_knows_its_status, + voter_gets_promoted_consistent_leader, + voter_gets_promoted_new_leader ]. groups() -> @@ -1024,6 +1027,83 @@ transfer_leadership_two_node(Config) -> ?assertEqual({error, unknown_member}, ra:transfer_leadership(NewLeader, {unknown, node()})), terminate_cluster(Members). +new_nonvoter_knows_its_status(Config) -> + Name = ?config(test_name, Config), + [N1, N2] = [{n1, node()}, {n2, node()}], + {ok, _, _} = ra:start_cluster(default, Name, add_machine(), [N1]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow + ok = start_and_join_nonvoter(N1, N2), + + % n2 had no time to catch up + % in server state + {ok, #{uid := T, membership := promotable}, _} = ra:member_overview(N2), + {ok, + #{cluster := #{N2 := #{voter_status := #{membership := promotable, + target := 3, + uid := T}}}}, + _} = ra:member_overview(N1), + % in ets + #{servers := #{n1 := #{membership := voter}, + n2 := #{membership := promotable}}} = ra:overview(?SYS), + ok. + +voter_gets_promoted_consistent_leader(Config) -> + N1 = nth_server_name(Config, 1), + N2 = nth_server_name(Config, 2), + N3 = nth_server_name(Config, 3), + + {ok, _, _} = ra:start_cluster(default, ?config(test_name, Config), add_machine(), [N1]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow 1 + ok = start_and_join_nonvoter(N1, N2), + _ = issue_op(N2, 1), + validate_state_on_node(N2, 2), + + % grow 2 + ok = start_and_join_nonvoter(N1, N3), + _ = issue_op(N3, 1), + validate_state_on_node(N3, 3), + + % all are voters after catch-up + timer:sleep(100), + All = [N1, N2, N3], + % in server state + lists:map(fun(O) -> ?assertEqual(All, voters(O)) end, overviews(N1)), + % in ets + #{servers := Servers} = ra:overview(?SYS), + lists:map(fun({Name, _}) -> #{Name := #{membership := voter}} = Servers end, All), + ok. + +voter_gets_promoted_new_leader(Config) -> + N1 = nth_server_name(Config, 1), + N2 = nth_server_name(Config, 2), + N3 = nth_server_name(Config, 3), + + {ok, [Leader, _Second], []} = ra:start_cluster(default, ?config(test_name, Config), add_machine(), [N1, N2]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow with leadership change + ok = start_and_join_nonvoter(N1, N3), + ra:transfer_leadership(Leader, _Second), + _ = issue_op(N3, 1), + validate_state_on_node(N3, 2), + + % all are voters after catch-up + timer:sleep(100), + All = [N1, N2, N3], + % in server state + lists:map(fun(O) -> ?assertEqual(All, voters(O)) end, overviews(N1)), + % in ets + #{servers := Servers} = ra:overview(?SYS), + lists:map(fun({Name, _}) -> #{Name := #{membership := voter}} = Servers end, All), + ok. + get_gen_statem_status(Ref) -> {_, _, _, Items} = sys:get_status(Ref), proplists:get_value(raft_state, lists:last(Items)). @@ -1096,6 +1176,13 @@ start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), ok. +start_and_join_nonvoter({ClusterName, _} = ServerRef, {_, _} = New) -> + UId = ra:new_uid(ra_lib:to_binary(ClusterName)), + Server = #{id => New, membership => promotable, uid => UId}, + {ok, _, _} = ra:add_member(ServerRef, Server), + ok = ra:start_server(default, ClusterName, Server, add_machine(), [ServerRef]), + ok. + start_local_cluster(Num, Name, Machine) -> Nodes = [{ra_server:name(Name, integer_to_list(N)), node()} || N <- lists:seq(1, Num)], @@ -1126,6 +1213,13 @@ nth_server_name(Config, N) when is_integer(N) -> add_machine() -> {module, ?MODULE, #{}}. +overviews(Node) -> + {ok, Members, _From} = ra:members(Node), + [ra:member_overview(P) || {_, _} = P <- Members]. + +voters({ok, #{cluster := Peers}, _} = _Overview) -> + [Id || {Id, Status} <- maps:to_list(Peers), maps:get(membership, Status, voter) == voter]. + %% machine impl init(_) -> 0. apply(_Meta, Num, State) -> diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 7b252bea..e431903a 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -20,6 +20,7 @@ all() -> follower_aer_term_mismatch_snapshot, follower_handles_append_entries_rpc, candidate_handles_append_entries_rpc, + append_entries_reply_success_promotes_nonvoter, append_entries_reply_success, append_entries_reply_no_success, follower_request_vote, @@ -41,10 +42,12 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, + leader_server_join_nonvoter, leader_server_leave, leader_is_removed, follower_cluster_change, leader_applies_new_cluster, + leader_applies_new_cluster_nonvoter, leader_appends_cluster_change_then_steps_before_applying_it, leader_receives_install_snapshot_rpc, follower_installs_snapshot, @@ -54,6 +57,7 @@ all() -> snapshotted_follower_received_append_entries, leader_received_append_entries_reply_with_stale_last_index, leader_receives_install_snapshot_result, + leader_received_append_entries_reply_and_promotes_voter, leader_replies_to_append_entries_rpc_with_lower_term, follower_aer_1, follower_aer_2, @@ -203,7 +207,7 @@ init_test(_Config) -> voted_for := some_server} = ra_server_init(InitConf), % snapshot SnapshotMeta = #{index => 3, term => 5, - cluster => maps:keys(Cluster), + cluster => dehydrate_cluster(Cluster), machine_version => 1}, SnapshotData = "hi1+2+3", {LogS, _, _} = ra_log_memory:install_snapshot(SnapshotMeta, SnapshotData, Log0), @@ -213,7 +217,7 @@ init_test(_Config) -> machine_state := "hi1+2+3", cluster := ClusterOut, voted_for := some_server} = ra_server_init(InitConf), - ?assertEqual(maps:keys(Cluster), maps:keys(ClusterOut)), + ?assertEqual(dehydrate_cluster(Cluster), dehydrate_cluster(ClusterOut)), ok. recover_restores_cluster_changes(_Config) -> @@ -275,6 +279,10 @@ election_timeout(_Config) -> {N3, _}]}]} = ra_server:handle_follower(Msg, State), + % non-voters ignore election_timeout + NVState = State#{membership => promotable}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + % pre_vote {pre_vote, #{current_term := 5, votes := 0, pre_vote_token := Token1}, @@ -572,7 +580,7 @@ follower_aer_term_mismatch_snapshot(_Config) -> Log0 = maps:get(log, State0), Meta = #{index => 3, term => 5, - cluster => [], + cluster => #{}, machine_version => 1}, Data = <<"hi3">>, {Log, _, _} = ra_log_memory:install_snapshot(Meta, Data, Log0), @@ -723,7 +731,7 @@ follower_catchup_condition(_Config) -> chunk_state = {1, last}, meta = #{index => 99, term => 99, - cluster => [], + cluster => #{}, machine_version => 0}, data = []}, {follower, State, [_NextEvent]} = @@ -796,6 +804,95 @@ candidate_handles_append_entries_rpc(_Config) -> = ra_server:handle_candidate(EmptyAE, State), ok. +append_entries_reply_success_promotes_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, + NonVoter = #{membership => promotable, target => 3, uid => <<"uid">>}, + Cluster = #{N1 => new_peer_with(#{next_index => 5, match_index => 4}), + N2 => new_peer_with(#{next_index => 1, match_index => 0, + commit_index_sent => 3, + voter_status => NonVoter}), + N3 => new_peer_with(#{next_index => 2, match_index => 1})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{commit_index => 1, + last_applied => 1, + cluster => Cluster, + machine_state => <<"hi1">>}, + Ack = #append_entries_reply{term = 5, success = true, + next_index = 4, + last_index = 3, last_term = 5}, + + % doesn't progress commit_index, non voter ack doesn't raise majority + {leader, #{cluster := #{N2 := #{next_index := 4, + match_index := 3, + voter_status := NonVoter}}, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State1, + [{next_event, info, pipeline_rpcs}, + {next_event, {command, {'$ra_join', _, + #{id := N2, + voter_status := #{membership := voter, + uid := <<"uid">>}}, + noreply}} = RaJoin} + ]} = ra_server:handle_leader({N2, Ack}, State0), + + % pipeline to N3 + {leader, #{cluster := #{N3 := #{next_index := 4, + match_index := 1}}, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State2, + [{send_rpc, N3, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 1, + prev_log_term = 1, + leader_commit = 1, + entries = [{2, 3, {'$usr', _, <<"hi2">>, _}}, + {3, 5, {'$usr', _, <<"hi3">>, _}}]} + }]} = ra_server:handle_leader(pipeline_rpcs, State1), + + % ra_join translates into cluster update + {leader, #{cluster := #{N2 := #{next_index := 5, + match_index := 3, + voter_status := #{membership := voter, + uid := <<"uid">>}}}, + cluster_change_permitted := false, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State3, + [{send_rpc, N3, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 1, + entries = [{4, 5, {'$ra_cluster_change', _, + #{N2 := #{voter_status := #{membership := voter, + uid := <<"uid">>}}}, + _}}]}}, + {send_rpc, N2, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 1, + entries = [{4, 5, {'$ra_cluster_change', _, + #{N2 := #{voter_status := #{membership := voter, + uid := <<"uid">>}}}, + _}}]}} + ]} = ra_server:handle_leader(RaJoin, State2), + + Ack2 = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + + % voter ack, raises commit_index + {leader, #{cluster := #{N2 := #{next_index := 5, + match_index := 4}}, + commit_index := 3, + last_applied := 3, + machine_state := <<"hi3">>}, + [{next_event, info, pipeline_rpcs}, + {aux, eval}]} = ra_server:handle_leader({N2, Ack2}, State3), + ok. + append_entries_reply_success(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, @@ -918,6 +1015,11 @@ follower_request_vote(_Config) -> [{reply, #request_vote_result{term = 6, vote_granted = true}}]} = ra_server:handle_follower(Msg#request_vote_rpc{last_log_index = 4}, State), + + % non-voters ignore request_vote_rpc + NVState = State#{membership => promotable}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + ok. follower_pre_vote(_Config) -> @@ -1032,6 +1134,11 @@ follower_pre_vote(_Config) -> vote_granted = true}}]} = ra_server:handle_follower(Msg#pre_vote_rpc{last_log_index = 4}, State), + + % non-voters ignore pre_vote_rpc + NVState = State#{membership => promotable}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + ok. pre_vote_receives_pre_vote(_Config) -> @@ -1269,7 +1376,7 @@ follower_install_snapshot_machine_version(_Config) -> %% by install snapshot rpc SnapMeta = #{index => 4, term => 5, - cluster => [?N1, ?N2, ?N3], + cluster => #{?N1 => #{}, ?N2 => #{}, ?N3 => #{}}, machine_version => MacVer}, SnapData = <<"hi4_v2">>, @@ -1310,12 +1417,12 @@ leader_server_join(_Config) -> #append_entries_rpc{entries = [_, _, _, {4, 5, {'$ra_cluster_change', _, #{N1 := _, N2 := _, - N3 := _, N4 := _}, + N3 := _, N4 := N4Peer}, await_consensus}}]}}, {send_rpc, N3, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := N4Peer}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1324,7 +1431,50 @@ leader_server_join(_Config) -> {send_rpc, N2, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := N4Peer}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + undefined = maps:get(membership, N4Peer, undefined), + ok. + +leader_server_join_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + #{log := Log} = State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_join', meta(), + #{id => N4, membership => promotable, uid => <<"uid">>}, await_consensus}}, State0), + % new member should join as non-voter + Status = #{membership => promotable, uid => <<"uid">>, target => ra_log:next_index(Log)}, + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter_status := Status}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := Status}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := Status}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1437,7 +1587,6 @@ leader_applies_new_cluster(_Config) -> ra_server:handle_leader(written_evt({4, 4, 5}), State1), ?assert(not maps:get(cluster_change_permitted, State2)), - % replies coming in AEReply = #append_entries_reply{term = 5, success = true, next_index = 5, @@ -1457,6 +1606,38 @@ leader_applies_new_cluster(_Config) -> cluster := #{N3 := #{next_index := 5, match_index := 4}}}, _Effects} = ra_server:handle_leader({N3, AEReply}, State3), + ok. + +leader_applies_new_cluster_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + + State = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + Command = {command, {'$ra_join', meta(), #{id => N4, membership => promotable, uid => <<"uid">>}, await_consensus}}, + % cluster records index and term it was applied to determine whether it has + % been applied + {leader, #{cluster_index_term := {4, 5}, + cluster := #{N1 := _, N2 := _, + N3 := _, N4 := #{voter_status := #{membership := promotable, + uid := <<"uid">>}}}} = State1, _} = + ra_server:handle_leader(Command, State), + {leader, State2, _} = + ra_server:handle_leader(written_evt({4, 4, 5}), State1), + + ?assert(not maps:get(cluster_change_permitted, State2)), + + % replies coming in + AEReply = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + % new peer doesn't count until it reaches its matching target, leader needs only 2 votes + {leader, _State3 = #{commit_index := 4, + cluster_change_permitted := true, + cluster := #{N2 := #{next_index := 5, + match_index := 4}}}, + _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), ok. leader_appends_cluster_change_then_steps_before_applying_it(_Config) -> @@ -1646,7 +1827,7 @@ pre_vote_election_reverts(_Config) -> ISR = #install_snapshot_rpc{term = 5, leader_id = N2, meta = #{index => 3, term => 5, - cluster => [], + cluster => #{}, machine_version => 0}, chunk_state = {1, last}, data = []}, @@ -1683,7 +1864,7 @@ leader_receives_install_snapshot_rpc(_Config) -> ISRpc = #install_snapshot_rpc{term = Term + 1, leader_id = ?N5, meta = #{index => Idx, term => Term, - cluster => [], + cluster => #{}, machine_version => 0}, chunk_state = {1, last}, data = []}, @@ -1703,8 +1884,7 @@ follower_installs_snapshot(_Config) -> Term = 2, % leader term Idx = 3, ISRpc = #install_snapshot_rpc{term = Term, leader_id = N1, - meta = snap_meta(Idx, LastTerm, - maps:keys(Config)), + meta = snap_meta(Idx, LastTerm, Config), chunk_state = {1, last}, data = []}, {receive_snapshot, FState1, @@ -1715,11 +1895,10 @@ follower_installs_snapshot(_Config) -> fun (_) -> {#{index => Idx, term => Term, - cluster => maps:keys(Config), + cluster => dehydrate_cluster(Config), machine_version => 0}, []} end), - {follower, #{current_term := Term, commit_index := Idx, last_applied := Idx, @@ -1743,7 +1922,7 @@ follower_ignores_installs_snapshot_with_higher_machine_version(_Config) -> ISRpc = #install_snapshot_rpc{term = Term, leader_id = N1, meta = #{index => Idx, term => LastTerm, - cluster => maps:keys(Config), + cluster => dehydrate_cluster(Config), machine_version => 1}, chunk_state = {1, last}, data = []}, @@ -1761,8 +1940,7 @@ follower_receives_stale_snapshot(_Config) -> LastTerm = 1, % snapshot term Idx = 2, ISRpc = #install_snapshot_rpc{term = CurTerm, leader_id = N1, - meta = snap_meta(Idx, LastTerm, - maps:keys(Config)), + meta = snap_meta(Idx, LastTerm, Config), chunk_state = {1, last}, data = []}, %% this should be a rare occurrence, rather than implement a special @@ -1780,8 +1958,7 @@ receive_snapshot_timeout(_Config) -> LastTerm = 1, % snapshot term Idx = 6, ISRpc = #install_snapshot_rpc{term = CurTerm, leader_id = N1, - meta = snap_meta(Idx, LastTerm, - maps:keys(Config)), + meta = snap_meta(Idx, LastTerm, Config), chunk_state = {1, last}, data = []}, {receive_snapshot, FState1, @@ -1807,13 +1984,12 @@ snapshotted_follower_received_append_entries(_Config) -> fun (_) -> {#{index => Idx, term => Term, - cluster => maps:keys(Config), + cluster => dehydrate_cluster(Config), machine_version => 0}, []} end), ISRpc = #install_snapshot_rpc{term = Term, leader_id = N1, - meta = snap_meta(Idx, LastTerm, - maps:keys(Config)), + meta = snap_meta(Idx, LastTerm, Config), chunk_state = {1, last}, data = []}, {follower, FState1, _} = ra_server:handle_receive_snapshot(ISRpc, FState0), @@ -1949,6 +2125,32 @@ leader_receives_install_snapshot_result(_Config) -> (_) -> false end, Effects)), ok. +leader_received_append_entries_reply_and_promotes_voter(_config) -> + N3 = ?N3, State = base_state(3, ?FUNCTION_NAME), + AER = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + + % Permanent voter + State1 = set_peer_voter_status(State, N3, #{membership => voter}), + {leader, _, + [{next_event,info,pipeline_rpcs}] + } = ra_server:handle_leader({N3, AER}, State1), + + % Permanent non-voter + State2 = set_peer_voter_status(State, N3, #{membership => non_voter}), + {leader, _, + [{next_event,info,pipeline_rpcs}] + } = ra_server:handle_leader({N3, AER}, State2), + + % Promotion + State3 = set_peer_voter_status(State, N3, + #{membership => promotable, target => 4}), + {leader, _, + [{next_event,info,pipeline_rpcs}, + {next_event, {command, {'$ra_join', _, #{id := N3, voter_status := #{membership := voter}}, _}}}] + } = ra_server:handle_leader({N3, AER}, State3). + follower_heartbeat(_Config) -> State = base_state(3, ?FUNCTION_NAME), #{current_term := Term, @@ -2456,6 +2658,11 @@ set_peer_query_index(State, PeerId, QueryIndex) -> #{PeerId := Peer} = Cluster, State#{cluster := Cluster#{PeerId => Peer#{query_index => QueryIndex}}}. +set_peer_voter_status(State, PeerId, VoterStatus) -> + #{cluster := Cluster} = State, + #{PeerId := Peer} = Cluster, + State#{cluster := Cluster#{PeerId => Peer#{voter_status => VoterStatus}}}. + leader_heartbeat_reply_lower_term(_Config) -> State = base_state(3, ?FUNCTION_NAME), #{current_term := Term, @@ -2599,11 +2806,16 @@ new_peer_with(Map) -> maps:merge(new_peer(), Map). snap_meta(Idx, Term) -> - snap_meta(Idx, Term, []). + snap_meta(Idx, Term, #{}). snap_meta(Idx, Term, Cluster) -> #{index => Idx, term => Term, - cluster => Cluster, + cluster => dehydrate_cluster(Cluster), machine_version => 0}. +dehydrate_cluster(Cluster) -> + maps:map(fun(_, V) -> + maps:with([voter_status], V) + end, Cluster). + diff --git a/test/ra_snapshot_SUITE.erl b/test/ra_snapshot_SUITE.erl index 36ff7847..32c74cd9 100644 --- a/test/ra_snapshot_SUITE.erl +++ b/test/ra_snapshot_SUITE.erl @@ -32,6 +32,7 @@ all_tests() -> take_snapshot, take_snapshot_crash, init_recover, + init_recover_voter_status, init_recover_multi_corrupt, init_recover_corrupt, read_snapshot, @@ -162,6 +163,34 @@ init_recover(Config) -> {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), ok. +init_recover_voter_status(Config) -> + UId = ?config(uid, Config), + State0 = ra_snapshot:init(UId, ra_log_snapshot, + ?config(snap_dir, Config), undefined), + Meta = meta(55, 2, #{node() => #{voter_status => test}}), + {State1, [{monitor, process, snapshot_writer, _}]} = + ra_snapshot:begin_snapshot(Meta, ?FUNCTION_NAME, State0), + receive + {ra_log_event, {snapshot_written, IdxTerm}} -> + _ = ra_snapshot:complete_snapshot(IdxTerm, State1), + ok + after 1000 -> + error(snapshot_event_timeout) + end, + + %% open a new snapshot state to simulate a restart + Recover = ra_snapshot:init(UId, ra_log_snapshot, + ?config(snap_dir, Config), undefined), + %% ensure last snapshot is recovered + %% it also needs to be validated as could have crashed mid write + undefined = ra_snapshot:pending(Recover), + {55, 2} = ra_snapshot:current(Recover), + 55 = ra_snapshot:last_index_for(UId), + + %% recover the meta data and machine state + {ok, Meta, ?FUNCTION_NAME} = ra_snapshot:recover(Recover), + ok. + init_multi(Config) -> UId = ?config(uid, Config), State0 = ra_snapshot:init(UId, ra_log_snapshot,