Skip to content

Commit

Permalink
Revert "back to mvp"
Browse files Browse the repository at this point in the history
This reverts commit d7e3030.
  • Loading branch information
Alex Valiushko committed May 25, 2023
1 parent ac84b6f commit 9936265
Show file tree
Hide file tree
Showing 6 changed files with 342 additions and 117 deletions.
9 changes: 9 additions & 0 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
%% membership changes
add_member/2,
add_member/3,
maybe_add_member/2,
maybe_add_member/3,
remove_member/2,
remove_member/3,
leave_and_terminate/3,
Expand Down Expand Up @@ -579,6 +581,13 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).

maybe_add_member(ServerLoc, ServerId) ->
maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT).
maybe_add_member(ServerLoc, ServerId, Timeout) ->
ra_server_proc:command(ServerLoc,
{'$ra_maybe_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
Expand Down
11 changes: 7 additions & 4 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@
suspended |
disconnected.

% Leader doesn't count for majority purposes peers that
% have matching_index lower than the predicate.
-type ra_voter() :: {matching, ra_index()}.
-type ra_voter() :: yes | {no, ra_nonvoter_status()}.

-type ra_nonvoter_status() :: permanent |
#{round := non_neg_integer(),
target := ra_index(),
ts := integer()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether peer is part of the consensus
%% whether the peer is part of the consensus
voter := ra_voter(),
%% indicates that a snapshot is being sent
%% to the peer
Expand Down
117 changes: 52 additions & 65 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
| recovered | stop | receive_snapshot.

-type command_type() :: '$usr' | '$ra_join' | '$ra_leave' |
'$ra_maybe_join' |
'$ra_cluster_change' | '$ra_cluster'.

-type command_meta() :: #{from => from(),
Expand Down Expand Up @@ -395,11 +396,15 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
{State2, Effects0} = evaluate_quorum(State1, []),
Effects00 = ra_voter:maybe_promote(PeerId, State1, []),

{State2, Effects0} = evaluate_quorum(State1, Effects00),

{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),

Effects = [{next_event, info, pipeline_rpcs} | Effects1],

case State of
#{cluster := #{Id := _}} ->
% leader is in the cluster
Expand Down Expand Up @@ -777,7 +782,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
NewVotes = Votes + 1,
?DEBUG("~s: vote granted for term ~b votes ~b",
[LogId, Term, NewVotes]),
case need_acks(Nodes) of
case trunc(maps:size(Nodes) / 2) + 1 of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
Expand Down Expand Up @@ -923,7 +928,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
[LogId, Token, Term, Votes + 1]),
NewVotes = Votes + 1,
State = update_term(Term, State0),
case need_acks(Nodes) of
case trunc(maps:size(Nodes) / 2) + 1 of
NewVotes ->
call_for_election(candidate, State);
_ ->
Expand Down Expand Up @@ -1104,6 +1109,10 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
Expand Down Expand Up @@ -1203,6 +1212,11 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored election_timeout, non voter: ~p",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -1370,6 +1384,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
last_applied,
cluster,
leader_id,
voter,
voted_for,
cluster_change_permitted,
cluster_index_term,
Expand Down Expand Up @@ -2029,13 +2044,11 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
last_log_term = LLTerm},
#{cfg := #cfg{machine_version = OurMacVer,
effective_machine_version = EffMacVer},
current_term := CurTerm,
cluster := Cluster} = State0)
current_term := CurTerm} = State0)
when Term >= CurTerm ->
State = update_term(Term, State0),
LastIdxTerm = last_idx_term(State),
case is_voter(Cand, Cluster) andalso
is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of
case is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of
true when Version > ?RA_PROTO_VERSION->
?DEBUG("~s: declining pre-vote for ~w for protocol version ~b",
[log_id(State0), Cand, Version]),
Expand All @@ -2060,10 +2073,8 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
false ->
?DEBUG("~s: declining pre-vote for ~w for term ~b,"
" candidate last log index term was: ~w~n"
"Last log entry idxterm seen was: ~w~n"
"Voter status was: ~w~n",
[log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm,
get_voter_status(Cand, Cluster)]),
"Last log entry idxterm seen was: ~w",
[log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm]),
case FsmState of
follower ->
{FsmState, State, [start_election_timeout]};
Expand Down Expand Up @@ -2092,15 +2103,16 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
voter => yes,
status => normal}.

