Skip to content

Commit

Permalink
$ra_maybe_join adds peer in catch-up mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed May 14, 2023
1 parent d1339ff commit ee12cc5
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 9 deletions.
6 changes: 4 additions & 2 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@
%% - Nonvoter, node does not participate in elections or consensus voting.
%% - Staging, node is a temporary nonvoter, and will be automatically promoted
%% if it proves to be fast enough to stay up to dat with teh leader.
-type ra_voter() :: yes | no | {staging, staging_status()}.
-type ra_voter() :: yes | no | {maybe, staging_status()}.

%% For staging nodes we measure current round, target index and the timestamp of its start.
%% If the node reaches target index and the ∂T is less than the election timeout, the node is
%% considered eligible to become a voter.
-type staging_status() :: {ra_replication_round(), ra_index(), integer()}.
-type staging_status() :: #{round := ra_replication_round(),
target := ra_index(),
ts := integer()}.

-type ra_peer_state() :: #{next_index := non_neg_integer(),
match_index := non_neg_integer(),
Expand Down
30 changes: 26 additions & 4 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
| recovered | stop | receive_snapshot.

-type command_type() :: '$usr' | '$ra_join' | '$ra_leave' |
'$ra_maybe_join' |
'$ra_cluster_change' | '$ra_cluster'.

-type command_meta() :: #{from => from(),
Expand Down Expand Up @@ -2094,6 +2095,15 @@ new_peer() ->
new_peer_with(Map) ->
maps:merge(new_peer(), Map).

new_staging_status(State) ->
TargetIdx = maps:get(commit_index, State),
#{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}.

already_member(State) ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State}.

peers(#{cfg := #cfg{id = Id}, cluster := Peers}) ->
maps:remove(Id, Peers).

Expand Down Expand Up @@ -2450,19 +2460,31 @@ add_reply(_, _, _, % From, Reply, Mode
append_log_leader({CmdTag, _, _, _},
State = #{cluster_change_permitted := false})
when CmdTag == '$ra_join' orelse
CmdTag == '$ra_maybe_join' orelse
CmdTag == '$ra_leave' ->
{not_appended, cluster_change_not_permitted, State};
append_log_leader({'$ra_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := _} ->
% already a member do nothing
% TODO: reply? If we don't reply the caller may block until timeout
{not_appended, already_member, State};
#{JoiningNode := #{voter := yes}} ->
already_member(State);
#{JoiningNode := #{voter := {maybe, _}} = Peer} ->
Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}},
append_cluster_change(Cluster, From, ReplyMode, State);
_ ->
Cluster = OldCluster#{JoiningNode => new_peer()},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode},
State = #{cluster := OldCluster}) ->
case OldCluster of
#{JoiningNode := _} ->
already_member(State);
_ ->
Round0 = new_staging_status(State),
Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => {maybe, Round0}})},
append_cluster_change(Cluster, From, ReplyMode, State)
end;
append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode},
State = #{cfg := #cfg{log_id = LogId},
cluster := OldCluster}) ->
Expand Down
52 changes: 49 additions & 3 deletions test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ all() ->
follower_machine_version,
follower_install_snapshot_machine_version,
leader_server_join,
leader_server_maybe_join,
leader_server_leave,
leader_is_removed,
follower_cluster_change,
Expand Down Expand Up @@ -1310,12 +1311,12 @@ leader_server_join(_Config) ->
#append_entries_rpc{entries =
[_, _, _, {4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _,
N3 := _, N4 := _},
N3 := _, N4 := #{voter := yes}},
await_consensus}}]}},
{send_rpc, N3,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := _},
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
Expand All @@ -1324,7 +1325,48 @@ leader_server_join(_Config) ->
{send_rpc, N2,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := _},
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}}
| _] = Effects,
ok.

leader_server_maybe_join(_Config) ->
N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4,
OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}),
N2 => new_peer_with(#{next_index => 4, match_index => 3}),
N3 => new_peer_with(#{next_index => 4, match_index => 3})},
State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster},
Round0 = new_staging_status(State0),
% raft servers should switch to the new configuration after log append
% and further cluster changes should be disallowed
{leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _},
cluster_change_permitted := false} = _State1, Effects} =
ra_server:handle_leader({command, {'$ra_maybe_join', meta(),
N4, await_consensus}}, State0),
[
{send_rpc, N4,
#append_entries_rpc{entries =
[_, _, _, {4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _,
N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}]}},
{send_rpc, N3,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
prev_log_term = 5,
leader_commit = 3}},
{send_rpc, N2,
#append_entries_rpc{entries =
[{4, 5, {'$ra_cluster_change', _,
#{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}},
await_consensus}}],
term = 5, leader_id = N1,
prev_log_index = 3,
Expand Down Expand Up @@ -2599,6 +2641,10 @@ new_peer() ->
new_peer_with(Map) ->
maps:merge(new_peer(), Map).

new_staging_status(State) ->
TargetIdx = maps:get(commit_index, State),
#{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}.

snap_meta(Idx, Term) ->
snap_meta(Idx, Term, []).

Expand Down

0 comments on commit ee12cc5

Please sign in to comment.