Skip to content

Commit

Permalink
fix(autoheal): compute minority to reboot more safely
Browse files Browse the repository at this point in the history
Specifically, do not perform autoheal in small steps because it may lead
to cluster inconsistencies. Sometimes Mnesia decides that cluster
partitions are healed too early, before coordinator has a chance to
reboot all of the nodes that have been partitioned off the cluster.
  • Loading branch information
keynslug committed Dec 17, 2024
1 parent f0caedb commit 88ee7ed
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 49 deletions.
96 changes: 71 additions & 25 deletions src/ekka_autoheal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ enabled() ->
end.

proc(undefined) -> undefined;
proc(#autoheal{proc = Proc}) ->
Proc.
proc(#autoheal{proc = Proc}) -> Proc.

handle_msg(Msg, undefined) ->
?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
Expand Down Expand Up @@ -78,9 +77,19 @@ handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, t
Nodes = ekka_mnesia:cluster_nodes(all),
case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
{Views, []} ->
SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
Coordinator = coordinator(SplitView),
ekka_node_monitor:cast(Coordinator, {heal_partition, SplitView}),
SplitView = find_split_view(Nodes, Views),
HealPlan = find_heal_plan(SplitView),
case HealPlan of
{Candidates = [_ | _], Minority} ->
%% Non-empty list of candidates, choose a coordinator.
CoordNode = ekka_membership:coordinator(Candidates),
ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView});
{[], Cluster} ->
%% It's very unlikely but possible to have empty list of candidates.
ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView});
{} ->
ignore
end,
Autoheal#autoheal{timer = undefined};
{_Views, BadNodes} ->
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
Expand All @@ -95,19 +104,27 @@ handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
Autoheal;

handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
%% NOTE: Backward compatibility.
case SplitView of
%% No partitions.
[] -> Autoheal;
[{_, []}] -> Autoheal;
%% Partitions.
SplitView ->
Proc = spawn_link(fun() ->
?tp(start_heal_partition, #{split_view => SplitView}),
heal_partition(SplitView)
end),
Proc = spawn_link(fun() -> heal_partition(SplitView) end),
Autoheal#autoheal{role = coordinator, proc = Proc}
end;

handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
Proc = spawn_link(fun() ->
?tp(notice, "Healing cluster partition", #{
need_reboot => Minority,
split_view => SplitView
}),
reboot_minority(Minority -- [node()])
end),
Autoheal#autoheal{role = coordinator, proc = Proc};

handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
Autoheal;
Expand All @@ -123,16 +140,52 @@ handle_msg(Msg, Autoheal) ->
?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
Autoheal.

compare_view({Running1, _}, {Running2, _}) ->
Len1 = length(Running1), Len2 = length(Running2),
if
Len1 > Len2 -> true;
Len1 == Len2 -> lists:member(node(), Running1);
true -> false
find_split_view(Nodes, Views) ->
ClusterView = lists:zipwith(
fun(N, {Running, Stopped}) -> {N, Running, Stopped} end,
Nodes,
Views
),
MajorityView = lists:usort(fun compare_node_views/2, ClusterView),
find_split_view(MajorityView).

compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) ->
Len1 = length(Running1),
Len2 = length(Running2),
case Len1 of
%% Prefer partitions with higher number of surviving nodes.
L when L > Len2 -> true;
%% If number of nodes is the same, prefer those where current node is a survivor.
%% Otherwise, sort by list of running nodes. If lists happen to be the same, this
%% view will be excluded by usort.
Len2 -> lists:member(node(), Running1) orelse Running1 < Running2;
L when L < Len2 -> false
end.

coordinator([{Nodes, _} | _]) ->
ekka_membership:coordinator(Nodes).
find_split_view([{_Node, _Running, []} | Views]) ->
%% Node observes no partitions, ignore.
find_split_view(Views);
find_split_view([View = {_Node, _Running, Partitioned} | Views]) ->
%% Node observes some nodes as partitioned from it.
%% These nodes need to be rebooted, and as such they should not be part of split view.
Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned),
[View | find_split_view(Rest)];
find_split_view([]) ->
[].

