Skip to content

Commit

Permalink
fix types
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 21, 2023
1 parent d70f253 commit 4ac1d50
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 40 deletions.
2 changes: 1 addition & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
suspended |
disconnected.

-type ra_membership() :: voter | promotable | unknown.
-type ra_membership() :: voter | promotable | non_voter | unknown.

-type ra_voter_status() :: #{membership => ra_membership(),
uid => ra_uid(),
Expand Down
28 changes: 15 additions & 13 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@
ra_event_formatter => {module(), atom(), [term()]},
counter => counters:counters_ref(),
membership => ra_membership(),
system_config => ra_system:config()}.
system_config => ra_system:config(),
has_changed => boolean()
}.

-type mutable_config() :: #{cluster_name => ra_cluster_name(),
metrics_key => term(),
Expand Down Expand Up @@ -1121,17 +1123,17 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) ->
{follower, State#{log => Log}, Effects};
handle_follower(#pre_vote_rpc{},
#{cfg := #cfg{log_id = LogId},
voter_status := #{membership := promotable} = VoterStatus} = State) ->
membership := Membership} = State) when Membership =/= voter ->
?DEBUG("~s: follower ignored pre_vote_rpc, non-voter: ~p0",
[LogId, VoterStatus]),
[LogId, Membership]),
{follower, State, []};
handle_follower(#pre_vote_rpc{} = PreVote, State) ->
process_pre_vote(follower, PreVote, State);
handle_follower(#request_vote_rpc{},
#{cfg := #cfg{log_id = LogId},
voter_status := #{membership := promotable} = VoterStatus} = State) ->
membership := Membership} = State) when Membership =/= voter ->
?DEBUG("~s: follower ignored request_vote_rpc, non-voter: ~p0",
[LogId, VoterStatus]),
[LogId, Membership]),
{follower, State, []};
handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term},
#{current_term := Term, voted_for := VotedFor,
Expand Down Expand Up @@ -1231,9 +1233,9 @@ handle_follower(#append_entries_reply{}, State) ->
{follower, State, []};
handle_follower(election_timeout,
#{cfg := #cfg{log_id = LogId},
voter_status := #{membership := promotable} = VoterStatus} = State) ->
membership := Membership} = State) when Membership =/= voter ->
?DEBUG("~s: follower ignored election_timeout, non-voter: ~p0",
[LogId, VoterStatus]),
[LogId, Membership]),
{follower, State, []};
handle_follower(election_timeout, State) ->
call_for_election(pre_vote, State);
Expand Down Expand Up @@ -2900,7 +2902,7 @@ already_member(State) ->
%%% Voter status helpers
%%% ====================

-spec ensure_promotion_target(ra_voter_status(), ra_index()) ->
-spec ensure_promotion_target(ra_voter_status(), ra_server_state()) ->
{ok, ra_voter_status()} | {error, term()}.
ensure_promotion_target(#{membership := promotable, target := _, uid := _} = Status,
_) ->
Expand All @@ -2916,8 +2918,8 @@ ensure_promotion_target(Voter, _) ->

