Skip to content

Commit

Permalink
extract code into new module
Browse files Browse the repository at this point in the history
follower ignores pre_vote and request_vote
  • Loading branch information
Alex Valiushko committed May 14, 2023
1 parent ee12cc5 commit 453965a
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 69 deletions.
22 changes: 6 additions & 16 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
-type ra_index() :: non_neg_integer().
%% Section 5.3.
-type ra_term() :: non_neg_integer().
%% Section 4.2.1
-type ra_replication_round() :: non_neg_integer().

%% tuple form of index and term
-type ra_idxterm() :: {ra_index(), ra_term()}.
Expand All @@ -46,20 +44,12 @@
suspended |
disconnected.

%% A peer can be one of:
%%
%% - Voter, standard quorum member.
%% - Nonvoter, node does not participate in elections or consensus voting.
%% - Staging, node is a temporary nonvoter, and will be automatically promoted
%% if it proves to be fast enough to stay up to dat with teh leader.
-type ra_voter() :: yes | no | {maybe, staging_status()}.

%% For staging nodes we measure current round, target index and the timestamp of its start.
%% If the node reaches target index and the ∂T is less than the election timeout, the node is
%% considered eligible to become a voter.
-type staging_status() :: #{round := ra_replication_round(),
target := ra_index(),
ts := integer()}.
-type ra_voter() :: yes | {no, ra_nonvoter_status()}.

-type ra_nonvoter_status() :: permanent |
#{round := non_neg_integer(),
target := ra_index(),
ts := integer()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
Expand Down
22 changes: 15 additions & 7 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
query_index := non_neg_integer(),
queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}),
pending_consistent_queries := [consistent_query_ref()],
voter => 'maybe'(ra_voter()),
commit_latency => 'maybe'(non_neg_integer())
}.

Expand Down Expand Up @@ -1105,8 +1106,14 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{}, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{}, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
cfg := #cfg{log_id = LogId}} = State)
Expand Down Expand Up @@ -1204,6 +1211,9 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout, #{voter := {no, _}} = State) ->
%% ignore elections, non-voter
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -2095,10 +2105,6 @@ new_peer() ->
new_peer_with(Map) ->
maps:merge(new_peer(), Map).

new_staging_status(State) ->
TargetIdx = maps:get(commit_index, State),
#{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}.

already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
Expand Down Expand Up @@ -2330,6 +2336,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter => ra_voter:peer_status(id(State0), NewCluster),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2468,7 +2475,7 @@ append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
case OldCluster of
#{JoiningNode := #{voter := yes}} ->
already_member(State);
#{JoiningNode := #{voter := {maybe, _}} = Peer} ->
#{JoiningNode := #{voter := {no, _}} = Peer} ->
Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
Expand All @@ -2481,8 +2488,7 @@ append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
#{JoiningNode := _} ->
already_member(State);
_ ->
Round0 = new_staging_status(State),
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => {maybe, Round0}})},
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
Expand Down Expand Up @@ -2514,6 +2520,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
case Cmd of
{'$ra_cluster_change', _, Cluster, _} ->
State#{cluster => Cluster,
voter => ra_voter:peer_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
_ ->
% revert back to previous cluster
Expand All @@ -2525,6 +2532,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter => ra_voter:peer_status(id(State), Cluster),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down
28 changes: 28 additions & 0 deletions src/ra_voter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%%
%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(ra_voter).

-export([
new_nonvoter/1,
status/1,
peer_status/2
]).

new_nonvoter(State) ->
TargetIdx = maps:get(commit_index, State),
{no, #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}}.

status(State) ->
case maps:get(voter, State) of
undefined ->
MyId = ra_server:id(State),
#{cluster := Cluster} = State,
peer_status(MyId, Cluster);
Voter -> Voter
end.

peer_status(PeerId, Cluster) ->
Peer = maps:get(PeerId, Cluster, undefined),
maps:get(voter, Peer, yes).
46 changes: 0 additions & 46 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ all() ->
follower_machine_version,
follower_install_snapshot_machine_version,
leader_server_join,
leader_server_maybe_join,
leader_server_leave,
leader_is_removed,
follower_cluster_change,
Expand Down Expand Up @@ -1334,47 +1333,6 @@ leader_server_join(_Config) ->
| _] = Effects,
ok.

leader_server_maybe_join(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
N2 => new_peer_with(#{next_index => 4, match_index => 3}),
N3 => new_peer_with(#{next_index => 4, match_index => 3})},
State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster},
Round0 = new_staging_status(State0),
% raft servers should switch to the new configuration after log append
% and further cluster changes should be disallowed
{leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _},
cluster_change_permitted := false} = _State1, Effects} =
ra_server:handle_leader({command, {'$ra_maybe_join', meta(),
N4, await_consensus}}, State0),
[
{send_rpc, N4,
#append_entries_rpc{entries =
[_, _, _, {4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _,
N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}]}},
{send_rpc, N3,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}},
{send_rpc, N2,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}}
| _] = Effects,
ok.

leader_server_leave(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
Expand Down Expand Up @@ -2641,10 +2599,6 @@ new_peer() ->
new_peer_with(Map) ->
maps:merge(new_peer(), Map).

new_staging_status(State) ->
TargetIdx = maps:get(commit_index, State),
#{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}.

snap_meta(Idx, Term) ->
snap_meta(Idx, Term, []).

Expand Down
Loading

0 comments on commit 453965a

Please sign in to comment.