From 453965a232c4a4cfb4973aef99ba1d8d1ef1b95f Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Mon, 8 May 2023 11:59:33 -0700 Subject: [PATCH] extract code into new module follower ignores pre_vote and request_vote --- src/ra.hrl | 22 +--- src/ra_server.erl | 22 +++- src/ra_voter.erl | 28 ++++ test/ra_server_SUITE.erl | 46 ------- test/ra_voter_SUITE.erl | 271 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 320 insertions(+), 69 deletions(-) create mode 100644 src/ra_voter.erl create mode 100644 test/ra_voter_SUITE.erl diff --git a/src/ra.hrl b/src/ra.hrl index 68beffba..bda1b171 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -20,8 +20,6 @@ -type ra_index() :: non_neg_integer(). %% Section 5.3. -type ra_term() :: non_neg_integer(). -%% Section 4.2.1 --type ra_replication_round() :: non_neg_integer(). %% tuple form of index and term -type ra_idxterm() :: {ra_index(), ra_term()}. @@ -46,20 +44,12 @@ suspended | disconnected. -%% A peer can be one of: -%% -%% - Voter, standard quorum member. -%% - Nonvoter, node does not participate in elections or consensus voting. -%% - Staging, node is a temporary nonvoter, and will be automatically promoted -%% if it proves to be fast enough to stay up to dat with teh leader. --type ra_voter() :: yes | no | {maybe, staging_status()}. - -%% For staging nodes we measure current round, target index and the timestamp of its start. -%% If the node reaches target index and the ∂T is less than the election timeout, the node is -%% considered eligible to become a voter. --type staging_status() :: #{round := ra_replication_round(), - target := ra_index(), - ts := integer()}. +-type ra_voter() :: yes | {no, ra_nonvoter_status()}. + +-type ra_nonvoter_status() :: permanent | + #{round := non_neg_integer(), + target := ra_index(), + ts := integer()}. -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), diff --git a/src/ra_server.erl b/src/ra_server.erl index 5b233b24..9077f2f6 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -85,6 +85,7 @@ query_index := non_neg_integer(), queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), pending_consistent_queries := [consistent_query_ref()], + voter => 'maybe'(ra_voter()), commit_latency => 'maybe'(non_neg_integer()) }. @@ -1105,8 +1106,14 @@ handle_follower({ra_log_event, Evt}, State = #{log := Log0}) -> % simply forward all other events to ra_log {Log, Effects} = ra_log:handle_event(Evt, Log0), {follower, State#{log => Log}, Effects}; +handle_follower(#pre_vote_rpc{}, #{voter := {no, _}} = State) -> + %% ignore elections, non-voter + {follower, State, []}; handle_follower(#pre_vote_rpc{} = PreVote, State) -> process_pre_vote(follower, PreVote, State); +handle_follower(#request_vote_rpc{}, #{voter := {no, _}} = State) -> + %% ignore elections, non-voter + {follower, State, []}; handle_follower(#request_vote_rpc{candidate_id = Cand, term = Term}, #{current_term := Term, voted_for := VotedFor, cfg := #cfg{log_id = LogId}} = State) @@ -1204,6 +1211,9 @@ handle_follower(#append_entries_reply{}, State) -> %% handle to avoid logging as unhandled %% could receive a lot of these shortly after standing down as leader {follower, State, []}; +handle_follower(election_timeout, #{voter := {no, _}} = State) -> + %% ignore elections, non-voter + {follower, State, []}; handle_follower(election_timeout, State) -> call_for_election(pre_vote, State); handle_follower(try_become_leader, State) -> @@ -2095,10 +2105,6 @@ new_peer() -> new_peer_with(Map) -> maps:merge(new_peer(), Map). -new_staging_status(State) -> - TargetIdx = maps:get(commit_index, State), - #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}. - already_member(State) -> % already a member do nothing % TODO: reply? If we don't reply the caller may block until timeout @@ -2330,6 +2336,7 @@ apply_with({Idx, Term, {'$ra_cluster_change', CmdMeta, NewCluster, ReplyType}}, [log_id(State0), maps:keys(NewCluster)]), %% we are recovering and should apply the cluster change State0#{cluster => NewCluster, + voter => ra_voter:peer_status(id(State0), NewCluster), cluster_change_permitted => true, cluster_index_term => {Idx, Term}}; _ -> @@ -2468,7 +2475,7 @@ append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, case OldCluster of #{JoiningNode := #{voter := yes}} -> already_member(State); - #{JoiningNode := #{voter := {maybe, _}} = Peer} -> + #{JoiningNode := #{voter := {no, _}} = Peer} -> Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}}, append_cluster_change(Cluster, From, ReplyMode, State); _ -> @@ -2481,8 +2488,7 @@ append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode}, #{JoiningNode := _} -> already_member(State); _ -> - Round0 = new_staging_status(State), - Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => {maybe, Round0}})}, + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => ra_voter:new_nonvoter(State)})}, append_cluster_change(Cluster, From, ReplyMode, State) end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, @@ -2514,6 +2520,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, case Cmd of {'$ra_cluster_change', _, Cluster, _} -> State#{cluster => Cluster, + voter => ra_voter:peer_status(id(State), Cluster), cluster_index_term => {Idx, Term}}; _ -> % revert back to previous cluster @@ -2525,6 +2532,7 @@ pre_append_log_follower({Idx, Term, Cmd} = Entry, pre_append_log_follower({Idx, Term, {'$ra_cluster_change', _, Cluster, _}}, State) -> State#{cluster => Cluster, + voter => ra_voter:peer_status(id(State), Cluster), cluster_index_term => {Idx, Term}}; pre_append_log_follower(_, State) -> State. diff --git a/src/ra_voter.erl b/src/ra_voter.erl new file mode 100644 index 00000000..5c9fefd3 --- /dev/null +++ b/src/ra_voter.erl @@ -0,0 +1,28 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% +%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(ra_voter). + +-export([ + new_nonvoter/1, + status/1, + peer_status/2 + ]). + +new_nonvoter(State) -> + TargetIdx = maps:get(commit_index, State), + {no, #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}}. + +status(State) -> + case maps:get(voter, State) of + undefined -> + MyId = ra_server:id(State), + #{cluster := Cluster} = State, + peer_status(MyId, Cluster); + Voter -> Voter + end. + +peer_status(PeerId, Cluster) -> + Peer = maps:get(PeerId, Cluster, undefined), + maps:get(voter, Peer, yes). diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index c55d6c84..f03b3990 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -41,7 +41,6 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, - leader_server_maybe_join, leader_server_leave, leader_is_removed, follower_cluster_change, @@ -1334,47 +1333,6 @@ leader_server_join(_Config) -> | _] = Effects, ok. -leader_server_maybe_join(_Config) -> - N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, - OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), - N2 => new_peer_with(#{next_index => 4, match_index => 3}), - N3 => new_peer_with(#{next_index => 4, match_index => 3})}, - State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, - Round0 = new_staging_status(State0), - % raft servers should switch to the new configuration after log append - % and further cluster changes should be disallowed - {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, - cluster_change_permitted := false} = _State1, Effects} = - ra_server:handle_leader({command, {'$ra_maybe_join', meta(), - N4, await_consensus}}, State0), - [ - {send_rpc, N4, - #append_entries_rpc{entries = - [_, _, _, {4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, - N3 := _, N4 := #{voter := {maybe, Round0}}}, - await_consensus}}]}}, - {send_rpc, N3, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}}, - {send_rpc, N2, - #append_entries_rpc{entries = - [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}}, - await_consensus}}], - term = 5, leader_id = N1, - prev_log_index = 3, - prev_log_term = 5, - leader_commit = 3}} - | _] = Effects, - ok. - leader_server_leave(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), @@ -2641,10 +2599,6 @@ new_peer() -> new_peer_with(Map) -> maps:merge(new_peer(), Map). -new_staging_status(State) -> - TargetIdx = maps:get(commit_index, State), - #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}. - snap_meta(Idx, Term) -> snap_meta(Idx, Term, []). diff --git a/test/ra_voter_SUITE.erl b/test/ra_voter_SUITE.erl new file mode 100644 index 00000000..7ba10e11 --- /dev/null +++ b/test/ra_voter_SUITE.erl @@ -0,0 +1,271 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% +%% Copyright (c) 2017-2022 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(ra_voter_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("src/ra.hrl"). +-include("src/ra_server.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + leader_server_maybe_join + ]. + +-define(MACFUN, fun (E, _) -> E end). +-define(N1, {n1, node()}). +-define(N2, {n2, node()}). +-define(N3, {n3, node()}). +-define(N4, {n4, node()}). +-define(N5, {n5, node()}). + +groups() -> + [ {tests, [], all()} ]. + +init_per_suite(Config) -> + ok = logger:set_primary_config(level, all), + Config. + +end_per_suite(Config) -> + Config. + +init_per_testcase(TestCase, Config) -> + ok = logger:set_primary_config(level, all), + ok = setup_log(), + [{test_case, TestCase} | Config]. + +end_per_testcase(_TestCase, Config) -> + meck:unload(), + Config. + +setup_log() -> + ok = meck:new(ra_log, []), + ok = meck:new(ra_snapshot, [passthrough]), + ok = meck:new(ra_machine, [passthrough]), + meck:expect(ra_log, init, fun(C) -> ra_log_memory:init(C) end), + meck:expect(ra_log_meta, store, fun (_, U, K, V) -> + put({U, K}, V), ok + end), + meck:expect(ra_log_meta, store_sync, fun (_, U, K, V) -> + put({U, K}, V), ok + end), + meck:expect(ra_log_meta, fetch, fun(_, U, K) -> + get({U, K}) + end), + meck:expect(ra_log_meta, fetch, fun (_, U, K, D) -> + ra_lib:default(get({U, K}), D) + end), + meck:expect(ra_snapshot, begin_accept, + fun(_Meta, SS) -> + {ok, SS} + end), + meck:expect(ra_snapshot, accept_chunk, + fun(_Data, _OutOf, _Flag, SS) -> + {ok, SS} + end), + meck:expect(ra_snapshot, abort_accept, fun(SS) -> SS end), + meck:expect(ra_snapshot, accepting, fun(_SS) -> undefined end), + meck:expect(ra_log, snapshot_state, fun (_) -> snap_state end), + meck:expect(ra_log, set_snapshot_state, fun (_, Log) -> Log end), + meck:expect(ra_log, install_snapshot, fun (_, _, Log) -> {Log, []} end), + meck:expect(ra_log, recover_snapshot, fun ra_log_memory:recover_snapshot/1), + meck:expect(ra_log, snapshot_index_term, fun ra_log_memory:snapshot_index_term/1), + meck:expect(ra_log, fold, fun ra_log_memory:fold/5), + meck:expect(ra_log, release_resources, fun ra_log_memory:release_resources/3), + meck:expect(ra_log, append_sync, + fun({Idx, Term, _} = E, L) -> + L1 = ra_log_memory:append(E, L), + {LX, _} = ra_log_memory:handle_event({written, {Idx, Idx, Term}}, L1), + LX + end), + meck:expect(ra_log, write_config, fun ra_log_memory:write_config/2), + meck:expect(ra_log, next_index, fun ra_log_memory:next_index/1), + meck:expect(ra_log, append, fun ra_log_memory:append/2), + meck:expect(ra_log, write, fun ra_log_memory:write/2), + meck:expect(ra_log, handle_event, fun ra_log_memory:handle_event/2), + meck:expect(ra_log, last_written, fun ra_log_memory:last_written/1), + meck:expect(ra_log, last_index_term, fun ra_log_memory:last_index_term/1), + meck:expect(ra_log, set_last_index, fun ra_log_memory:set_last_index/2), + meck:expect(ra_log, fetch_term, fun ra_log_memory:fetch_term/2), + meck:expect(ra_log, needs_cache_flush, fun (_) -> false end), + meck:expect(ra_log, exists, + fun ({Idx, Term}, L) -> + case ra_log_memory:fetch_term(Idx, L) of + {Term, Log} -> {true, Log}; + {_, Log} -> {false, Log} + end + end), + meck:expect(ra_log, update_release_cursor, + fun ra_log_memory:update_release_cursor/5), + ok. + +leader_server_maybe_join(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_maybe_join', meta(), + N4, await_consensus}}, State0), + % new member should join as non-voter + {no, #{round := Round, target := Target}} = ra_voter:new_nonvoter(State0), + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {no, #{round := Round, + target := Target, + ts := _}}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + ok. + + +% %%% helpers + +ra_server_init(Conf) -> + ra_server:recover(ra_server:init(Conf)). + +init_servers(ServerIds, Machine) -> + lists:foldl(fun (ServerId, Acc) -> + Args = #{cluster_name => some_id, + id => ServerId, + uid => atom_to_binary(element(1, ServerId), utf8), + initial_members => ServerIds, + log_init_args => #{uid => <<>>}, + machine => Machine}, + Acc#{ServerId => {follower, ra_server_init(Args), []}} + end, #{}, ServerIds). + +list(L) when is_list(L) -> L; +list(L) -> [L]. + +entry(Idx, Term, Data) -> + {Idx, Term, {'$usr', meta(), Data, after_log_append}}. + +empty_state(NumServers, Id) -> + Servers = lists:foldl(fun(N, Acc) -> + [{list_to_atom("n" ++ integer_to_list(N)), node()} + | Acc] + end, [], lists:seq(1, NumServers)), + ra_server_init(#{cluster_name => someid, + id => {Id, node()}, + uid => atom_to_binary(Id, utf8), + initial_members => Servers, + log_init_args => #{uid => <<>>}, + machine => {simple, fun (E, _) -> E end, <<>>}}). % just keep last applied value + +base_state(NumServers, MacMod) -> + Log0 = lists:foldl(fun(E, L) -> + ra_log:append(E, L) + end, ra_log:init(#{system_config => ra_system:default_config(), + uid => <<>>}), + [{1, 1, usr(<<"hi1">>)}, + {2, 3, usr(<<"hi2">>)}, + {3, 5, usr(<<"hi3">>)}]), + {Log, _} = ra_log:handle_event({written, {1, 3, 5}}, Log0), + + Servers = lists:foldl(fun(N, Acc) -> + Name = {list_to_atom("n" ++ integer_to_list(N)), node()}, + Acc#{Name => + new_peer_with(#{next_index => 4, + match_index => 3})} + end, #{}, lists:seq(1, NumServers)), + mock_machine(MacMod), + Cfg = #cfg{id = ?N1, + uid = <<"n1">>, + log_id = <<"n1">>, + metrics_key = n1, + machine = {machine, MacMod, #{}}, % just keep last applied value + machine_version = 0, + machine_versions = [{0, 0}], + effective_machine_version = 0, + effective_machine_module = MacMod, + system_config = ra_system:default_config() + }, + #{cfg => Cfg, + leader_id => ?N1, + cluster => Servers, + cluster_index_term => {0, 0}, + cluster_change_permitted => true, + machine_state => <<"hi3">>, % last entry has been applied + current_term => 5, + commit_index => 3, + last_applied => 3, + log => Log, + query_index => 0, + queries_waiting_heartbeats => queue:new(), + pending_consistent_queries => []}. + +mock_machine(Mod) -> + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (_) -> init_state end), + %% just keep the latest command as the state + meck:expect(Mod, apply, fun (_, Cmd, _) -> {Cmd, ok} end), + ok. + +usr_cmd(Data) -> + {command, usr(Data)}. + +usr(Data) -> + {'$usr', meta(), Data, after_log_append}. + +meta() -> + #{from => {self(), make_ref()}, + ts => os:system_time(millisecond)}. + +dump(T) -> + ct:pal("DUMP: ~p", [T]), + T. + +new_peer() -> + #{next_index => 1, + match_index => 0, + query_index => 0, + commit_index_sent => 0, + status => normal, + voter => yes}. + +new_peer_with(Map) -> + maps:merge(new_peer(), Map). + +snap_meta(Idx, Term) -> + snap_meta(Idx, Term, []). + +snap_meta(Idx, Term, Cluster) -> + #{index => Idx, + term => Term, + cluster => Cluster, + machine_version => 0}. +