Skip to content

Commit

Permalink
Newly added followers do not participate in quorum until they catch u…
Browse files Browse the repository at this point in the history
…p with the log
  • Loading branch information
Alex Valiushko committed Jul 12, 2023
1 parent c1d2036 commit 1073dd4
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 36 deletions.
1 change: 0 additions & 1 deletion src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
%% command to the log.
Expand Down
6 changes: 6 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@
suspended |
disconnected.

-type ra_voter_status() :: voter | {nonvoter, ra_nonvoter_reason()}.

-type ra_nonvoter_reason() :: init | #{target := ra_index()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter_status := ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status()}.
Expand Down
10 changes: 8 additions & 2 deletions src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
#{directory := Tbl,
directory_rev := _TblRev} = get_names(System),
Dir = ets:tab2list(Tbl),
States = maps:from_list(ets:tab2list(ra_state)),
Rows = lists:map(fun({K, S, V}) ->
{K, {S, V}}
end,
ets:tab2list(ra_state)),
States = maps:from_list(Rows),
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
Acc#{ServerName =>
#{uid => UId,
pid => Pid,
parent => Parent,
state => maps:get(ServerName, States, undefined),
state => S,
voter_status => V,
cluster_name => ClusterName,
snapshot_state => maps:get(UId, Snaps,
undefined)}}
Expand Down
146 changes: 136 additions & 10 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
terminate/2,
log_fold/3,
log_read/2,
voter_status/1,
recover/1
]).

Expand All @@ -72,6 +73,7 @@
log := term(),
voted_for => 'maybe'(ra_server_id()), % persistent
votes => non_neg_integer(),
voter_status => ra_voter_status(),
commit_index := ra_index(),
last_applied := ra_index(),
persisted_last_applied => ra_index(),
Expand Down Expand Up @@ -182,6 +184,7 @@
log_init_args := ra_log:ra_log_init_args(),
initial_members := [ra_server_id()],
machine := machine_conf(),
voter => boolean(),
friendly_name => unicode:chardata(),
metrics_key => term(),
% TODO: review - only really used for
Expand Down Expand Up @@ -236,6 +239,7 @@ init(#{id := Id,
cluster_name := _ClusterName,
initial_members := InitialNodes,
log_init_args := LogInitArgs,
tick_timeout := Timeout,
machine := MachineConf} = Config) ->
SystemConfig = maps:get(system_config, Config,
ra_system:default_config()),
Expand Down Expand Up @@ -315,6 +319,7 @@ init(#{id := Id,
uid = UId,
log_id = LogId,
metrics_key = MetricKey,
tick_timeout = Timeout,
machine = Machine,
machine_version = LatestMacVer,
machine_versions = [{SnapshotIdx, MacVer}],
Expand All @@ -325,6 +330,13 @@ init(#{id := Id,
counter = maps:get(counter, Config, undefined),
system_config = SystemConfig},

VoterStatus = case maps:get(voter, Config, false) of
false ->
{nonvoter, init};
true ->
voter
end,

#{cfg => Cfg,
current_term => CurrentTerm,
cluster => Cluster0,
Expand All @@ -335,6 +347,7 @@ init(#{id := Id,
cluster_change_permitted => false,
cluster_index_term => SnapshotIndexTerm,
voted_for => VotedFor,
voter_status => VoterStatus,
commit_index => CommitIndex,
%% set this to the first index so that we can apply all entries
%% up to the commit index during recovery
Expand Down Expand Up @@ -394,11 +407,16 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
{State2, Effects0} = evaluate_quorum(State1, []),

Effects00 = maybe_promote_voter(PeerId, State1, []),

{State2, Effects0} = evaluate_quorum(State1, Effects00),

{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),

Effects = [{next_event, info, pipeline_rpcs} | Effects1],

case State of
#{cluster := #{Id := _}} ->
% leader is in the cluster
Expand Down Expand Up @@ -776,7 +794,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
NewVotes = Votes + 1,
?DEBUG("~ts: vote granted for term ~b votes ~b",
[LogId, Term, NewVotes]),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
Expand Down Expand Up @@ -922,7 +940,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
[LogId, Token, Term, Votes + 1]),
NewVotes = Votes + 1,
State = update_term(Term, State0),
case trunc(maps:size(Nodes) / 2) + 1 of
case required_quorum(Nodes) of
NewVotes ->
call_for_election(candidate, State);
_ ->
Expand Down Expand Up @@ -1103,8 +1121,18 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
cfg := #cfg{log_id = LogId}} = State)
Expand Down Expand Up @@ -1161,7 +1189,7 @@ handle_follower(#install_snapshot_rpc{term = Term,
State = #{cfg := #cfg{log_id = LogId}, current_term := CurTerm})
when Term < CurTerm ->
?DEBUG("~ts: install_snapshot old term ~b in ~b",
[LogId, LastIndex, LastTerm]),
[LogId, LastIndex, LastTerm]),
% follower receives a snapshot from an old term
Reply = #install_snapshot_result{term = CurTerm,
last_term = LastTerm,
Expand Down Expand Up @@ -1202,6 +1230,11 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter_status := {nonvoter, _} = Voter} = State) ->
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -1374,7 +1407,8 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
cluster_index_term,
query_index
], State),
O = maps:merge(O0, cfg_to_map(Cfg)),
O1 = O0#{voter_status => voter_status(State)},
O = maps:merge(O1, cfg_to_map(Cfg)),
LogOverview = ra_log:overview(Log),
MacOverview = ra_machine:overview(MacMod, MacState),
O#{log => LogOverview,
Expand Down Expand Up @@ -2087,6 +2121,7 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
voter_status => voter,
status => normal}.

