From b6c39d2b04c13a4327f43dfa1f5e1801bb7791be Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 24 Jan 2024 13:48:30 -0800 Subject: [PATCH 1/5] add members_info state query and API wrapper --- src/ra.erl | 41 +++++++++++++++++++++++++++++++++++++++++ src/ra_server_proc.erl | 13 +++++++++++++ test/ra_SUITE.erl | 30 ++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+) diff --git a/src/ra.erl b/src/ra.erl index d415978d..77587ef8 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -23,6 +23,8 @@ %% queries members/1, members/2, + members_info/1, + members_info/2, initial_members/1, initial_members/2, local_query/2, @@ -1037,6 +1039,45 @@ members({local, ServerId}, Timeout) -> members(ServerId, Timeout) -> ra_server_proc:state_query(ServerId, members, Timeout). +%% @doc Returns a list of cluster members +%% +%% Except if `{local, ServerId}' is passed, the query is sent to the specified +%% server which may redirect it to the leader if it is a follower. It may +%% timeout if there is currently no leader (i.e. an election is in progress). +%% +%% With `{local, ServerId}', the query is always handled by the specified +%% server. It means the returned list might be out-of-date compared to what the +%% leader would have returned. +%% +%% @param ServerId the Ra server(s) to send the query to +%% @end +-spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()}) -> + ra_server_proc:ra_leader_call_ret([ra_server_id()]). +members_info(ServerId) -> + members_info(ServerId, ?DEFAULT_TIMEOUT). + +%% @doc Returns a list of cluster members +%% +%% Except if `{local, ServerId}' is passed, the query is sent to the specified +%% server which may redirect it to the leader if it is a follower. It may +%% timeout if there is currently no leader (i.e. an election is in progress). +%% +%% With `{local, ServerId}', the query is always handled by the specified +%% server. It means the returned list might be out-of-date compared to what the +%% leader would have returned. +%% +%% @param ServerId the Ra server(s) to send the query to +%% @param Timeout the timeout to use +%% @end +-spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()}, + timeout()) -> + ra_server_proc:ra_leader_call_ret([ra_server_id()]). +members_info({local, ServerId}, Timeout) -> + ra_server_proc:local_state_query(ServerId, members_info, Timeout); +members_info(ServerId, Timeout) -> + ra_server_proc:state_query(ServerId, members_info, Timeout). + + %% @doc Returns a list of initial (seed) cluster members. %% %% This allows Ra-based systems with dynamic cluster membership diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index cbaa97b3..686744ad 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -185,6 +185,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) -> overview | voters | members | + members_info | initial_members | machine, timeout()) -> ra_leader_call_ret(term()). @@ -196,6 +197,7 @@ state_query(ServerLoc, Spec, Timeout) -> overview | voters | members | + members_info | initial_members | machine, timeout()) -> ra_local_call_ret(term()). @@ -1555,6 +1557,17 @@ do_state_query(voters, #{cluster := Cluster}) -> Vs; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); +do_state_query(members_info, + _State = #{cfg := #cfg{id = Id}, cluster := Cluster, leader_id := Id, + query_index := QI, commit_index := CI}) -> + %% We're leader, update indices. + Peer = maps:get(Id, Cluster, #{}), + Cluster#{Id => Peer#{next_index => CI+1, + match_index => CI, + query_index => QI, + commit_index_sent => CI}}; +do_state_query(members_info, _State) -> + {error, not_a_leader}; do_state_query(initial_members, #{log := Log}) -> case ra_log:read_config(Log) of {ok, #{initial_members := InitialMembers}} -> diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 8338b0ad..ac3a98bd 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -44,6 +44,7 @@ all_tests() -> local_query_boom, local_query_stale, members, + members_info, consistent_query, consistent_query_after_restart, consistent_query_minority, @@ -537,6 +538,35 @@ members(Config) -> {ok, Cluster, Leader} = ra:members(Leader), terminate_cluster(Cluster). +members_info(Config) -> + Name = ?config(test_name, Config), + [A = {_, Host}, B] = InitNodes = start_local_cluster(2, Name, add_machine()), + {ok, _, Leader} = ra:process_command(A, 9), + + CSpec = #{id => {CName = ra_server:name(Name, "3"), node()}, + uid => <<"3">>, + membership => promotable}, + C = {CName, Host}, + ok = ra:start_server(default, Name, CSpec, add_machine(), InitNodes), + {ok, _, _} = ra:add_member(Leader, CSpec), + {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), + + {ok, #{cluster := Cluster}, _} = ra_server_proc:state_query(Leader, all, 5000), + dump(Cluster), + Want = #{A => #{status => normal,query_index => 1, + next_index => 5,match_index => 4, + commit_index_sent => 4}, + B => #{status => normal,query_index => 1, + next_index => 5,match_index => 4, + commit_index_sent => 4}, + C => #{status => normal, query_index => 1, + next_index => 5, match_index => 4, + commit_index_sent => 4, + voter_status => #{membership => voter,uid => <<"3">>, target => 3}}}, + {ok, Have, Leader} = ra:members_info(Leader), + ?assertEqual(Want, Have), + terminate_cluster([A, B, C]). + consistent_query(Config) -> [A, _, _] = Cluster = start_local_cluster(3, ?config(test_name, Config), add_machine()), From 2cd5ec34e4eb869a96152f9aa2d4c921bc42d8eb Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 24 Jan 2024 16:04:59 -0800 Subject: [PATCH 2/5] return only valid values --- src/ra.erl | 8 ++++---- src/ra_server_proc.erl | 40 ++++++++++++++++++++++------------------ test/ra_SUITE.erl | 34 ++++++++++++++++++---------------- 3 files changed, 44 insertions(+), 38 deletions(-) diff --git a/src/ra.erl b/src/ra.erl index 77587ef8..a34f60a2 100644 --- a/src/ra.erl +++ b/src/ra.erl @@ -1039,7 +1039,7 @@ members({local, ServerId}, Timeout) -> members(ServerId, Timeout) -> ra_server_proc:state_query(ServerId, members, Timeout). -%% @doc Returns a list of cluster members +%% @doc Returns a list of cluster members and their Raft metrics %% %% Except if `{local, ServerId}' is passed, the query is sent to the specified %% server which may redirect it to the leader if it is a follower. It may @@ -1052,11 +1052,11 @@ members(ServerId, Timeout) -> %% @param ServerId the Ra server(s) to send the query to %% @end -spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()}) -> - ra_server_proc:ra_leader_call_ret([ra_server_id()]). + ra_server_proc:ra_leader_call_ret(ra_cluster()). members_info(ServerId) -> members_info(ServerId, ?DEFAULT_TIMEOUT). -%% @doc Returns a list of cluster members +%% @doc Returns a list of cluster members and their Raft metrics %% %% Except if `{local, ServerId}' is passed, the query is sent to the specified %% server which may redirect it to the leader if it is a follower. It may @@ -1071,7 +1071,7 @@ members_info(ServerId) -> %% @end -spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()}, timeout()) -> - ra_server_proc:ra_leader_call_ret([ra_server_id()]). + ra_server_proc:ra_leader_call_ret(ra_cluster()). members_info({local, ServerId}, Timeout) -> ra_server_proc:local_state_query(ServerId, members_info, Timeout); members_info(ServerId, Timeout) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 686744ad..e94e686b 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1505,13 +1505,7 @@ follower_leader_change(Old, #state{pending_commands = Pending, aten_register(Node) -> case node() of Node -> ok; - _ -> - case aten:register(Node) of - ignore -> - ok; - Res -> - Res - end + _ -> aten:register(Node) end. swap_monitor(MRef, L) -> @@ -1557,17 +1551,27 @@ do_state_query(voters, #{cluster := Cluster}) -> Vs; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); -do_state_query(members_info, - _State = #{cfg := #cfg{id = Id}, cluster := Cluster, leader_id := Id, - query_index := QI, commit_index := CI}) -> - %% We're leader, update indices. - Peer = maps:get(Id, Cluster, #{}), - Cluster#{Id => Peer#{next_index => CI+1, - match_index => CI, - query_index => QI, - commit_index_sent => CI}}; -do_state_query(members_info, _State) -> - {error, not_a_leader}; +do_state_query(members_info, #{cfg := #cfg{id = Self, uid = UId}, cluster := Cluster, + leader_id := Self, query_index := QI, commit_index := CI, + membership := Membership}) -> + Peer = maps:get(Self, Cluster, #{}), + Cluster#{Self => Peer#{next_index => CI+1, + match_index => CI, + query_index => QI, + voter_status => #{membership => Membership, uid => UId}}}; +do_state_query(members_info, #{cfg := #cfg{id = Self, uid = UId}, cluster := Cluster, + query_index := QI, commit_index := CI, + membership := Membership}) -> + %% Followers do not have sufficient information, + %% bail out and send whatever we have. + maps:map(fun (Id, _) -> + case Id of + Self -> #{match_index => CI, + query_index => QI, + voter_status => #{membership => Membership, uid => UId}}; + _ -> #{} + end + end, Cluster); do_state_query(initial_members, #{log := Log}) -> case ra_log:read_config(Log) of {ok, #{initial_members := InitialMembers}} -> diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index ac3a98bd..98e09c3d 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -542,7 +542,7 @@ members_info(Config) -> Name = ?config(test_name, Config), [A = {_, Host}, B] = InitNodes = start_local_cluster(2, Name, add_machine()), {ok, _, Leader} = ra:process_command(A, 9), - + [Follower] = InitNodes -- [Leader], CSpec = #{id => {CName = ra_server:name(Name, "3"), node()}, uid => <<"3">>, membership => promotable}, @@ -550,21 +550,23 @@ members_info(Config) -> ok = ra:start_server(default, Name, CSpec, add_machine(), InitNodes), {ok, _, _} = ra:add_member(Leader, CSpec), {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), - - {ok, #{cluster := Cluster}, _} = ra_server_proc:state_query(Leader, all, 5000), - dump(Cluster), - Want = #{A => #{status => normal,query_index => 1, - next_index => 5,match_index => 4, - commit_index_sent => 4}, - B => #{status => normal,query_index => 1, - next_index => 5,match_index => 4, - commit_index_sent => 4}, - C => #{status => normal, query_index => 1, - next_index => 5, match_index => 4, - commit_index_sent => 4, - voter_status => #{membership => voter,uid => <<"3">>, target => 3}}}, - {ok, Have, Leader} = ra:members_info(Leader), - ?assertEqual(Want, Have), + ?assertMatch({ok, + #{Follower := #{status := normal, query_index := QI, + next_index := NI, match_index := MI, + commit_index_sent := MI}, + Leader := #{status := normal, query_index := QI, + next_index := NI, match_index := MI}, + C := #{status := normal, query_index := QI, + next_index := NI, match_index := 0, + commit_index_sent := MI, + voter_status := #{membership := promotable, target := MI}}}, + Leader}, ra:members_info(Follower)), + ?assertMatch({ok, + #{A := #{}, + B := #{}, + C := #{query_index := 1, match_index := 3, + voter_status := #{membership := promotable, uid := _}}}, + C}, ra:members_info({local, C})), terminate_cluster([A, B, C]). consistent_query(Config) -> From b134a9d70835433487776236d7ab5b2823982c52 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 24 Jan 2024 16:57:49 -0800 Subject: [PATCH 3/5] extract membership from voter_status --- src/ra_server_proc.erl | 32 ++++++++++++++++++++++---------- test/ra_SUITE.erl | 12 +++++++----- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index e94e686b..8e8c67de 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1551,24 +1551,29 @@ do_state_query(voters, #{cluster := Cluster}) -> Vs; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); -do_state_query(members_info, #{cfg := #cfg{id = Self, uid = UId}, cluster := Cluster, +do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, leader_id := Self, query_index := QI, commit_index := CI, membership := Membership}) -> - Peer = maps:get(Self, Cluster, #{}), - Cluster#{Self => Peer#{next_index => CI+1, - match_index => CI, - query_index => QI, - voter_status => #{membership => Membership, uid => UId}}}; -do_state_query(members_info, #{cfg := #cfg{id = Self, uid = UId}, cluster := Cluster, + maps:map(fun(Id, Peer) -> + case Id of + Self -> #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + membership => Membership}; + _ -> extract_peer_membership(Peer) + end + end, Cluster); +do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, query_index := QI, commit_index := CI, membership := Membership}) -> %% Followers do not have sufficient information, %% bail out and send whatever we have. - maps:map(fun (Id, _) -> + maps:map(fun(Id, _) -> case Id of Self -> #{match_index => CI, - query_index => QI, - voter_status => #{membership => Membership, uid => UId}}; + query_index => QI, + membership => Membership}; _ -> #{} end end, Cluster); @@ -1582,6 +1587,13 @@ do_state_query(initial_members, #{log := Log}) -> do_state_query(Query, _State) -> {error, {unknown_query, Query}}. +extract_peer_membership(Peer = #{voter_status := #{membership := Membership}}) -> + Peer1 = maps:remove(voter_status, Peer), + Peer1#{membership => Membership}; +extract_peer_membership(Peer) -> + %% Initial members have no voter_status. + Peer#{membership => voter}. + config_defaults(ServerId) -> #{broadcast_time => ?DEFAULT_BROADCAST_TIME, tick_timeout => ?TICK_INTERVAL_MS, diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 98e09c3d..74767a3b 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -552,20 +552,22 @@ members_info(Config) -> {ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end), ?assertMatch({ok, #{Follower := #{status := normal, query_index := QI, - next_index := NI, match_index := MI, - commit_index_sent := MI}, + next_index := NI, match_index := MI, + commit_index_sent := MI, + membership := voter}, Leader := #{status := normal, query_index := QI, - next_index := NI, match_index := MI}, + next_index := NI, match_index := MI, + membership := voter}, C := #{status := normal, query_index := QI, next_index := NI, match_index := 0, commit_index_sent := MI, - voter_status := #{membership := promotable, target := MI}}}, + membership := promotable}}, Leader}, ra:members_info(Follower)), ?assertMatch({ok, #{A := #{}, B := #{}, C := #{query_index := 1, match_index := 3, - voter_status := #{membership := promotable, uid := _}}}, + membership := promotable}}, C}, ra:members_info({local, C})), terminate_cluster([A, B, C]). From 70fa97898b8f0892f6408df264ed41f2c8bb01b9 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Wed, 24 Jan 2024 19:04:39 -0800 Subject: [PATCH 4/5] do not extract membership, but ensure it exists --- src/ra_server_proc.erl | 52 ++++++++++++++++++++++++++---------------- test/ra_SUITE.erl | 8 +++---- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 8e8c67de..8e86fe81 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1555,13 +1555,26 @@ do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, leader_id := Self, query_index := QI, commit_index := CI, membership := Membership}) -> maps:map(fun(Id, Peer) -> - case Id of - Self -> #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - membership => Membership}; - _ -> extract_peer_membership(Peer) + case {Id, Peer} of + {Self, Peer = #{voter_status := _}} -> + %% For completeness sake, preserve `target` + %% of once promoted leader. + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => Peer#{membership => Membership}}; + {Self, _} -> + #{next_index => CI+1, + match_index => CI, + query_index => QI, + status => normal, + voter_status => #{membership => Membership}}; + {_, Peer = #{voter_status := _}} -> + Peer; + {_, Peer} -> + %% Initial cluster members have no voter_status. + Peer#{voter_status => #{membership => voter}} end end, Cluster); do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, @@ -1569,12 +1582,18 @@ do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, membership := Membership}) -> %% Followers do not have sufficient information, %% bail out and send whatever we have. - maps:map(fun(Id, _) -> - case Id of - Self -> #{match_index => CI, - query_index => QI, - membership => Membership}; - _ -> #{} + maps:map(fun(Id, Peer) -> + case {Id, Peer} of + {Self, #{voter_status := VS}} -> + #{match_index => CI, + query_index => QI, + voter_status => VS#{membership => Membership}}; + {Self, _} -> + #{match_index => CI, + query_index => QI, + voter_status => #{membership => Membership}}; + _ -> + #{} end end, Cluster); do_state_query(initial_members, #{log := Log}) -> @@ -1587,13 +1606,6 @@ do_state_query(initial_members, #{log := Log}) -> do_state_query(Query, _State) -> {error, {unknown_query, Query}}. -extract_peer_membership(Peer = #{voter_status := #{membership := Membership}}) -> - Peer1 = maps:remove(voter_status, Peer), - Peer1#{membership => Membership}; -extract_peer_membership(Peer) -> - %% Initial members have no voter_status. - Peer#{membership => voter}. - config_defaults(ServerId) -> #{broadcast_time => ?DEFAULT_BROADCAST_TIME, tick_timeout => ?TICK_INTERVAL_MS, diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 74767a3b..bf6eb4ca 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -554,20 +554,20 @@ members_info(Config) -> #{Follower := #{status := normal, query_index := QI, next_index := NI, match_index := MI, commit_index_sent := MI, - membership := voter}, + voter_status := #{membership := voter}}, Leader := #{status := normal, query_index := QI, next_index := NI, match_index := MI, - membership := voter}, + voter_status := #{membership := voter}}, C := #{status := normal, query_index := QI, next_index := NI, match_index := 0, commit_index_sent := MI, - membership := promotable}}, + voter_status := #{membership := promotable, target := MI}}}, Leader}, ra:members_info(Follower)), ?assertMatch({ok, #{A := #{}, B := #{}, C := #{query_index := 1, match_index := 3, - membership := promotable}}, + voter_status := #{membership := promotable}}}, C}, ra:members_info({local, C})), terminate_cluster([A, B, C]). From 7944515e047a5f5612a605e090879339c7f8ded0 Mon Sep 17 00:00:00 2001 From: Alex Valiushko Date: Thu, 25 Jan 2024 10:58:47 -0800 Subject: [PATCH 5/5] address feedback --- src/ra_server_proc.erl | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 8e86fe81..2f5189b8 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1505,7 +1505,13 @@ follower_leader_change(Old, #state{pending_commands = Pending, aten_register(Node) -> case node() of Node -> ok; - _ -> aten:register(Node) + _ -> + case aten:register(Node) of + ignore -> + ok; + Res -> + Res + end end. swap_monitor(MRef, L) -> @@ -1552,24 +1558,24 @@ do_state_query(voters, #{cluster := Cluster}) -> do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - leader_id := Self, query_index := QI, commit_index := CI, - membership := Membership}) -> + leader_id := Self, query_index := QI, commit_index := CI, + membership := Membership}) -> maps:map(fun(Id, Peer) -> case {Id, Peer} of - {Self, Peer = #{voter_status := _}} -> + {Self, Peer = #{voter_status := VoterStatus}} -> %% For completeness sake, preserve `target` %% of once promoted leader. #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => Peer#{membership => Membership}}; + match_index => CI, + query_index => QI, + status => normal, + voter_status => VoterStatus#{membership => Membership}}; {Self, _} -> #{next_index => CI+1, - match_index => CI, - query_index => QI, - status => normal, - voter_status => #{membership => Membership}}; + match_index => CI, + query_index => QI, + status => normal, + voter_status => #{membership => Membership}}; {_, Peer = #{voter_status := _}} -> Peer; {_, Peer} -> @@ -1578,8 +1584,8 @@ do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, end end, Cluster); do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, - query_index := QI, commit_index := CI, - membership := Membership}) -> + query_index := QI, commit_index := CI, + membership := Membership}) -> %% Followers do not have sufficient information, %% bail out and send whatever we have. maps:map(fun(Id, Peer) ->