From a876d1d1d9b6046d890a4653f663368c7f91cc90 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 17 May 2023 14:35:28 -0700 Subject: [PATCH] back to mvp --- src/ra.erl | 9 --- src/ra.hrl | 11 ++- src/ra_server.erl | 104 ++++++++++++++-------------- src/ra_voter.erl | 61 ---------------- test/ra_server_SUITE.erl | 105 ++++++++++------------------ test/ra_voter_SUITE.erl | 146 --------------------------------------- 6 files changed, 96 insertions(+), 340 deletions(-) delete mode 100644 src/ra_voter.erl delete mode 100644 test/ra_voter_SUITE.erl diff --git a/src/ra.erl b/src/ra.erl index a6817635..35c4285a 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -59,8 +59,6 @@ %% membership changes add_member/2, add_member/3, - maybe_add_member/2, - maybe_add_member/3, remove_member/2, remove_member/3, leave_and_terminate/3, @@ -581,13 +579,6 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). -maybe_add_member(ServerLoc, ServerId) -> - maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT). -maybe_add_member(ServerLoc, ServerId, Timeout) -> - ra_server_proc:command(ServerLoc, - {'$ra_maybe_join', ServerId, after_log_append}, - Timeout). - %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change diff --git a/src/ra.hrl b/src/ra.hrl index abe2ee03..55f11a01 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -44,12 +44,9 @@ suspended | disconnected. --type ra_voter() :: yes | {no, ra_nonvoter_status()}. - --type ra_nonvoter_status() :: permanent | - #{round := non_neg_integer(), - target := ra_index(), - ts := integer()}. +% Leader doesn't count for majority purposes peers that +% have matching_index lower than the predicate. +-type ra_voter() :: {matching, ra_index()}. -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), @@ -57,7 +54,7 @@ % the commit index last sent % used for evaluating pipeline status commit_index_sent := non_neg_integer(), - %% whether the peer is part of the consensus + %% whether peer is part of the consensus voter := ra_voter(), %% indicates that a snapshot is being sent %% to the peer diff --git a/src/ra_server.erl b/src/ra_server.erl index 78218071..43cbb822 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -94,7 +94,6 @@ | recovered | stop | receive_snapshot. -type command_type() :: '$usr' | '$ra_join' | '$ra_leave' | - '$ra_maybe_join' | '$ra_cluster_change' | '$ra_cluster'. -type command_meta() :: #{from => from(), @@ -396,15 +395,11 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - Effects00 = ra_voter:maybe_promote(PeerId, State1, []), - - {State2, Effects0} = evaluate_quorum(State1, Effects00), + {State2, Effects0} = evaluate_quorum(State1, []), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), - Effects = [{next_event, info, pipeline_rpcs} | Effects1], - case State of #{cluster := #{Id := _}} -> % leader is in the cluster @@ -1109,10 +1104,6 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; -handle_follower(#pre_vote_rpc{}, - #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> - ?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]), - {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, @@ -1212,11 +1203,6 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; -handle_follower(election_timeout, - #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> - ?WARN("~w: follower ignored election_timeout, non voter: ~p", - [LogId, Voter]), - {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); handle_follower(try_become_leader, State) -> @@ -1384,7 +1370,6 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg, last_applied, cluster, leader_id, - voter, voted_for, cluster_change_permitted, cluster_index_term, @@ -2044,11 +2029,13 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand, last_log_term = LLTerm}, #{cfg := #cfg{machine_version = OurMacVer, effective_machine_version = EffMacVer}, - current_term := CurTerm} = State0) + current_term := CurTerm, + cluster := Cluster} = State0) when Term >= CurTerm -> State = update_term(Term, State0), LastIdxTerm = last_idx_term(State), - case is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of + case is_voter(Cand, Cluster) andalso + is_candidate_log_up_to_date(LLIdx, LLTerm, LastIdxTerm) of true when Version > ?RA_PROTO_VERSION-> ?DEBUG("~s: declining pre-vote for ~w for protocol version ~b", [log_id(State0), Cand, Version]), @@ -2073,8 +2060,10 @@ process_pre_vote(FsmState, #pre_vote_rpc{term = Term, candidate_id = Cand, false -> ?DEBUG("~s: declining pre-vote for ~w for term ~b," " candidate last log index term was: ~w~n" - "Last log entry idxterm seen was: ~w", - [log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm]), + "Last log entry idxterm seen was: ~w~n" + "Voter status was: ~w~n", + [log_id(State0), Cand, Term, {LLIdx, LLTerm}, LastIdxTerm, + get_voter_status(Cand, Cluster)]), case FsmState of follower -> {FsmState, State, [start_election_timeout]}; @@ -2103,16 +2092,15 @@ new_peer() -> match_index => 0, commit_index_sent => 0, query_index => 0, - voter => yes, status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). -already_member(State) -> - % already a member do nothing - % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}. +new_matching_peer(#{commit_index := CI} = _State) -> + new_peer_with(#{ + voter => {matching, CI} + }). peers(#{cfg := #cfg{id = Id}, cluster := Peers}) -> maps:remove(Id, Peers). @@ -2340,7 +2328,6 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, - voter => ra_voter:status(NewCluster, id(State0)), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2471,28 +2458,17 @@ add_reply(_, _, _, % From, Reply, Mode append_log_leader({CmdTag, _, _, _}, State = #{cluster_change_permitted := false}) when CmdTag == '$ra_join' orelse - CmdTag == '$ra_maybe_join' orelse CmdTag == '$ra_leave' -> {not_appended, cluster_change_not_permitted, State}; append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> - case OldCluster of - #{JoiningNode := #{voter := yes}} -> - already_member(State); - #{JoiningNode := #{voter := {no, _}} = Peer} -> - Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}}, - append_cluster_change(Cluster, From, ReplyMode, State); - _ -> - Cluster = OldCluster#{JoiningNode => new_peer()}, - append_cluster_change(Cluster, From, ReplyMode, State) - end; -append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode}, - State = #{cluster := OldCluster}) -> case OldCluster of #{JoiningNode := _} -> - already_member(State); + % already a member do nothing + % TODO: reply? If we don't reply the caller may block until timeout + {not_appended, already_member, State}; _ -> - Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})}, + Cluster = OldCluster#{JoiningNode => new_matching_peer(State)}, append_cluster_change(Cluster, From, ReplyMode, State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, @@ -2535,7 +2511,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, - voter => ra_voter:status(Cluster, id(State)), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2612,10 +2587,11 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{voter := {no, _}}, Acc) -> - Acc; - (_K, #{query_index := Idx}, Acc) -> - [Idx | Acc] + (_K, #{query_index := Idx} = Peer, Acc) -> + case is_voter(Peer) of + true -> [Idx | Acc]; + false -> Acc + end end, [QueryIndex], Cluster). match_indexes(#{cfg := #cfg{id = Id}, @@ -2624,12 +2600,40 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; - (_K, #{voter := {no, _}}, Acc) -> - Acc; - (_K, #{match_index := Idx}, Acc) -> - [Idx | Acc] + (_K, #{match_index := Idx} = Peer, Acc) -> + case is_voter(Peer) of + true -> [Idx | Acc]; + false -> Acc + end end, [LWIdx], Cluster). +get_voter_status(Id, Cluster) -> + case maps:get(Id, Cluster) of + Peer -> get_voter_status(Peer); + undefined -> undefined + end. + +get_voter_status(#{voter := Status}) -> + Status; +get_voter_status(_) -> + % Implicit 'yes' for initial cluster members, to differentiate from 'undefined' above. + yes. + + +is_voter(Id, Cluster) -> + case maps:get(Id, Cluster) of + Peer -> is_voter(Peer); + undefined -> false + end. + +is_voter(#{voter := {matching, Target}, match_index := MI}) + when MI >= Target -> + true; +is_voter(#{voter := {matching, _}}) -> + false; +is_voter(_Peer) -> + true. + -spec agreed_commit(list()) -> ra_index(). agreed_commit(Indexes) -> SortedIdxs = lists:sort(fun erlang:'>'/2, Indexes), diff --git a/src/ra_voter.erl b/src/ra_voter.erl deleted file mode 100644 index 1c074f7c..00000000 --- a/src/ra_voter.erl +++ /dev/null @@ -1,61 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% -%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. -%% --module(ra_voter). - --export([ - new_nonvoter/1, - status/1, - status/2, - maybe_promote/3 - ]). - --define(DEFAULT_MAX_ROUNDS, 4). - -new_nonvoter(State) -> - Target = maps:get(commit_index, State), - {no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}. - -maybe_promote(PeerID, - #{commit_index := CI, cluster := Cluster} = _State, - Effects) -> - #{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster, - case evaluate_voter(OldStatus, MI, CI) of - OldStatus -> - Effects; - Change -> - [{next_event, - {command, {'$ra_join', - #{ts => os:system_time(millisecond)}, - PeerID, - noreply}}} | - Effects] - end. - -evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI) - when MI >= Target -> - AtenPollInt = application:get_env(aten, poll_interval, 1000), - Now = os:system_time(millisecond), - case (Now - RoundStart) =< AtenPollInt of - true -> - yes; - false when Round > ?DEFAULT_MAX_ROUNDS -> - {no, permanent}; - false -> - {no, #{round => Round+1, target => CI, ts => Now}} - end; -evaluate_voter(Permanent, _, _) -> - Permanent. - -status(#{cluster := Cluster} = State) -> - Id = ra_server:id(State), - status(Cluster, Id). - -status(Cluster, PeerId) -> - case maps:get(PeerId, Cluster, undefined) of - undefined -> - throw(not_a_cluster_member); - Peer -> - maps:get(voter, Peer, yes) - end. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 0f53ebb6..3e7c9bd2 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -41,11 +41,11 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, - leader_server_maybe_join, leader_server_leave, leader_is_removed, follower_cluster_change, leader_applies_new_cluster, + leader_ignores_nonvoter, leader_appends_cluster_change_then_steps_before_applying_it, leader_receives_install_snapshot_rpc, follower_installs_snapshot, @@ -1299,7 +1299,7 @@ leader_server_join(_Config) -> OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), N2 => new_peer_with(#{next_index => 4, match_index => 3}), N3 => new_peer_with(#{next_index => 4, match_index => 3})}, - State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + #{commit_index := CI} = State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, % raft servers should switch to the new configuration after log append % and further cluster changes should be disallowed {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, @@ -1311,12 +1311,12 @@ leader_server_join(_Config) -> #append_entries_rpc{entries = [_, _, _, {4, 5, {'$ra_cluster_change', _, #{N1 := _, N2 := _, - N3 := _, N4 := #{voter := yes}}, + N3 := _, N4 := #{voter := {matching, CI}}}, await_consensus}}]}}, {send_rpc, N3, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {matching, CI}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1325,55 +1325,7 @@ leader_server_join(_Config) -> {send_rpc, N2, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}} - | _] = Effects, - ok. - -leader_server_maybe_join(_Config) -> - N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, - OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), - N2 => new_peer_with(#{next_index => 4, match_index => 3}), - N3 => new_peer_with(#{next_index => 4, match_index => 3})}, - State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, - % raft servers should switch to the new configuration after log append - % and further cluster changes should be disallowed - {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, - cluster_change_permitted := false} = _State1, Effects} = - ra_server:handle_leader({command, {'$ra_maybe_join', meta(), - N4, await_consensus}}, State0), - % new member should join as non-voter - {no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0), - [ - {send_rpc, N4, - #append_entries_rpc{entries = - [_, _, _, {4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, - N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, - await_consensus}}]}}, - {send_rpc, N3, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}}, - {send_rpc, N2, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {matching, CI}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1491,21 +1443,36 @@ leader_applies_new_cluster(_Config) -> AEReply = #append_entries_reply{term = 5, success = true, next_index = 5, last_index = 4, last_term = 5}, - % leader does not yet have consensus as will need at least 3 votes - {leader, State3 = #{commit_index := 3, - - cluster_change_permitted := false, - cluster_index_term := {4, 5}, - cluster := #{N2 := #{next_index := 5, - match_index := 4}}}, - _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), - - % leader has consensus - {leader, _State4 = #{commit_index := 4, + % new peer doesn't count until it reaches its matching target, leader needs only 2 votes + {leader, _State3 = #{commit_index := 4, cluster_change_permitted := true, - cluster := #{N3 := #{next_index := 5, + cluster := #{N2 := #{next_index := 5, match_index := 4}}}, - _Effects} = ra_server:handle_leader({N3, AEReply}, State3), + _} = ra_server:handle_leader({N2, AEReply}, State2#{votes => 1}), + ok. + +leader_ignores_nonvoter(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 2}), + N3 => new_peer_with(#{next_index => 4, match_index => 2}), + N4 => new_peer_with(#{next_index => 4, match_index => 2, voter => {matching, 3}})}, + + State = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + + AEReply = #append_entries_reply{term = 5, success = true, + next_index = 4, + last_index = 3, last_term = 5}, + + {leader, _ = #{commit_index := 3, + cluster := #{N2 := #{next_index := 4, + match_index := 3}}}, + _} = ra_server:handle_leader({N3, AEReply}, State#{votes => 1}), + + {leader, _ = #{commit_index := 2, + cluster := #{N3 := #{next_index := 4, + match_index := 3}}}, + _} = ra_server:handle_leader({N4, AEReply}, State#{votes => 1}), ok. leader_appends_cluster_change_then_steps_before_applying_it(_Config) -> @@ -2642,12 +2609,16 @@ new_peer() -> match_index => 0, query_index => 0, commit_index_sent => 0, - voter => yes, status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). +new_matching_peer(#{commit_index := CI} = _State) -> + new_peer_with(#{ + voter => {matching, CI} + }). + snap_meta(Idx, Term) -> snap_meta(Idx, Term, []). diff --git a/test/ra_voter_SUITE.erl b/test/ra_voter_SUITE.erl deleted file mode 100644 index 1c5ae2d2..00000000 --- a/test/ra_voter_SUITE.erl +++ /dev/null @@ -1,146 +0,0 @@ -%% This Source Code Form is subject to the terms of the Mozilla Public -%% -%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. -%% --module(ra_voter_SUITE). - --compile(nowarn_export_all). --compile(export_all). - --include("src/ra.hrl"). --include_lib("common_test/include/ct.hrl"). --include_lib("eunit/include/eunit.hrl"). - --define(PROCESS_COMMAND_TIMEOUT, 6000). --define(SYS, default). - -all() -> - [ - {group, tests} - ]. - -groups() -> - [ - {tests, [], all_tests()} - ]. - -all_tests() -> - [ - maybe_join - ]. - -suite() -> [{timetrap, {seconds, 120}}]. - -init_per_suite(Config) -> - ok = logger:set_primary_config(level, warning), - Config. - -end_per_suite(Config) -> - application:stop(ra), - Config. - -restart_ra(DataDir) -> - ok = application:set_env(ra, segment_max_entries, 128), - {ok, _} = ra:start_in(DataDir), - ok. - -init_per_group(_G, Config) -> - PrivDir = ?config(priv_dir, Config), - DataDir = filename:join([PrivDir, "data"]), - ok = restart_ra(DataDir), - Config. - -end_per_group(_, Config) -> - Config. - -init_per_testcase(TestCase, Config) -> - [{test_name, ra_lib:to_list(TestCase)} | Config]. - -end_per_testcase(_TestCase, Config) -> - ra_server_sup_sup:remove_all(default), - Config. - -%%% Tests - -maybe_join(Config) -> - % form a cluster - [N1, N2] = start_local_cluster(2, ?config(test_name, Config), add_machine()), - _ = issue_op(N1, 5), - validate_state_on_node(N1, 5), - % add maybe member - N3 = nth_server_name(Config, 3), - ok = start_and_maybe_join(N1, N3), - _ = issue_op(N3, 5), - validate_state_on_node(N3, 10), - % validate all are voters after catch-up - All = [N1, N2, N3], - lists:map(fun(O) -> - ?assertEqual(All, voters(O)), - ?assertEqual([], nonvoters(O)) - end, overviews(N1)), - terminate_cluster(All). - -%%% Helpers - -nth_server_name(Config, N) when is_integer(N) -> - {ra_server:name(?config(test_name, Config), erlang:integer_to_list(N)), node()}. - -add_machine() -> - {module, ?MODULE, #{}}. - -terminate_cluster(Nodes) -> - [ra:stop_server(?SYS, P) || P <- Nodes]. - -new_server(Name, Config) -> - ClusterName = ?config(test_name, Config), - ok = ra:start_server(default, ClusterName, Name, add_machine(), []), - ok. - -start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> - {ok, _, _} = ra:add_member(ServerRef, New), - ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), - ok. - -start_and_maybe_join({ClusterName, _} = ServerRef, {_, _} = New) -> - {ok, _, _} = ra:maybe_add_member(ServerRef, New), - ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), - ok. - -start_local_cluster(Num, ClusterName, Machine) -> - Nodes = [{ra_server:name(ClusterName, integer_to_list(N)), node()} - || N <- lists:seq(1, Num)], - - {ok, _, Failed} = ra:start_cluster(default, ClusterName, Machine, Nodes), - ?assert(length(Failed) == 0), - Nodes. - -remove_member(Name) -> - {ok, _IdxTerm, _Leader} = ra:remove_member(Name, Name), - ok. - -validate_state_on_node(Name, Expected) -> - {ok, Expected, _} = ra:consistent_query(Name, fun(X) -> X end). - -dump(T) -> - ct:pal("DUMP: ~p", [T]), - T. - -issue_op(Name, Op) -> - {ok, _, Leader} = ra:process_command(Name, Op, ?PROCESS_COMMAND_TIMEOUT), - Leader. - -overviews(Node) -> - {ok, Members, _From} = ra:members(Node), - [ ra:member_overview(P) || {_, _} = P <- Members ]. - -voters({ok, #{cluster := Peers}, _} = _Overview) -> - [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) =:= yes ]. - -nonvoters({ok, #{cluster := Peers}, _} = _Overview) -> - [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) /= yes ]. - - -%% machine impl -init(_) -> 0. -apply(_Meta, Num, State) -> - {Num + State, Num + State}.