Skip to content

Commit

Permalink
fix: restarting nonvoter fails to pick up its initial nvid
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Valiushko committed Sep 13, 2023
1 parent ac707d3 commit afbb4d2
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 23 deletions.
23 changes: 12 additions & 11 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,17 @@ init(#{id := Id,
case ra_log:recover_snapshot(Log0) of
undefined ->
InitialMachineState = ra_machine:init(Machine, Name),
VoterSt = new_voter_status(Config),
Clu0 = #{Id := Self} = make_cluster(Id, InitialNodes),
Clu = Clu0#{Id => Self#{voter_status => VoterSt}},
Clu = make_cluster(Id, InitialNodes,
#{voter_status => new_voter_status(Config)}),
{0, Clu,
0, InitialMachineState, {0, 0}};
{#{index := Idx,
term := Term,
cluster := ClusterNodes0,
machine_version := MacVersion} = Snapshot, MacSt} ->
ClusterNodes = maps:get(cluster_state, Snapshot, ClusterNodes0),
Clu = make_cluster(Id, ClusterNodes),
Clu = make_cluster(Id, ClusterNodes,
#{voter_status => new_voter_status(Config)}),
%% the snapshot is the last index before the first index
%% TODO: should this be Idx + 1?
{Idx + 1, Clu, MacVersion, MacSt, {Idx, Term}}
Expand Down Expand Up @@ -1298,7 +1298,8 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term,
log => Log,
commit_index => SnapIndex,
last_applied => SnapIndex,
cluster => make_cluster(Id, ClusterIds),
cluster => make_cluster(Id, ClusterIds,
#{voter_status => new_voter_status(State0)}),
machine_state => MacState}),
%% it was the last snapshot chunk so we can revert back to
%% follower status
Expand Down Expand Up @@ -2240,26 +2241,26 @@ fetch_term(Idx, #{log := Log0} = State) ->
{Term, State#{log => Log}}
end.

make_cluster(Self, Nodes0) when is_list(Nodes0) ->
make_cluster(Self, Nodes0, DefaultSelf) when is_list(Nodes0) ->
Nodes = lists:foldl(fun(N, Acc) ->
Acc#{N => new_peer()}
end, #{}, Nodes0),
make_cluster0(Self, Nodes);
make_cluster(Self, Nodes0) when is_map(Nodes0) ->
make_cluster0(Self, Nodes, DefaultSelf);
make_cluster(Self, Nodes0, DefaultSelf) when is_map(Nodes0) ->
Nodes = maps:map(fun(_, V0) ->
V1 = maps:with([voter_status], V0),
maps:merge(new_peer(), V1)
end, Nodes0),
make_cluster0(Self, Nodes).
make_cluster0(Self, Nodes, DefaultSelf).

make_cluster0(Self, Nodes) ->
make_cluster0(Self, Nodes, DefaultSelf) ->
case Nodes of
#{Self := _} = Cluster ->
% current server is already in cluster - do nothing
Cluster;
Cluster ->
% add current server to cluster
Cluster#{Self => new_peer()}
Cluster#{Self => maps:merge(new_peer(), DefaultSelf)}
end.

initialise_peers(State = #{cfg := #cfg{id = Id}, log := Log, cluster := Cluster0}) ->
Expand Down
51 changes: 39 additions & 12 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ disconnected_node_catches_up(Config) ->
nonvoter_catches_up(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
[A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
Machine = {module, ?MODULE, #{}},
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
{ok, _, Leader} = ra:members(hd(Started)),
Expand All @@ -411,22 +411,31 @@ nonvoter_catches_up(Config) ->
New = #{id => C, init_non_voter => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
NonVoter = {nonvoter, #{nvid => <<"test">>}},
?assertMatch(#{Group := #{voter_status := NonVoter}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := NonVoter},
ra:key_metrics(C)),
?assertMatch({ok, #{voter_status := NonVoter}, _},
ra:member_overview(C)),

await_condition(
fun () ->
{ok, #{voter_status := {Voter, _}}, _} = ra:member_overview(C),
voter == Voter
{ok, #{voter_status := {voter, #{nvid := <<"test">>}}}, _} = ra:member_overview(C),
true
end, 200),
?assertMatch(#{Group := #{voter_status := {voter, #{nvid := <<"test">>}}}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := {voter, #{nvid := <<"test">>}}},
ra:key_metrics(C)),

[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

nonvoter_catches_up_after_restart(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
[A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
Machine = {module, ?MODULE, #{}},
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
{ok, _, Leader} = ra:members(hd(Started)),
Expand All @@ -438,24 +447,33 @@ nonvoter_catches_up_after_restart(Config) ->
New = #{id => C, init_non_voter => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
NonVoter = {nonvoter, #{nvid => <<"test">>}},
?assertMatch(#{Group := #{voter_status := NonVoter}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := NonVoter},
ra:key_metrics(C)),
?assertMatch({ok, #{voter_status := NonVoter}, _},
ra:member_overview(C)),
ok = ra:stop_server(?SYS, C),
ok = ra:restart_server(?SYS, C),

await_condition(
fun () ->
{ok, #{voter_status := {Voter, _}}, _} = ra:member_overview(C),
voter == Voter
{ok, #{voter_status := {voter, #{nvid := <<"test">>}}}, _} = ra:member_overview(C),
true
end, 200),
?assertMatch(#{Group := #{voter_status := {voter, #{nvid := <<"test">>}}}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := {voter, #{nvid := <<"test">>}}},
ra:key_metrics(C)),

[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.

nonvoter_catches_up_after_leader_restart(Config) ->
PrivDir = ?config(data_dir, Config),
ClusterName = ?config(cluster_name, Config),
[A, B, C] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
[A, B, C = {Group, NodeC}] = ServerIds = [{ClusterName, start_follower(N, PrivDir)} || N <- [s1,s2,s3]],
Machine = {module, ?MODULE, #{}},
{ok, Started, []} = ra:start_cluster(?SYS, ClusterName, Machine, [A, B]),
{ok, _, Leader} = ra:members(hd(Started)),
Expand All @@ -467,16 +485,25 @@ nonvoter_catches_up_after_leader_restart(Config) ->
New = #{id => C, init_non_voter => <<"test">>},
{ok, _, _} = ra:add_member(A, New),
ok = ra:start_server(?SYS, ClusterName, New, Machine, [A, B]),
?assertMatch({ok, #{voter_status := {nonvoter, _}}, _},
NonVoter = {nonvoter, #{nvid => <<"test">>}},
?assertMatch(#{Group := #{voter_status := NonVoter}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := NonVoter},
ra:key_metrics(C)),
?assertMatch({ok, #{voter_status := NonVoter}, _},
ra:member_overview(C)),
ok = ra:stop_server(?SYS, Leader),
ok = ra:restart_server(?SYS, Leader),

await_condition(
fun () ->
{ok, #{voter_status := {Voter, _}}, _} = ra:member_overview(C),
voter == Voter
{ok, #{voter_status := {voter, #{nvid := <<"test">>}}}, _} = ra:member_overview(C),
true
end, 200),
?assertMatch(#{Group := #{voter_status := {voter, #{nvid := <<"test">>}}}},
rpc:call(NodeC, ra_directory, overview, [?SYS])),
?assertMatch(#{voter_status := {voter, #{nvid := <<"test">>}}},
ra:key_metrics(C)),

[ok = slave:stop(S) || {_, S} <- ServerIds],
ok.
Expand Down

0 comments on commit afbb4d2

Please sign in to comment.