From 33bdbe0f7520068b16ccfb0c8f467a796b5fd3b4 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Sat, 6 May 2023 15:15:28 -0700 Subject: [PATCH] Newly added followers do not participate in quorum until they catch up with the log --- src/ra.erl | 19 ++-- src/ra.hrl | 10 ++ src/ra_directory.erl | 10 +- src/ra_server.erl | 162 ++++++++++++++++++++++++--- src/ra_server.hrl | 2 + src/ra_server_proc.erl | 21 +++- test/ra_2_SUITE.erl | 3 +- test/ra_SUITE.erl | 98 ++++++++++++++++- test/ra_server_SUITE.erl | 229 +++++++++++++++++++++++++++++++++++++-- 9 files changed, 511 insertions(+), 43 deletions(-) diff --git a/src/ra.erl b/src/ra.erl index 14c9a84c..82e5afa3 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -454,7 +454,7 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs, %% @doc Starts a new distributed ra cluster. %% @param ClusterName the name of the cluster. -%% @param ServerId the ra_server_id() of the server +%% @param ServerId the ra_server_id() of the server, or a map with server id and settings. %% @param Machine The {@link ra_machine:machine/0} configuration. %% @param ServerIds a list of initial (seed) server configurations %% @returns @@ -469,10 +469,13 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs, %% forcefully deleted. %% @see start_server/1 %% @end --spec start_server(atom(), ra_cluster_name(), ra_server_id(), +-spec start_server(atom(), ra_cluster_name(), ra_server_id() | ra_new_server(), ra_server:machine_conf(), [ra_server_id()]) -> ok | {error, term()}. -start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) +start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) -> + % Legacy start server, default to full voter + start_server(System, ClusterName, #{id => ServerId, voter => true}, Machine, ServerIds); +start_server(System, ClusterName, #{id := {_, _} = ServerId, voter := Voter}, Machine, ServerIds) when is_atom(System) -> UId = new_uid(ra_lib:to_binary(ClusterName)), Conf = #{cluster_name => ClusterName, @@ -480,6 +483,7 @@ start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) uid => UId, initial_members => ServerIds, log_init_args => #{uid => UId}, + voter => Voter, machine => Machine}, start_server(System, Conf). @@ -557,9 +561,10 @@ delete_cluster(ServerIds, Timeout) -> %% affect said cluster's availability characteristics (by increasing quorum node count). %% %% @param ServerLoc the ra server or servers to try to send the command to -%% @param ServerId the ra server id of the new server. +%% @param ServerId the ra server id of the new server, or a map with server id and settings. %% @end --spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) -> +-spec add_member(ra_server_id() | [ra_server_id()], + ra_server_id() | ra_new_server()) -> ra_cmd_ret() | {error, already_member} | {error, cluster_change_not_permitted}. @@ -570,7 +575,8 @@ add_member(ServerLoc, ServerId) -> %% @see add_member/2 %% @end -spec add_member(ra_server_id() | [ra_server_id()], - ra_server_id(), timeout()) -> + ra_server_id() | ra_new_server(), + timeout()) -> ra_cmd_ret() | {error, already_member} | {error, cluster_change_not_permitted}. @@ -579,7 +585,6 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). - %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change %% command to the log. diff --git a/src/ra.hrl b/src/ra.hrl index e706fc5a..def4e759 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -39,17 +39,27 @@ %% after node restart). Pids are not stable in this sense. -type ra_server_id() :: {Name :: atom(), Node :: node()}. +%% Specifies server configuration for a new cluster member. +-type ra_new_server() :: #{id := ra_server_id(), + voter := boolean()}. + -type ra_peer_status() :: normal | {sending_snapshot, pid()} | suspended | disconnected. +-type ra_voter_status() :: voter | {nonvoter, ra_nonvoter_reason()}. + +-type ra_nonvoter_reason() :: init | #{target := ra_index()}. + -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), query_index := non_neg_integer(), % the commit index last sent % used for evaluating pipeline status commit_index_sent := non_neg_integer(), + %% whether the peer is part of the consensus + voter_status := ra_voter_status(), %% indicates that a snapshot is being sent %% to the peer status := ra_peer_status()}. diff --git a/src/ra_directory.erl b/src/ra_directory.erl index d50d6182..42990a0a 100644 --- a/src/ra_directory.erl +++ b/src/ra_directory.erl @@ -175,14 +175,20 @@ overview(System) when is_atom(System) -> #{directory := Tbl, directory_rev := _TblRev} = get_names(System), Dir = ets:tab2list(Tbl), - States = maps:from_list(ets:tab2list(ra_state)), + Rows = lists:map(fun({K, S, V}) -> + {K, {S, V}} + end, + ets:tab2list(ra_state)), + States = maps:from_list(Rows), Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)), lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) -> + {S, V} = maps:get(ServerName, States, {undefined, undefined}), Acc#{ServerName => #{uid => UId, pid => Pid, parent => Parent, - state => maps:get(ServerName, States, undefined), + state => S, + voter_status => V, cluster_name => ClusterName, snapshot_state => maps:get(UId, Snaps, undefined)}} diff --git a/src/ra_server.erl b/src/ra_server.erl index 1ac231d3..83bb23a6 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -55,6 +55,7 @@ terminate/2, log_fold/3, log_read/2, + voter_status/1, recover/1 ]). @@ -72,6 +73,7 @@ log := term(), voted_for => 'maybe'(ra_server_id()), % persistent votes => non_neg_integer(), + voter_status => ra_voter_status(), commit_index := ra_index(), last_applied := ra_index(), persisted_last_applied => ra_index(), @@ -195,6 +197,7 @@ max_pipeline_count => non_neg_integer(), ra_event_formatter => {module(), atom(), [term()]}, counter => counters:counters_ref(), + voter => boolean(), system_config => ra_system:config()}. -type mutable_config() :: #{cluster_name => ra_cluster_name(), @@ -325,9 +328,18 @@ init(#{id := Id, counter = maps:get(counter, Config, undefined), system_config = SystemConfig}, + VoterStatus = case maps:get(voter, Config, true) of + false -> + {nonvoter, init}; + true -> + voter + end, + Peer = maps:get(Id, Cluster0), + Cluster1 = Cluster0#{Id => Peer#{voter_status => VoterStatus}}, + #{cfg => Cfg, current_term => CurrentTerm, - cluster => Cluster0, + cluster => Cluster1, % There may be scenarios when a single server % starts up but hasn't % yet re-applied its noop command that we may receive other join @@ -335,6 +347,7 @@ init(#{id := Id, cluster_change_permitted => false, cluster_index_term => SnapshotIndexTerm, voted_for => VotedFor, + voter_status => VoterStatus, commit_index => CommitIndex, %% set this to the first index so that we can apply all entries %% up to the commit index during recovery @@ -394,8 +407,8 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - {State2, Effects0} = evaluate_quorum(State1, []), - + Effects00 = maybe_promote_voter(PeerId, State1, []), + {State2, Effects0} = evaluate_quorum(State1, Effects00), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), Effects = [{next_event, info, pipeline_rpcs} | Effects1], @@ -776,7 +789,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true}, NewVotes = Votes + 1, ?DEBUG("~ts: vote granted for term ~b votes ~b", [LogId, Term, NewVotes]), - case trunc(maps:size(Nodes) / 2) + 1 of + case required_quorum(Nodes) of NewVotes -> {State1, Effects} = make_all_rpcs(initialise_peers(State0)), Noop = {noop, #{ts => erlang:system_time(millisecond)}, @@ -922,7 +935,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true, [LogId, Token, Term, Votes + 1]), NewVotes = Votes + 1, State = update_term(Term, State0), - case trunc(maps:size(Nodes) / 2) + 1 of + case required_quorum(Nodes) of NewVotes -> call_for_election(candidate, State); _ -> @@ -1103,8 +1116,18 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; +handle_follower(#pre_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) -> + ?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0", + [LogId, Voter]), + {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); +handle_follower(#request_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) -> + ?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0", + [LogId, Voter]), + {follower, State, []}; handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, #{current_term := Term, voted_for := VotedFor, cfg := #cfg{log_id = LogId}} = State) @@ -1202,6 +1225,11 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; +handle_follower(election_timeout, + #{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) -> + ?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0", + [LogId, Voter]), + {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); handle_follower(try_become_leader, State) -> @@ -1374,7 +1402,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg, cluster_index_term, query_index ], State), - O = maps:merge(O0, cfg_to_map(Cfg)), + O1 = O0#{voter_status => voter_status(State)}, + O = maps:merge(O1, cfg_to_map(Cfg)), LogOverview = ra_log:overview(Log), MacOverview = ra_machine:overview(MacMod, MacState), O#{log => LogOverview, @@ -2087,6 +2116,7 @@ new_peer() -> match_index => 0, commit_index_sent => 0, query_index => 0, + voter_status => voter, status => normal}. new_peer_with(Map) -> @@ -2218,13 +2248,13 @@ make_cluster(Self, Nodes) -> Cluster#{Self => new_peer()} end. -initialise_peers(State = #{log := Log, cluster := Cluster0}) -> - PeerIds = peer_ids(State), +initialise_peers(State = #{cfg := #cfg{id = Id}, log := Log, cluster := Cluster0}) -> NextIdx = ra_log:next_index(Log), - Cluster = lists:foldl(fun(PeerId, Acc) -> - Acc#{PeerId => - new_peer_with(#{next_index => NextIdx})} - end, Cluster0, PeerIds), + Cluster = maps:map(fun (PeerId, Self) when PeerId =:= Id -> + Self; + (_, #{voter_status := Voter} = _Other) -> + new_peer_with(#{next_index => NextIdx, voter_status => Voter}) + end, Cluster0), State#{cluster => Cluster}. apply_to(ApplyTo, State, Effs) -> @@ -2318,6 +2348,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, + voter_status => voter_status(id(State0), NewCluster), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2450,16 +2481,40 @@ append_log_leader({CmdTag, _, _, _}, when CmdTag == '$ra_join' orelse CmdTag == '$ra_leave' -> {not_appended, cluster_change_not_permitted, State}; +append_log_leader({'$ra_join', From, + #{id := JoiningNode, voter_status := Voter}, + ReplyMode}, + State = #{cluster := OldCluster}) -> + case OldCluster of + #{JoiningNode := #{voter_status := Voter}} -> + already_member(State); + #{JoiningNode := Peer} -> + % Update member status. + Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}}, + append_cluster_change(Cluster, From, ReplyMode, State); + _ -> + % Insert new member. + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})}, + append_cluster_change(Cluster, From, ReplyMode, State) + end; +append_log_leader({'$ra_join', From, #{id := JoiningNode, voter := WantVoter}, ReplyMode}, + State) -> + % Shortcut to compute non-voter status + VoterStatus = case WantVoter of + true -> voter; + false -> new_nonvoter(State) + end, + append_log_leader({'$ra_join', From, #{id => JoiningNode, voter_status => VoterStatus}, ReplyMode}, + State); append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> + % Legacy $ra_join, join as full voter iff no such member in the cluster. case OldCluster of #{JoiningNode := _} -> - % already a member do nothing - % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}; + already_member(State); _ -> - Cluster = OldCluster#{JoiningNode => new_peer()}, - append_cluster_change(Cluster, From, ReplyMode, State) + append_log_leader({'$ra_join', From, #{id => JoiningNode, voter => true}, ReplyMode}, + State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, State = #{cfg := #cfg{log_id = LogId}, @@ -2501,6 +2556,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, + voter_status => voter_status(id(State), Cluster), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2577,6 +2633,8 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter_status := {nonvoter, _}}, Acc) -> + Acc; (_K, #{query_index := Idx}, Acc) -> [Idx | Acc] end, [QueryIndex], Cluster). @@ -2587,6 +2645,8 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter_status := {nonvoter, _}}, Acc) -> + Acc; (_K, #{match_index := Idx}, Acc) -> [Idx | Acc] end, [LWIdx], Cluster). @@ -2803,6 +2863,74 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) -> Name; meta_name(#{names := #{log_meta := Name}}) -> Name. + +already_member(State) -> + % already a member do nothing + % TODO: reply? If we don't reply the caller may block until timeout + {not_appended, already_member, State}. + +%%% ==================== +%%% Voter status helpers +%%% ==================== + +-spec new_nonvoter(ra_server_state()) -> ra_voter_status(). +new_nonvoter(#{commit_index := Target} = _State) -> + {nonvoter, #{target => Target}}. + +-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects(). +maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) -> + % Unknown peer handled in the caller. + #{PeerID := #{match_index := MI, voter_status := OldStatus}} = Cluster, + case update_voter_status(OldStatus, MI) of + OldStatus -> + Effects; + voter -> + [{next_event, + {command, {'$ra_join', + #{ts => os:system_time(millisecond)}, + #{id => PeerID, voter_status => voter}, + noreply}}} | + Effects] + end. + +update_voter_status({nonvoter, #{target := Target}}, MI) + when MI >= Target -> + voter; +update_voter_status(Permanent, _) -> + Permanent. + +-spec voter_status(ra_server_state()) -> ra_voter_status(). +voter_status(#{cluster := Cluster} = State) -> + case maps:get(voter_status, State, undefined) of + undefined -> + voter_status(id(State), Cluster); + Voter -> + Voter + end. + +-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status(). +voter_status(PeerId, Cluster) -> + case maps:get(PeerId, Cluster, undefined) of + undefined -> + undefined; + Peer -> + maps:get(voter_status, Peer, voter) + end. + +-spec required_quorum(ra_cluster()) -> pos_integer(). +required_quorum(Cluster) -> + Voters = count_voters(Cluster), + trunc(Voters / 2) + 1. + +count_voters(Cluster) -> + maps:fold( + fun (_, #{voter_status := {nonvoter, _}}, Count) -> + Count; + (_, _, Count) -> + Count + 1 + end, + 0, Cluster). + %%% =================== %%% Internal unit tests %%% =================== diff --git a/src/ra_server.hrl b/src/ra_server.hrl index 13294f70..0a387394 100644 --- a/src/ra_server.hrl +++ b/src/ra_server.hrl @@ -9,12 +9,14 @@ -define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB -define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000). -define(FLUSH_COMMANDS_SIZE, 16). +-define(MAX_NONVOTER_ROUNDS, 4). -record(cfg, {id :: ra_server_id(), uid :: ra_uid(), log_id :: unicode:chardata(), metrics_key :: term(), + tick_timeout :: non_neg_integer(), machine :: ra_machine:machine(), machine_version :: ra_machine:version(), machine_versions :: [{ra_index(), ra_machine:version()}, ...], diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index e13ec1b8..93e38f8c 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -783,11 +783,19 @@ follower(_, tick_timeout, State0) -> set_tick_timer(State, Actions)}; follower({call, From}, {log_fold, Fun, Term}, State) -> fold_log(From, Fun, Term, State); -follower(EventType, Msg, State0) -> +follower(EventType, Msg, #state{conf = #conf{name = Name}, + server_state = SS0} = State0) -> + Voter0 = ra_server:voter_status(SS0), case handle_follower(Msg, State0) of {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), - State = follower_leader_change(State0, State2), + State = #state{server_state = SS} = follower_leader_change(State0, State2), + case ra_server:voter_status(SS) of + Voter0 -> + ok; + Voter -> + true = ets:update_element(ra_state, Name, {3, Voter}) + end, {keep_state, State, Actions}; {pre_vote, State1, Effects} -> {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), @@ -1028,7 +1036,8 @@ format_status(Opt, [_PDict, StateName, handle_enter(RaftState, OldRaftState, #state{conf = #conf{name = Name}, server_state = ServerState0} = State) -> - true = ets:insert(ra_state, {Name, RaftState}), + Voter = ra_server:voter_status(ServerState0), + true = ets:insert(ra_state, {Name, RaftState, Voter}), {ServerState, Effects} = ra_server:handle_state_enter(RaftState, ServerState0), case RaftState == leader orelse OldRaftState == leader of @@ -1716,9 +1725,11 @@ handle_tick_metrics(State) -> _ = ets:insert(ra_metrics, Metrics), State. -can_execute_locally(RaftState, TargetNode, State) -> +can_execute_locally(RaftState, TargetNode, + #state{server_state = ServerState} = State) -> + Voter = ra_server:voter_status(ServerState), case RaftState of - follower -> + follower when Voter =:= voter -> TargetNode == node(); leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index 07a7eda8..299870e7 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -674,7 +674,8 @@ force_start_follower_as_single_member(Config) -> Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]), {ok, _, _} = ra:add_member(ServerId3, ServerId4), %% the membership has changed but member not running yet - {timeout,_} = ra:process_command(ServerId3, {enq, banana}), + %% it is nonvoter and does not affect quorum size yet + {ok, _, _} = ra:process_command(ServerId3, {enq, banana}), %% start new member ok = ra:start_server(?SYS, Conf4), {ok, _, ServerId3} = ra:members(ServerId4), diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 0e29fc35..89777eea 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -64,7 +64,10 @@ all_tests() -> post_partition_liveness, all_metrics_are_integers, transfer_leadership, - transfer_leadership_two_node + transfer_leadership_two_node, + new_nonvoter_knows_its_status, + voter_gets_promoted_consistent_leader, + voter_gets_promoted_new_leader ]. groups() -> @@ -1024,6 +1027,81 @@ transfer_leadership_two_node(Config) -> ?assertEqual({error, unknown_member}, ra:transfer_leadership(NewLeader, {unknown, node()})), terminate_cluster(Members). +new_nonvoter_knows_its_status(Config) -> + Name = ?config(test_name, Config), + [N1, N2] = [{n1, node()}, {n2, node()}], + {ok, _, _} = ra:start_cluster(default, Name, add_machine(), [N1]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow + ok = start_and_join_nonvoter(N1, N2), + + % n2 had no time to catch up + % in server state + {ok, #{cluster := #{N1 := #{voter_status := voter}, + N2 := #{voter_status := {nonvoter, #{target := 2}}}}}, _} = ra:member_overview(N1), + {ok, #{cluster := #{N1 := #{voter_status := voter}, + N2 := #{voter_status := {nonvoter, init}}}}, _} = ra:member_overview(N2), + % in ets + #{servers := #{n1 := #{voter_status := voter}, + n2 := #{voter_status := {nonvoter, init}}}} = ra:overview(?SYS), + ok. + +voter_gets_promoted_consistent_leader(Config) -> + N1 = nth_server_name(Config, 1), + N2 = nth_server_name(Config, 2), + N3 = nth_server_name(Config, 3), + + {ok, _, _} = ra:start_cluster(default, ?config(test_name, Config), add_machine(), [N1]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow 1 + ok = start_and_join_nonvoter(N1, N2), + _ = issue_op(N2, 1), + validate_state_on_node(N2, 2), + + % grow 2 + ok = start_and_join_nonvoter(N1, N3), + _ = issue_op(N3, 1), + validate_state_on_node(N3, 3), + + % all are voters after catch-up + timer:sleep(100), + All = [N1, N2, N3], + % in server state + lists:map(fun(O) -> ?assertEqual(All, voters(O)) end, overviews(N1)), + % in ets + #{servers := Servers} = ra:overview(?SYS), + lists:map(fun({Name, _}) -> #{Name := #{voter_status := voter}} = Servers end, All), + ok. + +voter_gets_promoted_new_leader(Config) -> + N1 = nth_server_name(Config, 1), + N2 = nth_server_name(Config, 2), + N3 = nth_server_name(Config, 3), + + {ok, [Leader, _Second], []} = ra:start_cluster(default, ?config(test_name, Config), add_machine(), [N1, N2]), + _ = issue_op(N1, 1), + validate_state_on_node(N1, 1), + + % grow with leadership change + ok = start_and_join_nonvoter(N1, N3), + ra:transfer_leadership(Leader, _Second), + _ = issue_op(N3, 1), + validate_state_on_node(N3, 2), + + % all are voters after catch-up + timer:sleep(100), + All = [N1, N2, N3], + % in server state + lists:map(fun(O) -> ?assertEqual(All, voters(O)) end, overviews(N1)), + % in ets + #{servers := Servers} = ra:overview(?SYS), + lists:map(fun({Name, _}) -> #{Name := #{voter_status := voter}} = Servers end, All), + ok. + get_gen_statem_status(Ref) -> {_, _, _, Items} = sys:get_status(Ref), proplists:get_value(raft_state, lists:last(Items)). @@ -1096,6 +1174,12 @@ start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), ok. +start_and_join_nonvoter({ClusterName, _} = ServerRef, {_, _} = New) -> + Server = #{id => New, voter => false}, + {ok, _, _} = ra:add_member(ServerRef, Server), + ok = ra:start_server(default, ClusterName, Server, add_machine(), [ServerRef]), + ok. + start_local_cluster(Num, Name, Machine) -> Nodes = [{ra_server:name(Name, integer_to_list(N)), node()} || N <- lists:seq(1, Num)], @@ -1126,6 +1210,18 @@ nth_server_name(Config, N) when is_integer(N) -> add_machine() -> {module, ?MODULE, #{}}. +overviews(Node) -> + {ok, Members, _From} = ra:members(Node), + [ra:member_overview(P) || {_, _} = P <- Members]. + +voters({ok, #{cluster := Peers}, _} = _Overview) -> + [Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter_status, Status) =:= voter]. + +is_voter({ok, #{voter_status := voter}, _} = _Overview) -> + true; +is_voter({ok, _, _} = _Overview) -> + false. + %% machine impl init(_) -> 0. apply(_Meta, Num, State) -> diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 45b09508..11886af3 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -20,6 +20,7 @@ all() -> follower_aer_term_mismatch_snapshot, follower_handles_append_entries_rpc, candidate_handles_append_entries_rpc, + append_entries_reply_success_promotes_nonvoter, append_entries_reply_success, append_entries_reply_no_success, follower_request_vote, @@ -40,11 +41,13 @@ all() -> leader_noop_increments_machine_version, follower_machine_version, follower_install_snapshot_machine_version, - leader_server_join, + leader_server_join_voter, + leader_server_join_nonvoter, leader_server_leave, leader_is_removed, follower_cluster_change, - leader_applies_new_cluster, + leader_applies_new_cluster_voter, + leader_applies_new_cluster_nonvoter, leader_appends_cluster_change_then_steps_before_applying_it, leader_receives_install_snapshot_rpc, follower_installs_snapshot, @@ -54,6 +57,7 @@ all() -> snapshotted_follower_received_append_entries, leader_received_append_entries_reply_with_stale_last_index, leader_receives_install_snapshot_result, + leader_received_append_entries_reply_and_promotes_voter, leader_replies_to_append_entries_rpc_with_lower_term, follower_aer_1, follower_aer_2, @@ -114,7 +118,8 @@ end_per_testcase(_TestCase, Config) -> id(X) -> X. -ra_server_init(Conf) -> +ra_server_init(Conf0) -> + Conf = maps:merge(Conf0, #{tick_timeout => 1000}), ra_server:recover(ra_server:init(Conf)). setup_log() -> @@ -275,6 +280,10 @@ election_timeout(_Config) -> {N3, _}]}]} = ra_server:handle_follower(Msg, State), + % non-voters ignore election_timeout + NVState = State#{voter_status => {nonvoter, test}}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + % pre_vote {pre_vote, #{current_term := 5, votes := 0, pre_vote_token := Token1}, @@ -796,6 +805,89 @@ candidate_handles_append_entries_rpc(_Config) -> = ra_server:handle_candidate(EmptyAE, State), ok. +append_entries_reply_success_promotes_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, + NonVoter = {nonvoter, #{target => 3}}, + Cluster = #{N1 => new_peer_with(#{next_index => 5, match_index => 4}), + N2 => new_peer_with(#{next_index => 1, match_index => 0, + commit_index_sent => 3, + voter_status => NonVoter}), + N3 => new_peer_with(#{next_index => 2, match_index => 1})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{commit_index => 1, + last_applied => 1, + cluster => Cluster, + machine_state => <<"hi1">>}, + Ack = #append_entries_reply{term = 5, success = true, + next_index = 4, + last_index = 3, last_term = 5}, + + % doesn't progress commit_index, non voter ack doesn't raise majority + {leader, #{cluster := #{N2 := #{next_index := 4, + match_index := 3, + voter_status := NonVoter}}, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State1, + [{next_event, info, pipeline_rpcs}, + {next_event, {command, {'$ra_join', _, + #{id := N2, voter_status := voter}, noreply}} = RaJoin} + ]} = ra_server:handle_leader({N2, Ack}, State0), + + % pipeline to N3 + {leader, #{cluster := #{N3 := #{next_index := 4, + match_index := 1}}, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State2, + [{send_rpc, N3, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 1, + prev_log_term = 1, + leader_commit = 1, + entries = [{2, 3, {'$usr', _, <<"hi2">>, _}}, + {3, 5, {'$usr', _, <<"hi3">>, _}}]} + }]} = ra_server:handle_leader(pipeline_rpcs, State1), + + % ra_join translates into cluster update + {leader, #{cluster := #{N2 := #{next_index := 5, + match_index := 3, + voter_status := voter}}, + cluster_change_permitted := false, + commit_index := 1, + last_applied := 1, + machine_state := <<"hi1">>} = State3, + [{send_rpc, N3, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 1, + entries = [{4, 5, {'$ra_cluster_change', _, + #{N2 := #{voter_status := voter}}, + _}}]}}, + {send_rpc, N2, + #append_entries_rpc{term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 1, + entries = [{4, 5, {'$ra_cluster_change', _, + #{N2 := #{voter_status := voter}}, + _}}]}} + ]} = ra_server:handle_leader(RaJoin, State2), + + Ack2 = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + + % voter ack, raises commit_index + {leader, #{cluster := #{N2 := #{next_index := 5, + match_index := 4}}, + commit_index := 3, + last_applied := 3, + machine_state := <<"hi3">>}, + [{next_event, info, pipeline_rpcs}, + {aux, eval}]} = ra_server:handle_leader({N2, Ack2}, State3), + ok. + append_entries_reply_success(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, @@ -918,6 +1010,11 @@ follower_request_vote(_Config) -> [{reply, #request_vote_result{term = 6, vote_granted = true}}]} = ra_server:handle_follower(Msg#request_vote_rpc{last_log_index = 4}, State), + + % non-voters ignore request_vote_rpc + NVState = State#{voter_status => {nonvoter, test}}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + ok. follower_pre_vote(_Config) -> @@ -1032,6 +1129,11 @@ follower_pre_vote(_Config) -> vote_granted = true}}]} = ra_server:handle_follower(Msg#pre_vote_rpc{last_log_index = 4}, State), + + % non-voters ignore pre_vote_rpc + NVState = State#{voter_status => {nonvoter, test}}, + {follower, NVState, []} = ra_server:handle_follower(Msg, NVState), + ok. pre_vote_receives_pre_vote(_Config) -> @@ -1293,7 +1395,7 @@ follower_install_snapshot_machine_version(_Config) -> _} = ra_server:handle_receive_snapshot(ISR, State0), ok. -leader_server_join(_Config) -> +leader_server_join_voter(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), N2 => new_peer_with(#{next_index => 4, match_index => 3}), @@ -1302,20 +1404,22 @@ leader_server_join(_Config) -> % raft servers should switch to the new configuration after log append % and further cluster changes should be disallowed {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + commit_index := Target, cluster_change_permitted := false} = _State1, Effects} = ra_server:handle_leader({command, {'$ra_join', meta(), N4, await_consensus}}, State0), + % new member should join as voter [ {send_rpc, N4, #append_entries_rpc{entries = [_, _, _, {4, 5, {'$ra_cluster_change', _, #{N1 := _, N2 := _, - N3 := _, N4 := _}, + N3 := _, N4 := #{voter_status := voter}}, await_consensus}}]}}, {send_rpc, N3, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := voter}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1324,7 +1428,49 @@ leader_server_join(_Config) -> {send_rpc, N2, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := voter}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + ok. + +leader_server_join_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + commit_index := Target, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_join', meta(), + #{id => N4, voter => false}, await_consensus}}, State0), + % new member should join as non-voter + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter_status := {nonvoter, #{target := Target}}}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := {nonvoter, #{target := Target}}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter_status := {nonvoter, #{target := Target}}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1419,7 +1565,7 @@ follower_cluster_change(_Config) -> written_evt(E) -> {ra_log_event, {written, E}}. -leader_applies_new_cluster(_Config) -> +leader_applies_new_cluster_voter(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), N2 => new_peer_with(#{next_index => 4, match_index => 3}), @@ -1438,13 +1584,11 @@ leader_applies_new_cluster(_Config) -> ?assert(not maps:get(cluster_change_permitted, State2)), - % replies coming in AEReply = #append_entries_reply{term = 5, success = true, next_index = 5, last_index = 4, last_term = 5}, % leader does not yet have consensus as will need at least 3 votes {leader, State3 = #{commit_index := 3, - cluster_change_permitted := false, cluster_index_term := {4, 5}, cluster := #{N2 := #{next_index := 5, @@ -1457,6 +1601,37 @@ leader_applies_new_cluster(_Config) -> cluster := #{N3 := #{next_index := 5, match_index := 4}}}, _Effects} = ra_server:handle_leader({N3, AEReply}, State3), + ok. + +leader_applies_new_cluster_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + + State = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + Command = {command, {'$ra_join', meta(), #{id => N4, voter => false}, await_consensus}}, + % cluster records index and term it was applied to determine whether it has + % been applied + {leader, #{cluster_index_term := {4, 5}, + cluster := #{N1 := _, N2 := _, + N3 := _, N4 := _} } = State1, _} = + ra_server:handle_leader(Command, State), + {leader, State2, _} = + ra_server:handle_leader(written_evt({4, 4, 5}), State1), + + ?assert(not maps:get(cluster_change_permitted, State2)), + + % replies coming in + AEReply = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + % new peer doesn't count until it reaches its matching target, leader needs only 2 votes + {leader, _State3 = #{commit_index := 4, + cluster_change_permitted := true, + cluster := #{N2 := #{next_index := 5, + match_index := 4}}}, + _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), ok. leader_appends_cluster_change_then_steps_before_applying_it(_Config) -> @@ -1497,6 +1672,7 @@ is_new(_Config) -> uid => <<"ra">>, initial_members => [], log_init_args => #{uid => <<>>}, + tick_timeout => 1000, machine => {simple, fun erlang:'+'/2, 0}}, NewState = ra_server:init(Args), {leader, State, _} = ra_server:handle_leader(usr_cmd(1), NewState), @@ -1949,6 +2125,32 @@ leader_receives_install_snapshot_result(_Config) -> (_) -> false end, Effects)), ok. +leader_received_append_entries_reply_and_promotes_voter(_config) -> + N3 = ?N3, State = base_state(3, ?FUNCTION_NAME), + AER = #append_entries_reply{term = 5, success = true, + next_index = 5, + last_index = 4, last_term = 5}, + + % Permanent voter + State1 = set_peer_voter_status(State, N3, voter), + {leader, _, + [{next_event,info,pipeline_rpcs}] + } = ra_server:handle_leader({N3, AER}, State1), + + % Permanent non-voter + State2 = set_peer_voter_status(State, N3, {nonvoter, test}), + {leader, _, + [{next_event,info,pipeline_rpcs}] + } = ra_server:handle_leader({N3, AER}, State2), + + % Promotion + State3 = set_peer_voter_status(State, N3, + {nonvoter, #{target => 4}}), + {leader, _, + [{next_event,info,pipeline_rpcs}, + {next_event, {command, {'$ra_join', _, #{id := N3, voter_status := voter}, _}}}] + } = ra_server:handle_leader({N3, AER}, State3). + follower_heartbeat(_Config) -> State = base_state(3, ?FUNCTION_NAME), #{current_term := Term, @@ -2456,6 +2658,11 @@ set_peer_query_index(State, PeerId, QueryIndex) -> #{PeerId := Peer} = Cluster, State#{cluster := Cluster#{PeerId => Peer#{query_index => QueryIndex}}}. +set_peer_voter_status(State, PeerId, VoterStatus) -> + #{cluster := Cluster} = State, + #{PeerId := Peer} = Cluster, + State#{cluster := Cluster#{PeerId => Peer#{voter_status => VoterStatus}}}. + leader_heartbeat_reply_lower_term(_Config) -> State = base_state(3, ?FUNCTION_NAME), #{current_term := Term, @@ -2546,6 +2753,7 @@ base_state(NumServers, MacMod) -> uid = <<"n1">>, log_id = <<"n1">>, metrics_key = n1, + tick_timeout = 1000, machine = {machine, MacMod, #{}}, % just keep last applied value machine_version = 0, machine_versions = [{0, 0}], @@ -2593,6 +2801,7 @@ new_peer() -> match_index => 0, query_index => 0, commit_index_sent => 0, + voter_status => voter, status => normal}. new_peer_with(Map) ->