Skip to content

Commit

Permalink
Newly added followers do not participate in quorum until they catch u…
Browse files Browse the repository at this point in the history
…p with the log

An opt-in ability of a cluster to ignore newly joined member until it catches
up with the log:

    New = #{id => Id, ini_non_voter => ra:new_nvid()},
    {ok, _, _} = ra:add_member(ServerRef, New),
    ok = ra:start_server(default, ClusterName, New, add_machine(), [ServerRef]),

Voter status is stored in the cluster map of the server state and is part of
every $ra_cluster_update. Additionally, nodes store their own status at the top
level for ease of matching. Nodes also store their own satus in ra_state ETS
table (breaking change), and present in overview.

On every #append_entries_reply leader may choose to promote non-voter by
issuing new `$ra_join` with desired voter status. Currently, only one promotion
condition is implemented `{nonvoter, #{target := ra_index()}`. Non-voter will
be promoted when it reaches the leaders log index at the time of joining.
  • Loading branch information
Alex Valiushko committed Sep 14, 2023
1 parent a67f2e6 commit 824fb46
Show file tree
Hide file tree
Showing 11 changed files with 735 additions and 66 deletions.
39 changes: 26 additions & 13 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
overview/1,
%% helpers
new_uid/1,
new_nvid/0,
%% rebalancing
transfer_leadership/2,
%% auxiliary commands
Expand Down Expand Up @@ -455,7 +456,7 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,

%% @doc Starts a new distributed ra cluster.
%% @param ClusterName the name of the cluster.
%% @param ServerId the ra_server_id() of the server
%% @param ServerId the ra_server_id() of the server, or a map with server id and settings.
%% @param Machine The {@link ra_machine:machine/0} configuration.
%% @param ServerIds a list of initial (seed) server configurations
%% @returns
Expand All @@ -470,19 +471,20 @@ start_cluster(System, [#{cluster_name := ClusterName} | _] = ServerConfigs,
%% forcefully deleted.
%% @see start_server/1
%% @end
-spec start_server(atom(), ra_cluster_name(), ra_server_id(),
-spec start_server(atom(), ra_cluster_name(), ra_server_id() | ra_new_server(),
ra_server:machine_conf(), [ra_server_id()]) ->
ok | {error, term()}.
start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds)
start_server(System, ClusterName, {_, _} = ServerId, Machine, ServerIds) ->
start_server(System, ClusterName, #{id => ServerId}, Machine, ServerIds);
start_server(System, ClusterName, #{id := {_, _}} = Conf0, Machine, ServerIds)
when is_atom(System) ->
UId = new_uid(ra_lib:to_binary(ClusterName)),
Conf = #{cluster_name => ClusterName,
id => ServerId,
uid => UId,
initial_members => ServerIds,
log_init_args => #{uid => UId},
machine => Machine},
start_server(System, Conf).
start_server(System, maps:merge(Conf0, Conf)).

%% @doc Starts a ra server in the default system
%% @param Conf a ra_server_config() configuration map.
Expand Down Expand Up @@ -558,9 +560,10 @@ delete_cluster(ServerIds, Timeout) ->
%% affect said cluster's availability characteristics (by increasing quorum node count).
%%
%% @param ServerLoc the ra server or servers to try to send the command to
%% @param ServerId the ra server id of the new server.
%% @param ServerId the ra server id of the new server, or a map with server id and settings.
%% @end
-spec add_member(ra_server_id() | [ra_server_id()], ra_server_id()) ->
-spec add_member(ra_server_id() | [ra_server_id()],
ra_server_id() | ra_new_server()) ->
ra_cmd_ret() |
{error, already_member} |
{error, cluster_change_not_permitted}.
Expand All @@ -571,7 +574,8 @@ add_member(ServerLoc, ServerId) ->
%% @see add_member/2
%% @end
-spec add_member(ra_server_id() | [ra_server_id()],
ra_server_id(), timeout()) ->
ra_server_id() | ra_new_server(),
timeout()) ->
ra_cmd_ret() |
{error, already_member} |
{error, cluster_change_not_permitted}.
Expand All @@ -580,7 +584,6 @@ add_member(ServerLoc, ServerId, Timeout) ->
{'$ra_join', ServerId, after_log_append},
Timeout).


%% @doc Removes a server from the cluster's membership configuration.
%% This function returns after appending a cluster membership change
%% command to the log.
Expand Down Expand Up @@ -716,6 +719,13 @@ new_uid(Source) when is_binary(Source) ->
Prefix = ra_lib:derive_safe_string(Source, 6),
ra_lib:make_uid(string:uppercase(Prefix)).

%% @doc generates a random uid using timestamp for the first
%% 6 characters.
%% @end
new_nvid() ->
Millis = erlang:system_time(millisecond),
Prefix = base64:encode(<<Millis:32/little>>),
new_uid(Prefix).

%% @doc Returns a map of overview data of the default Ra system on the current Erlang
%% node.
Expand Down Expand Up @@ -1132,13 +1142,16 @@ key_metrics({Name, N} = ServerId) when N == node() ->
end,
case whereis(Name) of
undefined ->
Counters#{state => noproc};
Counters#{state => noproc,
voter_status => noproc};
_ ->
case ets:lookup(ra_state, Name) of
[] ->
Counters#{state => unknown};
[{_, State}] ->
Counters#{state => State}
Counters#{state => unknown,
voter_status => unknown};
[{_, State, Voter}] ->
Counters#{state => State,
voter_status => Voter}
end
end;
key_metrics({_, N} = ServerId) ->
Expand Down
20 changes: 20 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,42 @@
%% used for on disk resources and local name to pid mapping
-type ra_uid() :: binary().

