Skip to content

Commit

Permalink
Newly added followers do not participate in quorum until they catch u…
Browse files Browse the repository at this point in the history
…p with the log
  • Loading branch information
Alex Valiushko committed Aug 21, 2023
1 parent 54860c4 commit 3bede59
Show file tree
Hide file tree
Showing 9 changed files with 399 additions and 46 deletions.
9 changes: 5 additions & 4 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,10 @@ delete_cluster(ServerIds, Timeout) ->
%% affect said cluster's availability characteristics (by increasing quorum node count).
%%
%% @param ServerLoc the ra server or servers to try to send the command to
%% @param ServerId the ra server id of the new server.
%% @param ServerId the ra server id of the new server, or a map with server id and settings.
%% @end
-spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) ->
-spec add_member(ra_server_id() | [ra_server_id()],
ra_server_id() | ra_new_server()) ->
ra_cmd_ret() |
{error, already_member} |
{error, cluster_change_not_permitted}.
Expand All @@ -570,7 +571,8 @@ add_member(ServerLoc, ServerId) ->
%% @see add_member/2
%% @end
-spec add_member(ra_server_id() | [ra_server_id()],
ra_server_id(), timeout()) ->
ra_server_id() | ra_new_server(),
timeout()) ->
ra_cmd_ret() |
{error, already_member} |
{error, cluster_change_not_permitted}.
Expand All @@ -579,7 +581,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
%% command to the log.
Expand Down
10 changes: 10 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,27 @@
%% after node restart). Pids are not stable in this sense.
-type ra_server_id() :: {Name :: atom(), Node :: node()}.

%% Specifies server configuration for a new cluster member.
-type ra_new_server() :: #{id := ra_server_id(),
voter_status := ra_voter_status()}.

-type ra_peer_status() :: normal |
{sending_snapshot, pid()} |
suspended |
disconnected.

-type ra_voter_status() :: voter | {nonvoter, ra_nonvoter_reason()}.

-type ra_nonvoter_reason() :: init | #{target := ra_index()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter_status := ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status()}.
Expand Down
10 changes: 8 additions & 2 deletions src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
#{directory := Tbl,
directory_rev := _TblRev} = get_names(System),
Dir = ets:tab2list(Tbl),
States = maps:from_list(ets:tab2list(ra_state)),
Rows = lists:map(fun({K, S, V}) ->
{K, {S, V}}
end,
ets:tab2list(ra_state)),
States = maps:from_list(Rows),
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
Acc#{ServerName =>
#{uid => UId,
pid => Pid,
parent => Parent,
state => maps:get(ServerName, States, undefined),
state => S,
voter_status => V,
cluster_name => ClusterName,
snapshot_state => maps:get(UId, Snaps,
undefined)}}
Expand Down
155 changes: 138 additions & 17 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
terminate/2,
log_fold/3,
log_read/2,
voter_status/1,
recover/1
]).

Expand All @@ -72,6 +73,7 @@
log := term(),
voted_for => 'maybe'(ra_server_id()), % persistent
votes => non_neg_integer(),
voter_status => ra_voter_status(),
commit_index := ra_index(),
last_applied := ra_index(),
persisted_last_applied => ra_index(),
Expand Down Expand Up @@ -325,16 +327,27 @@ init(#{id := Id,
counter = maps:get(counter, Config, undefined),
system_config = SystemConfig},

VoterStatus = case maps:get(voter_status, Config, true) of
false ->
{nonvoter, init};
true ->
voter
end,

Peer = maps:get(Id, Cluster0),
Cluster1 = Cluster0#{Id => Peer#{voter_status => VoterStatus}},

#{cfg => Cfg,
current_term => CurrentTerm,
cluster => Cluster0,
cluster => Cluster1,
% There may be scenarios when a single server
% starts up but hasn't
% yet re-applied its noop command that we may receive other join
% commands that can't be applied.
cluster_change_permitted => false,
cluster_index_term => SnapshotIndexTerm,
voted_for => VotedFor,
voter_status => VoterStatus,
commit_index => CommitIndex,
%% set this to the first index so that we can apply all entries
%% up to the commit index during recovery
Expand Down Expand Up @@ -394,8 +407,8 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
{State2, Effects0} = evaluate_quorum(State1, []),

