Skip to content

Commit

Permalink
back to mvp
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed May 18, 2023
1 parent 9a14f89 commit d7e3030
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 342 deletions.
9 changes: 0 additions & 9 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@
%% membership changes
add_member/2,
add_member/3,
maybe_add_member/2,
maybe_add_member/3,
remove_member/2,
remove_member/3,
leave_and_terminate/3,
Expand Down Expand Up @@ -581,13 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).

maybe_add_member(ServerLoc, ServerId) ->
maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT).
maybe_add_member(ServerLoc, ServerId, Timeout) ->
ra_server_proc:command(ServerLoc,
{'$ra_maybe_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
Expand Down
11 changes: 4 additions & 7 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,17 @@
suspended |
disconnected.

-type ra_voter() :: yes | {no, ra_nonvoter_status()}.

-type ra_nonvoter_status() :: permanent |
#{round := non_neg_integer(),
target := ra_index(),
ts := integer()}.
% Leader doesn't count for majority purposes peers that
% have matching_index lower than the predicate.
-type ra_voter() :: {matching, ra_index()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
%% whether peer is part of the consensus
voter := ra_voter(),
%% indicates that a snapshot is being sent
%% to the peer
Expand Down
117 changes: 65 additions & 52 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
| recovered | stop | receive_snapshot.

-type command_type() :: '$usr' | '$ra_join' | '$ra_leave' |
'$ra_maybe_join' |
'$ra_cluster_change' | '$ra_cluster'.

-type command_meta() :: #{from => from(),
Expand Down Expand Up @@ -396,15 +395,11 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
Effects00 = ra_voter:maybe_promote(PeerId, State1, []),

{State2, Effects0} = evaluate_quorum(State1, Effects00),
{State2, Effects0} = evaluate_quorum(State1, []),

{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),

Effects = [{next_event, info, pipeline_rpcs} | Effects1],

case State of
#{cluster := #{Id := _}} ->
% leader is in the cluster
Expand Down Expand Up @@ -782,7 +777,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true},
NewVotes = Votes + 1,
?DEBUG("~s: vote granted for term ~b votes ~b",
[LogId, Term, NewVotes]),
case trunc(maps:size(Nodes) / 2) + 1 of
case need_acks(Nodes) of
NewVotes ->
{State1, Effects} = make_all_rpcs(initialise_peers(State0)),
Noop = {noop, #{ts => erlang:system_time(millisecond)},
Expand Down Expand Up @@ -928,7 +923,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true,
[LogId, Token, Term, Votes + 1]),
NewVotes = Votes + 1,
State = update_term(Term, State0),
case trunc(maps:size(Nodes) / 2) + 1 of
case need_acks(Nodes) of
NewVotes ->
call_for_election(candidate, State);
_ ->
Expand Down Expand Up @@ -1109,10 +1104,6 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
% simply forward all other events to ra_log
{Log, Effects} = ra_log:handle_event(Evt, Log0),
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
Expand Down Expand Up @@ -1212,11 +1203,6 @@ handle_follower(#append_entries_reply{}, State) ->
%% handle to avoid logging as unhandled
%% could receive a lot of these shortly after standing down as leader
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) ->
?WARN("~w: follower ignored election_timeout, non voter: ~p",
[LogId, Voter]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
handle_follower(try_become_leader, State) ->
Expand Down Expand Up @@ -1384,7 +1370,6 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
last_applied,
cluster,
leader_id,
voter,
voted_for,
cluster_change_permitted,
cluster_index_term,
Expand Down Expand Up @@ -2044,11 +2029,13 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
last_log_term = LLTerm},
#{cfg := #cfg{machine_version = OurMacVer,
effective_machine_version = EffMacVer},
current_term := CurTerm} = State0)
current_term := CurTerm,
cluster := Cluster} = State0)
when Term >= CurTerm ->
State = update_term(Term, State0),
LastIdxTerm = last_idx_term(State),
case is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of
case is_voter(Cand, Cluster) andalso
is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of
true when Version > ?RA_PROTO_VERSION->
?DEBUG("~s: declining pre-vote for ~w for protocol version ~b",
[log_id(State0), Cand, Version]),
Expand All @@ -2073,8 +2060,10 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand,
false ->
?DEBUG("~s: declining pre-vote for ~w for term ~b,"
" candidate last log index term was: ~w~n"
"Last log entry idxterm seen was: ~w",
[log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm]),
"Last log entry idxterm seen was: ~w~n"
"Voter status was: ~w~n",
[log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm,
get_voter_status(Cand, Cluster)]),
case FsmState of
follower ->
{FsmState, State, [start_election_timeout]};
Expand Down Expand Up @@ -2103,16 +2092,15 @@ new_peer() ->
match_index => 0,
commit_index_sent => 0,
query_index => 0,
voter => yes,
status => normal}.

new_peer_with(Map) ->
maps:merge(new_peer(), Map).

already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.
new_matching_peer(#{commit_index := CI} = _State) ->
new_peer_with(#{
voter => {matching, CI}
}).

peers(#{cfg := #cfg{id = Id}, cluster := Peers}) ->
maps:remove(Id, Peers).
Expand Down Expand Up @@ -2340,7 +2328,6 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter => ra_voter:status(NewCluster, id(State0)),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2471,28 +2458,17 @@ add_reply(_, _, _, % From, Reply, Mode
append_log_leader({CmdTag, _, _, _},
State = #{cluster_change_permitted := false})
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_maybe_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := #{voter := yes}} ->
already_member(State);
#{JoiningNode := #{voter := {no, _}} = Peer} ->
Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := _} ->
already_member(State);
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
_ ->
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})},
Cluster = OldCluster#{JoiningNode => new_matching_peer(State)},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
Expand Down Expand Up @@ -2535,7 +2511,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter => ra_voter:status(Cluster, id(State)),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2612,10 +2587,11 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
(_K, #{query_index := Idx} = Peer, Acc) ->
case is_voter(Peer) of
true -> [Idx | Acc];
false -> Acc
end
end, [QueryIndex], Cluster).

match_indexes(#{cfg := #cfg{id = Id},
Expand All @@ -2624,12 +2600,49 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter := {no, _}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
(_K, #{match_index := Idx} = Peer, Acc) ->
case is_voter(Peer) of
true -> [Idx | Acc];
false -> Acc
end
end, [LWIdx], Cluster).

get_voter_status(Id, Cluster) ->
case maps:get(Id, Cluster) of
undefined -> undefined;
Peer -> get_voter_status(Peer)
end.

get_voter_status(#{voter := Status}) ->
Status;
get_voter_status(_) ->
% Implicit 'yes' for initial cluster members, to differentiate from 'undefined' above.
yes.


is_voter(Id, Cluster) ->
case maps:get(Id, Cluster) of
undefined -> false;
Peer -> is_voter(Peer)
end.

is_voter(#{voter := {matching, Target}, match_index := MI})
when MI >= Target ->
true;
is_voter(#{voter := {matching, _}}) ->
false;
is_voter(_Peer) ->
true.

need_acks(Cluster) ->
NumVoters = maps:fold(fun(_, Peer, Count) ->
case is_voter(Peer) of
true -> Count + 1;
false -> Count
end
end, 0, Cluster),
trunc(NumVoters / 2) + 1.

-spec agreed_commit(list()) -> ra_index().
agreed_commit(Indexes) ->
SortedIdxs = lists:sort(fun erlang:'>'/2, Indexes),
Expand Down
61 changes: 0 additions & 61 deletions src/ra_voter.erl

This file was deleted.

Loading

0 comments on commit d7e3030

Please sign in to comment.