Skip to content

Commit

Permalink
Merge pull request #240 from keynslug/fix/EEC-112/autoheal-asymm
Browse files Browse the repository at this point in the history
fix(autoheal): attempt healing complex asymmetric partitions
  • Loading branch information
keynslug authored Dec 18, 2024
2 parents 81cff1c + 5b2846d commit 31017f9
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 63 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/run_test_case.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ jobs:
runs-on: ubuntu-latest

container:
image: erlang:22.1
image: erlang:24

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v4
- name: Run tests
run: |
make eunit
Expand All @@ -23,12 +23,12 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
make coveralls
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v4
if: always()
with:
name: logs
path: _build/test/logs
- uses: actions/upload-artifact@v1
- uses: actions/upload-artifact@v4
with:
name: cover
path: _build/test/cover
10 changes: 8 additions & 2 deletions src/ekka.appup.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
%% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"0.8.1.11",
[{"0.8.1.12",
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
{"0.8.1.11",
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
{"0.8.1.10",
Expand Down Expand Up @@ -67,7 +70,10 @@
{load_module,ekka_httpc,brutal_purge,soft_purge,[]},
{load_module,ekka_mnesia,brutal_purge,soft_purge,[]},
{load_module,ekka_dist,brutal_purge,soft_purge,[]}]}],
[{"0.8.1.11",
[{"0.8.1.12",
[{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]},
{load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]},
{"0.8.1.11",
[{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]},
{load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]},
{"0.8.1.10",
Expand Down
131 changes: 95 additions & 36 deletions src/ekka_autoheal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

-module(ekka_autoheal).

-include_lib("snabbkaffe/include/trace.hrl").

-export([ init/0
, enabled/0
, proc/1
Expand All @@ -24,7 +26,7 @@

-record(autoheal, {delay, role, proc, timer}).

-type(autoheal() :: #autoheal{}).
-type autoheal() :: #autoheal{}.

-export_type([autoheal/0]).

Expand All @@ -47,8 +49,7 @@ enabled() ->
end.

proc(undefined) -> undefined;
proc(#autoheal{proc = Proc}) ->
Proc.
proc(#autoheal{proc = Proc}) -> Proc.

handle_msg(Msg, undefined) ->
?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined;
Expand Down Expand Up @@ -76,12 +77,24 @@ handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, t
Nodes = ekka_mnesia:cluster_nodes(all),
case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of
{Views, []} ->
SplitView = lists:sort(fun compare_view/2, lists:usort(Views)),
ekka_node_monitor:cast(coordinator(SplitView), {heal_partition, SplitView});
SplitView = find_split_view(Nodes, Views),
HealPlan = find_heal_plan(SplitView),
case HealPlan of
{Candidates = [_ | _], Minority} ->
%% Non-empty list of candidates, choose a coordinator.
CoordNode = pick_coordinator(Candidates),
ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView});
{[], Cluster} ->
%% It's very unlikely but possible to have empty list of candidates.
ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView});
{} ->
ignore
end,
Autoheal#autoheal{timer = undefined};
{_Views, BadNodes} ->
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes])
end,
Autoheal#autoheal{timer = undefined};
?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]),
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
end;
false ->
Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})}
end;
Expand All @@ -91,51 +104,98 @@ handle_msg(Msg = {create_splitview, _Node}, Autoheal) ->
Autoheal;

handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
%% NOTE: Backward compatibility.
case SplitView of
%% No partitions.
[] -> Autoheal;
[{_, []}] -> Autoheal;
%% Partitions.
SplitView ->
Proc = spawn_link(fun() -> heal_partition(SplitView) end),
Autoheal#autoheal{role = coordinator, proc = Proc}
end;

handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) ->
Proc = spawn_link(fun() ->
?LOG(info, "Healing partition: ~p", [SplitView]),
_ = heal_partition(SplitView)
end),
?tp(notice, "Healing cluster partition", #{
need_reboot => Minority,
split_view => SplitView
}),
reboot_minority(Minority -- [node()])
end),
Autoheal#autoheal{role = coordinator, proc = Proc};

handle_msg({heal_partition, SplitView}, Autoheal= #autoheal{proc = _Proc}) ->
handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) ->
?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]),
Autoheal;

handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) ->
Autoheal#autoheal{proc = undefined};
handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) ->
?LOG(critical, "Autoheal process crashed: ~s", [Reason]),
?LOG(critical, "Autoheal process crashed: ~p", [Reason]),
_Retry = ekka_node_monitor:run_after(1000, confirm_partition),
Autoheal#autoheal{proc = undefined};