new_peer_with(Map) ->
maps:merge(new_peer(), Map).

new_matching_peer(#{commit_index := CI} = _State) ->
new_peer_with(#{
voter => {matching, CI}
}).
already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.

peers(#{cfg := #cfg{id = Id}, cluster := Peers}) ->
maps:remove(Id, Peers).
Expand Down Expand Up @@ -2328,6 +2340,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter => ra_voter:status(NewCluster, id(State0)),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2458,17 +2471,28 @@ add_reply(_, _, _, % From, Reply, Mode
append_log_leader({CmdTag, _, _, _},
State = #{cluster_change_permitted := false})
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_maybe_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := #{voter := yes}} ->
already_member(State);
#{JoiningNode := #{voter := {no, _}} = Peer} ->
Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := _} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
already_member(State);
_ ->
Cluster = OldCluster#{JoiningNode => new_matching_peer(State)},
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
Expand Down Expand Up @@ -2511,6 +2535,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter => ra_voter:status(Cluster, id(State)),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2587,11 +2612,10 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{query_index := Idx} = Peer, Acc) ->
case is_voter(Peer) of
true -> [Idx | Acc];
false -> Acc
end
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
end, [QueryIndex], Cluster).

match_indexes(#{cfg := #cfg{id = Id},
Expand All @@ -2600,49 +2624,12 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{match_index := Idx} = Peer, Acc) ->
case is_voter(Peer) of
true -> [Idx | Acc];
false -> Acc
end
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
end, [LWIdx], Cluster).

get_voter_status(Id, Cluster) ->
case maps:get(Id, Cluster) of
undefined -> undefined;
Peer -> get_voter_status(Peer)
end.

get_voter_status(#{voter := Status}) ->
Status;
get_voter_status(_) ->
% Implicit 'yes' for initial cluster members, to differentiate from 'undefined' above.
yes.


is_voter(Id, Cluster) ->
case maps:get(Id, Cluster) of
undefined -> false;
Peer -> is_voter(Peer)
end.

is_voter(#{voter := {matching, Target}, match_index := MI})
when MI >= Target ->
true;
is_voter(#{voter := {matching, _}}) ->
false;
is_voter(_Peer) ->
true.

need_acks(Cluster) ->
NumVoters = maps:fold(fun(_, Peer, Count) ->
case is_voter(Peer) of
true -> Count + 1;
false -> Count
end
end, 0, Cluster),
trunc(NumVoters / 2) + 1.

-spec agreed_commit(list()) -> ra_index().
agreed_commit(Indexes) ->
SortedIdxs = lists:sort(fun erlang:'>'/2, Indexes),
Expand Down
61 changes: 61 additions & 0 deletions src/ra_voter.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%%
%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved.
%%
-module(ra_voter).

-export([
new_nonvoter/1,
status/1,
status/2,
maybe_promote/3
]).

-define(DEFAULT_MAX_ROUNDS, 4).

new_nonvoter(State) ->
Target = maps:get(commit_index, State),
{no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}.

maybe_promote(PeerID,
#{commit_index := CI, cluster := Cluster} = _State,
Effects) ->
#{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster,
case evaluate_voter(OldStatus, MI, CI) of
OldStatus ->
Effects;
Change ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
PeerID,
noreply}}} |
Effects]
end.

evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI)
when MI >= Target ->
AtenPollInt = application:get_env(aten, poll_interval, 1000),
Now = os:system_time(millisecond),
case (Now - RoundStart) =< AtenPollInt of
true ->
yes;
false when Round > ?DEFAULT_MAX_ROUNDS ->
{no, permanent};
false ->
{no, #{round => Round+1, target => CI, ts => Now}}
end;
evaluate_voter(Permanent, _, _) ->
Permanent.

status(#{cluster := Cluster} = State) ->
Id = ra_server:id(State),
status(Cluster, Id).

status(Cluster, PeerId) ->
case maps:get(PeerId, Cluster, undefined) of
undefined ->
throw(not_a_cluster_member);
Peer ->
maps:get(voter, Peer, yes)
end.
Loading

0 comments on commit 9936265

Please sign in to comment.