diff --git a/src/ra.hrl b/src/ra.hrl index ceab8819..e6a560b6 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -43,7 +43,11 @@ -type ra_server_id() :: {Name :: atom(), Node :: node()}. %% Specifies server configuration for a new cluster member. +%% Both `ra:add_member` and `ra:start_server` must be called with the same value. -type ra_new_server() :: #{id := ra_server_id(), + + %% If set, server will start as non-voter until later promoted by the + %% leader. init_non_voter => ra_nvid()}. -type ra_peer_status() :: normal | diff --git a/src/ra_server.erl b/src/ra_server.erl index 5289ad62..ae2f755c 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -301,7 +301,7 @@ init(#{id := Id, undefined -> InitialMachineState = ra_machine:init(Machine, Name), Clu = make_cluster(Id, InitialNodes, - #{voter_status => new_voter_status(Config)}), + #{voter_status => init_voter_status(Config)}), {0, Clu, 0, InitialMachineState, {0, 0}}; {#{index := Idx, @@ -310,7 +310,7 @@ init(#{id := Id, machine_version := MacVersion} = Snapshot, MacSt} -> ClusterNodes = maps:get(cluster_state, Snapshot, ClusterNodes0), Clu = make_cluster(Id, ClusterNodes, - #{voter_status => new_voter_status(Config)}), + #{voter_status => init_voter_status(Config)}), %% the snapshot is the last index before the first index %% TODO: should this be Idx + 1? {Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}} @@ -408,7 +408,7 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - Effects00 = maybe_promote_voter(PeerId, State1, []), + Effects00 = maybe_promote_peer(PeerId, State1, []), {State2, Effects0} = evaluate_quorum(State1, Effects00), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), @@ -1299,7 +1299,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, commit_index => SnapIndex, last_applied => SnapIndex, cluster => make_cluster(Id, ClusterIds, - #{voter_status => new_voter_status(State0)}), + #{voter_status => init_voter_status(State0)}), machine_state => MacState}), %% it was the last snapshot chunk so we can revert back to %% follower status @@ -2368,7 +2368,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, - voter_status => current_voter_status(NewCluster, State0), + voter_status => maybe_promote_self(NewCluster, State0), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2505,7 +2505,7 @@ append_log_leader({'$ra_join', From, #{id := JoiningNode, voter_status := Voter0}, ReplyMode}, State = #{cluster := OldCluster}) -> - case normalize_voter_status(Voter0, State) of + case ensure_promotion_target(Voter0, State) of {error, Reason} -> {not_appended, Reason, State}; {ok, Voter} -> @@ -2525,7 +2525,7 @@ append_log_leader({'$ra_join', From, append_log_leader({'$ra_join', From, #{id := JoiningNode} = Spec, ReplyMode}, State) -> append_log_leader({'$ra_join', From, - #{id => JoiningNode, voter_status => new_voter_status(Spec)}, + #{id => JoiningNode, voter_status => init_voter_status(Spec)}, ReplyMode}, State); append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> @@ -2576,7 +2576,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, - voter_status => current_voter_status(Cluster, State), + voter_status => maybe_promote_self(Cluster, State), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2900,43 +2900,23 @@ already_member(State) -> %%% Voter status helpers %%% ==================== -normalize_voter_status({voter, Reason}, _) -> +-spec ensure_promotion_target(ra_voter_status(), ra_server_state()) -> + {ok, ra_voter_status()} | {error, term()}. +ensure_promotion_target({voter, Reason}, _) -> {ok, {voter, Reason}}; -normalize_voter_status({nonvoter, #{target := _, nvid := _} = Reason}, _) -> +ensure_promotion_target({nonvoter, #{target := _, nvid := _} = Reason}, _) -> {ok, {nonvoter, Reason}}; -normalize_voter_status({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) -> +ensure_promotion_target({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) -> {ok, {nonvoter, Reason#{target => CI}}}; -normalize_voter_status(_, _) -> +ensure_promotion_target(_, _) -> {error, missing_nvid}. -new_voter_status(#{init_non_voter := NVId}) -> +-spec init_voter_status(ra_server_config() | ra_new_server()) -> ra_voter_status(). +init_voter_status(#{init_non_voter := NVId}) -> {nonvoter, #{nvid => NVId}}; -new_voter_status(_) -> +init_voter_status(_) -> {voter, #{}}. --spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects(). -maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) -> - % Unknown peer handled in the caller. - #{PeerID := #{match_index := MI, - voter_status := OldStatus}} = Cluster, - case update_voter_status(OldStatus, MI) of - OldStatus -> - Effects; - NewStatus -> - [{next_event, - {command, {'$ra_join', - #{ts => os:system_time(millisecond)}, - #{id => PeerID, voter_status => NewStatus}, - noreply}}} | - Effects] - end. - -update_voter_status({nonvoter, #{target := Target} = State}, MI) - when MI >= Target -> - {voter, State}; -update_voter_status(Permanent, _) -> - Permanent. - -spec voter_status(ra_server_state()) -> ra_voter_status(). voter_status(#{cluster := Cluster} = State) -> case maps:get(voter_status, State, undefined) of @@ -2955,8 +2935,10 @@ voter_status(PeerId, Cluster) -> maps:get(voter_status, Peer, {voter, #{}}) end. -current_voter_status(Cluster, State) -> - {_, NReason} = New = voter_status(id(State), Cluster), +-spec maybe_promote_self(ra_cluster(), ra_server_state()) -> ra_voter_status(). +maybe_promote_self(NewCluster, State) -> + %% A bit complicated procedure since nvid is not guaranteed. + {_, NReason} = New = voter_status(id(State), NewCluster), {_, CReason} = Current = voter_status(State), Self = maps:get(nvid, CReason, undefined), case maps:get(nvid, NReason, undefined) of @@ -2964,6 +2946,29 @@ current_voter_status(Cluster, State) -> _ -> Current end. +-spec maybe_promote_peer(ra_server_id(), ra_server_state(), effects()) -> effects(). +maybe_promote_peer(PeerID, #{cluster := Cluster}, Effects) -> + % Unknown peer handled in the caller. + #{PeerID := #{match_index := MI, + voter_status := OldStatus}} = Cluster, + case update_voter_status(OldStatus, MI) of + OldStatus -> + Effects; + NewStatus -> + [{next_event, + {command, {'$ra_join', + #{ts => os:system_time(millisecond)}, + #{id => PeerID, voter_status => NewStatus}, + noreply}}} | + Effects] + end. + +update_voter_status({nonvoter, #{target := Target} = Reason}, MI) + when MI >= Target -> + {voter, Reason}; +update_voter_status(Permanent, _) -> + Permanent. + -spec required_quorum(ra_cluster()) -> pos_integer(). required_quorum(Cluster) -> Voters = count_voters(Cluster),