From dc2cd398b93670880adb03d2be8cc9af03d237a7 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Thu, 8 Feb 2024 12:28:48 +0000 Subject: [PATCH] fixes, optimisations --- src/ra_server_proc.erl | 69 +++++++++++++++++-------------------- test/coordination_SUITE.erl | 6 ++-- 2 files changed, 34 insertions(+), 41 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 11246433..972624c1 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -370,7 +370,7 @@ recover(_, _, State) -> %% effects post recovery recovered(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), - record_cluster_change(State0, State), + ok = record_cluster_change(State), {keep_state, State, Actions}; recovered(internal, next, #state{server_state = ServerState} = State) -> true = erlang:garbage_collect(), @@ -379,6 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) -> leader(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), + ok = record_cluster_change(State), %% TODO: reset refs? {keep_state, State#state{leader_last_seen = undefined, pending_notifys = #{}, @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) -> leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, - #state{conf = Conf, - server_state = ServerState0} = State0) -> + #state{conf = Conf} = State0) -> case validate_reply_mode(ReplyMode) of ok -> %% normal priority commands are written immediately Cmd = make_command(CmdType, EventType, Data, ReplyMode), - {leader, ServerState, Effects} = - ra_server:handle_leader({command, Cmd}, ServerState0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), {keep_state, State, Actions}; Error -> ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) -> {keep_state, State#state{server_state = ServerState}, Actions}; leader(EventType, flush_commands, #state{conf = #conf{flush_commands_size = Size}, - server_state = ServerState0, low_priority_commands = Delayed0} = State0) -> {Commands, Delayed} = ra_ets_queue:take(Size, Delayed0), %% write a batch of delayed commands - {leader, ServerState, Effects} = - ra_server:handle_leader({commands, Commands}, ServerState0), + {leader, State1, Effects} = handle_leader({commands, Commands}, State0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), case ra_ets_queue:len(Delayed) of 0 -> ok; @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) -> State = send_applied_notifications(State2, #{}), {keep_state, handle_tick_metrics(State), set_tick_timer(State, Actions)}; -leader({timeout, Name}, machine_timeout, - #state{server_state = ServerState0, - conf = #conf{}} = State0) -> +leader({timeout, Name}, machine_timeout, State0) -> % the machine timer timed out, add a timeout message Cmd = make_command('$usr', cast, {timeout, Name}, noreply), - {leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd}, - ServerState0), - {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, - State0#state{server_state = - ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1), {keep_state, State, Actions}; leader({call, From}, trigger_election, State) -> {keep_state, State, [{reply, From, ok}]}; @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) -> fold_log(From, Fun, Term, State); follower(EventType, Msg, #state{conf = #conf{name = Name}, server_state = SS0} = State0) -> - Membership0 = ra_server:get_membership(SS0), case handle_follower(Msg, State0) of {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), State = #state{server_state = SS} = follower_leader_change(State0, State2), + Membership0 = ra_server:get_membership(SS0), case ra_server:get_membership(SS) of Membership0 -> ok; @@ -1077,7 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) -> {NextState, ServerState, Effects} -> State = State0#state{server_state = ra_server:persist_last_applied(ServerState)}, - record_cluster_change(State0, State), + maybe_record_cluster_change(State0, State), {NextState, State, Effects}; OtherErr -> ?ERR("handle_leader err ~p", [OtherErr]), @@ -1107,7 +1095,7 @@ handle_pre_vote(Msg, State) -> handle_follower(Msg, State0) -> Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0), {_NextState, State, _Effects} = Ret, - record_cluster_change(State0, State), + maybe_record_cluster_change(State0, State), Ret. handle_receive_snapshot(Msg, State) -> @@ -1874,21 +1862,26 @@ handle_process_down(Pid, Info, RaftState, monitors = Monitors}), {keep_state, State, Actions}. -record_cluster_change( - #state{conf = #conf{cluster_name = ClusterName}, - server_state = ServerStateA}, - #state{server_state = ServerStateB}) -> - LeaderA = do_state_query(leader, ServerStateA), - MembersA = do_state_query(members, ServerStateA), - LeaderB = do_state_query(leader, ServerStateB), - MembersB = do_state_query(members, ServerStateB), - case {LeaderA, MembersA} of - {LeaderB, MembersB} -> - ok; - _ -> - ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB) +maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerStateA}, + #state{server_state = ServerStateB}) -> + LeaderA = ra_server:leader_id(ServerStateA), + LeaderB = ra_server:leader_id(ServerStateB), + if (map_get(cluster_index_term, ServerStateA) =/= + map_get(cluster_index_term, ServerStateB) orelse + LeaderA =/= LeaderB) -> + MembersB = do_state_query(members, ServerStateB), + ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB); + true -> + ok end. +record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerState}) -> + Leader = do_state_query(leader, ServerState), + Members = do_state_query(members, ServerState), + ok = ra_leaderboard:record(ClusterName, Leader, Members). + incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined -> counters:add(Cnt, Ix, N); incr_counter(#conf{counter = undefined}, _Ix, _N) -> diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 7756460b..269aef8b 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) -> Machine = {module, ?MODULE, #{}}, {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), {ok, _} = ra:delete_cluster(ServerIds), - % timer:sleep(1000), await_condition( fun () -> lists:all( @@ -309,8 +308,9 @@ grow_cluster(Config) -> ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]), {ok, _, _} = ra:add_member(A, B), {ok, _, _} = ra:process_command(A, banana), - [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]), {ok, _, _} = ra:add_member(A, C), @@ -339,8 +339,8 @@ grow_cluster(Config) -> [B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), %% check leader - L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]),