find_heal_plan([{_Node, R0, P0} | Rest]) ->
%% If we have more than one parition in split view, we need to reboot _all_ of the nodes
%% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to
%% find candidates to do it, as ⋃(Survivors) ∩ ⋃(Partitions).
lists:foldl(
fun({_, R, P}, {RAcc, PAcc}) ->
{lists:usort((R -- PAcc) ++ (RAcc -- P)), lists:usort(P ++ PAcc)}
end,
{R0, P0},
Rest
);
find_heal_plan([]) ->
{}.

heal_partition([{Nodes, []} | _] = SplitView) ->
%% Symmetric partition.
Expand All @@ -141,14 +194,7 @@ heal_partition([{Nodes, []} | _] = SplitView) ->
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
%% Symmetric partition.
?LOG(info, "Healing partition: ~p", [SplitView]),
reboot_minority(Minority);
heal_partition([{_Nodes, Stopped} | _Asymmetric] = SplitView) ->
%% Asymmetric partitions.
%% Start with rebooting known stopped nodes. If this won't be enough, retry mechanism
%% in `ekka_node_monitor:handle_info({mnesia_system_event, ...}` should then launch
%% new iteration.
?LOG(info, "Trying to heal asymmetric partition: ~p", [SplitView]),
reboot_minority(Stopped).
reboot_minority(Minority).

reboot_minority(Minority) ->
lists:foreach(fun shutdown/1, Minority),
Expand Down
3 changes: 2 additions & 1 deletion src/ekka_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ handle_cast({confirm, TargetNode, Status}, State) ->

