diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index d7f6a413..f13c7f2b 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -9,10 +9,10 @@ jobs: runs-on: ubuntu-latest container: - image: erlang:22.1 + image: erlang:24 steps: - - uses: actions/checkout@v1 + - uses: actions/checkout@v4 - name: Run tests run: | make eunit @@ -23,12 +23,12 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} run: | make coveralls - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v4 if: always() with: name: logs path: _build/test/logs - - uses: actions/upload-artifact@v1 + - uses: actions/upload-artifact@v4 with: name: cover path: _build/test/cover diff --git a/src/ekka.appup.src b/src/ekka.appup.src index 402c098a..e253257c 100644 --- a/src/ekka.appup.src +++ b/src/ekka.appup.src @@ -1,7 +1,10 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"0.8.1.11", + [{"0.8.1.12", + [{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]}, + {load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]}, + {"0.8.1.11", [{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]}, {load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]}, {"0.8.1.10", @@ -67,7 +70,10 @@ {load_module,ekka_httpc,brutal_purge,soft_purge,[]}, {load_module,ekka_mnesia,brutal_purge,soft_purge,[]}, {load_module,ekka_dist,brutal_purge,soft_purge,[]}]}], - [{"0.8.1.11", + [{"0.8.1.12", + [{load_module,ekka_node_monitor,brutal_purge,soft_purge,[]}, + {load_module,ekka_autoheal,brutal_purge,soft_purge,[]}]}, + {"0.8.1.11", [{load_module,ekka_cluster_strategy,brutal_purge,soft_purge,[]}, {load_module,ekka_autocluster,brutal_purge,soft_purge,[]}]}, {"0.8.1.10", diff --git a/src/ekka_autoheal.erl b/src/ekka_autoheal.erl index f30c29dc..c621a64e 100644 --- a/src/ekka_autoheal.erl +++ b/src/ekka_autoheal.erl @@ -16,6 +16,8 @@ -module(ekka_autoheal). +-include_lib("snabbkaffe/include/trace.hrl"). + -export([ init/0 , enabled/0 , proc/1 @@ -24,7 +26,7 @@ -record(autoheal, {delay, role, proc, timer}). --type(autoheal() :: #autoheal{}). +-type autoheal() :: #autoheal{}. -export_type([autoheal/0]). @@ -47,8 +49,7 @@ enabled() -> end. proc(undefined) -> undefined; -proc(#autoheal{proc = Proc}) -> - Proc. +proc(#autoheal{proc = Proc}) -> Proc. handle_msg(Msg, undefined) -> ?LOG(error, "Autoheal not enabled! Unexpected msg: ~p", [Msg]), undefined; @@ -76,12 +77,24 @@ handle_msg(Msg = {create_splitview, Node}, Autoheal = #autoheal{delay = Delay, t Nodes = ekka_mnesia:cluster_nodes(all), case rpc:multicall(Nodes, ekka_mnesia, cluster_view, [], 30000) of {Views, []} -> - SplitView = lists:sort(fun compare_view/2, lists:usort(Views)), - ekka_node_monitor:cast(coordinator(SplitView), {heal_partition, SplitView}); + SplitView = find_split_view(Nodes, Views), + HealPlan = find_heal_plan(SplitView), + case HealPlan of + {Candidates = [_ | _], Minority} -> + %% Non-empty list of candidates, choose a coordinator. + CoordNode = pick_coordinator(Candidates), + ekka_node_monitor:cast(CoordNode, {heal_cluster, Minority, SplitView}); + {[], Cluster} -> + %% It's very unlikely but possible to have empty list of candidates. + ekka_node_monitor:cast(node(), {heal_cluster, Cluster, SplitView}); + {} -> + ignore + end, + Autoheal#autoheal{timer = undefined}; {_Views, BadNodes} -> - ?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]) - end, - Autoheal#autoheal{timer = undefined}; + ?LOG(critical, "Bad nodes found when autoheal: ~p", [BadNodes]), + Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})} + end; false -> Autoheal#autoheal{timer = ekka_node_monitor:run_after(Delay, {autoheal, Msg})} end; @@ -91,51 +104,98 @@ handle_msg(Msg = {create_splitview, _Node}, Autoheal) -> Autoheal; handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = undefined}) -> + %% NOTE: Backward compatibility. + case SplitView of + %% No partitions. + [] -> Autoheal; + [{_, []}] -> Autoheal; + %% Partitions. + SplitView -> + Proc = spawn_link(fun() -> heal_partition(SplitView) end), + Autoheal#autoheal{role = coordinator, proc = Proc} + end; + +handle_msg({heal_cluster, Minority, SplitView}, Autoheal = #autoheal{proc = undefined}) -> Proc = spawn_link(fun() -> - ?LOG(info, "Healing partition: ~p", [SplitView]), - _ = heal_partition(SplitView) - end), + ?tp(notice, "Healing cluster partition", #{ + need_reboot => Minority, + split_view => SplitView + }), + reboot_minority(Minority -- [node()]) + end), Autoheal#autoheal{role = coordinator, proc = Proc}; -handle_msg({heal_partition, SplitView}, Autoheal= #autoheal{proc = _Proc}) -> +handle_msg({heal_partition, SplitView}, Autoheal = #autoheal{proc = _Proc}) -> ?LOG(critical, "Unexpected heal_partition msg: ~p", [SplitView]), Autoheal; handle_msg({'EXIT', Pid, normal}, Autoheal = #autoheal{proc = Pid}) -> Autoheal#autoheal{proc = undefined}; handle_msg({'EXIT', Pid, Reason}, Autoheal = #autoheal{proc = Pid}) -> - ?LOG(critical, "Autoheal process crashed: ~s", [Reason]), + ?LOG(critical, "Autoheal process crashed: ~p", [Reason]), + _Retry = ekka_node_monitor:run_after(1000, confirm_partition), Autoheal#autoheal{proc = undefined}; handle_msg(Msg, Autoheal) -> ?LOG(critical, "Unexpected msg: ~p", [Msg, Autoheal]), Autoheal. -compare_view({Running1, _} , {Running2, _}) -> - Len1 = length(Running1), Len2 = length(Running2), - if - Len1 > Len2 -> true; - Len1 == Len2 -> lists:member(node(), Running1); - true -> false +find_split_view(Nodes, Views) -> + ClusterView = lists:zipwith( + fun(N, {Running, Stopped}) -> {N, Running, Stopped} end, + Nodes, + Views + ), + MajorityView = lists:sort(fun compare_node_views/2, ClusterView), + find_split_view(MajorityView). + +compare_node_views({_N1, Running1, _}, {_N2, Running2, _}) -> + Len1 = length(Running1), + Len2 = length(Running2), + case Len1 of + %% Prefer partitions with higher number of surviving nodes. + L when L > Len2 -> true; + %% If number of nodes is the same, sort by list of running nodes. + Len2 -> Running1 < Running2; + L when L < Len2 -> false end. -coordinator([{Nodes, _} | _]) -> - ekka_membership:coordinator(Nodes). - --spec heal_partition(list()) -> list(node()). -heal_partition([]) -> - []; -%% All nodes connected. -heal_partition([{_, []}]) -> - []; -%% Partial partitions happened. -heal_partition([{Nodes, []}|_]) -> +find_split_view([{_Node, _Running, []} | Views]) -> + %% Node observes no partitions, ignore. + find_split_view(Views); +find_split_view([View = {_Node, _Running, Partitioned} | Views]) -> + %% Node observes some nodes as partitioned from it. + %% These nodes need to be rebooted, and as such they should not be part of split view. + Rest = lists:foldl(fun(N, Acc) -> lists:keydelete(N, 1, Acc) end, Views, Partitioned), + [View | find_split_view(Rest)]; +find_split_view([]) -> + []. + +find_heal_plan([{_Node, R0, P0} | Rest]) -> + %% If we have more than one parition in split view, we need to reboot _all_ of the nodes + %% in each view's partition (i.e. ⋃(Partitions)) for better safety. But then we need to + %% find candidates to do it, as ⋃(Running) ∖ ⋃(Partitions). + {_Nodes, Rs, Ps} = lists:unzip3(Rest), + URunning = ordsets:union(lists:map(fun ordsets:from_list/1, [R0 | Rs])), + UPartitions = ordsets:union(lists:map(fun ordsets:from_list/1, [P0 | Ps])), + {ordsets:subtract(URunning, UPartitions), UPartitions}; +find_heal_plan([]) -> + {}. + +pick_coordinator(Candidates) -> + case lists:member(node(), Candidates) of + true -> node(); + false -> ekka_membership:coordinator(Candidates) + end. + +heal_partition([{Nodes, []} | _] = SplitView) -> + %% Symmetric partition. + ?LOG(info, "Healing partition: ~p", [SplitView]), reboot_minority(Nodes -- [node()]); -heal_partition([{Majority, Minority}, {Minority, Majority}]) -> - reboot_minority(Minority); -heal_partition(SplitView) -> - ?LOG(critical, "Cannot heal the partitions: ~p", [SplitView]), - error({unknown_splitview, SplitView}). +heal_partition([{Majority, Minority}, {Minority, Majority}] = SplitView) -> + %% Symmetric partition. + ?LOG(info, "Healing partition: ~p", [SplitView]), + reboot_minority(Minority). reboot_minority(Minority) -> lists:foreach(fun shutdown/1, Minority), @@ -155,4 +215,3 @@ ensure_cancel_timer(undefined) -> ok; ensure_cancel_timer(TRef) -> catch erlang:cancel_timer(TRef). - diff --git a/src/ekka_node_monitor.erl b/src/ekka_node_monitor.erl index 824e7655..5835b898 100644 --- a/src/ekka_node_monitor.erl +++ b/src/ekka_node_monitor.erl @@ -110,9 +110,10 @@ handle_cast({confirm, TargetNode, Status}, State) -> handle_cast(Msg = {report_partition, _Node}, State) -> {noreply, autoheal_handle_msg(Msg, State)}; - handle_cast(Msg = {heal_partition, _SplitView}, State) -> {noreply, autoheal_handle_msg(Msg, State)}; +handle_cast(Msg = {heal_cluster, _, _}, State) -> + {noreply, autoheal_handle_msg(Msg, State)}; handle_cast(Msg, State) -> ?LOG(error, "Unexpected cast: ~p", [Msg]), @@ -142,6 +143,12 @@ handle_info({mnesia_system_event, {mnesia_up, Node}}, false -> ok; true -> ekka_membership:partition_healed(Node) end, + %% If there was an anymmetric cluster partition, we might need more + %% autoheal iterations to completely bring the cluster back to normal. + case ekka_autoheal:enabled() of + {true, _} -> run_after(3000, confirm_partition); + false -> ignore + end, {noreply, State#state{partitions = lists:delete(Node, Partitions)}}; handle_info({mnesia_system_event, {mnesia_down, Node}}, State) -> diff --git a/test/ekka_autoheal_SUITE.erl b/test/ekka_autoheal_SUITE.erl index c85b0194..58cd25f8 100644 --- a/test/ekka_autoheal_SUITE.erl +++ b/test/ekka_autoheal_SUITE.erl @@ -16,6 +16,8 @@ -module(ekka_autoheal_SUITE). +-include_lib("snabbkaffe/include/test_macros.hrl"). + -compile(export_all). -compile(nowarn_export_all). @@ -41,27 +43,134 @@ t_autoheal(_Config) -> %% Create cluster ok = rpc:call(N2, ekka, join, [N1]), ok = rpc:call(N3, ekka, join, [N1]), - %% Simulate netsplit - true = rpc:cast(N3, net_kernel, disconnect, [N1]), - true = rpc:cast(N3, net_kernel, disconnect, [N2]), - ok = timer:sleep(1000), - %% SplitView: {[N1,N2], [N3]} - [N1,N2] = rpc:call(N1, ekka, info, [running_nodes]), - [N3] = rpc:call(N1, ekka, info, [stopped_nodes]), - [N1,N2] = rpc:call(N2, ekka, info, [running_nodes]), - [N3] = rpc:call(N2, ekka, info, [stopped_nodes]), - [N3] = rpc:call(N3, ekka, info, [running_nodes]), - [N1,N2] = rpc:call(N3, ekka, info, [stopped_nodes]), - %% Wait for autoheal - ok = timer:sleep(12000), - [N1,N2,N3] = rpc:call(N1, ekka, info, [running_nodes]), - [N1,N2,N3] = rpc:call(N2, ekka, info, [running_nodes]), - [N1,N2,N3] = rpc:call(N3, ekka, info, [running_nodes]), - rpc:call(N1, ekka, leave, []), - rpc:call(N2, ekka, leave, []), - rpc:call(N3, ekka, leave, []) + ?check_trace( + begin + %% Simulate netsplit + true = rpc:cast(N3, net_kernel, disconnect, [N1]), + true = rpc:cast(N3, net_kernel, disconnect, [N2]), + ok = timer:sleep(1000), + %% SplitView: {[N1,N2], [N3]} + [N1,N2] = rpc:call(N1, ekka, info, [running_nodes]), + [N3] = rpc:call(N1, ekka, info, [stopped_nodes]), + [N1,N2] = rpc:call(N2, ekka, info, [running_nodes]), + [N3] = rpc:call(N2, ekka, info, [stopped_nodes]), + [N3] = rpc:call(N3, ekka, info, [running_nodes]), + [N1,N2] = rpc:call(N3, ekka, info, [stopped_nodes]), + %% Simulate autoheal crash, to verify autoheal tolerates it. + snabbkaffe_nemesis:inject_crash( + ?match_event(#{?snk_kind := "Healing cluster partition"}), + snabbkaffe_nemesis:recover_after(1), + ?MODULE + ), + %% Wait for autoheal + ok = timer:sleep(12000), + [N1,N2,N3] = rpc:call(N1, ekka, info, [running_nodes]), + [N1,N2,N3] = rpc:call(N2, ekka, info, [running_nodes]), + [N1,N2,N3] = rpc:call(N3, ekka, info, [running_nodes]), + rpc:call(N1, ekka, leave, []), + rpc:call(N2, ekka, leave, []), + rpc:call(N3, ekka, leave, []) + end, + fun(Trace) -> + ?assertMatch( + [#{need_reboot := [N3]}], + ?of_kind("Healing cluster partition", Trace) + ), + ?assertMatch([_ | _], ?of_kind(snabbkaffe_crash, Trace)) + end + ) + after + lists:foreach(fun ekka_ct:stop_slave/1, Nodes), + snabbkaffe:stop() + end. + +t_autoheal_asymm(_Config) -> + [N1,N2,N3,N4] = Nodes = lists:map(fun start_slave_node/1, [ah1,ah2,ah3,ah4]), + try + %% Create cluster + ok = rpc:call(N2, ekka, join, [N1]), + ok = rpc:call(N3, ekka, join, [N1]), + ok = rpc:call(N4, ekka, join, [N1]), + ?check_trace( + begin + %% Simulate asymmetric netsplit + true = rpc:cast(N2, net_kernel, disconnect, [N1]), + true = rpc:cast(N3, net_kernel, disconnect, [N1]), + true = rpc:cast(N4, net_kernel, disconnect, [N2]), + ok = timer:sleep(1000), + %% Asymmetric split, but it's enough to reboot N1 and N2 + NodesInfo = [running_nodes, stopped_nodes], + [[N1,N4], [N2,N3]] = [rpc:call(N1, ekka, info, [I]) || I <- NodesInfo], + [[N2,N3], [N1,N4]] = [rpc:call(N2, ekka, info, [I]) || I <- NodesInfo], + [[N2,N3,N4], [N1]] = [rpc:call(N3, ekka, info, [I]) || I <- NodesInfo], + [[N1,N3,N4], [N2]] = [rpc:call(N4, ekka, info, [I]) || I <- NodesInfo], + %% Wait for autoheal + ok = timer:sleep(12000), + Nodes = rpc:call(N1, ekka, info, [running_nodes]), + Nodes = rpc:call(N2, ekka, info, [running_nodes]), + Nodes = rpc:call(N3, ekka, info, [running_nodes]), + Nodes = rpc:call(N4, ekka, info, [running_nodes]), + rpc:call(N1, ekka, leave, []), + rpc:call(N2, ekka, leave, []), + rpc:call(N3, ekka, leave, []), + rpc:call(N4, ekka, leave, []) + end, + fun(Trace) -> + ?assertMatch( + [#{need_reboot := [N1, N2]}], + ?of_kind("Healing cluster partition", Trace) + ) + end + ) + after + lists:foreach(fun ekka_ct:stop_slave/1, Nodes), + snabbkaffe:stop() + end. + +t_autoheal_fullsplit(_Config) -> + [N1,N2,N3,N4] = Nodes = lists:map(fun start_slave_node/1, [fs1,fs2,fs3,fs4]), + try + %% Create cluster + ok = rpc:call(N2, ekka, join, [N1]), + ok = rpc:call(N3, ekka, join, [N1]), + ok = rpc:call(N4, ekka, join, [N1]), + ?check_trace( + begin + %% Simulate asymmetric netsplit + true = rpc:cast(N1, net_kernel, disconnect, [N2]), + true = rpc:cast(N1, net_kernel, disconnect, [N3]), + true = rpc:cast(N1, net_kernel, disconnect, [N4]), + true = rpc:cast(N2, net_kernel, disconnect, [N3]), + true = rpc:cast(N2, net_kernel, disconnect, [N4]), + true = rpc:cast(N3, net_kernel, disconnect, [N4]), + ok = timer:sleep(1000), + %% Full split, all nodes except one need to be rebooted + NodesInfo = [running_nodes, stopped_nodes], + [[N1], [N2,N3,N4]] = [rpc:call(N1, ekka, info, [I]) || I <- NodesInfo], + [[N2], [N1,N3,N4]] = [rpc:call(N2, ekka, info, [I]) || I <- NodesInfo], + [[N3], [N1,N2,N4]] = [rpc:call(N3, ekka, info, [I]) || I <- NodesInfo], + [[N4], [N1,N2,N3]] = [rpc:call(N4, ekka, info, [I]) || I <- NodesInfo], + %% Wait for autoheal + ok = timer:sleep(12000), + Nodes = rpc:call(N1, ekka, info, [running_nodes]), + Nodes = rpc:call(N2, ekka, info, [running_nodes]), + Nodes = rpc:call(N3, ekka, info, [running_nodes]), + Nodes = rpc:call(N4, ekka, info, [running_nodes]), + rpc:call(N1, ekka, leave, []), + rpc:call(N2, ekka, leave, []), + rpc:call(N3, ekka, leave, []), + rpc:call(N4, ekka, leave, []) + end, + fun(Trace) -> + ?assertMatch( + [#{need_reboot := [N2, N3, N4]}], + ?of_kind("Healing cluster partition", Trace) + ) + end + ) after - lists:foreach(fun ekka_ct:stop_slave/1, Nodes) + lists:foreach(fun ekka_ct:stop_slave/1, Nodes), + snabbkaffe:stop() end. start_slave_node(Name) ->