Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ra_server: Update leaderboard after each command if cluster changed #417

Merged
merged 4 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/ra_log_wal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg,
%% to open one
T = open_mem_table(Cfg, UId),
true = ets:insert_new(OpnMemTbl,
{UId, Idx, Idx, T}),
{UId, Idx, Idx - 1, T}),
{T, Idx}
end,
Writer = #batch_writer{tbl_start = TblStart,
Expand Down
42 changes: 25 additions & 17 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ recover(#{cfg := #cfg{log_id = LogId,
[LogId, EffMacVer, MacVer, LastApplied, CommitIndex]),
Before = erlang:system_time(millisecond),
{#{log := Log0,
cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} =
cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State1, _} =
apply_to(CommitIndex,
fun({Idx, _, _} = E, S0) ->
%% Clear out the effects and notifies map
Expand All @@ -384,8 +384,17 @@ recover(#{cfg := #cfg{log_id = LogId,
?DEBUG("~ts: recovery of state machine version ~b:~b "
"from index ~b to ~b took ~bms",
[LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]),
%% scan from CommitIndex + 1 until NextIndex - 1 to see if there are
%% any further cluster changes
FromScan = CommitIndex + 1,
{ToScan, _} = ra_log:last_index_term(Log0),
?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]),
{State, Log1} = ra_log:fold(FromScan, ToScan,
fun cluster_scan_fun/2,
State1, Log0),

%% disable segment read cache by setting random access pattern
Log = ra_log:release_resources(1, random, Log0),
Log = ra_log:release_resources(1, random, Log1),
put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0),
State#{log => Log,
%% reset commit latency as recovery may calculate a very old value
Expand Down Expand Up @@ -1023,27 +1032,18 @@ handle_follower(#append_entries_rpc{term = Term,
_ ->
State1 = lists:foldl(fun pre_append_log_follower/2,
State0, Entries),
%% if the cluster has changed we need to update
%% the leaderboard
Effects1 = case maps:get(cluster, State0) =/=
maps:get(cluster, State1) of
true ->
[update_leaderboard | Effects0];
false ->
Effects0
end,
case ra_log:write(Entries, Log1) of
{ok, Log2} ->
{NextState, State, Effects} =
evaluate_commit_index_follower(State1#{log => Log2},
Effects1),
Effects0),
{NextState, State,
[{next_event, {ra_log_event, flush_cache}} | Effects]};
{error, wal_down} ->
{await_condition,
State1#{log => Log1,
condition => fun wal_down_condition/2},
Effects1};
Effects0};
{error, _} = Err ->
exit(Err)
end
Expand Down Expand Up @@ -1649,8 +1649,6 @@ filter_follower_effects(Effects) ->
[C | Acc];
(garbage_collection = C, Acc) ->
[C | Acc];
(update_leaderboard = C, Acc) ->
[C | Acc];
({delete_snapshot, _} = C, Acc) ->
[C | Acc];
({send_msg, _, _, _Opts} = C, Acc) ->
Expand Down Expand Up @@ -2338,6 +2336,17 @@ append_app_effects([AppEff], Effs) ->
append_app_effects(AppEffs, Effs) ->
[AppEffs | Effs].

cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}},
State0) ->
?DEBUG("~ts: ~ts: applying ra cluster change to ~w",
[log_id(State0), ?FUNCTION_NAME, maps:keys(NewCluster)]),
%% we are recovering and should apply the cluster change
State0#{cluster => NewCluster,
membership => get_membership(NewCluster, State0),
cluster_change_permitted => true,
cluster_index_term => {Idx, Term}};
cluster_scan_fun(_Cmd, State) ->
State.

