From 293fd97edf26b9f4f99e8f7faff2f6df5558b7a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 6 Feb 2024 13:29:17 +0100 Subject: [PATCH 1/4] ra_server: Update leaderboard after each command if cluster changed [Why] In a RabbitMQ test with a 5-node cluster where Erlang nodes are killed and restarted in a rolling fashion, I observed that the leaderboards might be inconsistent across the cluster. Logs from node 1: 2024-02-06 12:29:15.792058+01:00 [debug] <0.251.0> Trying to restart local Ra server for store "rabbitmq_metadata" in Ra system "coordination" 2024-02-06 12:29:15.810508+01:00 [debug] <0.296.0> RabbitMQ metadata store: ra_log:init recovered last_index_term {60,1} first index 0 2024-02-06 12:29:15.813225+01:00 [debug] <0.296.0> RabbitMQ metadata store: post_init -> recover in term: 1 machine version: 0 2024-02-06 12:29:15.813291+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovering state machine version 0:0 from index 0 to 0 2024-02-06 12:29:15.813347+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovery of state machine version 0:0 from index 0 to 0 took 0ms 2024-02-06 12:29:15.813413+01:00 [info] <0.251.0> Waiting for Khepri leader for 30000 ms, 9 retries left 2024-02-06 12:29:15.813428+01:00 [debug] <0.296.0> RabbitMQ metadata store: recover -> recovered in term: 1 machine version: 0 2024-02-06 12:29:15.813484+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovered -> follower in term: 1 machine version: 0 2024-02-06 12:29:15.813520+01:00 [debug] <0.296.0> RabbitMQ metadata store: is not new, setting election timeout. 2024-02-06 12:29:15.818609+01:00 [info] <0.251.0> Khepri leader elected 2024-02-06 12:29:15.820842+01:00 [debug] <0.296.0> RabbitMQ metadata store: leader call - leader not known. Command will be forwarded once leader is known. 2024-02-06 12:29:16.176876+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote election called for in term 1 2024-02-06 12:29:16.182866+01:00 [debug] <0.296.0> RabbitMQ metadata store: follower -> pre_vote in term: 1 machine version: 0 2024-02-06 12:29:16.183051+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote granted #Ref<0.2919399944.1500250113.30962> for term 1 votes 1 2024-02-06 12:29:16.183247+01:00 [debug] <0.296.0> RabbitMQ metadata store: election called for in term 2 2024-02-06 12:29:16.186666+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote -> candidate in term: 2 machine version: 0 2024-02-06 12:29:16.186815+01:00 [debug] <0.296.0> RabbitMQ metadata store: vote granted for term 2 votes 1 2024-02-06 12:29:16.186996+01:00 [notice] <0.296.0> RabbitMQ metadata store: candidate -> leader in term: 2 machine version: 0 2024-02-06 12:29:16.187220+01:00 [alert] <0.296.0> RA LEADER RECORD: ClusterName=rabbitmq_metadata Leader={rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} Members=[{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'}] 2024-02-06 12:29:16.190313+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:16.190648+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:16.190878+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:16.191393+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:16.200058+01:00 [debug] <0.296.0> RabbitMQ metadata store: enabling ra cluster changes in 2, index 61 2024-02-06 12:29:16.302892+01:00 [debug] <0.251.0> Khepri-based RabbitMQ metadata store ready Logs from node 2: 2024-02-06 12:29:18.714368+01:00 [debug] <0.260.0> Trying to restart local Ra server for store "rabbitmq_metadata" in Ra system "coordination" 2024-02-06 12:29:18.732537+01:00 [debug] <0.310.0> RabbitMQ metadata store: ra_log:init recovered last_index_term {74,2} first index 0 2024-02-06 12:29:18.735899+01:00 [debug] <0.310.0> RabbitMQ metadata store: post_init -> recover in term: 2 machine version: 0 2024-02-06 12:29:18.735944+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovering state machine version 0:0 from index 0 to 60 2024-02-06 12:29:18.736488+01:00 [info] <0.260.0> Waiting for Khepri leader for 30000 ms, 9 retries left 2024-02-06 12:29:18.741696+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:18.741784+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:18.741852+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:18.741969+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:18.743421+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovery of state machine version 0:0 from index 0 to 60 took 8ms 2024-02-06 12:29:18.743487+01:00 [debug] <0.310.0> RabbitMQ metadata store: recover -> recovered in term: 2 machine version: 0 2024-02-06 12:29:18.748414+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovered -> follower in term: 2 machine version: 0 2024-02-06 12:29:18.748445+01:00 [debug] <0.310.0> RabbitMQ metadata store: is not new, setting election timeout. 2024-02-06 12:29:18.748591+01:00 [info] <0.260.0> Khepri leader elected 2024-02-06 12:29:18.748642+01:00 [debug] <0.310.0> RabbitMQ metadata store: leader call - leader not known. Command will be forwarded once leader is known. 2024-02-06 12:29:19.036638+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote election called for in term 2 2024-02-06 12:29:19.040483+01:00 [debug] <0.310.0> RabbitMQ metadata store: follower -> pre_vote in term: 2 machine version: 0 2024-02-06 12:29:19.040652+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote granted #Ref<0.772528231.2305294338.197063> for term 2 votes 1 2024-02-06 12:29:19.043116+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote -> follower in term: 2 machine version: 0 2024-02-06 12:29:19.043212+01:00 [debug] <0.310.0> RabbitMQ metadata store: is not new, setting election timeout. 2024-02-06 12:29:19.044068+01:00 [debug] <0.310.0> RabbitMQ metadata store: enabling ra cluster changes in 2, index 61 2024-02-06 12:29:19.061918+01:00 [alert] <0.310.0> RA LEADER RECORD: ClusterName=rabbitmq_metadata Leader={rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} Members=[{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}] 2024-02-06 12:29:19.062191+01:00 [info] <0.310.0> RabbitMQ metadata store: detected a new leader {rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} in term 2 2024-02-06 12:29:19.062378+01:00 [debug] <0.310.0> RabbitMQ metadata store: Leader node 'rmq-ct-rolling_kill_restart-1-22440@localhost' may be down, setting pre-vote timeout 2024-02-06 12:29:19.151724+01:00 [debug] <0.260.0> Khepri-based RabbitMQ metadata store ready Logs on nodes 3, 4 and 5 are similar to node 2. Note that messages starting with "RA LEADER RECORD" were added to `ra_leaderboard:record/3` during debugging and are not committed to Ra. We can see that node 1 is elected the leader on recovery before the `ra_cluster_change` commands are applied. It stays the leader afterwards but never updates its leaderboard. [How] Instead of emitting an `update_leaderboard` effect after a cluster change command was appended or after some specific events, we check if the cluster changed after a command was applied and update the leaderboard if it did. This check is performed on the leader and all followers. It is also performed once a Ra server finished recovery. V2: This revision changes the approach. The first iteration updated the leaderboard after a `ra_cluster_change` command was applied in addition of the existing updates. V3: Refactor the approach and and ensure the leaderboard is updated after recovery as well as whenver the cluster changes on any member. --- src/ra_server.erl | 18 ++-------- src/ra_server_proc.erl | 70 ++++++++++++++++++++----------------- test/coordination_SUITE.erl | 6 ++-- 3 files changed, 43 insertions(+), 51 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 62e16d0a..57416bd3 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1023,27 +1023,18 @@ handle_follower(#append_entries_rpc{term = Term, _ -> State1 = lists:foldl(fun pre_append_log_follower/2, State0, Entries), - %% if the cluster has changed we need to update - %% the leaderboard - Effects1 = case maps:get(cluster, State0) =/= - maps:get(cluster, State1) of - true -> - [update_leaderboard | Effects0]; - false -> - Effects0 - end, case ra_log:write(Entries, Log1) of {ok, Log2} -> {NextState, State, Effects} = evaluate_commit_index_follower(State1#{log => Log2}, - Effects1), + Effects0), {NextState, State, [{next_event, {ra_log_event, flush_cache}} | Effects]}; {error, wal_down} -> {await_condition, State1#{log => Log1, condition => fun wal_down_condition/2}, - Effects1}; + Effects0}; {error, _} = Err -> exit(Err) end @@ -1649,8 +1640,6 @@ filter_follower_effects(Effects) -> [C | Acc]; (garbage_collection = C, Acc) -> [C | Acc]; - (update_leaderboard = C, Acc) -> - [C | Acc]; ({delete_snapshot, _} = C, Acc) -> [C | Acc]; ({send_msg, _, _, _Opts} = C, Acc) -> @@ -2600,7 +2589,7 @@ append_cluster_change(Cluster, From, ReplyMode, cluster := PrevCluster, cluster_index_term := {PrevCITIdx, PrevCITTerm}, current_term := Term} = State, - Effects0) -> + Effects) -> % turn join command into a generic cluster change command % that include the new cluster configuration Command = {'$ra_cluster_change', From, Cluster, ReplyMode}, @@ -2609,7 +2598,6 @@ append_cluster_change(Cluster, From, ReplyMode, % TODO: is it safe to do change the cluster config with an async write? % what happens if the write fails? Log = ra_log:append({NextIdx, Term, Command}, Log0), - Effects = [update_leaderboard | Effects0], {ok, NextIdx, Term, State#{log => Log, cluster => Cluster, diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 2f5189b8..972624c1 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -370,6 +370,7 @@ recover(_, _, State) -> %% effects post recovery recovered(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), + ok = record_cluster_change(State), {keep_state, State, Actions}; recovered(internal, next, #state{server_state = ServerState} = State) -> true = erlang:garbage_collect(), @@ -378,7 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) -> leader(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), - ok = record_leader_change(id(State0), State0), + ok = record_cluster_change(State), %% TODO: reset refs? {keep_state, State#state{leader_last_seen = undefined, pending_notifys = #{}, @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) -> leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, - #state{conf = Conf, - server_state = ServerState0} = State0) -> + #state{conf = Conf} = State0) -> case validate_reply_mode(ReplyMode) of ok -> %% normal priority commands are written immediately Cmd = make_command(CmdType, EventType, Data, ReplyMode), - {leader, ServerState, Effects} = - ra_server:handle_leader({command, Cmd}, ServerState0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), {keep_state, State, Actions}; Error -> ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) -> {keep_state, State#state{server_state = ServerState}, Actions}; leader(EventType, flush_commands, #state{conf = #conf{flush_commands_size = Size}, - server_state = ServerState0, low_priority_commands = Delayed0} = State0) -> {Commands, Delayed} = ra_ets_queue:take(Size, Delayed0), %% write a batch of delayed commands - {leader, ServerState, Effects} = - ra_server:handle_leader({commands, Commands}, ServerState0), + {leader, State1, Effects} = handle_leader({commands, Commands}, State0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), case ra_ets_queue:len(Delayed) of 0 -> ok; @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) -> State = send_applied_notifications(State2, #{}), {keep_state, handle_tick_metrics(State), set_tick_timer(State, Actions)}; -leader({timeout, Name}, machine_timeout, - #state{server_state = ServerState0, - conf = #conf{}} = State0) -> +leader({timeout, Name}, machine_timeout, State0) -> % the machine timer timed out, add a timeout message Cmd = make_command('$usr', cast, {timeout, Name}, noreply), - {leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd}, - ServerState0), - {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, - State0#state{server_state = - ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1), {keep_state, State, Actions}; leader({call, From}, trigger_election, State) -> {keep_state, State, [{reply, From, ok}]}; @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) -> fold_log(From, Fun, Term, State); follower(EventType, Msg, #state{conf = #conf{name = Name}, server_state = SS0} = State0) -> - Membership0 = ra_server:get_membership(SS0), case handle_follower(Msg, State0) of {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), State = #state{server_state = SS} = follower_leader_change(State0, State2), + Membership0 = ra_server:get_membership(SS0), case ra_server:get_membership(SS) of Membership0 -> ok; @@ -1077,6 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) -> {NextState, ServerState, Effects} -> State = State0#state{server_state = ra_server:persist_last_applied(ServerState)}, + maybe_record_cluster_change(State0, State), {NextState, State, Effects}; OtherErr -> ?ERR("handle_leader err ~p", [OtherErr]), @@ -1103,8 +1092,11 @@ handle_candidate(Msg, State) -> handle_pre_vote(Msg, State) -> handle_raft_state(?FUNCTION_NAME, Msg, State). -handle_follower(Msg, State) -> - handle_raft_state(?FUNCTION_NAME, Msg, State). +handle_follower(Msg, State0) -> + Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0), + {_NextState, State, _Effects} = Ret, + maybe_record_cluster_change(State0, State), + Ret. handle_receive_snapshot(Msg, State) -> handle_raft_state(?FUNCTION_NAME, Msg, State). @@ -1341,9 +1333,6 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; -handle_effect(_, update_leaderboard, _EvtType, State, Actions) -> - ok = record_leader_change(leader_id(State), State), - {State, Actions}; handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, @@ -1491,7 +1480,6 @@ follower_leader_change(Old, #state{pending_commands = Pending, ok = aten_register(LeaderNode), OldLeaderNode = ra_lib:ra_server_id_node(OldLeader), _ = aten:unregister(OldLeaderNode), - ok = record_leader_change(NewLeader, New), % leader has either changed or just been set ?INFO("~ts: detected a new leader ~w in term ~b", [log_id(New), NewLeader, current_term(New)]), @@ -1555,6 +1543,8 @@ do_state_query(voters, #{cluster := Cluster}) -> end end, [], Cluster), Vs; +do_state_query(leader, #{leader_id := Leader}) -> + Leader; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, @@ -1872,11 +1862,25 @@ handle_process_down(Pid, Info, RaftState, monitors = Monitors}), {keep_state, State, Actions}. -record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName}, - server_state = ServerState}) -> +maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerStateA}, + #state{server_state = ServerStateB}) -> + LeaderA = ra_server:leader_id(ServerStateA), + LeaderB = ra_server:leader_id(ServerStateB), + if (map_get(cluster_index_term, ServerStateA) =/= + map_get(cluster_index_term, ServerStateB) orelse + LeaderA =/= LeaderB) -> + MembersB = do_state_query(members, ServerStateB), + ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB); + true -> + ok + end. + +record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerState}) -> + Leader = do_state_query(leader, ServerState), Members = do_state_query(members, ServerState), - ok = ra_leaderboard:record(ClusterName, Leader, Members), - ok. + ok = ra_leaderboard:record(ClusterName, Leader, Members). incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined -> counters:add(Cnt, Ix, N); diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 7756460b..269aef8b 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) -> Machine = {module, ?MODULE, #{}}, {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), {ok, _} = ra:delete_cluster(ServerIds), - % timer:sleep(1000), await_condition( fun () -> lists:all( @@ -309,8 +308,9 @@ grow_cluster(Config) -> ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]), {ok, _, _} = ra:add_member(A, B), {ok, _, _} = ra:process_command(A, banana), - [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]), {ok, _, _} = ra:add_member(A, C), @@ -339,8 +339,8 @@ grow_cluster(Config) -> [B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), %% check leader - L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), From 110b8d632105d06af36e668c6936f66790c814d5 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 8 Feb 2024 13:00:06 +0000 Subject: [PATCH 2/4] Fix cluster recovery bug. In Raft the cluster is changed when a cluster change command is written to the log. Hence to correctly recover the current cluster a state machine needs to scan until the end of the log on recovery to ensure it picks up any observed changes. Ra didn't do this and thus were vulnerable to pick up the wrong cluster configuration (which ofc would affect subsequent leader elections). --- src/ra_server.erl | 24 ++++++++++++++++++++++-- test/ra_server_SUITE.erl | 4 +++- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 57416bd3..055a9730 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -370,7 +370,7 @@ recover(#{cfg := #cfg{log_id = LogId, [LogId, EffMacVer, MacVer, LastApplied, CommitIndex]), Before = erlang:system_time(millisecond), {#{log := Log0, - cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} = + cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State1, _} = apply_to(CommitIndex, fun({Idx, _, _} = E, S0) -> %% Clear out the effects and notifies map @@ -384,8 +384,17 @@ recover(#{cfg := #cfg{log_id = LogId, ?DEBUG("~ts: recovery of state machine version ~b:~b " "from index ~b to ~b took ~bms", [LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]), + %% scan from CommitIndex + 1 until NextIndex - 1 to see if there are + %% any further cluster changes + FromScan = CommitIndex + 1, + {ToScan, _} = ra_log:last_index_term(Log0), + ?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]), + {State, Log1} = ra_log:fold(FromScan, ToScan, + fun cluster_scan_fun/2, + State1, Log0), + %% disable segment read cache by setting random access pattern - Log = ra_log:release_resources(1, random, Log0), + Log = ra_log:release_resources(1, random, Log1), put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0), State#{log => Log, %% reset commit latency as recovery may calculate a very old value @@ -2327,6 +2336,17 @@ append_app_effects([AppEff], Effs) -> append_app_effects(AppEffs, Effs) -> [AppEffs | Effs]. +cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}}, + State0) -> + ?DEBUG("~ts: ~ts: applying ra cluster change to ~w", + [log_id(State0), ?FUNCTION_NAME, maps:keys(NewCluster)]), + %% we are recovering and should apply the cluster change + State0#{cluster => NewCluster, + membership => get_membership(NewCluster, State0), + cluster_change_permitted => true, + cluster_index_term => {Idx, Term}}; +cluster_scan_fun(_Cmd, State) -> + State. apply_with(_Cmd, {Mod, LastAppliedIdx, diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index ccb2e7c3..64685dbf 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -248,13 +248,15 @@ recover_restores_cluster_changes(_Config) -> log := Log0}, _} = ra_server:handle_leader({command, {'$ra_join', meta(), N2, await_consensus}}, State), + {LIdx, _} = ra_log:last_index_term(Log0), ?assertEqual(2, maps:size(Cluster)), % intercept ra_log:init call to simulate persisted log data % ok = meck:new(ra_log, [passthrough]), meck:expect(ra_log, init, fun (_) -> Log0 end), meck:expect(ra_log_meta, fetch, fun (_, _, last_applied, 0) -> - element(1, ra_log:last_index_term(Log0)); + LIdx - 1; + % element(1, ra_log:last_index_term(Log0)); (_, _, _, Def) -> Def end), From 3628f4856fd3b8d4f43be69fd420c6d015c7e2fe Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 8 Feb 2024 14:32:49 +0000 Subject: [PATCH 3/4] Possibly avoid a rare race condition in the WAL When the wal detects a new writer it creates a new ETS mem table and inserts an initial record into the open mem table.. hrm.. table. This is so that the open_mem_tables table can later be updated using ets:update_element/3. However the initial update was such that it indicated that it should contain a single entry which isn't yet true. The entires are written a little later during batch completion. This is an attempt to avoid this condition by setting the "To" field to -1 that of the "From" field. --- src/ra_log_wal.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index be8cd8c4..29448ddb 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -524,7 +524,7 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, %% to open one T = open_mem_table(Cfg, UId), true = ets:insert_new(OpnMemTbl, - {UId, Idx, Idx, T}), + {UId, Idx, Idx - 1, T}), {T, Idx} end, Writer = #batch_writer{tbl_start = TblStart, From 2c9a7fe15b496fcaca4e2e1ae06ae5b03247cd64 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 8 Feb 2024 16:35:50 +0000 Subject: [PATCH 4/4] ra_log_2_SUITE: test stability. --- test/ra_log_2_SUITE.erl | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index ff81a67d..d25eb4b1 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -180,8 +180,11 @@ take_after_overwrite_and_init(Config) -> Log2 = deliver_written_log_events(Log1, 200), {[_, _, _, _], Log3} = ra_log_take(1, 4, Log2), Log4 = write_and_roll_no_deliver(1, 2, 2, Log3), - % fake lost segments event - Log5 = deliver_written_log_events(Log4, 200), + Log5 = deliver_log_events_cond(Log4, + fun (L) -> + {1, 2} =:= ra_log:last_written(L) + end, 100), + % ensure we cannot take stale entries {[{1, 2, _}], Log6} = ra_log_take(1, 4, Log5), _ = ra_log:close(Log6), @@ -310,7 +313,8 @@ sparse_read_out_of_range_2(Config) -> {Log3, _} = receive {ra_log_event, {snapshot_written, {10, 2}} = Evt} -> ra_log:handle_event(Evt, Log2) - after 500 -> + after 5000 -> + flush(), exit(snapshot_written_timeout) end, Log4 = deliver_all_log_events(Log3, 100), @@ -332,8 +336,17 @@ sparse_read(Config) -> %% read small batch of the latest entries {_, Log3} = ra_log_take(Num - 5, Num, Log2), ct:pal("log overview ~p", [ra_log:overview(Log3)]), - %% warm up run - {_, Log4} = ra_log_take(1, Num, Log3), + %% ensure cache is empty as this indicates all enties have at least + %% been written to the WAL and thus will be available in mem tables. + Log4 = deliver_log_events_cond(Log3, + fun (L) -> + case ra_log:overview(L) of + #{cache_size := 0} -> + true; + _ -> + false + end + end, 100), ra_log:close(Log4), NumDiv2 = Num div 2, %% create a list of indexes with some consecutive and some gaps