Skip to content

Commit

Permalink
fixes, optimisations
Browse files Browse the repository at this point in the history
  • Loading branch information
kjnilsson committed Feb 8, 2024
1 parent 301ca62 commit dc2cd39
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 41 deletions.
69 changes: 31 additions & 38 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ recover(_, _, State) ->
%% effects post recovery
recovered(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
record_cluster_change(State0, State),
ok = record_cluster_change(State),
{keep_state, State, Actions};
recovered(internal, next, #state{server_state = ServerState} = State) ->
true = erlang:garbage_collect(),
Expand All @@ -379,6 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) ->

leader(enter, OldState, State0) ->
{State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0),
ok = record_cluster_change(State),
%% TODO: reset refs?
{keep_state, State#state{leader_last_seen = undefined,
pending_notifys = #{},
Expand All @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) ->
leader(EventType, {leader_cast, Msg}, State) ->
leader(EventType, Msg, State);
leader(EventType, {command, normal, {CmdType, Data, ReplyMode}},
#state{conf = Conf,
server_state = ServerState0} = State0) ->
#state{conf = Conf} = State0) ->
case validate_reply_mode(ReplyMode) of
ok ->
%% normal priority commands are written immediately
Cmd = make_command(CmdType, EventType, Data, ReplyMode),
{leader, ServerState, Effects} =
ra_server:handle_leader({command, Cmd}, ServerState0),
{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
{keep_state, State, Actions};
Error ->
ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1),
Expand Down Expand Up @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) ->
{keep_state, State#state{server_state = ServerState}, Actions};
leader(EventType, flush_commands,
#state{conf = #conf{flush_commands_size = Size},
server_state = ServerState0,
low_priority_commands = Delayed0} = State0) ->

{Commands, Delayed} = ra_ets_queue:take(Size, Delayed0),
%% write a batch of delayed commands
{leader, ServerState, Effects} =
ra_server:handle_leader({commands, Commands}, ServerState0),
{leader, State1, Effects} = handle_leader({commands, Commands}, State0),

{State, Actions} =
?HANDLE_EFFECTS(Effects, EventType,
State0#state{server_state = ServerState}),
{State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
case ra_ets_queue:len(Delayed) of
0 ->
ok;
Expand Down Expand Up @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) ->
State = send_applied_notifications(State2, #{}),
{keep_state, handle_tick_metrics(State),
set_tick_timer(State, Actions)};
leader({timeout, Name}, machine_timeout,
#state{server_state = ServerState0,
conf = #conf{}} = State0) ->
leader({timeout, Name}, machine_timeout, State0) ->
% the machine timer timed out, add a timeout message
Cmd = make_command('$usr', cast, {timeout, Name}, noreply),
{leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd},
ServerState0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast,
State0#state{server_state =
ServerState}),
{leader, State1, Effects} = handle_leader({command, Cmd}, State0),
{State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1),
{keep_state, State, Actions};
leader({call, From}, trigger_election, State) ->
{keep_state, State, [{reply, From, ok}]};
Expand Down Expand Up @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) ->
fold_log(From, Fun, Term, State);
follower(EventType, Msg, #state{conf = #conf{name = Name},
server_state = SS0} = State0) ->
Membership0 = ra_server:get_membership(SS0),
case handle_follower(Msg, State0) of
{follower, State1, Effects} ->
{State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1),
State = #state{server_state = SS} = follower_leader_change(State0, State2),
Membership0 = ra_server:get_membership(SS0),
case ra_server:get_membership(SS) of
Membership0 ->
ok;
Expand Down Expand Up @@ -1077,7 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) ->
{NextState, ServerState, Effects} ->
State = State0#state{server_state =
ra_server:persist_last_applied(ServerState)},
record_cluster_change(State0, State),
maybe_record_cluster_change(State0, State),
{NextState, State, Effects};
OtherErr ->
?ERR("handle_leader err ~p", [OtherErr]),
Expand Down Expand Up @@ -1107,7 +1095,7 @@ handle_pre_vote(Msg, State) ->
handle_follower(Msg, State0) ->
Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0),
{_NextState, State, _Effects} = Ret,
record_cluster_change(State0, State),
maybe_record_cluster_change(State0, State),
Ret.

handle_receive_snapshot(Msg, State) ->
Expand Down Expand Up @@ -1874,21 +1862,26 @@ handle_process_down(Pid, Info, RaftState,
monitors = Monitors}),
{keep_state, State, Actions}.

record_cluster_change(
#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = do_state_query(leader, ServerStateA),
MembersA = do_state_query(members, ServerStateA),
LeaderB = do_state_query(leader, ServerStateB),
MembersB = do_state_query(members, ServerStateB),
case {LeaderA, MembersA} of
{LeaderB, MembersB} ->
ok;
_ ->
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB)
maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerStateA},
#state{server_state = ServerStateB}) ->
LeaderA = ra_server:leader_id(ServerStateA),
LeaderB = ra_server:leader_id(ServerStateB),
if (map_get(cluster_index_term, ServerStateA) =/=
map_get(cluster_index_term, ServerStateB) orelse
LeaderA =/= LeaderB) ->
MembersB = do_state_query(members, ServerStateB),
ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB);
true ->
ok
end.

record_cluster_change(#state{conf = #conf{cluster_name = ClusterName},
server_state = ServerState}) ->
Leader = do_state_query(leader, ServerState),
Members = do_state_query(members, ServerState),
ok = ra_leaderboard:record(ClusterName, Leader, Members).

incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined ->
counters:add(Cnt, Ix, N);
incr_counter(#conf{counter = undefined}, _Ix, _N) ->
Expand Down
6 changes: 3 additions & 3 deletions test/coordination_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) ->
Machine = {module, ?MODULE, #{}},
{ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds),
{ok, _} = ra:delete_cluster(ServerIds),
% timer:sleep(1000),
await_condition(
fun () ->
lists:all(
Expand Down Expand Up @@ -309,8 +308,9 @@ grow_cluster(Config) ->
ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]),
{ok, _, _} = ra:add_member(A, B),
{ok, _, _} = ra:process_command(A, banana),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

[A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]),
[A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),

ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]),
{ok, _, _} = ra:add_member(A, C),
Expand Down Expand Up @@ -339,8 +339,8 @@ grow_cluster(Config) ->
[B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]),
%% check leader
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]),
L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]),
undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),


Expand Down

0 comments on commit dc2cd39

Please sign in to comment.