handle_cast(Msg = {report_partition, _Node}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg = {heal_partition, _SplitView}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};
handle_cast(Msg = {heal_cluster, _, _}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
Expand Down
107 changes: 84 additions & 23 deletions test/ekka_autoheal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ t_autoheal(_Config) ->
[N1,N2] = rpc:call(N3, ekka, info, [stopped_nodes]),
%% Simulate autoheal crash, to verify autoheal tolerates it.
snabbkaffe_nemesis:inject_crash(
?match_event(#{?snk_kind := start_heal_partition}),
?match_event(#{?snk_kind := "Healing cluster partition"}),
snabbkaffe_nemesis:recover_after(1),
?MODULE
),
Expand All @@ -72,6 +72,10 @@ t_autoheal(_Config) ->
rpc:call(N3, ekka, leave, [])
end,
fun(Trace) ->
?assertMatch(
[#{need_reboot := [N3]}],
?of_kind("Healing cluster partition", Trace)
),
?assertMatch([_ | _], ?of_kind(snabbkaffe_crash, Trace))
end
)
Expand All @@ -87,29 +91,86 @@ t_autoheal_asymm(_Config) ->
ok = rpc:call(N2, ekka, join, [N1]),
ok = rpc:call(N3, ekka, join, [N1]),
ok = rpc:call(N4, ekka, join, [N1]),
%% Simulate asymmetric netsplit
true = rpc:cast(N3, net_kernel, disconnect, [N1]),
true = rpc:cast(N4, net_kernel, disconnect, [N2]),
ok = timer:sleep(1000),
%% SplitView: [{[N1,N2,N4], [N3]}, {[N1,N2,N3], [N4]},
%% {[N2,N3,N4], [N1]}, {[N1,N3,N4], [N2]}]
NodesInfo = [running_nodes, stopped_nodes],
[[N1,N2,N4], [N3]] = [rpc:call(N1, ekka, info, [I]) || I <- NodesInfo],
[[N1,N2,N3], [N4]] = [rpc:call(N2, ekka, info, [I]) || I <- NodesInfo],
[[N2,N3,N4], [N1]] = [rpc:call(N3, ekka, info, [I]) || I <- NodesInfo],
[[N1,N3,N4], [N2]] = [rpc:call(N4, ekka, info, [I]) || I <- NodesInfo],
%% Wait for autoheal
ok = timer:sleep(12000),
Nodes = rpc:call(N1, ekka, info, [running_nodes]),
Nodes = rpc:call(N2, ekka, info, [running_nodes]),
Nodes = rpc:call(N3, ekka, info, [running_nodes]),
Nodes = rpc:call(N4, ekka, info, [running_nodes]),
rpc:call(N1, ekka, leave, []),
rpc:call(N2, ekka, leave, []),
rpc:call(N3, ekka, leave, []),
rpc:call(N4, ekka, leave, [])
?check_trace(
begin
%% Simulate asymmetric netsplit
true = rpc:cast(N2, net_kernel, disconnect, [N1]),
true = rpc:cast(N3, net_kernel, disconnect, [N1]),
true = rpc:cast(N4, net_kernel, disconnect, [N2]),
ok = timer:sleep(1000),
%% Asymmetric split, but it's enough to reboot N1 and N2
NodesInfo = [running_nodes, stopped_nodes],
[[N1,N4], [N2,N3]] = [rpc:call(N1, ekka, info, [I]) || I <- NodesInfo],
[[N2,N3], [N1,N4]] = [rpc:call(N2, ekka, info, [I]) || I <- NodesInfo],
[[N2,N3,N4], [N1]] = [rpc:call(N3, ekka, info, [I]) || I <- NodesInfo],
[[N1,N3,N4], [N2]] = [rpc:call(N4, ekka, info, [I]) || I <- NodesInfo],
%% Wait for autoheal
ok = timer:sleep(12000),
Nodes = rpc:call(N1, ekka, info, [running_nodes]),
Nodes = rpc:call(N2, ekka, info, [running_nodes]),
Nodes = rpc:call(N3, ekka, info, [running_nodes]),
Nodes = rpc:call(N4, ekka, info, [running_nodes]),
rpc:call(N1, ekka, leave, []),
rpc:call(N2, ekka, leave, []),
rpc:call(N3, ekka, leave, []),
rpc:call(N4, ekka, leave, [])
end,
fun(Trace) ->
?assertMatch(
[#{need_reboot := [N1, N2]}],
?of_kind("Healing cluster partition", Trace)
)
end
)
after
lists:foreach(fun ekka_ct:stop_slave/1, Nodes),
snabbkaffe:stop()
end.

t_autoheal_fullsplit(_Config) ->
[N1,N2,N3,N4] = Nodes = lists:map(fun start_slave_node/1, [fs1,fs2,fs3,fs4]),
try
%% Create cluster
ok = rpc:call(N2, ekka, join, [N1]),
ok = rpc:call(N3, ekka, join, [N1]),
ok = rpc:call(N4, ekka, join, [N1]),
?check_trace(
begin
%% Simulate asymmetric netsplit
true = rpc:cast(N1, net_kernel, disconnect, [N2]),
true = rpc:cast(N1, net_kernel, disconnect, [N3]),
true = rpc:cast(N1, net_kernel, disconnect, [N4]),
true = rpc:cast(N2, net_kernel, disconnect, [N3]),
true = rpc:cast(N2, net_kernel, disconnect, [N4]),
true = rpc:cast(N3, net_kernel, disconnect, [N4]),
ok = timer:sleep(1000),
%% Full split, all nodes except one need to be rebooted
NodesInfo = [running_nodes, stopped_nodes],
[[N1], [N2,N3,N4]] = [rpc:call(N1, ekka, info, [I]) || I <- NodesInfo],
[[N2], [N1,N3,N4]] = [rpc:call(N2, ekka, info, [I]) || I <- NodesInfo],
[[N3], [N1,N2,N4]] = [rpc:call(N3, ekka, info, [I]) || I <- NodesInfo],
[[N4], [N1,N2,N3]] = [rpc:call(N4, ekka, info, [I]) || I <- NodesInfo],
%% Wait for autoheal
ok = timer:sleep(12000),
Nodes = rpc:call(N1, ekka, info, [running_nodes]),
Nodes = rpc:call(N2, ekka, info, [running_nodes]),
Nodes = rpc:call(N3, ekka, info, [running_nodes]),
Nodes = rpc:call(N4, ekka, info, [running_nodes]),
rpc:call(N1, ekka, leave, []),
rpc:call(N2, ekka, leave, []),
rpc:call(N3, ekka, leave, []),
rpc:call(N4, ekka, leave, [])
end,
fun(Trace) ->
?assertMatch(
[#{need_reboot := [N2, N3, N4]}],
?of_kind("Healing cluster partition", Trace)
)
end
)
after
lists:foreach(fun ekka_ct:stop_slave/1, Nodes)
lists:foreach(fun ekka_ct:stop_slave/1, Nodes),
snabbkaffe:stop()
end.

start_slave_node(Name) ->
Expand Down

0 comments on commit 88ee7ed

Please sign in to comment.