apply_with(_Cmd,
{Mod, LastAppliedIdx,
Expand Down Expand Up @@ -2600,7 +2609,7 @@ append_cluster_change(Cluster, From, ReplyMode,
cluster := PrevCluster,
cluster_index_term := {PrevCITIdx, PrevCITTerm},
current_term := Term} = State,
Effects0) ->
Effects) ->
% turn join command into a generic cluster change command
% that include the new cluster configuration
Command = {'$ra_cluster_change', From, Cluster, ReplyMode},
Expand All @@ -2609,7 +2618,6 @@ append_cluster_change(Cluster, From, ReplyMode,
% TODO: is it safe to do change the cluster config with an async write?
% what happens if the write fails?
Log = ra_log:append({NextIdx, Term, Command}, Log0),
Effects = [update_leaderboard | Effects0],
{ok, NextIdx, Term,
State#{log => Log,
cluster => Cluster,
Expand Down
70 changes: 37 additions & 33 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ recover(_, _, State) ->
%% effects post recovery
recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_cluster_change(State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
true = erlang:garbage_collect(),
Expand All @@ -378,7 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_leader_change(id(State0), State0),
ok = record_cluster_change(State),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
Expand All @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) ->
leader(EventType, {leader_cast, Msg}, State) ->
leader(EventType, Msg, State);
leader(EventType, {command, normal, {CmdType, Data, ReplyMode}},
#state{conf = Conf,
server_state = ServerState0} = State0) ->
#state{conf = Conf} = State0) ->
case validate_reply_mode(ReplyMode) of
ok ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
{keep_state, State, Actions};
Error ->
ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1),
Expand Down Expand Up @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) ->
{keep_state, State#state{server_state = ServerState}, Actions};
leader(EventType, flush_commands,
#state{conf = #conf{flush_commands_size = Size},
server_state = ServerState0,
low_priority_commands = Delayed0} = State0) ->

{Commands, Delayed} = ra_ets_queue:take(Size, Delayed0),
%% write a batch of delayed commands
{leader, ServerState, Effects} =
ra_server:handle_leader({commands, Commands}, ServerState0),
{leader, State1, Effects} = handle_leader({commands, Commands}, State0),

{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
case ra_ets_queue:len(Delayed) of
0 ->
ok;
Expand Down Expand Up @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) ->
State = send_applied_notifications(State2, #{}),
{keep_state, handle_tick_metrics(State),
set_tick_timer(State, Actions)};
leader({timeout, Name}, machine_timeout,
#state{server_state = ServerState0,
conf = #conf{}} = State0) ->
leader({timeout, Name}, machine_timeout, State0) ->
% the machine timer timed out, add a timeout message
Cmd = make_command('$usr', cast, {timeout, Name}, noreply),
{leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd},
ServerState0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast,
State0#state{server_state =
ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1),
{keep_state, State, Actions};
leader({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
Expand Down Expand Up @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
server_state = SS0} = State0) ->
Membership0 = ra_server:get_membership(SS0),
case handle_follower(Msg, State0) of
{follower, State1, Effects} ->
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = #state{server_state = SS} = follower_leader_change(State0, State2),
Membership0 = ra_server:get_membership(SS0),
case ra_server:get_membership(SS) of
Membership0 ->
ok;
Expand Down Expand Up @@ -1077,6 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) ->
{NextState, ServerState, Effects} ->
State = State0#state{server_state =
ra_server:persist_last_applied(ServerState)},
maybe_record_cluster_change(State0, State),
{NextState, State, Effects};
OtherErr ->
?ERR("handle_leader err ~p", [OtherErr]),
Expand All @@ -1103,8 +1092,11 @@ handle_candidate(Msg, State) ->
handle_pre_vote(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).

handle_follower(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
handle_follower(Msg, State0) ->
Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0),
{_NextState, State, _Effects} = Ret,
maybe_record_cluster_change(State0, State),
Ret.

