Skip to content

Commit

Permalink
Merge pull request #413 from illotum/members-info
Browse files Browse the repository at this point in the history
add members_info state query and API wrapper
  • Loading branch information
michaelklishin authored Jan 25, 2024
2 parents 4f45efe + 7944515 commit ed80c46
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 0 deletions.
41 changes: 41 additions & 0 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
%% queries
members/1,
members/2,
members_info/1,
members_info/2,
initial_members/1,
initial_members/2,
local_query/2,
Expand Down Expand Up @@ -1037,6 +1039,45 @@ members({local, ServerId}, Timeout) ->
members(ServerId, Timeout) ->
ra_server_proc:state_query(ServerId, members, Timeout).

%% @doc Returns a list of cluster members and their Raft metrics
%%
%% Except if `{local, ServerId}' is passed, the query is sent to the specified
%% server which may redirect it to the leader if it is a follower. It may
%% timeout if there is currently no leader (i.e. an election is in progress).
%%
%% With `{local, ServerId}', the query is always handled by the specified
%% server. It means the returned list might be out-of-date compared to what the
%% leader would have returned.
%%
%% @param ServerId the Ra server(s) to send the query to
%% @end
-spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()}) ->
ra_server_proc:ra_leader_call_ret(ra_cluster()).
members_info(ServerId) ->
members_info(ServerId, ?DEFAULT_TIMEOUT).

%% @doc Returns a list of cluster members and their Raft metrics
%%
%% Except if `{local, ServerId}' is passed, the query is sent to the specified
%% server which may redirect it to the leader if it is a follower. It may
%% timeout if there is currently no leader (i.e. an election is in progress).
%%
%% With `{local, ServerId}', the query is always handled by the specified
%% server. It means the returned list might be out-of-date compared to what the
%% leader would have returned.
%%
%% @param ServerId the Ra server(s) to send the query to
%% @param Timeout the timeout to use
%% @end
-spec members_info(ra_server_id() | [ra_server_id()] | {local, ra_server_id()},
timeout()) ->
ra_server_proc:ra_leader_call_ret(ra_cluster()).
members_info({local, ServerId}, Timeout) ->
ra_server_proc:local_state_query(ServerId, members_info, Timeout);
members_info(ServerId, Timeout) ->
ra_server_proc:state_query(ServerId, members_info, Timeout).


%% @doc Returns a list of initial (seed) cluster members.
%%
%% This allows Ra-based systems with dynamic cluster membership
Expand Down
47 changes: 47 additions & 0 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ log_fold(ServerId, Fun, InitialState, Timeout) ->
overview |
voters |
members |
members_info |
initial_members |
machine, timeout()) ->
ra_leader_call_ret(term()).
Expand All @@ -196,6 +197,7 @@ state_query(ServerLoc, Spec, Timeout) ->
overview |
voters |
members |
members_info |
initial_members |
machine, timeout()) ->
ra_local_call_ret(term()).
Expand Down Expand Up @@ -1555,6 +1557,51 @@ do_state_query(voters, #{cluster := Cluster}) ->
Vs;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
leader_id := Self, query_index := QI, commit_index := CI,
membership := Membership}) ->
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, Peer = #{voter_status := VoterStatus}} ->
%% For completeness sake, preserve `target`
%% of once promoted leader.
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => VoterStatus#{membership => Membership}};
{Self, _} ->
#{next_index => CI+1,
match_index => CI,
query_index => QI,
status => normal,
voter_status => #{membership => Membership}};
{_, Peer = #{voter_status := _}} ->
Peer;
{_, Peer} ->
%% Initial cluster members have no voter_status.
Peer#{voter_status => #{membership => voter}}
end
end, Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
query_index := QI, commit_index := CI,
membership := Membership}) ->
%% Followers do not have sufficient information,
%% bail out and send whatever we have.
maps:map(fun(Id, Peer) ->
case {Id, Peer} of
{Self, #{voter_status := VS}} ->
#{match_index => CI,
query_index => QI,
voter_status => VS#{membership => Membership}};
{Self, _} ->
#{match_index => CI,
query_index => QI,
voter_status => #{membership => Membership}};
_ ->
#{}
end
end, Cluster);
do_state_query(initial_members, #{log := Log}) ->
case ra_log:read_config(Log) of
{ok, #{initial_members := InitialMembers}} ->
Expand Down
34 changes: 34 additions & 0 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ all_tests() ->
local_query_boom,
local_query_stale,
members,
members_info,
consistent_query,
consistent_query_after_restart,
consistent_query_minority,
Expand Down Expand Up @@ -537,6 +538,39 @@ members(Config) ->
{ok, Cluster, Leader} = ra:members(Leader),
terminate_cluster(Cluster).

members_info(Config) ->
Name = ?config(test_name, Config),
[A = {_, Host}, B] = InitNodes = start_local_cluster(2, Name, add_machine()),
{ok, _, Leader} = ra:process_command(A, 9),
[Follower] = InitNodes -- [Leader],
CSpec = #{id => {CName = ra_server:name(Name, "3"), node()},
uid => <<"3">>,
membership => promotable},
C = {CName, Host},
ok = ra:start_server(default, Name, CSpec, add_machine(), InitNodes),
{ok, _, _} = ra:add_member(Leader, CSpec),
{ok, 9, Leader} = ra:consistent_query(C, fun(S) -> S end),
?assertMatch({ok,
#{Follower := #{status := normal, query_index := QI,
next_index := NI, match_index := MI,
commit_index_sent := MI,
voter_status := #{membership := voter}},
Leader := #{status := normal, query_index := QI,
next_index := NI, match_index := MI,
voter_status := #{membership := voter}},
C := #{status := normal, query_index := QI,
next_index := NI, match_index := 0,
commit_index_sent := MI,
voter_status := #{membership := promotable, target := MI}}},
Leader}, ra:members_info(Follower)),
?assertMatch({ok,
#{A := #{},
B := #{},
C := #{query_index := 1, match_index := 3,
voter_status := #{membership := promotable}}},
C}, ra:members_info({local, C})),
terminate_cluster([A, B, C]).

consistent_query(Config) ->
[A, _, _] = Cluster = start_local_cluster(3, ?config(test_name, Config),
add_machine()),
Expand Down

0 comments on commit ed80c46

Please sign in to comment.