diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index be8cd8c4..29448ddb 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -524,7 +524,7 @@ incr_batch(#conf{open_mem_tbls_tid = OpnMemTbl} = Cfg, %% to open one T = open_mem_table(Cfg, UId), true = ets:insert_new(OpnMemTbl, - {UId, Idx, Idx, T}), + {UId, Idx, Idx - 1, T}), {T, Idx} end, Writer = #batch_writer{tbl_start = TblStart, diff --git a/src/ra_server.erl b/src/ra_server.erl index 62e16d0a..055a9730 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -370,7 +370,7 @@ recover(#{cfg := #cfg{log_id = LogId, [LogId, EffMacVer, MacVer, LastApplied, CommitIndex]), Before = erlang:system_time(millisecond), {#{log := Log0, - cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State, _} = + cfg := #cfg{effective_machine_version = EffMacVerAfter}} = State1, _} = apply_to(CommitIndex, fun({Idx, _, _} = E, S0) -> %% Clear out the effects and notifies map @@ -384,8 +384,17 @@ recover(#{cfg := #cfg{log_id = LogId, ?DEBUG("~ts: recovery of state machine version ~b:~b " "from index ~b to ~b took ~bms", [LogId, EffMacVerAfter, MacVer, LastApplied, CommitIndex, After - Before]), + %% scan from CommitIndex + 1 until NextIndex - 1 to see if there are + %% any further cluster changes + FromScan = CommitIndex + 1, + {ToScan, _} = ra_log:last_index_term(Log0), + ?DEBUG("~ts: scanning for cluster changes ~b:~b ", [LogId, FromScan, ToScan]), + {State, Log1} = ra_log:fold(FromScan, ToScan, + fun cluster_scan_fun/2, + State1, Log0), + %% disable segment read cache by setting random access pattern - Log = ra_log:release_resources(1, random, Log0), + Log = ra_log:release_resources(1, random, Log1), put_counter(Cfg, ?C_RA_SVR_METRIC_COMMIT_LATENCY, 0), State#{log => Log, %% reset commit latency as recovery may calculate a very old value @@ -1023,27 +1032,18 @@ handle_follower(#append_entries_rpc{term = Term, _ -> State1 = lists:foldl(fun pre_append_log_follower/2, State0, Entries), - %% if the cluster has changed we need to update - %% the leaderboard - Effects1 = case maps:get(cluster, State0) =/= - maps:get(cluster, State1) of - true -> - [update_leaderboard | Effects0]; - false -> - Effects0 - end, case ra_log:write(Entries, Log1) of {ok, Log2} -> {NextState, State, Effects} = evaluate_commit_index_follower(State1#{log => Log2}, - Effects1), + Effects0), {NextState, State, [{next_event, {ra_log_event, flush_cache}} | Effects]}; {error, wal_down} -> {await_condition, State1#{log => Log1, condition => fun wal_down_condition/2}, - Effects1}; + Effects0}; {error, _} = Err -> exit(Err) end @@ -1649,8 +1649,6 @@ filter_follower_effects(Effects) -> [C | Acc]; (garbage_collection = C, Acc) -> [C | Acc]; - (update_leaderboard = C, Acc) -> - [C | Acc]; ({delete_snapshot, _} = C, Acc) -> [C | Acc]; ({send_msg, _, _, _Opts} = C, Acc) -> @@ -2338,6 +2336,17 @@ append_app_effects([AppEff], Effs) -> append_app_effects(AppEffs, Effs) -> [AppEffs | Effs]. +cluster_scan_fun({Idx, Term, {'$ra_cluster_change', _Meta, NewCluster, _}}, + State0) -> + ?DEBUG("~ts: ~ts: applying ra cluster change to ~w", + [log_id(State0), ?FUNCTION_NAME, maps:keys(NewCluster)]), + %% we are recovering and should apply the cluster change + State0#{cluster => NewCluster, + membership => get_membership(NewCluster, State0), + cluster_change_permitted => true, + cluster_index_term => {Idx, Term}}; +cluster_scan_fun(_Cmd, State) -> + State. apply_with(_Cmd, {Mod, LastAppliedIdx, @@ -2600,7 +2609,7 @@ append_cluster_change(Cluster, From, ReplyMode, cluster := PrevCluster, cluster_index_term := {PrevCITIdx, PrevCITTerm}, current_term := Term} = State, - Effects0) -> + Effects) -> % turn join command into a generic cluster change command % that include the new cluster configuration Command = {'$ra_cluster_change', From, Cluster, ReplyMode}, @@ -2609,7 +2618,6 @@ append_cluster_change(Cluster, From, ReplyMode, % TODO: is it safe to do change the cluster config with an async write? % what happens if the write fails? Log = ra_log:append({NextIdx, Term, Command}, Log0), - Effects = [update_leaderboard | Effects0], {ok, NextIdx, Term, State#{log => Log, cluster => Cluster, diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 2f5189b8..972624c1 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -370,6 +370,7 @@ recover(_, _, State) -> %% effects post recovery recovered(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), + ok = record_cluster_change(State), {keep_state, State, Actions}; recovered(internal, next, #state{server_state = ServerState} = State) -> true = erlang:garbage_collect(), @@ -378,7 +379,7 @@ recovered(internal, next, #state{server_state = ServerState} = State) -> leader(enter, OldState, State0) -> {State, Actions} = handle_enter(?FUNCTION_NAME, OldState, State0), - ok = record_leader_change(id(State0), State0), + ok = record_cluster_change(State), %% TODO: reset refs? {keep_state, State#state{leader_last_seen = undefined, pending_notifys = #{}, @@ -392,17 +393,13 @@ leader(EventType, {local_call, Msg}, State) -> leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, - #state{conf = Conf, - server_state = ServerState0} = State0) -> + #state{conf = Conf} = State0) -> case validate_reply_mode(ReplyMode) of ok -> %% normal priority commands are written immediately Cmd = make_command(CmdType, EventType, Data, ReplyMode), - {leader, ServerState, Effects} = - ra_server:handle_leader({command, Cmd}, ServerState0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), {keep_state, State, Actions}; Error -> ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), @@ -453,17 +450,13 @@ leader(EventType, {aux_command, Cmd}, State0) -> {keep_state, State#state{server_state = ServerState}, Actions}; leader(EventType, flush_commands, #state{conf = #conf{flush_commands_size = Size}, - server_state = ServerState0, low_priority_commands = Delayed0} = State0) -> {Commands, Delayed} = ra_ets_queue:take(Size, Delayed0), %% write a batch of delayed commands - {leader, ServerState, Effects} = - ra_server:handle_leader({commands, Commands}, ServerState0), + {leader, State1, Effects} = handle_leader({commands, Commands}, State0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), + {State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), case ra_ets_queue:len(Delayed) of 0 -> ok; @@ -514,16 +507,11 @@ leader(_, tick_timeout, State0) -> State = send_applied_notifications(State2, #{}), {keep_state, handle_tick_metrics(State), set_tick_timer(State, Actions)}; -leader({timeout, Name}, machine_timeout, - #state{server_state = ServerState0, - conf = #conf{}} = State0) -> +leader({timeout, Name}, machine_timeout, State0) -> % the machine timer timed out, add a timeout message Cmd = make_command('$usr', cast, {timeout, Name}, noreply), - {leader, ServerState, Effects} = ra_server:handle_leader({command, Cmd}, - ServerState0), - {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, - State0#state{server_state = - ServerState}), + {leader, State1, Effects} = handle_leader({command, Cmd}, State0), + {State, Actions} = ?HANDLE_EFFECTS(Effects, cast, State1), {keep_state, State, Actions}; leader({call, From}, trigger_election, State) -> {keep_state, State, [{reply, From, ok}]}; @@ -790,11 +778,11 @@ follower({call, From}, {log_fold, Fun, Term}, State) -> fold_log(From, Fun, Term, State); follower(EventType, Msg, #state{conf = #conf{name = Name}, server_state = SS0} = State0) -> - Membership0 = ra_server:get_membership(SS0), case handle_follower(Msg, State0) of {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), State = #state{server_state = SS} = follower_leader_change(State0, State2), + Membership0 = ra_server:get_membership(SS0), case ra_server:get_membership(SS) of Membership0 -> ok; @@ -1077,6 +1065,7 @@ handle_leader(Msg, #state{server_state = ServerState0} = State0) -> {NextState, ServerState, Effects} -> State = State0#state{server_state = ra_server:persist_last_applied(ServerState)}, + maybe_record_cluster_change(State0, State), {NextState, State, Effects}; OtherErr -> ?ERR("handle_leader err ~p", [OtherErr]), @@ -1103,8 +1092,11 @@ handle_candidate(Msg, State) -> handle_pre_vote(Msg, State) -> handle_raft_state(?FUNCTION_NAME, Msg, State). -handle_follower(Msg, State) -> - handle_raft_state(?FUNCTION_NAME, Msg, State). +handle_follower(Msg, State0) -> + Ret = handle_raft_state(?FUNCTION_NAME, Msg, State0), + {_NextState, State, _Effects} = Ret, + maybe_record_cluster_change(State0, State), + Ret. handle_receive_snapshot(Msg, State) -> handle_raft_state(?FUNCTION_NAME, Msg, State). @@ -1341,9 +1333,6 @@ handle_effect(_, garbage_collection, _EvtType, State, Actions) -> true = erlang:garbage_collect(), incr_counter(State#state.conf, ?C_RA_SRV_GCS, 1), {State, Actions}; -handle_effect(_, update_leaderboard, _EvtType, State, Actions) -> - ok = record_leader_change(leader_id(State), State), - {State, Actions}; handle_effect(_, {monitor, _ProcOrNode, PidOrNode}, _, #state{monitors = Monitors} = State, Actions0) -> {State#state{monitors = ra_monitors:add(PidOrNode, machine, Monitors)}, @@ -1491,7 +1480,6 @@ follower_leader_change(Old, #state{pending_commands = Pending, ok = aten_register(LeaderNode), OldLeaderNode = ra_lib:ra_server_id_node(OldLeader), _ = aten:unregister(OldLeaderNode), - ok = record_leader_change(NewLeader, New), % leader has either changed or just been set ?INFO("~ts: detected a new leader ~w in term ~b", [log_id(New), NewLeader, current_term(New)]), @@ -1555,6 +1543,8 @@ do_state_query(voters, #{cluster := Cluster}) -> end end, [], Cluster), Vs; +do_state_query(leader, #{leader_id := Leader}) -> + Leader; do_state_query(members, #{cluster := Cluster}) -> maps:keys(Cluster); do_state_query(members_info, #{cfg := #cfg{id = Self}, cluster := Cluster, @@ -1872,11 +1862,25 @@ handle_process_down(Pid, Info, RaftState, monitors = Monitors}), {keep_state, State, Actions}. -record_leader_change(Leader, #state{conf = #conf{cluster_name = ClusterName}, - server_state = ServerState}) -> +maybe_record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerStateA}, + #state{server_state = ServerStateB}) -> + LeaderA = ra_server:leader_id(ServerStateA), + LeaderB = ra_server:leader_id(ServerStateB), + if (map_get(cluster_index_term, ServerStateA) =/= + map_get(cluster_index_term, ServerStateB) orelse + LeaderA =/= LeaderB) -> + MembersB = do_state_query(members, ServerStateB), + ok = ra_leaderboard:record(ClusterName, LeaderB, MembersB); + true -> + ok + end. + +record_cluster_change(#state{conf = #conf{cluster_name = ClusterName}, + server_state = ServerState}) -> + Leader = do_state_query(leader, ServerState), Members = do_state_query(members, ServerState), - ok = ra_leaderboard:record(ClusterName, Leader, Members), - ok. + ok = ra_leaderboard:record(ClusterName, Leader, Members). incr_counter(#conf{counter = Cnt}, Ix, N) when Cnt =/= undefined -> counters:add(Cnt, Ix, N); diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index 7756460b..269aef8b 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -197,7 +197,6 @@ delete_two_server_cluster(Config) -> Machine = {module, ?MODULE, #{}}, {ok, _, []} = ra:start_cluster(?SYS, ClusterName, Machine, ServerIds), {ok, _} = ra:delete_cluster(ServerIds), - % timer:sleep(1000), await_condition( fun () -> lists:all( @@ -309,8 +308,9 @@ grow_cluster(Config) -> ok = ra:start_server(?SYS, ClusterName, B, Machine, [A]), {ok, _, _} = ra:add_member(A, B), {ok, _, _} = ra:process_command(A, banana), - [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(BNode, ra_leaderboard, lookup_members, [ClusterName]), + [A, B] = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), ok = ra:start_server(?SYS, ClusterName, C, Machine, [A, B]), {ok, _, _} = ra:add_member(A, C), @@ -339,8 +339,8 @@ grow_cluster(Config) -> [B, C] = rpc:call(CNode, ra_leaderboard, lookup_members, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_members, [ClusterName]), %% check leader - L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), L2 = rpc:call(CNode, ra_leaderboard, lookup_leader, [ClusterName]), + L2 = rpc:call(BNode, ra_leaderboard, lookup_leader, [ClusterName]), undefined = rpc:call(ANode, ra_leaderboard, lookup_leader, [ClusterName]), diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index ff81a67d..d25eb4b1 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -180,8 +180,11 @@ take_after_overwrite_and_init(Config) -> Log2 = deliver_written_log_events(Log1, 200), {[_, _, _, _], Log3} = ra_log_take(1, 4, Log2), Log4 = write_and_roll_no_deliver(1, 2, 2, Log3), - % fake lost segments event - Log5 = deliver_written_log_events(Log4, 200), + Log5 = deliver_log_events_cond(Log4, + fun (L) -> + {1, 2} =:= ra_log:last_written(L) + end, 100), + % ensure we cannot take stale entries {[{1, 2, _}], Log6} = ra_log_take(1, 4, Log5), _ = ra_log:close(Log6), @@ -310,7 +313,8 @@ sparse_read_out_of_range_2(Config) -> {Log3, _} = receive {ra_log_event, {snapshot_written, {10, 2}} = Evt} -> ra_log:handle_event(Evt, Log2) - after 500 -> + after 5000 -> + flush(), exit(snapshot_written_timeout) end, Log4 = deliver_all_log_events(Log3, 100), @@ -332,8 +336,17 @@ sparse_read(Config) -> %% read small batch of the latest entries {_, Log3} = ra_log_take(Num - 5, Num, Log2), ct:pal("log overview ~p", [ra_log:overview(Log3)]), - %% warm up run - {_, Log4} = ra_log_take(1, Num, Log3), + %% ensure cache is empty as this indicates all enties have at least + %% been written to the WAL and thus will be available in mem tables. + Log4 = deliver_log_events_cond(Log3, + fun (L) -> + case ra_log:overview(L) of + #{cache_size := 0} -> + true; + _ -> + false + end + end, 100), ra_log:close(Log4), NumDiv2 = Num div 2, %% create a list of indexes with some consecutive and some gaps diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index ccb2e7c3..64685dbf 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -248,13 +248,15 @@ recover_restores_cluster_changes(_Config) -> log := Log0}, _} = ra_server:handle_leader({command, {'$ra_join', meta(), N2, await_consensus}}, State), + {LIdx, _} = ra_log:last_index_term(Log0), ?assertEqual(2, maps:size(Cluster)), % intercept ra_log:init call to simulate persisted log data % ok = meck:new(ra_log, [passthrough]), meck:expect(ra_log, init, fun (_) -> Log0 end), meck:expect(ra_log_meta, fetch, fun (_, _, last_applied, 0) -> - element(1, ra_log:last_index_term(Log0)); + LIdx - 1; + % element(1, ra_log:last_index_term(Log0)); (_, _, _, Def) -> Def end),