%% Get membership of a given Id+UId from a (possibly new) cluster.
-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_server_id(), ra_uid(), boolean()) ->
boolean().
ra_server_id(), ra_uid(), ra_membership()) ->
ra_membership().
get_membership(_Cluster, _PeerId, _UId, Default) when is_list(_Cluster) ->
%% Legacy cluster snapshot does not retain voter_status.
Default;
Expand All @@ -2932,15 +2934,15 @@ get_membership(Cluster, PeerId, UId, Default) ->
%% Get this node's membership from a (possibly new) cluster.
%% Defaults to last known-locally value.
-spec get_membership(ra_cluster() | ra_cluster_snapshot() | ra_cluster_servers(),
ra_state()) ->
boolean().
ra_server_state()) ->
ra_membership().
get_membership(Cluster, #{cfg := #cfg{id = Id, uid = UId}} = State) ->
Default = maps:get(membership, State, voter),
get_membership(Cluster, Id, UId, Default).

%% Get this node's membership.
%% Defaults to last known-locally value.
-spec get_membership(ra_state()) -> boolean().
-spec get_membership(ra_server_state()) -> ra_membership().
get_membership(#{cfg := #cfg{id = Id, uid = UId}, cluster := Cluster} = State) ->
Default = maps:get(membership, State, voter),
get_membership(Cluster, Id, UId, Default).
Expand Down
6 changes: 2 additions & 4 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ send_local_msg(Config) ->
{ok, _, Leader} = ra:members(hd(NodeIds)),
{ok, _, _} = ra:process_command(Leader, banana),
New = #{id => NonVoter,
voter_status => #{membership => promotable, uid => <<"test">>, target => 999},
membership => promotable,
membership => non_voter,
uid => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds),
Expand Down Expand Up @@ -347,8 +346,7 @@ local_log_effect(Config) ->
{ok, _, Leader} = ra:members(hd(NodeIds)),
{ok, _, _} = ra:process_command(Leader, banana),
New = #{id => NonVoter,
voter_status => #{membership => promotable, uid => <<"test">>, target => 999},
membership => promotable,
membership => non_voter,
uid => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds),
Expand Down
51 changes: 29 additions & 22 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ init_test(_Config) ->
voted_for := some_server} = ra_server_init(InitConf),
% snapshot
SnapshotMeta = #{index => 3, term => 5,
cluster => maps:keys(Cluster),
cluster => dehydrate_cluster(Cluster),
machine_version => 1},
SnapshotData = "hi1+2+3",
{LogS, _, _} = ra_log_memory:install_snapshot(SnapshotMeta, SnapshotData, Log0),
Expand All @@ -217,7 +217,7 @@ init_test(_Config) ->
machine_state := "hi1+2+3",
cluster := ClusterOut,
voted_for := some_server} = ra_server_init(InitConf),
?assertEqual(maps:keys(Cluster), maps:keys(ClusterOut)),
?assertEqual(dehydrate_cluster(Cluster), dehydrate_cluster(ClusterOut)),
ok.

recover_restores_cluster_changes(_Config) ->
Expand Down Expand Up @@ -280,7 +280,7 @@ election_timeout(_Config) ->
ra_server:handle_follower(Msg, State),

% non-voters ignore election_timeout
NVState = State#{voter_status => #{membership => promotable}},
NVState = State#{membership => promotable},
{follower, NVState, []} = ra_server:handle_follower(Msg, NVState),

% pre_vote
Expand Down Expand Up @@ -580,7 +580,7 @@ follower_aer_term_mismatch_snapshot(_Config) ->
Log0 = maps:get(log, State0),
Meta = #{index => 3,
term => 5,
cluster => [],
cluster => #{},
machine_version => 1},
Data = <<"hi3">>,
{Log, _, _} = ra_log_memory:install_snapshot(Meta, Data, Log0),
Expand Down Expand Up @@ -731,7 +731,7 @@ follower_catchup_condition(_Config) ->
chunk_state = {1, last},
meta = #{index => 99,
term => 99,
cluster => [],
cluster => #{},
machine_version => 0},
data = []},
{follower, State, [_NextEvent]} =
Expand Down Expand Up @@ -830,7 +830,8 @@ append_entries_reply_success_promotes_nonvoter(_Config) ->
[{next_event, info, pipeline_rpcs},
{next_event, {command, {'$ra_join', _,
#{id := N2,
voter_status := #{membership := voter, uid := <<"uid">>}},
voter_status := #{membership := voter,
uid := <<"uid">>}},
noreply}} = RaJoin}
]} = ra_server:handle_leader({N2, Ack}, State0),

Expand All @@ -852,7 +853,8 @@ append_entries_reply_success_promotes_nonvoter(_Config) ->
% ra_join translates into cluster update
{leader, #{cluster := #{N2 := #{next_index := 5,
match_index := 3,
voter_status := #{membership := voter}}},
voter_status := #{membership := voter,
uid := <<"uid">>}}},
cluster_change_permitted := false,
commit_index := 1,
last_applied := 1,
Expand All @@ -863,15 +865,17 @@ append_entries_reply_success_promotes_nonvoter(_Config) ->
prev_log_term = 5,
leader_commit = 1,
entries = [{4, 5, {'$ra_cluster_change', _,
#{N2 := #{voter_status := #{membership := voter}}},
#{N2 := #{voter_status := #{membership := voter,
uid := <<"uid">>}}},
_}}]}},
{send_rpc, N2,
#append_entries_rpc{term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 1,
entries = [{4, 5, {'$ra_cluster_change', _,
#{N2 := #{voter_status := #{membership := voter}}},
#{N2 := #{voter_status := #{membership := voter,
uid := <<"uid">>}}},
_}}]}}
]} = ra_server:handle_leader(RaJoin, State2),

