Skip to content

Commit

Permalink
Merge pull request #417 from rabbitmq/update-leaderboard-on-recovery
Browse files Browse the repository at this point in the history
ra_server: Update leaderboard after each command if cluster changed
  • Loading branch information
kjnilsson authored Feb 8, 2024
2 parents 8a24cdd + 2c9a7fe commit 7910c4f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 60 deletions.
2 changes: 1 addition & 1 deletion src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg,
%% to open one
T = open_mem_table(Cfg, UId),
true = ets:insert_new(OpnMemTbl,
{UId, Idx, Idx, T}),
{UId, Idx, Idx - 1, T}),
{T, Idx}
end,
Writer = #batch_writer{tbl_start = TblStart,
Expand Down
42 changes: 25 additions & 17 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ recover(#{cfg := #cfg{log_id = LogId,
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex]),
Before = erlang:system_time(millisecond),
{#{log := Log0,
cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} =
cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State1, _} =
apply_to(CommitIndex,
fun({Idx, _, _} = E, S0) ->
%% Clear out the effects and notifies map
Expand All @@ -384,8 +384,17 @@ recover(#{cfg := #cfg{log_id = LogId,
?DEBUG("~ts: recovery of state machine version ~b:~b "
"from index ~b to ~b took ~bms",
[LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]),
%% scan from CommitIndex + 1 until NextIndex - 1 to see if there are
%% any further cluster changes
FromScan = CommitIndex + 1,
{ToScan, _} = ra_log:last_index_term(Log0),
?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]),
{State, Log1} = ra_log:fold(FromScan, ToScan,
fun cluster_scan_fun/2,
State1, Log0),

%% disable segment read cache by setting random access pattern
Log = ra_log:release_resources(1, random, Log0),
Log = ra_log:release_resources(1, random, Log1),
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0),
State#{log => Log,
%% reset commit latency as recovery may calculate a very old value
Expand Down Expand Up @@ -1023,27 +1032,18 @@ handle_follower(#append_entries_rpc{term = Term,
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
%% if the cluster has changed we need to update
%% the leaderboard
Effects1 = case maps:get(cluster, State0) =/=
maps:get(cluster, State1) of
true ->
[update_leaderboard | Effects0];
false ->
Effects0
end,
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
Effects1),
Effects0),
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
Effects1};
Effects0};
{error, _} = Err ->
exit(Err)
end
Expand Down Expand Up @@ -1649,8 +1649,6 @@ filter_follower_effects(Effects) ->
[C | Acc];
(garbage_collection = C, Acc) ->
[C | Acc];
(update_leaderboard = C, Acc) ->
[C | Acc];
({delete_snapshot, _} = C, Acc) ->
[C | Acc];
({send_msg, _, _, _Opts} = C, Acc) ->
Expand Down Expand Up @@ -2338,6 +2336,17 @@ append_app_effects([AppEff], Effs) ->
append_app_effects(AppEffs, Effs) ->
[AppEffs | Effs].

cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}},
State0) ->
?DEBUG("~ts: ~ts: applying ra cluster change to ~w",
[log_id(State0), ?FUNCTION_NAME, maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
membership => get_membership(NewCluster, State0),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
cluster_scan_fun(_Cmd, State) ->
State.

apply_with(_Cmd,
{Mod, LastAppliedIdx,
Expand Down Expand Up @@ -2600,7 +2609,7 @@ append_cluster_change(Cluster, From, ReplyMode,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term} = State,
Effects0) ->
Effects) ->
% turn join command into a generic cluster change command
% that include the new cluster configuration
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
Expand All @@ -2609,7 +2618,6 @@ append_cluster_change(Cluster, From, ReplyMode,
% TODO: is it safe to do change the cluster config with an async write?
% what happens if the write fails?
Log = ra_log:append({NextIdx, Term, Command}, Log0),
Effects = [update_leaderboard | Effects0],
{ok, NextIdx, Term,
State#{log => Log,
cluster => Cluster,
Expand Down
70 changes: 37 additions & 33 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ recover(_, _, State) ->
%% effects post recovery
recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_cluster_change(State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
true = erlang:garbage_collect(),
Expand All @@ -378,7 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_leader_change(id(State0), State0),
ok = record_cluster_change(State),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
Expand All @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) ->
leader(EventType, {leader_cast, Msg}, State) ->
leader(EventType, Msg, State);
leader(EventType, {command, normal, {CmdType, Data, ReplyMode}},
#state{conf = Conf,
server_state = ServerState0} = State0) ->
#state{conf = Conf} = State0) ->
case validate_reply_mode(ReplyMode) of
ok ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
{keep_state, State, Actions};
Error ->
ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1),
Expand Down Expand Up @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) ->
{keep_state, State#state{server_state = ServerState}, Actions};
leader(EventType, flush_commands,
#state{conf = #conf{flush_commands_size = Size},
server_state = ServerState0,
low_priority_commands = Delayed0} = State0) ->

{Commands, Delayed} = ra_ets_queue:take(Size, Delayed0),
%% write a batch of delayed commands
{leader, ServerState, Effects} =
ra_server:handle_leader({commands, Commands}, ServerState0),
{leader, State1, Effects} = handle_leader({commands, Commands}, State0),

{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
case ra_ets_queue:len(Delayed) of
0 ->
ok;
Expand Down Expand Up @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) ->
State = send_applied_notifications(State2, #{}),
{keep_state, handle_tick_metrics(State),
set_tick_timer(State, Actions)};
leader({timeout, Name}, machine_timeout,
#state{server_state = ServerState0,
conf = #conf{}} = State0) ->
leader({timeout, Name}, machine_timeout, State0) ->
% the machine timer timed out, add a timeout message
Cmd = make_command('$usr', cast, {timeout, Name}, noreply),
{leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd},
ServerState0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast,
State0#state{server_state =
ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1),
{keep_state, State, Actions};
leader({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
Expand Down Expand Up @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
server_state = SS0} = State0) ->
Membership0 = ra_server:get_membership(SS0),
case handle_follower(Msg, State0) of
{follower, State1, Effects} ->
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = #state{server_state = SS} = follower_leader_change(State0, State2),
Membership0 = ra_server:get_membership(SS0),
case ra_server:get_membership(SS) of
Membership0 ->
ok;
Expand Down Expand Up @@ -1077,6 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) ->
{NextState, ServerState, Effects} ->
State = State0#state{server_state =
ra_server:persist_last_applied(ServerState)},
maybe_record_cluster_change(State0, State),
{NextState, State, Effects};
OtherErr ->
?ERR("handle_leader err ~p", [OtherErr]),
Expand All @@ -1103,8 +1092,11 @@ handle_candidate(Msg, State) ->
handle_pre_vote(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).

handle_follower(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
handle_follower(Msg, State0) ->
Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0),
{_NextState, State, _Effects} = Ret,
maybe_record_cluster_change(State0, State),
Ret.

