Skip to content

Commit

Permalink
replace bool membership with enum
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 21, 2023
1 parent 831be88 commit d70f253
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 124 deletions.
8 changes: 4 additions & 4 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1135,15 +1135,15 @@ key_metrics({Name, N} = ServerId) when N == node() ->
case whereis(Name) of
undefined ->
Counters#{state => noproc,
non_voter => unknown};
membership => unknown};
_ ->
case ets:lookup(ra_state, Name) of
[] ->
Counters#{state => unknown,
non_voter => unknown};
[{_, State, NonVoter}] ->
membership => unknown};
[{_, State, Membership}] ->
Counters#{state => State,
non_voter => NonVoter}
membership => Membership}
end
end;
key_metrics({_, N} = ServerId) ->
Expand Down
14 changes: 8 additions & 6 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,19 @@
%% Subset of ra_server:ra_server_config().
%% Both `ra:add_member` and `ra:start_server` must be called with the same values.
-type ra_new_server() :: #{id := ra_server_id(),

%% If set, server will start as non-voter until later promoted by the
%% leader.
non_voter => boolean(),
% Defaults to `voter` is absent.
membership => ra_membership(),
% Required for `promotable` in the above.
uid => ra_uid()}.

-type ra_peer_status() :: normal |
{sending_snapshot, pid()} |
suspended |
disconnected.

-type ra_voter_status() :: #{non_voter => boolean(),
-type ra_membership() :: voter | promotable | unknown.

-type ra_voter_status() :: #{membership => ra_membership(),
uid => ra_uid(),
target => ra_index()}.

Expand All @@ -64,7 +65,8 @@
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
%% Whether the peer is part of the consensus.
%% Defaults to "yes" if absent.
voter_status => ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
Expand Down
2 changes: 1 addition & 1 deletion src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ overview(System) when is_atom(System) ->
pid => Pid,
parent => Parent,
state => S,
non_voter => V,
membership => V,
cluster_name => ClusterName,
snapshot_state => maps:get(UId, Snaps,
undefined)}}
Expand Down
81 changes: 40 additions & 41 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
terminate/2,
log_fold/3,
log_read/2,
get_non_voter/1,
get_membership/1,
recover/1
]).

Expand All @@ -75,7 +75,7 @@
log := term(),
voted_for => 'maybe'(ra_server_id()), % persistent
votes => non_neg_integer(),
non_voter => boolean(),
membership => ra_membership(),
commit_index := ra_index(),
last_applied := ra_index(),
persisted_last_applied => ra_index(),
Expand Down Expand Up @@ -199,7 +199,7 @@
max_pipeline_count => non_neg_integer(),
ra_event_formatter => {module(), atom(), [term()]},
counter => counters:counters_ref(),
non_voter => boolean(),
membership => ra_membership(),
system_config => ra_system:config()}.

-type mutable_config() :: #{cluster_name => ra_cluster_name(),
Expand Down Expand Up @@ -333,8 +333,8 @@ init(#{id := Id,
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_APPLIED, SnapshotIdx),
put_counter(Cfg, ?C_RA_SVR_METRIC_TERM, CurrentTerm),

NonVoter = get_non_voter(Cluster0, Id, UId,
maps:get(non_voter, Config, false)),
NonVoter = get_membership(Cluster0, Id, UId,
maps:get(membership, Config, voter)),

