diff --git a/src/ra.hrl b/src/ra.hrl index a1a7dff0..68beffba 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -52,12 +52,14 @@ %% - Nonvoter, node does not participate in elections or consensus voting. %% - Staging, node is a temporary nonvoter, and will be automatically promoted %% if it proves to be fast enough to stay up to dat with teh leader. --type ra_voter() :: yes | no | {staging, staging_status()}. +-type ra_voter() :: yes | no | {maybe, staging_status()}. %% For staging nodes we measure current round, target index and the timestamp of its start. %% If the node reaches target index and the ∂T is less than the election timeout, the node is %% considered eligible to become a voter. --type staging_status() :: {ra_replication_round(), ra_index(), integer()}. +-type staging_status() :: #{round := ra_replication_round(), + target := ra_index(), + ts := integer()}. -type ra_peer_state() :: #{next_index := non_neg_integer(), match_index := non_neg_integer(), diff --git a/src/ra_server.erl b/src/ra_server.erl index ab964a9e..5b233b24 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -94,6 +94,7 @@ | recovered | stop | receive_snapshot. -type command_type() :: '$usr' | '$ra_join' | '$ra_leave' | + '$ra_maybe_join' | '$ra_cluster_change' | '$ra_cluster'. -type command_meta() :: #{from => from(), @@ -2094,6 +2095,15 @@ new_peer() -> new_peer_with(Map) -> maps:merge(new_peer(), Map). +new_staging_status(State) -> + TargetIdx = maps:get(commit_index, State), + #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}. + +already_member(State) -> + % already a member do nothing + % TODO: reply? If we don't reply the caller may block until timeout + {not_appended, already_member, State}. + peers(#{cfg := #cfg{id = Id}, cluster := Peers}) -> maps:remove(Id, Peers). @@ -2450,19 +2460,31 @@ add_reply(_, _, _, % From, Reply, Mode append_log_leader({CmdTag, _, _, _}, State = #{cluster_change_permitted := false}) when CmdTag == '$ra_join' orelse + CmdTag == '$ra_maybe_join' orelse CmdTag == '$ra_leave' -> {not_appended, cluster_change_not_permitted, State}; append_log_leader({'$ra_join', From, JoiningNode, ReplyMode}, State = #{cluster := OldCluster}) -> case OldCluster of - #{JoiningNode := _} -> - % already a member do nothing - % TODO: reply? If we don't reply the caller may block until timeout - {not_appended, already_member, State}; + #{JoiningNode := #{voter := yes}} -> + already_member(State); + #{JoiningNode := #{voter := {maybe, _}} = Peer} -> + Cluster = OldCluster#{JoiningNode => Peer#{voter => yes}}, + append_cluster_change(Cluster, From, ReplyMode, State); _ -> Cluster = OldCluster#{JoiningNode => new_peer()}, append_cluster_change(Cluster, From, ReplyMode, State) end; +append_log_leader({'$ra_maybe_join', From, JoiningNode, ReplyMode}, + State = #{cluster := OldCluster}) -> + case OldCluster of + #{JoiningNode := _} -> + already_member(State); + _ -> + Round0 = new_staging_status(State), + Cluster = OldCluster#{JoiningNode => new_peer_with(#{voter => {maybe, Round0}})}, + append_cluster_change(Cluster, From, ReplyMode, State) + end; append_log_leader({'$ra_leave', From, LeavingServer, ReplyMode}, State = #{cfg := #cfg{log_id = LogId}, cluster := OldCluster}) -> diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index fe0c6386..c55d6c84 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -41,6 +41,7 @@ all() -> follower_machine_version, follower_install_snapshot_machine_version, leader_server_join, + leader_server_maybe_join, leader_server_leave, leader_is_removed, follower_cluster_change, @@ -1310,12 +1311,12 @@ leader_server_join(_Config) -> #append_entries_rpc{entries = [_, _, _, {4, 5, {'$ra_cluster_change', _, #{N1 := _, N2 := _, - N3 := _, N4 := _}, + N3 := _, N4 := #{voter := yes}}, await_consensus}}]}}, {send_rpc, N3, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -1324,7 +1325,48 @@ leader_server_join(_Config) -> {send_rpc, N2, #append_entries_rpc{entries = [{4, 5, {'$ra_cluster_change', _, - #{N1 := _, N2 := _, N3 := _, N4 := _}, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := yes}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}} + | _] = Effects, + ok. + +leader_server_maybe_join(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, N4 = ?N4, + OldCluster = #{N1 => new_peer_with(#{next_index => 4, match_index => 3}), + N2 => new_peer_with(#{next_index => 4, match_index => 3}), + N3 => new_peer_with(#{next_index => 4, match_index => 3})}, + State0 = (base_state(3, ?FUNCTION_NAME))#{cluster => OldCluster}, + Round0 = new_staging_status(State0), + % raft servers should switch to the new configuration after log append + % and further cluster changes should be disallowed + {leader, #{cluster := #{N1 := _, N2 := _, N3 := _, N4 := _}, + cluster_change_permitted := false} = _State1, Effects} = + ra_server:handle_leader({command, {'$ra_maybe_join', meta(), + N4, await_consensus}}, State0), + [ + {send_rpc, N4, + #append_entries_rpc{entries = + [_, _, _, {4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, + N3 := _, N4 := #{voter := {maybe, Round0}}}, + await_consensus}}]}}, + {send_rpc, N3, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}}, + await_consensus}}], + term = 5, leader_id = N1, + prev_log_index = 3, + prev_log_term = 5, + leader_commit = 3}}, + {send_rpc, N2, + #append_entries_rpc{entries = + [{4, 5, {'$ra_cluster_change', _, + #{N1 := _, N2 := _, N3 := _, N4 := #{voter := {maybe, Round0}}}, await_consensus}}], term = 5, leader_id = N1, prev_log_index = 3, @@ -2599,6 +2641,10 @@ new_peer() -> new_peer_with(Map) -> maps:merge(new_peer(), Map). +new_staging_status(State) -> + TargetIdx = maps:get(commit_index, State), + #{round => 0, target => TargetIdx , ts => os:system_time(millisecond)}. + snap_meta(Idx, Term) -> snap_meta(Idx, Term, []).