new_peer_with(Map) ->
Expand Down Expand Up @@ -2318,6 +2353,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter_status => voter_status(id(State0), NewCluster),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2450,16 +2486,33 @@ append_log_leader({CmdTag, _, _, _},
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From,
#{node := JoiningNode, voter_status := Voter},
ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := #{voter_status := Voter}} ->
already_member(State);
#{JoiningNode := Peer} ->
% Update member status.
Cluster = OldCluster#{JoiningNode => Peer#{voter_status => Voter}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
% Insert new member.
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter_status => Voter})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
% Legacy $ra_join, join as non-voter iff no such member in the cluster.
case OldCluster of
#{JoiningNode := _} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
already_member(State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
append_log_leader({'$ra_join', From,
#{node => JoiningNode, voter_status => new_nonvoter(State)},
ReplyMode},
State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
State = #{cfg := #cfg{log_id = LogId},
Expand Down Expand Up @@ -2501,6 +2554,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter_status => voter_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2577,6 +2631,8 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
end, [QueryIndex], Cluster).
Expand All @@ -2587,6 +2643,8 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := {nonvoter, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
end, [LWIdx], Cluster).
Expand Down Expand Up @@ -2803,6 +2861,74 @@ meta_name(#cfg{system_config = #{names := #{log_meta := Name}}}) ->
Name;
meta_name(#{names := #{log_meta := Name}}) ->
Name.

already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.

%%% ====================
%%% Voter status helpers
%%% ====================

-spec new_nonvoter(ra_server_state()) -> ra_voter_status().
new_nonvoter(#{commit_index := Target} = _State) ->
{nonvoter, #{target => Target}}.

-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI, voter_status := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
voter ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{node => PeerID, voter_status => voter},
noreply}}} |
Effects]
end.

update_voter_status({nonvoter, #{target := Target}}, MI)
when MI >= Target ->
voter;
update_voter_status(Permanent, _) ->
Permanent.

-spec voter_status(ra_server_state()) -> ra_voter_status().
voter_status(#{cluster := Cluster} = State) ->
case maps:get(voter_status, State, undefined) of
undefined ->
voter_status(id(State), Cluster);
Voter ->
Voter
end.

-spec voter_status(ra_server_id(), ra_cluster()) -> ra_voter_status().
voter_status(PeerId, Cluster) ->
case maps:get(PeerId, Cluster, undefined) of
undefined ->
undefined;
Peer ->
maps:get(voter_status, Peer, voter)
end.

-spec required_quorum(ra_cluster()) -> pos_integer().
required_quorum(Cluster) ->
Voters = count_voters(Cluster),
trunc(Voters / 2) + 1.

count_voters(Cluster) ->
maps:fold(
fun (_, #{voter_status := {nonvoter, _}}, Count) ->
Count;
(_, _, Count) ->
Count + 1
end,
0, Cluster).

%%% ===================
%%% Internal unit tests
%%% ===================
Expand Down
2 changes: 2 additions & 0 deletions src/ra_server.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
-define(DEFAULT_SNAPSHOT_CHUNK_SIZE, 1000000). % 1MB
-define(DEFAULT_RECEIVE_SNAPSHOT_TIMEOUT, 30000).
-define(FLUSH_COMMANDS_SIZE, 16).
-define(MAX_NONVOTER_ROUNDS, 4).

-record(cfg,
{id :: ra_server_id(),
uid :: ra_uid(),
log_id :: unicode:chardata(),
metrics_key :: term(),
tick_timeout :: non_neg_integer(),
machine :: ra_machine:machine(),
machine_version :: ra_machine:version(),
machine_versions :: [{ra_index(), ra_machine:version()}, ...],
Expand Down
Loading

0 comments on commit 1073dd4

Please sign in to comment.