%% Transient ID that uniquely identifies any new non-voter.
-type ra_nvid() :: binary().

%% Identifies a Ra server (node) in a Ra cluster.
%%
%% Ra servers need to be registered stable names (names that are reachable
%% after node restart). Pids are not stable in this sense.
-type ra_server_id() :: {Name :: atom(), Node :: node()}.

%% Specifies server configuration for a new cluster member.
%% Both `ra:add_member` and `ra:start_server` must be called with the same value.
-type ra_new_server() :: #{id := ra_server_id(),

%% If set, server will start as non-voter until later promoted by the
%% leader.
init_non_voter => ra_nvid()}.

-type ra_peer_status() :: normal |
{sending_snapshot, pid()} |
suspended |
disconnected.

-type ra_voter_status() :: {voter, ra_voter_state()} |
{nonvoter, ra_voter_state()}.

-type ra_voter_state() :: #{nvid => ra_nvid(),
target => ra_index()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
query_index := non_neg_integer(),
% the commit index last sent
% used for evaluating pipeline status
commit_index_sent := non_neg_integer(),
%% whether the peer is part of the consensus
voter_status := ra_voter_status(),
%% indicates that a snapshot is being sent
%% to the peer
status := ra_peer_status()}.
Expand Down Expand Up @@ -139,6 +158,7 @@
-type snapshot_meta() :: #{index := ra_index(),
term := ra_term(),
cluster := ra_cluster_servers(),
cluster_state => ra_cluster(), %% TODO replace `cluster`
machine_version := ra_machine:version()}.

-record(install_snapshot_rpc,
Expand Down
10 changes: 8 additions & 2 deletions src/ra_directory.erl
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,20 @@ overview(System) when is_atom(System) ->
#{directory := Tbl,
directory_rev := _TblRev} = get_names(System),
Dir = ets:tab2list(Tbl),
States = maps:from_list(ets:tab2list(ra_state)),
Rows = lists:map(fun({K, S, V}) ->
{K, {S, V}}
end,
ets:tab2list(ra_state)),
States = maps:from_list(Rows),
Snaps = maps:from_list(ets:tab2list(ra_log_snapshot_state)),
lists:foldl(fun ({UId, Pid, Parent, ServerName, ClusterName}, Acc) ->
{S, V} = maps:get(ServerName, States, {undefined, undefined}),
Acc#{ServerName =>
#{uid => UId,
pid => Pid,
parent => Parent,
state => maps:get(ServerName, States, undefined),
state => S,
voter_status => V,
cluster_name => ClusterName,
snapshot_state => maps:get(UId, Snaps,
undefined)}}
Expand Down
1 change: 1 addition & 0 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ update_release_cursor0(Idx, Cluster, MacVersion, MacState,
end,
Meta = #{index => Idx,
cluster => ClusterServerIds,
cluster_state => Cluster,
machine_version => MacVersion},
% The release cursor index is the last entry _not_ contributing
% to the current state. I.e. the last entry that can be discarded.
Expand Down
Loading

0 comments on commit 824fb46

Please sign in to comment.