diff --git a/src/ra.erl b/src/ra.erl index 35c4285a..a6817635 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -59,6 +59,8 @@ %% membership changes add_member/2, add_member/3, + maybe_add_member/2, + maybe_add_member/3, remove_member/2, remove_member/3, leave_and_terminate/3, @@ -579,6 +581,13 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). +maybe_add_member(ServerLoc, ServerId) -> + maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT). +maybe_add_member(ServerLoc, ServerId, Timeout) -> + ra_server_proc:command(ServerLoc, + {'$ra_maybe_join', ServerId, after_log_append}, + Timeout). + %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change diff --git a/src/ra.hrl b/src/ra.hrl index 55f11a01..abe2ee03 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -44,9 +44,12 @@ suspended | disconnected. -% Leader doesn't count for majority purposes peers that -% have matching_index lower than the predicate. --type ra_voter() :: {matching, ra_index()}. +-type ra_voter() :: yes | {no, ra_nonvoter_status()}. + +-type ra_nonvoter_status() :: permanent | + #{round := non_neg_integer(), + target := ra_index(), + ts := integer()}. -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), @@ -54,7 +57,7 @@ % the commit index last sent % used for evaluating pipeline status commit_index_sent := non_neg_integer(), - %% whether peer is part of the consensus + %% whether the peer is part of the consensus voter := ra_voter(), %% indicates that a snapshot is being sent %% to the peer diff --git a/src/ra_server.erl b/src/ra_server.erl index 635ad6bf..78218071 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -94,6 +94,7 @@ | recovered | stop | receive_snapshot. -type command_type() :: '$usr' | '$ra_join' | '$ra_leave' | + '$ra_maybe_join' | '$ra_cluster_change' | '$ra_cluster'. -type command_meta() :: #{from => from(), @@ -395,11 +396,15 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - {State2, Effects0} = evaluate_quorum(State1, []), + Effects00 = ra_voter:maybe_promote(PeerId, State1, []), + + {State2, Effects0} = evaluate_quorum(State1, Effects00), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), + Effects = [{next_event, info, pipeline_rpcs} | Effects1], + case State of #{cluster := #{Id := _}} -> % leader is in the cluster @@ -777,7 +782,7 @@ handle_candidate(#request_vote_result{term = Term, vote_granted = true}, NewVotes = Votes + 1, ?DEBUG("~s: vote granted for term ~b votes ~b", [LogId, Term, NewVotes]), - case need_acks(Nodes) of + case trunc(maps:size(Nodes) / 2) + 1 of NewVotes -> {State1, Effects} = make_all_rpcs(initialise_peers(State0)), Noop = {noop, #{ts => erlang:system_time(millisecond)}, @@ -923,7 +928,7 @@ handle_pre_vote(#pre_vote_result{term = Term, vote_granted = true, [LogId, Token, Term, Votes + 1]), NewVotes = Votes + 1, State = update_term(Term, State0), - case need_acks(Nodes) of + case trunc(maps:size(Nodes) / 2) + 1 of NewVotes -> call_for_election(candidate, State); _ -> @@ -1104,6 +1109,10 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; +handle_follower(#pre_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> + ?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]), + {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, @@ -1203,6 +1212,11 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; +handle_follower(election_timeout, + #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> + ?WARN("~w: follower ignored election_timeout, non voter: ~p", + [LogId, Voter]), + {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); handle_follower(try_become_leader, State) -> @@ -1370,6 +1384,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg, last_applied, cluster, leader_id, + voter, voted_for, cluster_change_permitted, cluster_index_term, @@ -2029,13 +2044,11 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand, last_log_term = LLTerm}, #{cfg := #cfg{machine_version = OurMacVer, effective_machine_version = EffMacVer}, - current_term := CurTerm, - cluster := Cluster} = State0) + current_term := CurTerm} = State0) when Term >= CurTerm -> State = update_term(Term, State0), LastIdxTerm = last_idx_term(State), - case is_voter(Cand, Cluster) andalso - is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of + case is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of true when Version > ?RA_PROTO_VERSION-> ?DEBUG("~s: declining pre-vote for ~w for protocol version ~b", [log_id(State0), Cand, Version]), @@ -2060,10 +2073,8 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand, false -> ?DEBUG("~s: declining pre-vote for ~w for term ~b," " candidate last log index term was: ~w~n" - "Last log entry idxterm seen was: ~w~n" - "Voter status was: ~w~n", - [log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm, - get_voter_status(Cand, Cluster)]), + "Last log entry idxterm seen was: ~w", + [log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm]), case FsmState of follower -> {FsmState, State, [start_election_timeout]}; @@ -2092,15 +2103,16 @@ new_peer() -> match_index => 0, commit_index_sent => 0, query_index => 0, + voter => yes, status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). -new_matching_peer(#{commit_index := CI} = _State) -> - new_peer_with(#{ - voter => {matching, CI} - }). +already_member(State) -> + % already a member do nothing + % TODO: reply? If we don't reply the caller may block until timeout + {not_appended, already_member, State}. peers(#{cfg := #cfg{id = Id}, cluster := Peers}) -> maps:remove(Id, Peers). @@ -2328,6 +2340,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, + voter => ra_voter:status(NewCluster, id(State0)), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2458,17 +2471,28 @@ add_reply(_, _, _, % From, Reply, Mode append_log_leader({CmdTag, _, _, _}, State = #{cluster_change_permitted := false}) when CmdTag == '$ra_join' orelse + CmdTag == '$ra_maybe_join' orelse CmdTag == '$ra_leave' -> {not_appended, cluster_change_not_permitted, State}; append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> + case OldCluster of + #{JoiningNode := #{voter := yes}} -> + already_member(State); + #{JoiningNode := #{voter := {no, _}} = Peer} -> + Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}}, + append_cluster_change(Cluster, From, ReplyMode, State); + _ -> + Cluster = OldCluster#{JoiningNode => new_peer()}, + append_cluster_change(Cluster, From, ReplyMode, State) + end; +append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode}, + State = #{cluster := OldCluster}) -> case OldCluster of #{JoiningNode := _} -> - % already a member do nothing - % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}; + already_member(State); _ -> - Cluster = OldCluster#{JoiningNode => new_matching_peer(State)}, + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})}, append_cluster_change(Cluster, From, ReplyMode, State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, @@ -2511,6 +2535,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, + voter => ra_voter:status(Cluster, id(State)), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2587,11 +2612,10 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{query_index := Idx} = Peer, Acc) -> - case is_voter(Peer) of - true -> [Idx | Acc]; - false -> Acc - end + (_K, #{voter := {no, _}}, Acc) -> + Acc; + (_K, #{query_index := Idx}, Acc) -> + [Idx | Acc] end, [QueryIndex], Cluster). match_indexes(#{cfg := #cfg{id = Id}, @@ -2600,49 +2624,12 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{match_index := Idx} = Peer, Acc) -> - case is_voter(Peer) of - true -> [Idx | Acc]; - false -> Acc - end + (_K, #{voter := {no, _}}, Acc) -> + Acc; + (_K, #{match_index := Idx}, Acc) -> + [Idx | Acc] end, [LWIdx], Cluster). -get_voter_status(Id, Cluster) -> - case maps:get(Id, Cluster) of - undefined -> undefined; - Peer -> get_voter_status(Peer) - end. - -get_voter_status(#{voter := Status}) -> - Status; -get_voter_status(_) -> - % Implicit 'yes' for initial cluster members, to differentiate from 'undefined' above. - yes. - - -is_voter(Id, Cluster) -> - case maps:get(Id, Cluster) of - undefined -> false; - Peer -> is_voter(Peer) - end. - -is_voter(#{voter := {matching, Target}, match_index := MI}) - when MI >= Target -> - true; -is_voter(#{voter := {matching, _}}) -> - false; -is_voter(_Peer) -> - true. - -need_acks(Cluster) -> - NumVoters = maps:fold(fun(_, Peer, Count) -> - case is_voter(Peer) of - true -> Count + 1; - false -> Count - end - end, 0, Cluster), - trunc(NumVoters / 2) + 1. - -spec agreed_commit(list()) -> ra_index(). agreed_commit(Indexes) -> SortedIdxs = lists:sort(fun erlang:'>'/2, Indexes), diff --git a/src/ra_voter.erl b/src/ra_voter.erl new file mode 100644 index 00000000..1c074f7c --- /dev/null +++ b/src/ra_voter.erl @@ -0,0 +1,61 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% +%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(ra_voter). + +-export([ + new_nonvoter/1, + status/1, + status/2, + maybe_promote/3 + ]). + +-define(DEFAULT_MAX_ROUNDS, 4). + +new_nonvoter(State) -> + Target = maps:get(commit_index, State), + {no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}. + +maybe_promote(PeerID, + #{commit_index := CI, cluster := Cluster} = _State, + Effects) -> + #{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster, + case evaluate_voter(OldStatus, MI, CI) of + OldStatus -> + Effects; + Change -> + [{next_event, + {command, {'$ra_join', + #{ts => os:system_time(millisecond)}, + PeerID, + noreply}}} | + Effects] + end. + +evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI) + when MI >= Target -> + AtenPollInt = application:get_env(aten, poll_interval, 1000), + Now = os:system_time(millisecond), + case (Now - RoundStart) =< AtenPollInt of + true -> + yes; + false when Round > ?DEFAULT_MAX_ROUNDS -> + {no, permanent}; + false -> + {no, #{round => Round+1, target => CI, ts => Now}} + end; +evaluate_voter(Permanent, _, _) -> + Permanent. + +status(#{cluster := Cluster} = State) -> + Id = ra_server:id(State), + status(Cluster, Id). + +status(Cluster, PeerId) -> + case maps:get(PeerId, Cluster, undefined) of + undefined -> + throw(not_a_cluster_member); + Peer -> + maps:get(voter, Peer, yes) + end. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 314ee930..0f53ebb6 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -22,7 +22,6 @@ all() -> candidate_handles_append_entries_rpc, append_entries_reply_success, append_entries_reply_no_success, - append_entries_nonvoter, follower_request_vote, follower_pre_vote, pre_vote_receives_pre_vote, @@ -42,6 +41,7 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, + leader_server_maybe_join, leader_server_leave, leader_is_removed, follower_cluster_change, @@ -876,40 +876,6 @@ append_entries_reply_no_success(_Config) -> ]} = ra_server:handle_leader(Msg, State), ok. -append_entries_nonvoter(_Config) -> - N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, - Cluster = #{N1 => new_peer_with(#{next_index => 5, match_index => 4}), - N2 => new_peer_with(#{next_index => 1, match_index => 0, - commit_index_sent => 3}), - N3 => new_peer_with(#{next_index => 2, match_index => 1}), - N4 => new_peer_with(#{next_index => 2, match_index => 1, - voter => {matching, 3}})}, - State0 = (base_state(3, ?FUNCTION_NAME))#{ - commit_index => 1, - last_applied => 1, - cluster => Cluster, - machine_state => <<"hi1">>}, - Ack = #append_entries_reply{term = 5, success = true, - next_index = 4, - last_index = 3, last_term = 5}, - - % Response from N2, effective cluster size 3, one ack sufficient. - {leader, #{cluster := #{N2 := #{next_index := 4, - match_index := 3}}, - commit_index := 3, - last_applied := 3, - machine_state := <<"hi3">>}, - _} = ra_server:handle_leader({N2, Ack}, State0), - - % Response from N4, cluster grows to 4, one ack insufficient. - {leader, #{cluster := #{N4 := #{next_index := 4, - match_index := 3}}, - commit_index := 1, - last_applied := 1, - machine_state := <<"hi1">>}, - _} = ra_server:handle_leader({N4, Ack}, State0), - ok. - follower_request_vote(_Config) -> N2 = ?N2, N3 = ?N3, State = base_state(3, ?FUNCTION_NAME), @@ -1333,7 +1299,7 @@ leader_server_join(_Config) -> OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), N2 => new_peer_with(#{next_index => 4, match_index => 3}), N3 => new_peer_with(#{next_index => 4, match_index => 3})}, - State0 = #{commit_index := CI} = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, % raft servers should switch to the new configuration after log append % and further cluster changes should be disallowed {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, @@ -1345,12 +1311,58 @@ leader_server_join(_Config) -> #append_entries_rpc{entries = [_, _, _, {4, 5, {'$ra_cluster_change', _, #{N1 := _, N2 := _, - N3 := _, N4 := #{voter := {matching, CI}}}, + N3 := _, N4 := #{voter := yes}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + ok. + +leader_server_maybe_join(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_maybe_join', meta(), + N4, await_consensus}}, State0), + % new member should join as non-voter + {no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0), + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, await_consensus}}]}}, {send_rpc, N3, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {matching, CI}}}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1359,7 +1371,9 @@ leader_server_join(_Config) -> {send_rpc, N2, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {matching, CI}}}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1477,12 +1491,21 @@ leader_applies_new_cluster(_Config) -> AEReply = #append_entries_reply{term = 5, success = true, next_index = 5, last_index = 4, last_term = 5}, - % new peer doesn't count until it reaches its matching target, leader needs only 2 votes - {leader, _State3 = #{commit_index := 4, + % leader does not yet have consensus as will need at least 3 votes + {leader, State3 = #{commit_index := 3, + + cluster_change_permitted := false, + cluster_index_term := {4, 5}, + cluster := #{N2 := #{next_index := 5, + match_index := 4}}}, + _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), + + % leader has consensus + {leader, _State4 = #{commit_index := 4, cluster_change_permitted := true, - cluster := #{N2 := #{next_index := 5, + cluster := #{N3 := #{next_index := 5, match_index := 4}}}, - _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), + _Effects} = ra_server:handle_leader({N3, AEReply}, State3), ok. leader_appends_cluster_change_then_steps_before_applying_it(_Config) -> @@ -2619,16 +2642,12 @@ new_peer() -> match_index => 0, query_index => 0, commit_index_sent => 0, + voter => yes, status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). -new_matching_peer(#{commit_index := CI} = _State) -> - new_peer_with(#{ - voter => {matching, CI} - }). - snap_meta(Idx, Term) -> snap_meta(Idx, Term, []). diff --git a/test/ra_voter_SUITE.erl b/test/ra_voter_SUITE.erl new file mode 100644 index 00000000..1c5ae2d2 --- /dev/null +++ b/test/ra_voter_SUITE.erl @@ -0,0 +1,146 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% +%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(ra_voter_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("src/ra.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-define(PROCESS_COMMAND_TIMEOUT, 6000). +-define(SYS, default). + +all() -> + [ + {group, tests} + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +all_tests() -> + [ + maybe_join + ]. + +suite() -> [{timetrap, {seconds, 120}}]. + +init_per_suite(Config) -> + ok = logger:set_primary_config(level, warning), + Config. + +end_per_suite(Config) -> + application:stop(ra), + Config. + +restart_ra(DataDir) -> + ok = application:set_env(ra, segment_max_entries, 128), + {ok, _} = ra:start_in(DataDir), + ok. + +init_per_group(_G, Config) -> + PrivDir = ?config(priv_dir, Config), + DataDir = filename:join([PrivDir, "data"]), + ok = restart_ra(DataDir), + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(TestCase, Config) -> + [{test_name, ra_lib:to_list(TestCase)} | Config]. + +end_per_testcase(_TestCase, Config) -> + ra_server_sup_sup:remove_all(default), + Config. + +%%% Tests + +maybe_join(Config) -> + % form a cluster + [N1, N2] = start_local_cluster(2, ?config(test_name, Config), add_machine()), + _ = issue_op(N1, 5), + validate_state_on_node(N1, 5), + % add maybe member + N3 = nth_server_name(Config, 3), + ok = start_and_maybe_join(N1, N3), + _ = issue_op(N3, 5), + validate_state_on_node(N3, 10), + % validate all are voters after catch-up + All = [N1, N2, N3], + lists:map(fun(O) -> + ?assertEqual(All, voters(O)), + ?assertEqual([], nonvoters(O)) + end, overviews(N1)), + terminate_cluster(All). + +%%% Helpers + +nth_server_name(Config, N) when is_integer(N) -> + {ra_server:name(?config(test_name, Config), erlang:integer_to_list(N)), node()}. + +add_machine() -> + {module, ?MODULE, #{}}. + +terminate_cluster(Nodes) -> + [ra:stop_server(?SYS, P) || P <- Nodes]. + +new_server(Name, Config) -> + ClusterName = ?config(test_name, Config), + ok = ra:start_server(default, ClusterName, Name, add_machine(), []), + ok. + +start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> + {ok, _, _} = ra:add_member(ServerRef, New), + ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), + ok. + +start_and_maybe_join({ClusterName, _} = ServerRef, {_, _} = New) -> + {ok, _, _} = ra:maybe_add_member(ServerRef, New), + ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), + ok. + +start_local_cluster(Num, ClusterName, Machine) -> + Nodes = [{ra_server:name(ClusterName, integer_to_list(N)), node()} + || N <- lists:seq(1, Num)], + + {ok, _, Failed} = ra:start_cluster(default, ClusterName, Machine, Nodes), + ?assert(length(Failed) == 0), + Nodes. + +remove_member(Name) -> + {ok, _IdxTerm, _Leader} = ra:remove_member(Name, Name), + ok. + +validate_state_on_node(Name, Expected) -> + {ok, Expected, _} = ra:consistent_query(Name, fun(X) -> X end). + +dump(T) -> + ct:pal("DUMP: ~p", [T]), + T. + +issue_op(Name, Op) -> + {ok, _, Leader} = ra:process_command(Name, Op, ?PROCESS_COMMAND_TIMEOUT), + Leader. + +overviews(Node) -> + {ok, Members, _From} = ra:members(Node), + [ ra:member_overview(P) || {_, _} = P <- Members ]. + +voters({ok, #{cluster := Peers}, _} = _Overview) -> + [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) =:= yes ]. + +nonvoters({ok, #{cluster := Peers}, _} = _Overview) -> + [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) /= yes ]. + + +%% machine impl +init(_) -> 0. +apply(_Meta, Num, State) -> + {Num + State, Num + State}.