Skip to content

Commit

Permalink
ra_server: Update leaderboard after each command if cluster changed
Browse files Browse the repository at this point in the history
[Why]
In a RabbitMQ test with a 5-node cluster where Erlang nodes are killed
and restarted in a rolling fashion, I observed that the leaderboards
might be inconsistent across the cluster.

Logs from node 1:

    2024-02-06 12:29:15.792058+01:00 [debug] <0.251.0> Trying to restart local Ra server for store "rabbitmq_metadata" in Ra system "coordination"
    2024-02-06 12:29:15.810508+01:00 [debug] <0.296.0> RabbitMQ metadata store: ra_log:init recovered last_index_term {60,1} first index 0
    2024-02-06 12:29:15.813225+01:00 [debug] <0.296.0> RabbitMQ metadata store: post_init -> recover in term: 1 machine version: 0
    2024-02-06 12:29:15.813291+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovering state machine version 0:0 from index 0 to 0
    2024-02-06 12:29:15.813347+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovery of state machine version 0:0 from index 0 to 0 took 0ms
    2024-02-06 12:29:15.813413+01:00 [info] <0.251.0> Waiting for Khepri leader for 30000 ms, 9 retries left
    2024-02-06 12:29:15.813428+01:00 [debug] <0.296.0> RabbitMQ metadata store: recover -> recovered in term: 1 machine version: 0
    2024-02-06 12:29:15.813484+01:00 [debug] <0.296.0> RabbitMQ metadata store: recovered -> follower in term: 1 machine version: 0
    2024-02-06 12:29:15.813520+01:00 [debug] <0.296.0> RabbitMQ metadata store: is not new, setting election timeout.
    2024-02-06 12:29:15.818609+01:00 [info] <0.251.0> Khepri leader elected
    2024-02-06 12:29:15.820842+01:00 [debug] <0.296.0> RabbitMQ metadata store: leader call - leader not known. Command will be forwarded once leader is known.
    2024-02-06 12:29:16.176876+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote election called for in term 1
    2024-02-06 12:29:16.182866+01:00 [debug] <0.296.0> RabbitMQ metadata store: follower -> pre_vote in term: 1 machine version: 0
    2024-02-06 12:29:16.183051+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote granted #Ref<0.2919399944.1500250113.30962> for term 1 votes 1
    2024-02-06 12:29:16.183247+01:00 [debug] <0.296.0> RabbitMQ metadata store: election called for in term 2
    2024-02-06 12:29:16.186666+01:00 [debug] <0.296.0> RabbitMQ metadata store: pre_vote -> candidate in term: 2 machine version: 0
    2024-02-06 12:29:16.186815+01:00 [debug] <0.296.0> RabbitMQ metadata store: vote granted for term 2 votes 1
    2024-02-06 12:29:16.186996+01:00 [notice] <0.296.0> RabbitMQ metadata store: candidate -> leader in term: 2 machine version: 0
    2024-02-06 12:29:16.187220+01:00 [alert] <0.296.0> RA LEADER RECORD: ClusterName=rabbitmq_metadata Leader={rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} Members=[{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'}]
    2024-02-06 12:29:16.190313+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:16.190648+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:16.190878+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:16.191393+01:00 [debug] <0.296.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:16.200058+01:00 [debug] <0.296.0> RabbitMQ metadata store: enabling ra cluster changes in 2, index 61
    2024-02-06 12:29:16.302892+01:00 [debug] <0.251.0> Khepri-based RabbitMQ metadata store ready

Logs from node 2:

    2024-02-06 12:29:18.714368+01:00 [debug] <0.260.0> Trying to restart local Ra server for store "rabbitmq_metadata" in Ra system "coordination"
    2024-02-06 12:29:18.732537+01:00 [debug] <0.310.0> RabbitMQ metadata store: ra_log:init recovered last_index_term {74,2} first index 0
    2024-02-06 12:29:18.735899+01:00 [debug] <0.310.0> RabbitMQ metadata store: post_init -> recover in term: 2 machine version: 0
    2024-02-06 12:29:18.735944+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovering state machine version 0:0 from index 0 to 60
    2024-02-06 12:29:18.736488+01:00 [info] <0.260.0> Waiting for Khepri leader for 30000 ms, 9 retries left
    2024-02-06 12:29:18.741696+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:18.741784+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:18.741852+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:18.741969+01:00 [debug] <0.310.0> RabbitMQ metadata store: applying ra cluster change to [{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:18.743421+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovery of state machine version 0:0 from index 0 to 60 took 8ms
    2024-02-06 12:29:18.743487+01:00 [debug] <0.310.0> RabbitMQ metadata store: recover -> recovered in term: 2 machine version: 0
    2024-02-06 12:29:18.748414+01:00 [debug] <0.310.0> RabbitMQ metadata store: recovered -> follower in term: 2 machine version: 0
    2024-02-06 12:29:18.748445+01:00 [debug] <0.310.0> RabbitMQ metadata store: is not new, setting election timeout.
    2024-02-06 12:29:18.748591+01:00 [info] <0.260.0> Khepri leader elected
    2024-02-06 12:29:18.748642+01:00 [debug] <0.310.0> RabbitMQ metadata store: leader call - leader not known. Command will be forwarded once leader is known.
    2024-02-06 12:29:19.036638+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote election called for in term 2
    2024-02-06 12:29:19.040483+01:00 [debug] <0.310.0> RabbitMQ metadata store: follower -> pre_vote in term: 2 machine version: 0
    2024-02-06 12:29:19.040652+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote granted #Ref<0.772528231.2305294338.197063> for term 2 votes 1
    2024-02-06 12:29:19.043116+01:00 [debug] <0.310.0> RabbitMQ metadata store: pre_vote -> follower in term: 2 machine version: 0
    2024-02-06 12:29:19.043212+01:00 [debug] <0.310.0> RabbitMQ metadata store: is not new, setting election timeout.
    2024-02-06 12:29:19.044068+01:00 [debug] <0.310.0> RabbitMQ metadata store: enabling ra cluster changes in 2, index 61
    2024-02-06 12:29:19.061918+01:00 [alert] <0.310.0> RA LEADER RECORD: ClusterName=rabbitmq_metadata Leader={rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} Members=[{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-2-22488@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-3-22536@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-4-22584@localhost'},{rabbitmq_metadata,'rmq-ct-rolling_kill_restart-5-22632@localhost'}]
    2024-02-06 12:29:19.062191+01:00 [info] <0.310.0> RabbitMQ metadata store: detected a new leader {rabbitmq_metadata,'rmq-ct-rolling_kill_restart-1-22440@localhost'} in term 2
    2024-02-06 12:29:19.062378+01:00 [debug] <0.310.0> RabbitMQ metadata store: Leader node 'rmq-ct-rolling_kill_restart-1-22440@localhost' may be down, setting pre-vote timeout
    2024-02-06 12:29:19.151724+01:00 [debug] <0.260.0> Khepri-based RabbitMQ metadata store ready

Logs on nodes 3, 4 and 5 are similar to node 2.

Note that messages starting with "RA LEADER RECORD" were added to
`ra_leaderboard:record/3` during debugging and are not committed to Ra.

We can see that node 1 is elected the leader on recovery before the
`ra_cluster_change` commands are applied. It stays the leader afterwards
but never updates its leaderboard.

[How]
Instead of emitting an `update_leaderboard` effect after a cluster
change command was appended or after some specific events, we check if
the cluster changed after a command was applied and update the
leaderboard if it did.

This check is performed on the leader and all followers. It is also
performed once a Ra server finished recovery.
  • Loading branch information
dumbbell committed Feb 6, 2024
1 parent 8a24cdd commit 7eb516b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
18 changes: 3 additions & 15 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1023,27 +1023,18 @@ handle_follower(#append_entries_rpc{term = Term,
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
%% if the cluster has changed we need to update
%% the leaderboard
Effects1 = case maps:get(cluster, State0) =/=
maps:get(cluster, State1) of
true ->
[update_leaderboard | Effects0];
false ->
Effects0
end,
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
Effects1),
Effects0),
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
Effects1};
Effects0};
{error, _} = Err ->
exit(Err)
end
Expand Down Expand Up @@ -1649,8 +1640,6 @@ filter_follower_effects(Effects) ->
[C | Acc];
(garbage_collection = C, Acc) ->
[C | Acc];
(update_leaderboard = C, Acc) ->
[C | Acc];
({delete_snapshot, _} = C, Acc) ->
[C | Acc];
({send_msg, _, _, _Opts} = C, Acc) ->
Expand Down Expand Up @@ -2600,7 +2589,7 @@ append_cluster_change(Cluster, From, ReplyMode,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term} = State,
Effects0) ->
Effects) ->
% turn join command into a generic cluster change command
% that include the new cluster configuration
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
Expand All @@ -2609,7 +2598,6 @@ append_cluster_change(Cluster, From, ReplyMode,
% TODO: is it safe to do change the cluster config with an async write?
% what happens if the write fails?
Log = ra_log:append({NextIdx, Term, Command}, Log0),
Effects = [update_leaderboard | Effects0],
{ok, NextIdx, Term,
State#{log => Log,
cluster => Cluster,
Expand Down
35 changes: 23 additions & 12 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ recover(_, _, State) ->
%% effects post recovery
recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
record_cluster_change(State0, State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
true = erlang:garbage_collect(),
Expand All @@ -378,7 +379,6 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_leader_change(id(State0), State0),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
Expand Down Expand Up @@ -1077,6 +1077,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) ->
{NextState, ServerState, Effects} ->
State = State0#state{server_state =
ra_server:persist_last_applied(ServerState)},
record_cluster_change(State0, State),
{NextState, State, Effects};
OtherErr ->
?ERR("handle_leader err ~p", [OtherErr]),
Expand All @@ -1103,8 +1104,11 @@ handle_candidate(Msg, State) ->
handle_pre_vote(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).

handle_follower(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
handle_follower(Msg, State0) ->
Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0),
{_NextState, State, _Effects} = Ret,
record_cluster_change(State0, State),
Ret.

handle_receive_snapshot(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
Expand Down Expand Up @@ -1341,9 +1345,6 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, update_leaderboard, _EvtType, State, Actions) ->
ok = record_leader_change(leader_id(State), State),
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Expand Down Expand Up @@ -1491,7 +1492,6 @@ follower_leader_change(Old, #state{pending_commands = Pending,
ok = aten_register(LeaderNode),
OldLeaderNode = ra_lib:ra_server_id_node(OldLeader),
_ = aten:unregister(OldLeaderNode),
ok = record_leader_change(NewLeader, New),
% leader has either changed or just been set
?INFO("~ts: detected a new leader ~w in term ~b",
[log_id(New), NewLeader, current_term(New)]),
Expand Down Expand Up @@ -1555,6 +1555,8 @@ do_state_query(voters, #{cluster := Cluster}) ->
end
end, [], Cluster),
Vs;
do_state_query(leader, #{leader_id := Leader}) ->
Leader;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
Expand Down Expand Up @@ -1872,11 +1874,20 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Members = do_state_query(members, ServerState),
ok = ra_leaderboard:record(ClusterName, Leader, Members),
ok.
record_cluster_change(
#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = do_state_query(leader, ServerStateA),
MembersA = do_state_query(members, ServerStateA),
LeaderB = do_state_query(leader, ServerStateB),
MembersB = do_state_query(members, ServerStateB),
case {LeaderA, MembersA} of
{LeaderB, MembersB} ->
ok;
_ ->
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB)
end.

incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
Expand Down

0 comments on commit 7eb516b

Please sign in to comment.