Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 19, 2023
1 parent 824fb46 commit f841440
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 35 deletions.
2 changes: 1 addition & 1 deletion src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@

%% If set, server will start as non-voter until later promoted by the
%% leader.
init_non_voter => ra_nvid()}.
non_voter_id => ra_nvid()}.

-type ra_peer_status() :: normal |
{sending_snapshot, pid()} |
Expand Down
32 changes: 14 additions & 18 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@
max_pipeline_count => non_neg_integer(),
ra_event_formatter => {module(), atom(), [term()]},
counter => counters:counters_ref(),
init_non_voter => ra_nvid(),
non_voter_id => ra_nvid(),
system_config => ra_system:config()}.

-type mutable_config() :: #{cluster_name => ra_cluster_name(),
Expand Down Expand Up @@ -2899,19 +2899,21 @@ already_member(State) ->
%%% Voter status helpers
%%% ====================

-spec ensure_promotion_target(ra_voter_status(), ra_server_state()) ->
-spec ensure_promotion_target(ra_voter_status(), ra_index()) ->
{ok, ra_voter_status()} | {error, term()}.
ensure_promotion_target({voter, Reason}, _) ->
{ok, {voter, Reason}};
ensure_promotion_target({nonvoter, #{target := _, nvid := _} = Reason}, _) ->
{ok, {nonvoter, Reason}};
ensure_promotion_target({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) ->
{ok, {nonvoter, Reason#{target => CI}}};
ensure_promotion_target({nonvoter, #{nvid := _} = Reason},
#{log := Log}) ->
Target = ra_log:next_index(Log),
{ok, {nonvoter, Reason#{target => Target}}};
ensure_promotion_target(_, _) ->
{error, missing_nvid}.

-spec init_voter_status(ra_server_config() | ra_new_server()) -> ra_voter_status().
init_voter_status(#{init_non_voter := NVId}) ->
init_voter_status(#{non_voter_id := NVId}) ->
{nonvoter, #{nvid => NVId}};
init_voter_status(_) ->
{voter, #{}}.
Expand Down Expand Up @@ -2949,25 +2951,19 @@ maybe_promote_self(NewCluster, State) ->
maybe_promote_peer(PeerID, #{cluster := Cluster}, Effects) ->
% Unknown peer handled in the caller.
#{PeerID := #{match_index := MI,
voter_status := OldStatus}} = Cluster,
case update_voter_status(OldStatus, MI) of
OldStatus ->
Effects;
NewStatus ->
voter_status := Status}} = Cluster,
case Status of
{nonvoter, #{target := Target} = Reason} when MI >= Target ->
[{next_event,
{command, {'$ra_join',
#{ts => os:system_time(millisecond)},
#{id => PeerID, voter_status => NewStatus},
#{id => PeerID, voter_status => {voter, Reason}},
noreply}}} |
Effects]
Effects];
_ ->
Effects
end.

update_voter_status({nonvoter, #{target := Target} = Reason}, MI)
when MI >= Target ->
{voter, Reason};
update_voter_status(Permanent, _) ->
Permanent.

-spec required_quorum(ra_cluster()) -> pos_integer().
required_quorum(Cluster) ->
Voters = count_voters(Cluster),
Expand Down
12 changes: 10 additions & 2 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) ->
-spec state_query(server_loc(),
all |
overview |
voters |
members |
initial_members |
machine, timeout()) ->
Expand All @@ -193,6 +194,7 @@ state_query(ServerLoc, Spec, Timeout) ->
-spec local_state_query(server_loc(),
all |
overview |
voters |
members |
initial_members |
machine, timeout()) ->
Expand Down Expand Up @@ -1519,6 +1521,12 @@ do_state_query(overview, State) ->
ra_server:overview(State);
do_state_query(machine, #{machine_state := MacState}) ->
MacState;
do_state_query(voters, #{cluster := Cluster}) ->
Voters = maps:filter(fun(_, Peer) ->
{Voter, _} = maps:get(voter_status, Peer, {voter, legacy}),
Voter == voter
end, Cluster),
maps:keys(Voters);
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(initial_members, #{log := Log}) ->
Expand Down Expand Up @@ -1735,8 +1743,8 @@ can_execute_locally(RaftState, TargetNode,
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Members = do_state_query(members, State#state.server_state),
not lists:any(fun ({_, N}) -> N == TargetNode end, Members);
Voters = do_state_query(voters, State#state.server_state),
not lists:any(fun ({_, N}) -> N == TargetNode end, Voters);
leader ->
true;
_ ->
Expand Down
44 changes: 35 additions & 9 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -298,50 +298,76 @@ start_cluster_minority(Config) ->
send_local_msg(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
[A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
NodeIds = [A, B],
Machine = {module, ?MODULE, #{}},
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds),
% assert all were said to be started
[] = Started -- NodeIds,
%% spawn a receiver process on one node
% add permanent non-voter
{ok, _, Leader} = ra:members(hd(NodeIds)),
{ok, _, _} = ra:process_command(Leader, banana),
New = #{id => NonVoter,
voter_status => {nonvoter, #{nvid => <<"test">>, target => 999}},
non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds),
%% select a non-leader node to spawn on
[{_, N} | _] = lists:delete(Leader, NodeIds),
test_local_msg(Leader, N, N, send_local_msg, local),
test_local_msg(Leader, N, N, send_local_msg, [local, ra_event]),
test_local_msg(Leader, N, N, send_local_msg, [local, cast]),
test_local_msg(Leader, N, N, send_local_msg, [local, cast, ra_event]),
{_, LeaderNode} = Leader,
%% test the same but for a local pid (non-member)
test_local_msg(Leader, node(), LeaderNode, send_local_msg, local),
test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, ra_event]),
test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast]),
test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast, ra_event]),
%% test the same but for a local pid (non-member)
%% same for non-voter
{_, NonVoterNode} = NonVoter,
test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, local),
test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, ra_event]),
test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast]),
test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast, ra_event]),
[ok = slave:stop(S) || {_, S} <- NodeIds],
ok.

local_log_effect(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
[A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
NodeIds = [A, B],
Machine = {module, ?MODULE, #{}},
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds),
% assert all were said to be started
[] = Started -- NodeIds,
%% spawn a receiver process on one node
% add permanent non-voter
{ok, _, Leader} = ra:members(hd(NodeIds)),
{ok, _, _} = ra:process_command(Leader, banana),
New = #{id => NonVoter,
voter_status => {nonvoter, #{nvid => <<"test">>, target => 999}},
non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds),
%% select a non-leader node to spawn on
[{_, N} | _] = lists:delete(Leader, NodeIds),
test_local_msg(Leader, N, N, do_local_log, local),
test_local_msg(Leader, N, N, do_local_log, [local, ra_event]),
test_local_msg(Leader, N, N, do_local_log, [local, cast]),
test_local_msg(Leader, N, N, do_local_log, [local, cast, ra_event]),
%% test the same but for a local pid (non-member)
{_, LeaderNode} = Leader,
test_local_msg(Leader, node(), LeaderNode, do_local_log, local),
test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, ra_event]),
test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast]),
test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast, ra_event]),
%% test the same but for a local pid (non-member)
%% same for non-voter
{_, NonVoterNode} = NonVoter,
test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, local),
test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, ra_event]),
test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast]),
test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast, ra_event]),
[ok = slave:stop(S) || {_, S} <- NodeIds],
ok.