#{cfg => Cfg,
current_term => CurrentTerm,
Expand All @@ -346,7 +346,7 @@ init(#{id := Id,
cluster_change_permitted => false,
cluster_index_term => SnapshotIndexTerm,
voted_for => VotedFor,
non_voter => NonVoter,
membership => NonVoter,
commit_index => CommitIndex,
%% set this to the first index so that we can apply all entries
%% up to the commit index during recovery
Expand Down Expand Up @@ -1121,15 +1121,15 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId},
voter_status := #{non_voter := true} = VoterStatus} = State) ->
voter_status := #{membership := promotable} = VoterStatus} = State) ->
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
[LogId, VoterStatus]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{},
#{cfg := #cfg{log_id = LogId},
voter_status := #{non_voter := true} = VoterStatus} = State) ->
voter_status := #{membership := promotable} = VoterStatus} = State) ->
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
[LogId, VoterStatus]),
{follower, State, []};
Expand Down Expand Up @@ -1231,7 +1231,7 @@ handle_follower(#append_entries_reply{}, State) ->
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId},
voter_status := #{non_voter := true} = VoterStatus} = State) ->
voter_status := #{membership := promotable} = VoterStatus} = State) ->
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
[LogId, VoterStatus]),
{follower, State, []};
Expand Down Expand Up @@ -1301,7 +1301,7 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => make_cluster(Id, ClusterIds),
non_voter => get_non_voter(ClusterIds, State0),
membership => get_membership(ClusterIds, State0),
machine_state => MacState}),
%% it was the last snapshot chunk so we can revert back to
%% follower status
Expand Down Expand Up @@ -1404,7 +1404,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg,
cluster,
leader_id,
voted_for,
non_voter,
membership,
cluster_change_permitted,
cluster_index_term,
query_index
Expand Down Expand Up @@ -2249,7 +2249,6 @@ make_cluster(Self, Nodes0) when is_list(Nodes0) ->
end, #{}, Nodes0),
append_self(Self, Nodes);
make_cluster(Self, Nodes0) when is_map(Nodes0) ->
ct:pal("MAKE CLUSTER ~p", [[Self, Nodes0]]),
Nodes = maps:map(fun(_, Peer0) ->
new_peer_with(Peer0)
end, Nodes0),
Expand Down Expand Up @@ -2367,7 +2366,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}},
[log_id(State0), maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
non_voter => get_non_voter(NewCluster, State0),
membership => get_membership(NewCluster, State0),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
_ ->
Expand Down Expand Up @@ -2525,7 +2524,7 @@ append_log_leader({'$ra_join', From, #{id := JoiningNode} = Config, ReplyMode},
State) ->
append_log_leader({'$ra_join', From,
#{id => JoiningNode,
voter_status => maps:with([non_voter, uid, target],
voter_status => maps:with([membership, uid, target],
Config)},
ReplyMode}, State);
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
Expand Down Expand Up @@ -2577,7 +2576,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry,
pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}},
State) ->
State#{cluster => Cluster,
non_voter => get_non_voter(Cluster, State),
membership => get_membership(Cluster, State),
cluster_index_term => {Idx, Term}};
pre_append_log_follower(_, State) ->
State.
Expand Down Expand Up @@ -2656,7 +2655,7 @@ query_indexes(#{cfg := #cfg{id = Id},
query_index := QueryIndex}) ->
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := #{non_voter := true}}, Acc) ->
(_K, #{voter_status := #{membership := promotable}}, Acc) ->
Acc;
(_K, #{query_index := Idx}, Acc) ->
[Idx | Acc]
Expand All @@ -2668,7 +2667,7 @@ match_indexes(#{cfg := #cfg{id = Id},
{LWIdx, _} = ra_log:last_written(Log),
maps:fold(fun (PeerId, _, Acc) when PeerId == Id ->
Acc;
(_K, #{voter_status := #{non_voter := true}}, Acc) ->
(_K, #{voter_status := #{membership := promotable}}, Acc) ->
Acc;
(_K, #{match_index := Idx}, Acc) ->
[Idx | Acc]
Expand Down Expand Up @@ -2903,61 +2902,61 @@ already_member(State) ->

-spec ensure_promotion_target(ra_voter_status(), ra_index()) ->
{ok, ra_voter_status()} | {error, term()}.
ensure_promotion_target(#{non_voter := true, target := _, uid := _} = Status,
ensure_promotion_target(#{membership := promotable, target := _, uid := _} = Status,
_) ->
{ok, Status};
ensure_promotion_target(#{non_voter := true, uid := _} = Status,
ensure_promotion_target(#{membership := promotable, uid := _} = Status,
#{log := Log}) ->
Target = ra_log:next_index(Log),
{ok, Status#{target => Target}};
ensure_promotion_target(#{non_voter := true}, _) ->
ensure_promotion_target(#{membership := promotable}, _) ->
{error, missing_uid};
ensure_promotion_target(Voter, _) ->
{ok, Voter}.

%% Get non_voter of a given Id+UId from a (possibly new) cluster.
-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
%% Get membership of a given Id+UId from a (possibly new) cluster.
-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_server_id(), ra_uid(), boolean()) ->
boolean().
get_non_voter(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) ->
get_membership(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) ->
%% Legacy cluster snapshot does not retain voter_status.
Default;
get_non_voter(Cluster, PeerId, UId, Default) ->
get_membership(Cluster, PeerId, UId, Default) ->
case maps:get(PeerId, Cluster, undefined) of
#{voter_status := #{uid := UId} = VoterStatus} ->
maps:get(non_voter, VoterStatus, Default);
maps:get(membership, VoterStatus, Default);
_ ->
Default
end.

%% Get this node's non_voter from a (possibly new) cluster.
%% Get this node's membership from a (possibly new) cluster.
%% Defaults to last known-locally value.
-spec get_non_voter(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_state()) ->
boolean().
get_non_voter(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) ->
Default = maps:get(non_voter, State, false),
get_non_voter(Cluster, Id, UId, Default).
get_membership(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) ->
Default = maps:get(membership, State, voter),
get_membership(Cluster, Id, UId, Default).

%% Get this node's non_voter.
%% Get this node's membership.
%% Defaults to last known-locally value.
-spec get_non_voter(ra_state()) -> boolean().
get_non_voter(#{cfg := #cfg{id = Id, uid = UId}, cluster := Cluster} = State) ->
Default = maps:get(non_voter, State, false),
get_non_voter(Cluster, Id, UId, Default).
-spec get_membership(ra_state()) -> boolean().
get_membership(#{cfg := #cfg{id = Id, uid = UId}, cluster := Cluster} = State) ->
Default = maps:get(membership, State, voter),
get_membership(Cluster, Id, UId, Default).

-spec maybe_promote_peer(ra_server_id(), ra_server_state(), effects()) -> effects().
maybe_promote_peer(PeerID, #{cluster := Cluster}, Effects) ->
maybe_promote_peer(PeerId, #{cluster := Cluster}, Effects) ->
maybe
#{PeerID := #{match_index := MI,
#{PeerId := #{match_index := MI,
voter_status := VoterStatus}} ?= Cluster,
#{non_voter := true, target := Target} ?= VoterStatus,
#{membership := promotable, target := Target} ?= VoterStatus,
true ?= MI >= Target,
E = {next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{id => PeerID,
voter_status => VoterStatus#{non_voter => false}},
#{id => PeerId,
voter_status => VoterStatus#{membership => voter}},
noreply}}},
[E | Effects]
else
Expand All @@ -2971,7 +2970,7 @@ required_quorum(Cluster) ->

count_voters(Cluster) ->
maps:fold(
fun (_, #{voter_status := #{non_voter := true}}, Count) ->
fun (_, #{voter_status := #{membership := promotable}}, Count) ->
Count;
(_, _, Count) ->
Count + 1
Expand Down
40 changes: 23 additions & 17 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
%% @hidden
-module(ra_server_proc).

-feature(maybe_expr, enable).

-behaviour(gen_statem).

-compile({inline, [handle_raft_state/3]}).
Expand Down Expand Up @@ -273,7 +275,7 @@ init(Config) ->
do_init(#{id := Id,
cluster_name := ClusterName} = Config0) ->
Key = ra_lib:ra_server_id_to_local_name(Id),
true = ets:insert(ra_state, {Key, init, false}), %% can't vote while initializing
true = ets:insert(ra_state, {Key, init, unknown}),
process_flag(trap_exit, true),
Config = #{counter := Counter,
system_config := SysConf} = maps:merge(config_defaults(Id),
Expand Down Expand Up @@ -788,16 +790,16 @@ follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
server_state = SS0} = State0) ->
NonVoter0 = ra_server:get_non_voter(SS0),
Membership0 = ra_server:get_membership(SS0),
case handle_follower(Msg, State0) of
{follower, State1, Effects} ->
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = #state{server_state = SS} = follower_leader_change(State0, State2),
case ra_server:get_non_voter(SS) of
NonVoter0 ->
case ra_server:get_membership(SS) of
Membership0 ->
ok;
NonVoter ->
true = ets:update_element(ra_state, Name, {3, NonVoter})
Membership ->
true = ets:update_element(ra_state, Name, {3, Membership})
end,
{keep_state, State, Actions};
{pre_vote, State1, Effects} ->
Expand Down Expand Up @@ -1039,8 +1041,8 @@ format_status(Opt, [_PDict, StateName,
handle_enter(RaftState, OldRaftState,
#state{conf = #conf{name = Name},
server_state = ServerState0} = State) ->
NonVoter = ra_server:get_non_voter(ServerState0),
true = ets:insert(ra_state, {Name, RaftState, NonVoter}),
Membership = ra_server:get_membership(ServerState0),
true = ets:insert(ra_state, {Name, RaftState, Membership}),
{ServerState, Effects} = ra_server:handle_state_enter(RaftState,
ServerState0),
case RaftState == leader orelse OldRaftState == leader of
Expand Down Expand Up @@ -1522,13 +1524,17 @@ do_state_query(overview, State) ->
do_state_query(machine, #{machine_state := MacState}) ->
MacState;
do_state_query(voters, #{cluster := Cluster}) ->
Voters = maps:filter(fun(_, Peer) ->
case maps:get(voter_status, Peer, undefined) of
#{non_voter := true} -> false;
_ -> true
end
end, Cluster),
maps:keys(Voters);
Vs = maps:fold(fun(K, V, Acc) ->
case maps:get(voter_status, V, undefined) of
undefined -> [K|Acc];
S -> case maps:get(membership, S, undefined) of
undefined -> [K|Acc];
voter -> [K|Acc];
_ -> Acc
end
end
end, [], Cluster),
Vs;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(initial_members, #{log := Log}) ->
Expand Down Expand Up @@ -1738,9 +1744,9 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
NonVoter = ra_server:get_non_voter(ServerState),
Membership = ra_server:get_membership(ServerState),
case RaftState of
follower when not NonVoter ->
follower when Membership == voter ->
TargetNode == node();
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
Expand Down
Loading

0 comments on commit d70f253

Please sign in to comment.