Skip to content

Commit

Permalink
improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 14, 2023
1 parent afbb4d2 commit f069095
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 39 deletions.
4 changes: 4 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@
-type ra_server_id() :: {Name :: atom(), Node :: node()}.

%% Specifies server configuration for a new cluster member.
%% Both `ra:add_member` and `ra:start_server` must be called with the same value.
-type ra_new_server() :: #{id := ra_server_id(),

%% If set, server will start as non-voter until later promoted by the
%% leader.
init_non_voter => ra_nvid()}.

-type ra_peer_status() :: normal |
Expand Down
83 changes: 44 additions & 39 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ init(#{id := Id,
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
Clu = make_cluster(Id, InitialNodes,
#{voter_status => new_voter_status(Config)}),
#{voter_status => init_voter_status(Config)}),
{0, Clu,
0, InitialMachineState, {0, 0}};
{#{index := Idx,
Expand All @@ -310,7 +310,7 @@ init(#{id := Id,
machine_version := MacVersion} = Snapshot, MacSt} ->
ClusterNodes = maps:get(cluster_state, Snapshot, ClusterNodes0),
Clu = make_cluster(Id, ClusterNodes,
#{voter_status => new_voter_status(Config)}),
#{voter_status => init_voter_status(Config)}),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
Expand Down Expand Up @@ -408,7 +408,7 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true,
Peer = Peer0#{match_index => max(MI, LastIdx),
next_index => max(NI, NextIdx)},
State1 = put_peer(PeerId, Peer, State0),
Effects00 = maybe_promote_voter(PeerId, State1, []),
Effects00 = maybe_promote_peer(PeerId, State1, []),
{State2, Effects0} = evaluate_quorum(State1, Effects00),
{State, Effects1} = process_pending_consistent_queries(State2,
Effects0),
Expand Down Expand Up @@ -1299,7 +1299,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => make_cluster(Id, ClusterIds,
#{voter_status => new_voter_status(State0)}),
#{voter_status => init_voter_status(State0)}),
machine_state => MacState}),
%% it was the last snapshot chunk so we can revert back to
%% follower status
Expand Down Expand Up @@ -2368,7 +2368,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
voter_status => current_voter_status(NewCluster, State0),
voter_status => maybe_promote_self(NewCluster, State0),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2505,7 +2505,7 @@ append_log_leader({'$ra_join', From,
#{id := JoiningNode, voter_status := Voter0},
ReplyMode},
State = #{cluster := OldCluster}) ->
case normalize_voter_status(Voter0, State) of
case ensure_promotion_target(Voter0, State) of
{error, Reason} ->
{not_appended, Reason, State};
{ok, Voter} ->
Expand All @@ -2525,7 +2525,7 @@ append_log_leader({'$ra_join', From,
append_log_leader({'$ra_join', From, #{id := JoiningNode} = Spec, ReplyMode},
State) ->
append_log_leader({'$ra_join', From,
#{id => JoiningNode, voter_status => new_voter_status(Spec)},
#{id => JoiningNode, voter_status => init_voter_status(Spec)},
ReplyMode}, State);
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
Expand Down Expand Up @@ -2576,7 +2576,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
voter_status => current_voter_status(Cluster, State),
voter_status => maybe_promote_self(Cluster, State),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2900,43 +2900,23 @@ already_member(State) ->
%%% Voter status helpers
%%% ====================

normalize_voter_status({voter, Reason}, _) ->
-spec ensure_promotion_target(ra_voter_status(), ra_server_state()) ->
{ok, ra_voter_status()} | {error, term()}.
ensure_promotion_target({voter, Reason}, _) ->
{ok, {voter, Reason}};
normalize_voter_status({nonvoter, #{target := _, nvid := _} = Reason}, _) ->
ensure_promotion_target({nonvoter, #{target := _, nvid := _} = Reason}, _) ->
{ok, {nonvoter, Reason}};
normalize_voter_status({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) ->
ensure_promotion_target({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) ->
{ok, {nonvoter, Reason#{target => CI}}};
normalize_voter_status(_, _) ->
ensure_promotion_target(_, _) ->
{error, missing_nvid}.

new_voter_status(#{init_non_voter := NVId}) ->
-spec init_voter_status(ra_server_config() | ra_new_server()) -> ra_voter_status().
init_voter_status(#{init_non_voter := NVId}) ->
{nonvoter, #{nvid => NVId}};
new_voter_status(_) ->
init_voter_status(_) ->
{voter, #{}}.

-spec maybe_promote_voter(ra_server_id(), ra_server_state(), effects()) -> effects().
maybe_promote_voter(PeerID, #{cluster := Cluster} = _State, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI,
voter_status := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
NewStatus ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{id => PeerID, voter_status => NewStatus},
noreply}}} |
Effects]
end.

update_voter_status({nonvoter, #{target := Target} = State}, MI)
when MI >= Target ->
{voter, State};
update_voter_status(Permanent, _) ->
Permanent.

-spec voter_status(ra_server_state()) -> ra_voter_status().
voter_status(#{cluster := Cluster} = State) ->
case maps:get(voter_status, State, undefined) of
Expand All @@ -2955,15 +2935,40 @@ voter_status(PeerId, Cluster) ->
maps:get(voter_status, Peer, {voter, #{}})
end.

current_voter_status(Cluster, State) ->
{_, NReason} = New = voter_status(id(State), Cluster),
-spec maybe_promote_self(ra_cluster(), ra_server_state()) -> ra_voter_status().
maybe_promote_self(NewCluster, State) ->
%% A bit complicated procedure since nvid is not guaranteed.
{_, NReason} = New = voter_status(id(State), NewCluster),
{_, CReason} = Current = voter_status(State),
Self = maps:get(nvid, CReason, undefined),
case maps:get(nvid, NReason, undefined) of
Self -> New;
_ -> Current
end.

-spec maybe_promote_peer(ra_server_id(), ra_server_state(), effects()) -> effects().
maybe_promote_peer(PeerID, #{cluster := Cluster}, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI,
voter_status := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
NewStatus ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{id => PeerID, voter_status => NewStatus},
noreply}}} |
Effects]
end.

update_voter_status({nonvoter, #{target := Target} = Reason}, MI)
when MI >= Target ->
{voter, Reason};
update_voter_status(Permanent, _) ->
Permanent.

-spec required_quorum(ra_cluster()) -> pos_integer().
required_quorum(Cluster) ->
Voters = count_voters(Cluster),
Expand Down

0 comments on commit f069095

Please sign in to comment.