handle_msg(Msg, Autoheal) ->
?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]),
Autoheal.

compare_view({Running1, _} , {Running2, _}) ->
Len1 = length(Running1), Len2 = length(Running2),
if
Len1 > Len2 -> true;
Len1 == Len2 -> lists:member(node(), Running1);
true -> false
find_split_view(Nodes, Views) ->
ClusterView = lists:zipwith(
fun(N, {Running, Stopped}) -> {N, Running, Stopped} end,
Nodes,
Views
),
MajorityView = lists:sort(fun compare_node_views/2, ClusterView),
find_split_view(MajorityView).

compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) ->
Len1 = length(Running1),
Len2 = length(Running2),
case Len1 of
%% Prefer partitions with higher number of surviving nodes.
L when L > Len2 -> true;
%% If number of nodes is the same, sort by list of running nodes.
Len2 -> Running1 < Running2;
L when L < Len2 -> false
end.

coordinator([{Nodes, _} | _]) ->
ekka_membership:coordinator(Nodes).

-spec heal_partition(list()) -> list(node()).
heal_partition([]) ->
[];
%% All nodes connected.
heal_partition([{_, []}]) ->
[];
%% Partial partitions happened.
heal_partition([{Nodes, []}|_]) ->
find_split_view([{_Node, _Running, []} | Views]) ->
%% Node observes no partitions, ignore.
find_split_view(Views);
find_split_view([View = {_Node, _Running, Partitioned} | Views]) ->
%% Node observes some nodes as partitioned from it.
%% These nodes need to be rebooted, and as such they should not be part of split view.
Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned),
[View | find_split_view(Rest)];
find_split_view([]) ->
[].

find_heal_plan([{_Node, R0, P0} | Rest]) ->
%% If we have more than one parition in split view, we need to reboot _all_ of the nodes
%% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to
%% find candidates to do it, as ⋃(Running) ∖ ⋃(Partitions).
{_Nodes, Rs, Ps} = lists:unzip3(Rest),
URunning = ordsets:union(lists:map(fun ordsets:from_list/1, [R0 | Rs])),
UPartitions = ordsets:union(lists:map(fun ordsets:from_list/1, [P0 | Ps])),
{ordsets:subtract(URunning, UPartitions), UPartitions};
find_heal_plan([]) ->
{}.

pick_coordinator(Candidates) ->
case lists:member(node(), Candidates) of
true -> node();
false -> ekka_membership:coordinator(Candidates)
end.

heal_partition([{Nodes, []} | _] = SplitView) ->
%% Symmetric partition.
?LOG(info, "Healing partition: ~p", [SplitView]),
reboot_minority(Nodes -- [node()]);
heal_partition([{Majority, Minority}, {Minority, Majority}]) ->
reboot_minority(Minority);
heal_partition(SplitView) ->
?LOG(critical, "Cannot heal the partitions: ~p", [SplitView]),
error({unknown_splitview, SplitView}).
heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) ->
%% Symmetric partition.
?LOG(info, "Healing partition: ~p", [SplitView]),
reboot_minority(Minority).

reboot_minority(Minority) ->
lists:foreach(fun shutdown/1, Minority),
Expand All @@ -155,4 +215,3 @@ ensure_cancel_timer(undefined) ->
ok;
ensure_cancel_timer(TRef) ->
catch erlang:cancel_timer(TRef).

9 changes: 8 additions & 1 deletion src/ekka_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,10 @@ handle_cast({confirm, TargetNode, Status}, State) ->

handle_cast(Msg = {report_partition, _Node}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg = {heal_partition, _SplitView}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};
handle_cast(Msg = {heal_cluster, _, _}, State) ->
{noreply, autoheal_handle_msg(Msg, State)};

handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
Expand Down Expand Up @@ -142,6 +143,12 @@ handle_info({mnesia_system_event, {mnesia_up, Node}},
false -> ok;
true -> ekka_membership:partition_healed(Node)
end,
%% If there was an anymmetric cluster partition, we might need more
%% autoheal iterations to completely bring the cluster back to normal.
case ekka_autoheal:enabled() of
{true, _} -> run_after(3000, confirm_partition);
false -> ignore
end,
{noreply, State#state{partitions = lists:delete(Node, Partitions)}};

handle_info({mnesia_system_event, {mnesia_down, Node}}, State) ->
Expand Down
Loading

0 comments on commit 31017f9

Please sign in to comment.