From 9a14f89777e78a482f69efb83aa592755c8d099c Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Tue, 9 May 2023 11:25:28 -0700 Subject: [PATCH] promote voters after sync tick through replication rounds inject cluster_change effect --- src/ra.erl | 9 ++ src/ra.hrl | 6 +- src/ra_server.erl | 37 +++-- src/ra_voter.erl | 59 +++++-- test/ra_server_SUITE.erl | 53 ++++++- test/ra_voter_SUITE.erl | 325 ++++++++++++--------------------------- 6 files changed, 231 insertions(+), 258 deletions(-) diff --git a/src/ra.erl b/src/ra.erl index 35c4285a..a6817635 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -59,6 +59,8 @@ %% membership changes add_member/2, add_member/3, + maybe_add_member/2, + maybe_add_member/3, remove_member/2, remove_member/3, leave_and_terminate/3, @@ -579,6 +581,13 @@ add_member(ServerLoc, ServerId, Timeout) -> {'$ra_join', ServerId, after_log_append}, Timeout). +maybe_add_member(ServerLoc, ServerId) -> + maybe_add_member(ServerLoc, ServerId, ?DEFAULT_TIMEOUT). +maybe_add_member(ServerLoc, ServerId, Timeout) -> + ra_server_proc:command(ServerLoc, + {'$ra_maybe_join', ServerId, after_log_append}, + Timeout). + %% @doc Removes a server from the cluster's membership configuration. %% This function returns after appending a cluster membership change diff --git a/src/ra.hrl b/src/ra.hrl index bda1b171..abe2ee03 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -57,11 +57,11 @@ % the commit index last sent % used for evaluating pipeline status commit_index_sent := non_neg_integer(), + %% whether the peer is part of the consensus + voter := ra_voter(), %% indicates that a snapshot is being sent %% to the peer - status := ra_peer_status(), - %% whether the peer is part of the consensus - voter := ra_voter()}. + status := ra_peer_status()}. -type ra_cluster() :: #{ra_server_id() => ra_peer_state()}. diff --git a/src/ra_server.erl b/src/ra_server.erl index 9077f2f6..78218071 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -85,7 +85,6 @@ query_index := non_neg_integer(), queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), pending_consistent_queries := [consistent_query_ref()], - voter => 'maybe'(ra_voter()), commit_latency => 'maybe'(non_neg_integer()) }. @@ -397,11 +396,15 @@ handle_leader({PeerId, #append_entries_reply{term = Term, success = true, Peer = Peer0#{match_index => max(MI, LastIdx), next_index => max(NI, NextIdx)}, State1 = put_peer(PeerId, Peer, State0), - {State2, Effects0} = evaluate_quorum(State1, []), + Effects00 = ra_voter:maybe_promote(PeerId, State1, []), + + {State2, Effects0} = evaluate_quorum(State1, Effects00), {State, Effects1} = process_pending_consistent_queries(State2, Effects0), + Effects = [{next_event, info, pipeline_rpcs} | Effects1], + case State of #{cluster := #{Id := _}} -> % leader is in the cluster @@ -1106,14 +1109,12 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; -handle_follower(#pre_vote_rpc{}, #{voter := {no, _}} = State) -> - %% ignore elections, non-voter +handle_follower(#pre_vote_rpc{}, + #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> + ?WARN("~w: follower ignored request_vote_rpc, non voter: ~p", [LogId, Voter]), {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); -handle_follower(#request_vote_rpc{}, #{voter := {no, _}} = State) -> - %% ignore elections, non-voter - {follower, State, []}; handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, #{current_term := Term, voted_for := VotedFor, cfg := #cfg{log_id = LogId}} = State) @@ -1211,8 +1212,10 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; -handle_follower(election_timeout, #{voter := {no, _}} = State) -> - %% ignore elections, non-voter +handle_follower(election_timeout, + #{cfg := #cfg{log_id = LogId}, voter := {no, _} = Voter} = State) -> + ?WARN("~w: follower ignored election_timeout, non voter: ~p", + [LogId, Voter]), {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); @@ -1381,6 +1384,7 @@ overview(#{cfg := #cfg{effective_machine_module = MacMod} = Cfg, last_applied, cluster, leader_id, + voter, voted_for, cluster_change_permitted, cluster_index_term, @@ -2099,8 +2103,8 @@ new_peer() -> match_index => 0, commit_index_sent => 0, query_index => 0, - status => normal, - voter => yes}. + voter => yes, + status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). @@ -2336,7 +2340,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, - voter => ra_voter:peer_status(id(State0), NewCluster), + voter => ra_voter:status(NewCluster, id(State0)), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2488,7 +2492,7 @@ append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode}, #{JoiningNode := _} -> already_member(State); _ -> - Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})}, + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})}, append_cluster_change(Cluster, From, ReplyMode, State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, @@ -2520,7 +2524,6 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, case Cmd of {'$ra_cluster_change', _, Cluster, _} -> State#{cluster => Cluster, - voter => ra_voter:peer_status(id(State), Cluster), cluster_index_term => {Idx, Term}}; _ -> % revert back to previous cluster @@ -2532,7 +2535,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, - voter => ra_voter:peer_status(id(State), Cluster), + voter => ra_voter:status(Cluster, id(State)), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. @@ -2609,6 +2612,8 @@ query_indexes(#{cfg := #cfg{id = Id}, query_index := QueryIndex}) -> maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter := {no, _}}, Acc) -> + Acc; (_K, #{query_index := Idx}, Acc) -> [Idx | Acc] end, [QueryIndex], Cluster). @@ -2619,6 +2624,8 @@ match_indexes(#{cfg := #cfg{id = Id}, {LWIdx, _} = ra_log:last_written(Log), maps:fold(fun (PeerId, _, Acc) when PeerId == Id -> Acc; + (_K, #{voter := {no, _}}, Acc) -> + Acc; (_K, #{match_index := Idx}, Acc) -> [Idx | Acc] end, [LWIdx], Cluster). diff --git a/src/ra_voter.erl b/src/ra_voter.erl index 5c9fefd3..1c074f7c 100644 --- a/src/ra_voter.erl +++ b/src/ra_voter.erl @@ -7,22 +7,55 @@ -export([ new_nonvoter/1, status/1, - peer_status/2 + status/2, + maybe_promote/3 ]). +-define(DEFAULT_MAX_ROUNDS, 4). + new_nonvoter(State) -> - TargetIdx = maps:get(commit_index, State), - {no, #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}}. + Target = maps:get(commit_index, State), + {no, #{round => 0, target => Target , ts => os:system_time(millisecond)}}. -status(State) -> - case maps:get(voter, State) of - undefined -> - MyId = ra_server:id(State), - #{cluster := Cluster} = State, - peer_status(MyId, Cluster); - Voter -> Voter +maybe_promote(PeerID, + #{commit_index := CI, cluster := Cluster} = _State, + Effects) -> + #{PeerID := #{match_index := MI, voter := OldStatus} = _Peer} = Cluster, + case evaluate_voter(OldStatus, MI, CI) of + OldStatus -> + Effects; + Change -> + [{next_event, + {command, {'$ra_join', + #{ts => os:system_time(millisecond)}, + PeerID, + noreply}}} | + Effects] end. -peer_status(PeerId, Cluster) -> - Peer = maps:get(PeerId, Cluster, undefined), - maps:get(voter, Peer, yes). +evaluate_voter({no, #{round := Round, target := Target , ts := RoundStart}}, MI, CI) + when MI >= Target -> + AtenPollInt = application:get_env(aten, poll_interval, 1000), + Now = os:system_time(millisecond), + case (Now - RoundStart) =< AtenPollInt of + true -> + yes; + false when Round > ?DEFAULT_MAX_ROUNDS -> + {no, permanent}; + false -> + {no, #{round => Round+1, target => CI, ts => Now}} + end; +evaluate_voter(Permanent, _, _) -> + Permanent. + +status(#{cluster := Cluster} = State) -> + Id = ra_server:id(State), + status(Cluster, Id). + +status(Cluster, PeerId) -> + case maps:get(PeerId, Cluster, undefined) of + undefined -> + throw(not_a_cluster_member); + Peer -> + maps:get(voter, Peer, yes) + end. diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index f03b3990..0f53ebb6 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -41,6 +41,7 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, + leader_server_maybe_join, leader_server_leave, leader_is_removed, follower_cluster_change, @@ -1333,6 +1334,54 @@ leader_server_join(_Config) -> | _] = Effects, ok. +leader_server_maybe_join(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_maybe_join', meta(), + N4, await_consensus}}, State0), + % new member should join as non-voter + {no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0), + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + ok. + leader_server_leave(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), @@ -2593,8 +2642,8 @@ new_peer() -> match_index => 0, query_index => 0, commit_index_sent => 0, - status => normal, - voter => yes}. + voter => yes, + status => normal}. new_peer_with(Map) -> maps:merge(new_peer(), Map). diff --git a/test/ra_voter_SUITE.erl b/test/ra_voter_SUITE.erl index 7ba10e11..1c5ae2d2 100644 --- a/test/ra_voter_SUITE.erl +++ b/test/ra_voter_SUITE.erl @@ -8,264 +8,139 @@ -compile(export_all). -include("src/ra.hrl"). --include("src/ra_server.hrl"). +-include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-define(PROCESS_COMMAND_TIMEOUT, 6000). +-define(SYS, default). + all() -> [ - leader_server_maybe_join + {group, tests} ]. --define(MACFUN, fun (E, _) -> E end). --define(N1, {n1, node()}). --define(N2, {n2, node()}). --define(N3, {n3, node()}). --define(N4, {n4, node()}). --define(N5, {n5, node()}). - groups() -> - [ {tests, [], all()} ]. + [ + {tests, [], all_tests()} + ]. + +all_tests() -> + [ + maybe_join + ]. + +suite() -> [{timetrap, {seconds, 120}}]. init_per_suite(Config) -> - ok = logger:set_primary_config(level, all), + ok = logger:set_primary_config(level, warning), Config. end_per_suite(Config) -> + application:stop(ra), + Config. + +restart_ra(DataDir) -> + ok = application:set_env(ra, segment_max_entries, 128), + {ok, _} = ra:start_in(DataDir), + ok. + +init_per_group(_G, Config) -> + PrivDir = ?config(priv_dir, Config), + DataDir = filename:join([PrivDir, "data"]), + ok = restart_ra(DataDir), + Config. + +end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> - ok = logger:set_primary_config(level, all), - ok = setup_log(), - [{test_case, TestCase} | Config]. + [{test_name, ra_lib:to_list(TestCase)} | Config]. end_per_testcase(_TestCase, Config) -> - meck:unload(), + ra_server_sup_sup:remove_all(default), Config. -setup_log() -> - ok = meck:new(ra_log, []), - ok = meck:new(ra_snapshot, [passthrough]), - ok = meck:new(ra_machine, [passthrough]), - meck:expect(ra_log, init, fun(C) -> ra_log_memory:init(C) end), - meck:expect(ra_log_meta, store, fun (_, U, K, V) -> - put({U, K}, V), ok - end), - meck:expect(ra_log_meta, store_sync, fun (_, U, K, V) -> - put({U, K}, V), ok - end), - meck:expect(ra_log_meta, fetch, fun(_, U, K) -> - get({U, K}) - end), - meck:expect(ra_log_meta, fetch, fun (_, U, K, D) -> - ra_lib:default(get({U, K}), D) - end), - meck:expect(ra_snapshot, begin_accept, - fun(_Meta, SS) -> - {ok, SS} - end), - meck:expect(ra_snapshot, accept_chunk, - fun(_Data, _OutOf, _Flag, SS) -> - {ok, SS} - end), - meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end), - meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end), - meck:expect(ra_log, snapshot_state, fun (_) -> snap_state end), - meck:expect(ra_log, set_snapshot_state, fun (_, Log) -> Log end), - meck:expect(ra_log, install_snapshot, fun (_, _, Log) -> {Log, []} end), - meck:expect(ra_log, recover_snapshot, fun ra_log_memory:recover_snapshot/1), - meck:expect(ra_log, snapshot_index_term, fun ra_log_memory:snapshot_index_term/1), - meck:expect(ra_log, fold, fun ra_log_memory:fold/5), - meck:expect(ra_log, release_resources, fun ra_log_memory:release_resources/3), - meck:expect(ra_log, append_sync, - fun({Idx, Term, _} = E, L) -> - L1 = ra_log_memory:append(E, L), - {LX, _} = ra_log_memory:handle_event({written, {Idx, Idx, Term}}, L1), - LX - end), - meck:expect(ra_log, write_config, fun ra_log_memory:write_config/2), - meck:expect(ra_log, next_index, fun ra_log_memory:next_index/1), - meck:expect(ra_log, append, fun ra_log_memory:append/2), - meck:expect(ra_log, write, fun ra_log_memory:write/2), - meck:expect(ra_log, handle_event, fun ra_log_memory:handle_event/2), - meck:expect(ra_log, last_written, fun ra_log_memory:last_written/1), - meck:expect(ra_log, last_index_term, fun ra_log_memory:last_index_term/1), - meck:expect(ra_log, set_last_index, fun ra_log_memory:set_last_index/2), - meck:expect(ra_log, fetch_term, fun ra_log_memory:fetch_term/2), - meck:expect(ra_log, needs_cache_flush, fun (_) -> false end), - meck:expect(ra_log, exists, - fun ({Idx, Term}, L) -> - case ra_log_memory:fetch_term(Idx, L) of - {Term, Log} -> {true, Log}; - {_, Log} -> {false, Log} - end - end), - meck:expect(ra_log, update_release_cursor, - fun ra_log_memory:update_release_cursor/5), +%%% Tests + +maybe_join(Config) -> + % form a cluster + [N1, N2] = start_local_cluster(2, ?config(test_name, Config), add_machine()), + _ = issue_op(N1, 5), + validate_state_on_node(N1, 5), + % add maybe member + N3 = nth_server_name(Config, 3), + ok = start_and_maybe_join(N1, N3), + _ = issue_op(N3, 5), + validate_state_on_node(N3, 10), + % validate all are voters after catch-up + All = [N1, N2, N3], + lists:map(fun(O) -> + ?assertEqual(All, voters(O)), + ?assertEqual([], nonvoters(O)) + end, overviews(N1)), + terminate_cluster(All). + +%%% Helpers + +nth_server_name(Config, N) when is_integer(N) -> + {ra_server:name(?config(test_name, Config), erlang:integer_to_list(N)), node()}. + +add_machine() -> + {module, ?MODULE, #{}}. + +terminate_cluster(Nodes) -> + [ra:stop_server(?SYS, P) || P <- Nodes]. + +new_server(Name, Config) -> + ClusterName = ?config(test_name, Config), + ok = ra:start_server(default, ClusterName, Name, add_machine(), []), ok. -leader_server_maybe_join(_Config) -> - N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, - OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), - N2 => new_peer_with(#{next_index => 4, match_index => 3}), - N3 => new_peer_with(#{next_index => 4, match_index => 3})}, - State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, - % raft servers should switch to the new configuration after log append - % and further cluster changes should be disallowed - {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, - cluster_change_permitted := false} = _State1, Effects} = - ra_server:handle_leader({command, {'$ra_maybe_join', meta(), - N4, await_consensus}}, State0), - % new member should join as non-voter - {no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0), - [ - {send_rpc, N4, - #append_entries_rpc{entries = - [_, _, _, {4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, - N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, - await_consensus}}]}}, - {send_rpc, N3, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}}, - {send_rpc, N2, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, - target := Target, - ts := _}}}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}} - | _] = Effects, +start_and_join({ClusterName, _} = ServerRef, {_, _} = New) -> + {ok, _, _} = ra:add_member(ServerRef, New), + ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), ok. - -% %%% helpers - -ra_server_init(Conf) -> - ra_server:recover(ra_server:init(Conf)). - -init_servers(ServerIds, Machine) -> - lists:foldl(fun (ServerId, Acc) -> - Args = #{cluster_name => some_id, - id => ServerId, - uid => atom_to_binary(element(1, ServerId), utf8), - initial_members => ServerIds, - log_init_args => #{uid => <<>>}, - machine => Machine}, - Acc#{ServerId => {follower, ra_server_init(Args), []}} - end, #{}, ServerIds). - -list(L) when is_list(L) -> L; -list(L) -> [L]. - -entry(Idx, Term, Data) -> - {Idx, Term, {'$usr', meta(), Data, after_log_append}}. - -empty_state(NumServers, Id) -> - Servers = lists:foldl(fun(N, Acc) -> - [{list_to_atom("n" ++ integer_to_list(N)), node()} - | Acc] - end, [], lists:seq(1, NumServers)), - ra_server_init(#{cluster_name => someid, - id => {Id, node()}, - uid => atom_to_binary(Id, utf8), - initial_members => Servers, - log_init_args => #{uid => <<>>}, - machine => {simple, fun (E, _) -> E end, <<>>}}). % just keep last applied value - -base_state(NumServers, MacMod) -> - Log0 = lists:foldl(fun(E, L) -> - ra_log:append(E, L) - end, ra_log:init(#{system_config => ra_system:default_config(), - uid => <<>>}), - [{1, 1, usr(<<"hi1">>)}, - {2, 3, usr(<<"hi2">>)}, - {3, 5, usr(<<"hi3">>)}]), - {Log, _} = ra_log:handle_event({written, {1, 3, 5}}, Log0), - - Servers = lists:foldl(fun(N, Acc) -> - Name = {list_to_atom("n" ++ integer_to_list(N)), node()}, - Acc#{Name => - new_peer_with(#{next_index => 4, - match_index => 3})} - end, #{}, lists:seq(1, NumServers)), - mock_machine(MacMod), - Cfg = #cfg{id = ?N1, - uid = <<"n1">>, - log_id = <<"n1">>, - metrics_key = n1, - machine = {machine, MacMod, #{}}, % just keep last applied value - machine_version = 0, - machine_versions = [{0, 0}], - effective_machine_version = 0, - effective_machine_module = MacMod, - system_config = ra_system:default_config() - }, - #{cfg => Cfg, - leader_id => ?N1, - cluster => Servers, - cluster_index_term => {0, 0}, - cluster_change_permitted => true, - machine_state => <<"hi3">>, % last entry has been applied - current_term => 5, - commit_index => 3, - last_applied => 3, - log => Log, - query_index => 0, - queries_waiting_heartbeats => queue:new(), - pending_consistent_queries => []}. - -mock_machine(Mod) -> - meck:new(Mod, [non_strict]), - meck:expect(Mod, init, fun (_) -> init_state end), - %% just keep the latest command as the state - meck:expect(Mod, apply, fun (_, Cmd, _) -> {Cmd, ok} end), +start_and_maybe_join({ClusterName, _} = ServerRef, {_, _} = New) -> + {ok, _, _} = ra:maybe_add_member(ServerRef, New), + ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]), ok. -usr_cmd(Data) -> - {command, usr(Data)}. +start_local_cluster(Num, ClusterName, Machine) -> + Nodes = [{ra_server:name(ClusterName, integer_to_list(N)), node()} + || N <- lists:seq(1, Num)], + + {ok, _, Failed} = ra:start_cluster(default, ClusterName, Machine, Nodes), + ?assert(length(Failed) == 0), + Nodes. -usr(Data) -> - {'$usr', meta(), Data, after_log_append}. +remove_member(Name) -> + {ok, _IdxTerm, _Leader} = ra:remove_member(Name, Name), + ok. -meta() -> - #{from => {self(), make_ref()}, - ts => os:system_time(millisecond)}. +validate_state_on_node(Name, Expected) -> + {ok, Expected, _} = ra:consistent_query(Name, fun(X) -> X end). dump(T) -> ct:pal("DUMP: ~p", [T]), T. -new_peer() -> - #{next_index => 1, - match_index => 0, - query_index => 0, - commit_index_sent => 0, - status => normal, - voter => yes}. +issue_op(Name, Op) -> + {ok, _, Leader} = ra:process_command(Name, Op, ?PROCESS_COMMAND_TIMEOUT), + Leader. + +overviews(Node) -> + {ok, Members, _From} = ra:members(Node), + [ ra:member_overview(P) || {_, _} = P <- Members ]. -new_peer_with(Map) -> - maps:merge(new_peer(), Map). +voters({ok, #{cluster := Peers}, _} = _Overview) -> + [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) =:= yes ]. -snap_meta(Idx, Term) -> - snap_meta(Idx, Term, []). +nonvoters({ok, #{cluster := Peers}, _} = _Overview) -> + [ Id || {Id, Status} <- maps:to_list(Peers), maps:get(voter, Status) /= yes ]. -snap_meta(Idx, Term, Cluster) -> - #{index => Idx, - term => Term, - cluster => Cluster, - machine_version => 0}. +%% machine impl +init(_) -> 0. +apply(_Meta, Num, State) -> + {Num + State, Num + State}.