Expand Down Expand Up @@ -1013,7 +1017,7 @@ follower_request_vote(_Config) ->
State),

% non-voters ignore request_vote_rpc
NVState = State#{voter_status => #{membership => promotable}},
NVState = State#{membership => promotable},
{follower, NVState, []} = ra_server:handle_follower(Msg, NVState),

ok.
Expand Down Expand Up @@ -1132,7 +1136,7 @@ follower_pre_vote(_Config) ->
State),

% non-voters ignore pre_vote_rpc
NVState = State#{voter_status => #{membership => promotable}},
NVState = State#{membership => promotable},
{follower, NVState, []} = ra_server:handle_follower(Msg, NVState),

ok.
Expand Down Expand Up @@ -1372,7 +1376,7 @@ follower_install_snapshot_machine_version(_Config) ->
%% by install snapshot rpc
SnapMeta = #{index => 4,
term => 5,
cluster => [?N1, ?N2, ?N3],
cluster => #{?N1 => #{}, ?N2 => #{}, ?N3 => #{}},
machine_version => MacVer},
SnapData = <<"hi4_v2">>,

Expand Down Expand Up @@ -1823,7 +1827,7 @@ pre_vote_election_reverts(_Config) ->
ISR = #install_snapshot_rpc{term = 5, leader_id = N2,
meta = #{index => 3,
term => 5,
cluster => [],
cluster => #{},
machine_version => 0},
chunk_state = {1, last},
data = []},
Expand Down Expand Up @@ -1860,7 +1864,7 @@ leader_receives_install_snapshot_rpc(_Config) ->
ISRpc = #install_snapshot_rpc{term = Term + 1, leader_id = ?N5,
meta = #{index => Idx,
term => Term,
cluster => [],
cluster => #{},
machine_version => 0},
chunk_state = {1, last},
data = []},
Expand Down Expand Up @@ -1891,8 +1895,7 @@ follower_installs_snapshot(_Config) ->
fun (_) ->
{#{index => Idx,
term => Term,
cluster => maps:keys(Config),
cluster_state => Config,
cluster => dehydrate_cluster(Config),
machine_version => 0},
[]}
end),
Expand All @@ -1919,7 +1922,7 @@ follower_ignores_installs_snapshot_with_higher_machine_version(_Config) ->
ISRpc = #install_snapshot_rpc{term = Term, leader_id = N1,
meta = #{index => Idx,
term => LastTerm,
cluster => maps:keys(Config),
cluster => dehydrate_cluster(Config),
machine_version => 1},
chunk_state = {1, last},
data = []},
Expand Down Expand Up @@ -1981,7 +1984,7 @@ snapshotted_follower_received_append_entries(_Config) ->
fun (_) ->
{#{index => Idx,
term => Term,
cluster => maps:keys(Config),
cluster => dehydrate_cluster(Config),
machine_version => 0},
[]}
end),
Expand Down Expand Up @@ -2129,13 +2132,13 @@ leader_received_append_entries_reply_and_promotes_voter(_config) ->
last_index = 4, last_term = 5},

% Permanent voter
State1 = set_peer_voter_status(State, N3, #{membership => false}),
State1 = set_peer_voter_status(State, N3, #{membership => voter}),
{leader, _,
[{next_event,info,pipeline_rpcs}]
} = ra_server:handle_leader({N3, AER}, State1),

% Permanent non-voter
State2 = set_peer_voter_status(State, N3, #{membership => promotable}),
State2 = set_peer_voter_status(State, N3, #{membership => non_voter}),
{leader, _,
[{next_event,info,pipeline_rpcs}]
} = ra_server:handle_leader({N3, AER}, State2),
Expand Down Expand Up @@ -2808,7 +2811,11 @@ snap_meta(Idx, Term) ->
snap_meta(Idx, Term, Cluster) ->
#{index => Idx,
term => Term,
cluster => maps:keys(Cluster),
cluster_state => Cluster,
cluster => dehydrate_cluster(Cluster),
machine_version => 0}.

dehydrate_cluster(Cluster) ->
maps:map(fun(_, V) ->
maps:with([voter_status], V)
end, Cluster).

0 comments on commit 4ac1d50

Please sign in to comment.