handle_receive_snapshot(Msg, State) ->
handle_raft_state(?FUNCTION_NAME, Msg, State).
Expand Down Expand Up @@ -1341,9 +1333,6 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) ->
true = erlang:garbage_collect(),
incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1),
{State, Actions};
handle_effect(_, update_leaderboard, _EvtType, State, Actions) ->
ok = record_leader_change(leader_id(State), State),
{State, Actions};
handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _,
#state{monitors = Monitors} = State, Actions0) ->
{State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)},
Expand Down Expand Up @@ -1491,7 +1480,6 @@ follower_leader_change(Old, #state{pending_commands = Pending,
ok = aten_register(LeaderNode),
OldLeaderNode = ra_lib:ra_server_id_node(OldLeader),
_ = aten:unregister(OldLeaderNode),
ok = record_leader_change(NewLeader, New),
% leader has either changed or just been set
?INFO("~ts: detected a new leader ~w in term ~b",
[log_id(New), NewLeader, current_term(New)]),
Expand Down Expand Up @@ -1555,6 +1543,8 @@ do_state_query(voters, #{cluster := Cluster}) ->
end
end, [], Cluster),
Vs;
do_state_query(leader, #{leader_id := Leader}) ->
Leader;
do_state_query(members, #{cluster := Cluster}) ->
maps:keys(Cluster);
do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster,
Expand Down Expand Up @@ -1872,11 +1862,25 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = ra_server:leader_id(ServerStateA),
LeaderB = ra_server:leader_id(ServerStateB),
if (map_get(cluster_index_term, ServerStateA) =/=
map_get(cluster_index_term, ServerStateB) orelse
LeaderA =/= LeaderB) ->
MembersB = do_state_query(members, ServerStateB),
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB);
true ->
ok
end.

record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Leader = do_state_query(leader, ServerState),
Members = do_state_query(members, ServerState),
ok = ra_leaderboard:record(ClusterName, Leader, Members),
ok.
ok = ra_leaderboard:record(ClusterName, Leader, Members).

incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
Expand Down
6 changes: 3 additions & 3 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) ->
Machine = {module, ?MODULE, #{}},
{ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds),
{ok, _} = ra:delete_cluster(ServerIds),
% timer:sleep(1000),
await_condition(
fun () ->
lists:all(
Expand Down Expand Up @@ -309,8 +308,9 @@ grow_cluster(Config) ->
ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]),
{ok, _, _} = ra:add_member(A, B),
{ok, _, _} = ra:process_command(A, banana),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

[A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
{ok, _, _} = ra:add_member(A, C),
Expand Down Expand Up @@ -339,8 +339,8 @@ grow_cluster(Config) ->
[B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
%% check leader
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),


Expand Down
23 changes: 18 additions & 5 deletions test/ra_log_2_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ take_after_overwrite_and_init(Config) ->
Log2 = deliver_written_log_events(Log1, 200),
{[_, _, _, _], Log3} = ra_log_take(1, 4, Log2),
Log4 = write_and_roll_no_deliver(1, 2, 2, Log3),
% fake lost segments event
Log5 = deliver_written_log_events(Log4, 200),
Log5 = deliver_log_events_cond(Log4,
fun (L) ->
{1, 2} =:= ra_log:last_written(L)
end, 100),

% ensure we cannot take stale entries
{[{1, 2, _}], Log6} = ra_log_take(1, 4, Log5),
_ = ra_log:close(Log6),
Expand Down Expand Up @@ -310,7 +313,8 @@ sparse_read_out_of_range_2(Config) ->
{Log3, _} = receive
{ra_log_event, {snapshot_written, {10, 2}} = Evt} ->
ra_log:handle_event(Evt, Log2)
after 500 ->
after 5000 ->
flush(),
exit(snapshot_written_timeout)
end,
Log4 = deliver_all_log_events(Log3, 100),
Expand All @@ -332,8 +336,17 @@ sparse_read(Config) ->
%% read small batch of the latest entries
{_, Log3} = ra_log_take(Num - 5, Num, Log2),
ct:pal("log overview ~p", [ra_log:overview(Log3)]),
%% warm up run
{_, Log4} = ra_log_take(1, Num, Log3),
%% ensure cache is empty as this indicates all enties have at least
%% been written to the WAL and thus will be available in mem tables.
Log4 = deliver_log_events_cond(Log3,
fun (L) ->
case ra_log:overview(L) of
#{cache_size := 0} ->
true;
_ ->
false
end
end, 100),
ra_log:close(Log4),
NumDiv2 = Num div 2,
%% create a list of indexes with some consecutive and some gaps
Expand Down
4 changes: 3 additions & 1 deletion test/ra_server_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,15 @@ recover_restores_cluster_changes(_Config) ->
log := Log0}, _} =
ra_server:handle_leader({command, {'$ra_join', meta(),
N2, await_consensus}}, State),
{LIdx, _} = ra_log:last_index_term(Log0),
?assertEqual(2, maps:size(Cluster)),
% intercept ra_log:init call to simulate persisted log data
% ok = meck:new(ra_log, [passthrough]),
meck:expect(ra_log, init, fun (_) -> Log0 end),
meck:expect(ra_log_meta, fetch,
fun (_, _, last_applied, 0) ->
element(1, ra_log:last_index_term(Log0));
LIdx - 1;
% element(1, ra_log:last_index_term(Log0));
(_, _, _, Def) ->
Def
end),
Expand Down
Loading