From 27a533b33e0ac3b411ecf86b1950ff154ff58036 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Mon, 18 Sep 2023 15:10:23 -0700 Subject: [PATCH] address feedback --- src/ra.hrl | 2 +- src/ra_server.erl | 32 ++++++++++++--------------- src/ra_server_proc.erl | 12 ++++++++-- test/coordination_SUITE.erl | 44 +++++++++++++++++++++++++++++-------- test/ra_2_SUITE.erl | 4 ++-- test/ra_SUITE.erl | 2 +- test/ra_server_SUITE.erl | 4 ++-- 7 files changed, 65 insertions(+), 35 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index e6a560b6..a48ed14c 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -48,7 +48,7 @@ %% If set, server will start as non-voter until later promoted by the %% leader. - init_non_voter => ra_nvid()}. + non_voter_id => ra_nvid()}. -type ra_peer_status() :: normal | {sending_snapshot, pid()} | diff --git a/src/ra_server.erl b/src/ra_server.erl index 760581a5..e40a039b 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -197,7 +197,7 @@ max_pipeline_count => non_neg_integer(), ra_event_formatter => {module(), atom(), [term()]}, counter => counters:counters_ref(), - init_non_voter => ra_nvid(), + non_voter_id => ra_nvid(), system_config => ra_system:config()}. -type mutable_config() :: #{cluster_name => ra_cluster_name(), @@ -2899,19 +2899,21 @@ already_member(State) -> %%% Voter status helpers %%% ==================== --spec ensure_promotion_target(ra_voter_status(), ra_server_state()) -> +-spec ensure_promotion_target(ra_voter_status(), ra_index()) -> {ok, ra_voter_status()} | {error, term()}. ensure_promotion_target({voter, Reason}, _) -> {ok, {voter, Reason}}; ensure_promotion_target({nonvoter, #{target := _, nvid := _} = Reason}, _) -> {ok, {nonvoter, Reason}}; -ensure_promotion_target({nonvoter, #{nvid := _} = Reason}, #{commit_index := CI}) -> - {ok, {nonvoter, Reason#{target => CI}}}; +ensure_promotion_target({nonvoter, #{nvid := _} = Reason}, + #{log := Log}) -> + Target = ra_log:next_index(Log), + {ok, {nonvoter, Reason#{target => Target}}}; ensure_promotion_target(_, _) -> {error, missing_nvid}. -spec init_voter_status(ra_server_config() | ra_new_server()) -> ra_voter_status(). -init_voter_status(#{init_non_voter := NVId}) -> +init_voter_status(#{non_voter_id := NVId}) -> {nonvoter, #{nvid => NVId}}; init_voter_status(_) -> {voter, #{}}. @@ -2949,25 +2951,19 @@ maybe_promote_self(NewCluster, State) -> maybe_promote_peer(PeerID, #{cluster := Cluster}, Effects) -> % Unknown peer handled in the caller. #{PeerID := #{match_index := MI, - voter_status := OldStatus}} = Cluster, - case update_voter_status(OldStatus, MI) of - OldStatus -> - Effects; - NewStatus -> + voter_status := Status}} = Cluster, + case Status of + {nonvoter, #{target := Target} = Reason} when MI >= Target -> [{next_event, {command, {'$ra_join', #{ts => os:system_time(millisecond)}, - #{id => PeerID, voter_status => NewStatus}, + #{id => PeerID, voter_status => {voter, Reason}}, noreply}}} | - Effects] + Effects]; + _ -> + Effects end. -update_voter_status({nonvoter, #{target := Target} = Reason}, MI) - when MI >= Target -> - {voter, Reason}; -update_voter_status(Permanent, _) -> - Permanent. - -spec required_quorum(ra_cluster()) -> pos_integer(). required_quorum(Cluster) -> Voters = count_voters(Cluster), diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 7862bfad..6e4966b5 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -183,6 +183,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) -> -spec state_query(server_loc(), all | overview | + voters | members | initial_members | machine, timeout()) -> @@ -193,6 +194,7 @@ state_query(ServerLoc, Spec, Timeout) -> -spec local_state_query(server_loc(), all | overview | + voters | members | initial_members | machine, timeout()) -> @@ -1519,6 +1521,12 @@ do_state_query(overview, State) -> ra_server:overview(State); do_state_query(machine, #{machine_state := MacState}) -> MacState; +do_state_query(voters, #{cluster := Cluster}) -> + Voters = maps:filter(fun(_, Peer) -> + {Voter, _} = maps:get(voter_status, Peer, {voter, legacy}), + Voter == voter + end, Cluster), + maps:keys(Voters); do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); do_state_query(initial_members, #{log := Log}) -> @@ -1735,8 +1743,8 @@ can_execute_locally(RaftState, TargetNode, leader when TargetNode =/= node() -> %% We need to evaluate whether to send the message. %% Only send if there isn't a local node for the target pid. - Members = do_state_query(members, State#state.server_state), - not lists:any(fun ({_, N}) -> N == TargetNode end, Members); + Voters = do_state_query(voters, State#state.server_state), + not lists:any(fun ({_, N}) -> N == TargetNode end, Voters); leader -> true; _ -> diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index d163295c..e0ac392c 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -298,13 +298,20 @@ start_cluster_minority(Config) -> send_local_msg(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + [A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + NodeIds = [A, B], Machine = {module, ?MODULE, #{}}, {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), % assert all were said to be started [] = Started -- NodeIds, - %% spawn a receiver process on one node + % add permanent non-voter {ok, _, Leader} = ra:members(hd(NodeIds)), + {ok, _, _} = ra:process_command(Leader, banana), + New = #{id => NonVoter, + voter_status => {nonvoter, #{nvid => <<"test">>, target => 999}}, + non_voter_id => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds), %% select a non-leader node to spawn on [{_, N} | _] = lists:delete(Leader, NodeIds), test_local_msg(Leader, N, N, send_local_msg, local), @@ -312,36 +319,55 @@ send_local_msg(Config) -> test_local_msg(Leader, N, N, send_local_msg, [local, cast]), test_local_msg(Leader, N, N, send_local_msg, [local, cast, ra_event]), {_, LeaderNode} = Leader, + %% test the same but for a local pid (non-member) test_local_msg(Leader, node(), LeaderNode, send_local_msg, local), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, ra_event]), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast]), test_local_msg(Leader, node(), LeaderNode, send_local_msg, [local, cast, ra_event]), - %% test the same but for a local pid (non-member) + %% same for non-voter + {_, NonVoterNode} = NonVoter, + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, local), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, ra_event]), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast]), + test_local_msg(Leader, NonVoterNode, LeaderNode, send_local_msg, [local, cast, ra_event]), [ok = slave:stop(S) || {_, S} <- NodeIds], ok. local_log_effect(Config) -> PrivDir = ?config(data_dir, Config), ClusterName = ?config(cluster_name, Config), - NodeIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + [A, B, NonVoter] = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]], + NodeIds = [A, B], Machine = {module, ?MODULE, #{}}, {ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, NodeIds), % assert all were said to be started [] = Started -- NodeIds, - %% spawn a receiver process on one node + % add permanent non-voter {ok, _, Leader} = ra:members(hd(NodeIds)), + {ok, _, _} = ra:process_command(Leader, banana), + New = #{id => NonVoter, + voter_status => {nonvoter, #{nvid => <<"test">>, target => 999}}, + non_voter_id => <<"test">>}, + {ok, _, _} = ra:add_member(A, New), + ok = ra:start_server(?SYS, ClusterName, New, Machine, NodeIds), %% select a non-leader node to spawn on [{_, N} | _] = lists:delete(Leader, NodeIds), test_local_msg(Leader, N, N, do_local_log, local), test_local_msg(Leader, N, N, do_local_log, [local, ra_event]), test_local_msg(Leader, N, N, do_local_log, [local, cast]), test_local_msg(Leader, N, N, do_local_log, [local, cast, ra_event]), + %% test the same but for a local pid (non-member) {_, LeaderNode} = Leader, test_local_msg(Leader, node(), LeaderNode, do_local_log, local), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, ra_event]), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast]), test_local_msg(Leader, node(), LeaderNode, do_local_log, [local, cast, ra_event]), - %% test the same but for a local pid (non-member) + %% same for non-voter + {_, NonVoterNode} = NonVoter, + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, local), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, ra_event]), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast]), + test_local_msg(Leader, NonVoterNode, LeaderNode, do_local_log, [local, cast, ra_event]), [ok = slave:stop(S) || {_, S} <- NodeIds], ok. @@ -408,7 +434,7 @@ nonvoter_catches_up(Config) -> || N <- lists:seq(1, 10000)], {ok, _, _} = ra:process_command(Leader, banana), - New = #{id => C, init_non_voter => <<"test">>}, + New = #{id => C, non_voter_id => <<"test">>}, {ok, _, _} = ra:add_member(A, New), ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), NonVoter = {nonvoter, #{nvid => <<"test">>}}, @@ -444,7 +470,7 @@ nonvoter_catches_up_after_restart(Config) -> || N <- lists:seq(1, 10000)], {ok, _, _} = ra:process_command(Leader, banana), - New = #{id => C, init_non_voter => <<"test">>}, + New = #{id => C, non_voter_id => <<"test">>}, {ok, _, _} = ra:add_member(A, New), ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), NonVoter = {nonvoter, #{nvid => <<"test">>}}, @@ -482,7 +508,7 @@ nonvoter_catches_up_after_leader_restart(Config) -> || N <- lists:seq(1, 10000)], {ok, _, _} = ra:process_command(Leader, banana), - New = #{id => C, init_non_voter => <<"test">>}, + New = #{id => C, non_voter_id => <<"test">>}, {ok, _, _} = ra:add_member(A, New), ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]), NonVoter = {nonvoter, #{nvid => <<"test">>}}, diff --git a/test/ra_2_SUITE.erl b/test/ra_2_SUITE.erl index afe3c01e..4fb8bdbc 100644 --- a/test/ra_2_SUITE.erl +++ b/test/ra_2_SUITE.erl @@ -713,12 +713,12 @@ force_start_follower_as_single_member_nonvoter(Config) -> ServerId4 = ?config(server_id4, Config), UId4 = ?config(uid4, Config), Conf4 = conf(ClusterName, UId4, ServerId4, PrivDir, [ServerId3]), - {ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, init_non_voter => <<"test">>}), + {ok, _, _} = ra:add_member(ServerId3, #{id => ServerId4, non_voter_id => <<"test">>}), %% the membership has changed but member not running yet %% it is nonvoter and does not affect quorum size {ok, _, _} = ra:process_command(ServerId3, {enq, banana}), %% start new member - ok = ra:start_server(?SYS, Conf4#{init_non_voter => <<"test">>}), + ok = ra:start_server(?SYS, Conf4#{non_voter_id => <<"test">>}), {ok, _, ServerId3} = ra:members(ServerId4), ok = enqueue(ServerId3, msg3), diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 8a6f52de..d3481339 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -1175,7 +1175,7 @@ start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> ok. start_and_join_nonvoter({ClusterName, _} = ServerRef, {_, _} = New) -> - Server = #{id => New, init_non_voter => <<"test">>}, + Server = #{id => New, non_voter_id => <<"test">>}, {ok, _, _} = ra:add_member(ServerRef, Server), ok = ra:start_server(default, ClusterName, Server, add_machine(), [ServerRef]), ok. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 1dbf65c2..dcf7604d 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -1447,7 +1447,7 @@ leader_server_join_nonvoter(_Config) -> commit_index := Target, cluster_change_permitted := false} = _State1, Effects} = ra_server:handle_leader({command, {'$ra_join', meta(), - #{id => N4, init_non_voter => <<"test">>}, await_consensus}}, State0), + #{id => N4, non_voter_id => <<"test">>}, await_consensus}}, State0), % new member should join as non-voter [ {send_rpc, N4, @@ -1608,7 +1608,7 @@ leader_applies_new_cluster_nonvoter(_Config) -> N3 => new_peer_with(#{next_index => 4, match_index => 3})}, State = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, - Command = {command, {'$ra_join', meta(), #{id => N4, init_non_voter => <<"test">>}, await_consensus}}, + Command = {command, {'$ra_join', meta(), #{id => N4, non_voter_id => <<"test">>}, await_consensus}}, % cluster records index and term it was applied to determine whether it has % been applied {leader, #{cluster_index_term := {4, 5},