Expand Down Expand Up @@ -408,7 +434,7 @@ nonvoter_catches_up(Config) ->
|| N <- lists:seq(1, 10000)],
{ok, _, _} = ra:process_command(Leader, banana),

New = #{id => C, init_non_voter => <<"test">>},
New = #{id => C, non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
NonVoter = {nonvoter, #{nvid => <<"test">>}},
Expand Down Expand Up @@ -444,7 +470,7 @@ nonvoter_catches_up_after_restart(Config) ->
|| N <- lists:seq(1, 10000)],
{ok, _, _} = ra:process_command(Leader, banana),

New = #{id => C, init_non_voter => <<"test">>},
New = #{id => C, non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
NonVoter = {nonvoter, #{nvid => <<"test">>}},
Expand Down Expand Up @@ -482,7 +508,7 @@ nonvoter_catches_up_after_leader_restart(Config) ->
|| N <- lists:seq(1, 10000)],
{ok, _, _} = ra:process_command(Leader, banana),

New = #{id => C, init_non_voter => <<"test">>},
New = #{id => C, non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
NonVoter = {nonvoter, #{nvid => <<"test">>}},
Expand Down
4 changes: 2 additions & 2 deletions test/ra_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -713,12 +713,12 @@ force_start_follower_as_single_member_nonvoter(Config) ->
ServerId4 = ?config(server_id4, Config),
UId4 = ?config(uid4, Config),
Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]),
{ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, init_non_voter => <<"test">>}),
{ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, non_voter_id => <<"test">>}),
%% the membership has changed but member not running yet
%% it is nonvoter and does not affect quorum size
{ok, _, _} = ra:process_command(ServerId3, {enq, banana}),
%% start new member
ok = ra:start_server(?SYS, Conf4#{init_non_voter => <<"test">>}),
ok = ra:start_server(?SYS, Conf4#{non_voter_id => <<"test">>}),
{ok, _, ServerId3} = ra:members(ServerId4),
ok = enqueue(ServerId3, msg3),

Expand Down
2 changes: 1 addition & 1 deletion test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1175,7 +1175,7 @@ start_and_join({ClusterName, _} = ServerRef, {_, _} = New) ->
ok.

start_and_join_nonvoter({ClusterName, _} = ServerRef, {_, _} = New) ->
Server = #{id => New, init_non_voter => <<"test">>},
Server = #{id => New, non_voter_id => <<"test">>},
{ok, _, _} = ra:add_member(ServerRef, Server),
ok = ra:start_server(default, ClusterName, Server, add_machine(), [ServerRef]),
ok.
Expand Down
4 changes: 2 additions & 2 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,7 @@ leader_server_join_nonvoter(_Config) ->
commit_index := Target,
cluster_change_permitted := false} = _State1, Effects} =
ra_server:handle_leader({command, {'$ra_join', meta(),
#{id => N4, init_non_voter => <<"test">>}, await_consensus}}, State0),
#{id => N4, non_voter_id => <<"test">>}, await_consensus}}, State0),
% new member should join as non-voter
[
{send_rpc, N4,
Expand Down Expand Up @@ -1608,7 +1608,7 @@ leader_applies_new_cluster_nonvoter(_Config) ->
N3 => new_peer_with(#{next_index => 4, match_index => 3})},

State = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster},
Command = {command, {'$ra_join', meta(), #{id => N4, init_non_voter => <<"test">>}, await_consensus}},
Command = {command, {'$ra_join', meta(), #{id => N4, non_voter_id => <<"test">>}, await_consensus}},
% cluster records index and term it was applied to determine whether it has
% been applied
{leader, #{cluster_index_term := {4, 5},
Expand Down

0 comments on commit f841440

Please sign in to comment.