handle_receive_snapshot(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
Expand Down Expand Up @@ -1341,9 +1333,6 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, update_leaderboard, _EvtType, State, Actions) ->
ok = record_leader_change(leader_id(State), State),
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Expand Down Expand Up @@ -1491,7 +1480,6 @@ follower_leader_change(Old, #state{pending_commands = Pending,
ok = aten_register(LeaderNode),
OldLeaderNode = ra_lib:ra_server_id_node(OldLeader),
_ = aten:unregister(OldLeaderNode),
ok = record_leader_change(NewLeader, New),
% leader has either changed or just been set
?INFO("~ts: detected a new leader ~w in term ~b",
[log_id(New), NewLeader, current_term(New)]),
Expand Down Expand Up @@ -1555,6 +1543,8 @@ do_state_query(voters, #{cluster := Cluster}) ->
end
end, [], Cluster),
Vs;
do_state_query(leader, #{leader_id := Leader}) ->
Leader;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
Expand Down Expand Up @@ -1872,11 +1862,25 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = ra_server:leader_id(ServerStateA),
LeaderB = ra_server:leader_id(ServerStateB),
if (map_get(cluster_index_term, ServerStateA) =/=
map_get(cluster_index_term, ServerStateB) orelse
LeaderA =/= LeaderB) ->
MembersB = do_state_query(members, ServerStateB),
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB);
true ->
ok
end.

record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Leader = do_state_query(leader, ServerState),
Members = do_state_query(members, ServerState),
ok = ra_leaderboard:record(ClusterName, Leader, Members),
ok.
ok = ra_leaderboard:record(ClusterName, Leader, Members).

incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
Expand Down
6 changes: 3 additions & 3 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) ->
Machine = {module, ?MODULE, #{}},
{ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds),
{ok, _} = ra:delete_cluster(ServerIds),
% timer:sleep(1000),
await_condition(
fun () ->
lists:all(
Expand Down Expand Up @@ -309,8 +308,9 @@ grow_cluster(Config) ->
ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]),
{ok, _, _} = ra:add_member(A, B),
{ok, _, _} = ra:process_command(A, banana),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

[A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
{ok, _, _} = ra:add_member(A, C),
Expand Down Expand Up @@ -339,8 +339,8 @@ grow_cluster(Config) ->
[B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
%% check leader
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),


Expand Down
23 changes: 18 additions & 5 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ take_after_overwrite_and_init(Config) ->
Log2 = deliver_written_log_events(Log1, 200),
{[_, _, _, _], Log3} = ra_log_take(1, 4, Log2),
Log4 = write_and_roll_no_deliver(1, 2, 2, Log3),
% fake lost segments event
Log5 = deliver_written_log_events(Log4, 200),
Log5 = deliver_log_events_cond(Log4,
fun (L) ->
{1, 2} =:= ra_log:last_written(L)
end, 100),

% ensure we cannot take stale entries
{[{1, 2, _}], Log6} = ra_log_take(1, 4, Log5),
_ = ra_log:close(Log6),
Expand Down Expand Up @@ -310,7 +313,8 @@ sparse_read_out_of_range_2(Config) ->
{Log3, _} = receive
{ra_log_event, {snapshot_written, {10, 2}} = Evt} ->
ra_log:handle_event(Evt, Log2)
after 500 ->
after 5000 ->
flush(),
exit(snapshot_written_timeout)
end,
Log4 = deliver_all_log_events(Log3, 100),
Expand All @@ -332,8 +336,17 @@ sparse_read(Config) ->
%% read small batch of the latest entries
{_, Log3} = ra_log_take(Num - 5, Num, Log2),
ct:pal("log overview ~p", [ra_log:overview(Log3)]),
%% warm up run
{_, Log4} = ra_log_take(1, Num, Log3),
%% ensure cache is empty as this indicates all enties have at least
%% been written to the WAL and thus will be available in mem tables.
Log4 = deliver_log_events_cond(Log3,
fun (L) ->
case ra_log:overview(L) of
#{cache_size := 0} ->
true;
_ ->
false
end
end, 100),
ra_log:close(Log4),
NumDiv2 = Num div 2,
%% create a list of indexes with some consecutive and some gaps
Expand Down
4 changes: 3 additions & 1 deletion test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,15 @@ recover_restores_cluster_changes(_Config) ->
log := Log0}, _} =
ra_server:handle_leader({command, {'$ra_join', meta(),
N2, await_consensus}}, State),
{LIdx, _} = ra_log:last_index_term(Log0),
?assertEqual(2, maps:size(Cluster)),
% intercept ra_log:init call to simulate persisted log data
% ok = meck:new(ra_log, [passthrough]),
meck:expect(ra_log, init, fun (_) -> Log0 end),
meck:expect(ra_log_meta, fetch,
fun (_, _, last_applied, 0) ->
element(1, ra_log:last_index_term(Log0));
LIdx - 1;
% element(1, ra_log:last_index_term(Log0));
(_, _, _, Def) ->
Def
end),
Expand Down

0 comments on commit 7910c4f

Please sign in to comment.