Skip to content

Commit

Permalink
promote voters after sync
Browse files Browse the repository at this point in the history
tick through replication rounds

inject cluster_change effect
  • Loading branch information
Alex Valiushko committed May 14, 2023
1 parent 453965a commit 9a14f89
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 258 deletions.
9 changes: 9 additions & 0 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
%% membership changes
add_member/2,
add_member/3,
maybe_add_member/2,
maybe_add_member/3,
remove_member/2,
remove_member/3,
leave_and_terminate/3,
Expand Down Expand Up @@ -579,6 +581,13 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).

maybe_add_member(ServerLoc, ServerId) ->
maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT).
maybe_add_member(ServerLoc, ServerId, Timeout) ->
ra_server_proc:command(ServerLoc,
{'$ra_maybe_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
Expand Down
6 changes: 3 additions & 3 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter := ra_voter(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status(),
%% whether the peer is part of the consensus
voter := ra_voter()}.
status := ra_peer_status()}.

-type ra_cluster() :: #{ra_server_id() => ra_peer_state()}.

Expand Down
37 changes: 22 additions & 15 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@
query_index := non_neg_integer(),
queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}),
pending_consistent_queries := [consistent_query_ref()],
voter => 'maybe'(ra_voter()),
commit_latency => 'maybe'(non_neg_integer())
}.

Expand Down Expand Up @@ -397,11 +396,15 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
{State2, Effects0} = evaluate_quorum(State1, []),
Effects00 = ra_voter:maybe_promote(PeerId, State1, []),

{State2, Effects0} = evaluate_quorum(State1, Effects00),

{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),

Effects = [{next_event, info, pipeline_rpcs} | Effects1],

case State of
#{cluster := #{Id := _}} ->
% leader is in the cluster
Expand Down Expand Up @@ -1106,14 +1109,12 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{}, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{}, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
cfg := #cfg{log_id = LogId}} = State)
Expand Down Expand Up @@ -1211,8 +1212,10 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored election_timeout, non voter: ~p",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
Expand Down Expand Up @@ -1381,6 +1384,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
last_applied,
cluster,
leader_id,
voter,
voted_for,
cluster_change_permitted,
cluster_index_term,
Expand Down Expand Up @@ -2099,8 +2103,8 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
status => normal,
voter => yes}.
voter => yes,
status => normal}.

new_peer_with(Map) ->
maps:merge(new_peer(), Map).
Expand Down Expand Up @@ -2336,7 +2340,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter => ra_voter:peer_status(id(State0), NewCluster),
voter => ra_voter:status(NewCluster, id(State0)),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2488,7 +2492,7 @@ append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
#{JoiningNode := _} ->
already_member(State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
Expand Down Expand Up @@ -2520,7 +2524,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
case Cmd of
{'$ra_cluster_change', _, Cluster, _} ->
State#{cluster => Cluster,
voter => ra_voter:peer_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
_ ->
% revert back to previous cluster
Expand All @@ -2532,7 +2535,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter => ra_voter:peer_status(id(State), Cluster),
voter => ra_voter:status(Cluster, id(State)),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2609,6 +2612,8 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
end, [QueryIndex], Cluster).
Expand All @@ -2619,6 +2624,8 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
end, [LWIdx], Cluster).
Expand Down
59 changes: 46 additions & 13 deletions src/ra_voter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,55 @@
-export([
new_nonvoter/1,
status/1,
peer_status/2
status/2,
maybe_promote/3
]).

-define(DEFAULT_MAX_ROUNDS, 4).

new_nonvoter(State) ->
TargetIdx = maps:get(commit_index, State),
{no, #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}}.
Target = maps:get(commit_index, State),
{no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}.

status(State) ->
case maps:get(voter, State) of
undefined ->
MyId = ra_server:id(State),
#{cluster := Cluster} = State,
peer_status(MyId, Cluster);
Voter -> Voter
maybe_promote(PeerID,
#{commit_index := CI, cluster := Cluster} = _State,
Effects) ->
#{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster,
case evaluate_voter(OldStatus, MI, CI) of
OldStatus ->
Effects;
Change ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
PeerID,
noreply}}} |
Effects]
end.

peer_status(PeerId, Cluster) ->
Peer = maps:get(PeerId, Cluster, undefined),
maps:get(voter, Peer, yes).
evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI)
when MI >= Target ->
AtenPollInt = application:get_env(aten, poll_interval, 1000),
Now = os:system_time(millisecond),
case (Now - RoundStart) =< AtenPollInt of
true ->
yes;
false when Round > ?DEFAULT_MAX_ROUNDS ->
{no, permanent};
false ->
{no, #{round => Round+1, target => CI, ts => Now}}
end;
evaluate_voter(Permanent, _, _) ->
Permanent.

status(#{cluster := Cluster} = State) ->
Id = ra_server:id(State),
status(Cluster, Id).

status(Cluster, PeerId) ->
case maps:get(PeerId, Cluster, undefined) of
undefined ->
throw(not_a_cluster_member);
Peer ->
maps:get(voter, Peer, yes)
end.
53 changes: 51 additions & 2 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ all() ->
follower_machine_version,
follower_install_snapshot_machine_version,
leader_server_join,
leader_server_maybe_join,
leader_server_leave,
leader_is_removed,
follower_cluster_change,
Expand Down Expand Up @@ -1333,6 +1334,54 @@ leader_server_join(_Config) ->
| _] = Effects,
ok.

leader_server_maybe_join(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
N2 => new_peer_with(#{next_index => 4, match_index => 3}),
N3 => new_peer_with(#{next_index => 4, match_index => 3})},
State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster},
% raft servers should switch to the new configuration after log append
% and further cluster changes should be disallowed
{leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _},
cluster_change_permitted := false} = _State1, Effects} =
ra_server:handle_leader({command, {'$ra_maybe_join', meta(),
N4, await_consensus}}, State0),
% new member should join as non-voter
{no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0),
[
{send_rpc, N4,
#append_entries_rpc{entries =
[_, _, _, {4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _,
N3 := _, N4 := #{voter := {no, #{round := Round,
target := Target,
ts := _}}}},
await_consensus}}]}},
{send_rpc, N3,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round,
target := Target,
ts := _}}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}},
{send_rpc, N2,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round,
target := Target,
ts := _}}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}}
| _] = Effects,
ok.

leader_server_leave(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
Expand Down Expand Up @@ -2593,8 +2642,8 @@ new_peer() ->
match_index => 0,
query_index => 0,
commit_index_sent => 0,
status => normal,
voter => yes}.
voter => yes,
status => normal}.

new_peer_with(Map) ->
maps:merge(new_peer(), Map).
Expand Down
Loading

0 comments on commit 9a14f89

Please sign in to comment.