Effects00 = maybe_promote_voter(PeerId, State1, []),
{State2, Effects0} = evaluate_quorum(State1, Effects00),
{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),
Effects = [{next_event, info, pipeline_rpcs} | Effects1],
Expand Down Expand Up @@ -776,7 +789,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
NewVotes = Votes + 1,
?DEBUG("~ts: vote granted for term ~b votes ~b",
[LogId, Term, NewVotes]),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
Expand Down Expand Up @@ -922,7 +935,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
[LogId, Token, Term, Votes + 1]),
NewVotes = Votes + 1,
State = update_term(Term, State0),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
call_for_election(candidate, State);
_ ->
Expand Down Expand Up @@ -1103,8 +1116,18 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
cfg := #cfg{log_id = LogId}} = State)
Expand Down Expand Up @@ -1202,6 +1225,11 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -1374,7 +1402,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
cluster_index_term,
query_index
], State),
O = maps:merge(O0, cfg_to_map(Cfg)),
O1 = O0#{voter_status => voter_status(State)},
O = maps:merge(O1, cfg_to_map(Cfg)),
LogOverview = ra_log:overview(Log),
MacOverview = ra_machine:overview(MacMod, MacState),
O#{log => LogOverview,
Expand Down Expand Up @@ -2087,6 +2116,7 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
voter_status => voter,
status => normal}.

new_peer_with(Map) ->
Expand Down Expand Up @@ -2218,13 +2248,13 @@ make_cluster(Self, Nodes) ->
Cluster#{Self => new_peer()}
end.

initialise_peers(State = #{log := Log, cluster := Cluster0}) ->
PeerIds = peer_ids(State),
initialise_peers(State = #{cfg := #cfg{id = Id}, log := Log, cluster := Cluster0}) ->
NextIdx = ra_log:next_index(Log),
Cluster = lists:foldl(fun(PeerId, Acc) ->
Acc#{PeerId =>
new_peer_with(#{next_index => NextIdx})}
end, Cluster0, PeerIds),
Cluster = maps:map(fun (PeerId, Self) when PeerId =:= Id ->
Self;
(_, #{voter_status := Voter} = _Other) ->
new_peer_with(#{next_index => NextIdx, voter_status => Voter})
end, Cluster0),
State#{cluster => Cluster}.

apply_to(ApplyTo, State, Effs) ->
Expand Down Expand Up @@ -2318,6 +2348,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter_status => voter_status(id(State0), NewCluster),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2450,16 +2481,33 @@ append_log_leader({CmdTag, _, _, _},
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From,
#{id := JoiningNode, voter_status := Voter},
ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := #{voter_status := Voter}} ->
already_member(State);
#{JoiningNode := Peer} ->
% Update member status.
Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
% Insert new member.
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
% Legacy $ra_join, join as non-voter iff no such member in the cluster.
case OldCluster of
#{JoiningNode := _} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
already_member(State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
append_log_leader({'$ra_join', From,
#{id => JoiningNode, voter_status => new_nonvoter(State)},
ReplyMode},
State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
State = #{cfg := #cfg{log_id = LogId},
Expand Down Expand Up @@ -2501,6 +2549,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter_status => voter_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2577,6 +2626,8 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
end, [QueryIndex], Cluster).
Expand All @@ -2587,6 +2638,8 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
end, [LWIdx], Cluster).
Expand Down Expand Up @@ -2803,6 +2856,74 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
Name;
meta_name(#{names := #{log_meta := Name}}) ->
Name.

already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.

%%% ====================
%%% Voter status helpers
%%% ====================

-spec new_nonvoter(ra_server_state()) -> ra_voter_status().
new_nonvoter(#{commit_index := Target} = _State) ->
{nonvoter, #{target => Target}}.

-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI, voter_status := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
voter ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{id => PeerID, voter_status => voter},
noreply}}} |
Effects]
end.

update_voter_status({nonvoter, #{target := Target}}, MI)
when MI >= Target ->
voter;
update_voter_status(Permanent, _) ->
Permanent.

-spec voter_status(ra_server_state()) -> ra_voter_status().
voter_status(#{cluster := Cluster} = State) ->
case maps:get(voter_status, State, undefined) of
undefined ->
voter_status(id(State), Cluster);
Voter ->
Voter
end.

-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status().
voter_status(PeerId, Cluster) ->
case maps:get(PeerId, Cluster, undefined) of
undefined ->
undefined;
Peer ->
maps:get(voter_status, Peer, voter)
end.

-spec required_quorum(ra_cluster()) -> pos_integer().
required_quorum(Cluster) ->
Voters = count_voters(Cluster),
trunc(Voters / 2) + 1.

count_voters(Cluster) ->
maps:fold(
fun (_, #{voter_status := {nonvoter, _}}, Count) ->
Count;
(_, _, Count) ->
Count + 1
end,
0, Cluster).

%%% ===================
%%% Internal unit tests
%%% ===================
Expand Down
2 changes: 2 additions & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
-define(FLUSH_COMMANDS_SIZE, 16).
-define(MAX_NONVOTER_ROUNDS, 4).

-record(cfg,
{id :: ra_server_id(),
uid :: ra_uid(),
log_id :: unicode:chardata(),
metrics_key :: term(),
tick_timeout :: non_neg_integer(),
machine :: ra_machine:machine(),
machine_version :: ra_machine:version(),
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
Expand Down
Loading

0 comments on commit 3bede59

Please sign in to comment.