From 56e2f0e2c999a09d95f35650c4c2dcde80db04a6 Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Tue, 23 Apr 2024 14:35:42 +0900 Subject: [PATCH 1/3] Fix crash caused by receiving #append_entries_reply{success=false} from unknown peer --- src/ra_server.erl | 103 ++++++++++++++++++++++++---------------------- 1 file changed, 54 insertions(+), 49 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 8f96d52f..02c9343f 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -469,55 +469,60 @@ handle_leader({PeerId, #append_entries_reply{success = false, State0 = #{cfg := #cfg{log_id = LogId} = Cfg, cluster := Nodes, log := Log0}) -> ok = incr_counter(Cfg, ?C_RA_SRV_AER_REPLIES_FAILED, 1), - #{PeerId := Peer0 = #{match_index := MI, - next_index := NI}} = Nodes, - % if the last_index exists and has a matching term we can forward - % match_index and update next_index directly - {Peer, Log} = case ra_log:fetch_term(LastIdx, Log0) of - {undefined, L} -> - % entry was not found - simply set next index to - ?DEBUG("~ts: setting next index for ~w ~b", - [LogId, PeerId, NextIdx]), - {Peer0#{match_index => LastIdx, - next_index => NextIdx}, L}; - % entry exists we can forward - {LastTerm, L} when LastIdx >= MI -> - ?DEBUG("~ts: setting last index to ~b, " - " next_index ~b for ~w", - [LogId, LastIdx, NextIdx, PeerId]), - {Peer0#{match_index => LastIdx, - next_index => NextIdx}, L}; - {_Term, L} when LastIdx < MI -> - % TODO: this can only really happen when peers are - % non-persistent. - % should they turn-into non-voters when this sitution - % is detected - ?WARN("~ts: leader saw peer with last_index [~b in ~b]" - " lower than recorded match index [~b]." - "Resetting peer's state to last_index.", - [LogId, LastIdx, LastTerm, MI]), - {Peer0#{match_index => LastIdx, - next_index => LastIdx + 1}, L}; - {EntryTerm, L} -> - NextIndex = max(min(NI-1, LastIdx), MI), - ?DEBUG("~ts: leader received last_index ~b" - " from ~w with term ~b " - "- expected term ~b. Setting " - "next_index to ~b", - [LogId, LastIdx, PeerId, LastTerm, EntryTerm, - NextIndex]), - % last_index has a different term or entry does not - % exist - % The peer must have received an entry from a previous - % leader and the current leader wrote a different - % entry at the same index in a different term. - % decrement next_index but don't go lower than - % match index. - {Peer0#{next_index => NextIndex}, L} - end, - State1 = State0#{cluster => Nodes#{PeerId => Peer}, log => Log}, - {State, _, Effects} = make_pipelined_rpc_effects(State1, []), - {leader, State, Effects}; + case peer(PeerId, State0) of + undefined -> + ?WARN("~ts: saw append_entries_reply from unknown peer ~w", + [LogId, PeerId]), + {leader, State0, []}; + Peer0 = #{match_index := MI, next_index := NI} -> + % if the last_index exists and has a matching term we can forward + % match_index and update next_index directly + {Peer, Log} = case ra_log:fetch_term(LastIdx, Log0) of + {undefined, L} -> + % entry was not found - simply set next index to + ?DEBUG("~ts: setting next index for ~w ~b", + [LogId, PeerId, NextIdx]), + {Peer0#{match_index => LastIdx, + next_index => NextIdx}, L}; + % entry exists we can forward + {LastTerm, L} when LastIdx >= MI -> + ?DEBUG("~ts: setting last index to ~b, " + " next_index ~b for ~w", + [LogId, LastIdx, NextIdx, PeerId]), + {Peer0#{match_index => LastIdx, + next_index => NextIdx}, L}; + {_Term, L} when LastIdx < MI -> + % TODO: this can only really happen when peers are + % non-persistent. + % should they turn-into non-voters when this sitution + % is detected + ?WARN("~ts: leader saw peer with last_index [~b in ~b]" + " lower than recorded match index [~b]." + "Resetting peer's state to last_index.", + [LogId, LastIdx, LastTerm, MI]), + {Peer0#{match_index => LastIdx, + next_index => LastIdx + 1}, L}; + {EntryTerm, L} -> + NextIndex = max(min(NI-1, LastIdx), MI), + ?DEBUG("~ts: leader received last_index ~b" + " from ~w with term ~b " + "- expected term ~b. Setting " + "next_index to ~b", + [LogId, LastIdx, PeerId, LastTerm, EntryTerm, + NextIndex]), + % last_index has a different term or entry does not + % exist + % The peer must have received an entry from a previous + % leader and the current leader wrote a different + % entry at the same index in a different term. + % decrement next_index but don't go lower than + % match index. + {Peer0#{next_index => NextIndex}, L} + end, + State1 = State0#{cluster => Nodes#{PeerId => Peer}, log => Log}, + {State, _, Effects} = make_pipelined_rpc_effects(State1, []), + {leader, State, Effects} + end; handle_leader({command, Cmd}, #{cfg := #cfg{log_id = LogId} = Cfg} = State00) -> ok = incr_counter(Cfg, ?C_RA_SRV_COMMANDS, 1), case append_log_leader(Cmd, State00, []) of From 7098c31921e24f056558f6191c947495de6f27ad Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Tue, 23 Apr 2024 14:43:41 +0900 Subject: [PATCH 2/3] Add append_entries_reply_no_success_from_unknown_peer() test --- test/ra_server_SUITE.erl | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 64685dbf..24808907 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -23,6 +23,7 @@ all() -> append_entries_reply_success_promotes_nonvoter, append_entries_reply_success, append_entries_reply_no_success, + append_entries_reply_no_success_from_unknown_peer, follower_request_vote, follower_pre_vote, pre_vote_receives_pre_vote, @@ -974,6 +975,21 @@ append_entries_reply_no_success(_Config) -> ]} = ra_server:handle_leader(Msg, State), ok. +append_entries_reply_no_success_from_unknown_peer(_Config) -> + N1 = ?N1, N2 = ?N2, + Cluster = #{N1 => new_peer()}, + State = (base_state(3, ?FUNCTION_NAME))#{commit_index => 1, + last_applied => 1, + cluster => Cluster, + machine_state => <<"hi1">>}, + % n2 is an unknown peer + Msg = {N2, #append_entries_reply{term = 5, success = false, next_index = 2, + last_index = 1, last_term = 1}}, + % The reply from n2 is ignored + {leader, State, []} = ra_server:handle_leader(Msg, State), + + ok. + follower_request_vote(_Config) -> N2 = ?N2, N3 = ?N3, State = base_state(3, ?FUNCTION_NAME), From 1c69466ff392f4f1448f8cc769f90e58d3e5e4a4 Mon Sep 17 00:00:00 2001 From: Takeru Ohta Date: Wed, 24 Apr 2024 05:10:31 +0900 Subject: [PATCH 3/3] Use is_map_key() guard --- src/ra_server.erl | 110 +++++++++++++++++++++++----------------------- 1 file changed, 56 insertions(+), 54 deletions(-) diff --git a/src/ra_server.erl b/src/ra_server.erl index 02c9343f..54ec223f 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -462,6 +462,13 @@ handle_leader({PeerId, #append_entries_reply{term = Term}}, [LogId, PeerId, Term, CurTerm]), {follower, update_term(Term, State0#{leader_id => undefined}), []} end; +handle_leader({PeerId, #append_entries_reply{success = false}}, + State0 = #{cfg := #cfg{log_id = LogId}, + cluster := Nodes}) + when not is_map_key(PeerId, Nodes) -> + ?WARN("~ts: saw append_entries_reply from unknown peer ~w", + [LogId, PeerId]), + {leader, State0, []}; handle_leader({PeerId, #append_entries_reply{success = false, next_index = NextIdx, last_index = LastIdx, @@ -469,60 +476,55 @@ handle_leader({PeerId, #append_entries_reply{success = false, State0 = #{cfg := #cfg{log_id = LogId} = Cfg, cluster := Nodes, log := Log0}) -> ok = incr_counter(Cfg, ?C_RA_SRV_AER_REPLIES_FAILED, 1), - case peer(PeerId, State0) of - undefined -> - ?WARN("~ts: saw append_entries_reply from unknown peer ~w", - [LogId, PeerId]), - {leader, State0, []}; - Peer0 = #{match_index := MI, next_index := NI} -> - % if the last_index exists and has a matching term we can forward - % match_index and update next_index directly - {Peer, Log} = case ra_log:fetch_term(LastIdx, Log0) of - {undefined, L} -> - % entry was not found - simply set next index to - ?DEBUG("~ts: setting next index for ~w ~b", - [LogId, PeerId, NextIdx]), - {Peer0#{match_index => LastIdx, - next_index => NextIdx}, L}; - % entry exists we can forward - {LastTerm, L} when LastIdx >= MI -> - ?DEBUG("~ts: setting last index to ~b, " - " next_index ~b for ~w", - [LogId, LastIdx, NextIdx, PeerId]), - {Peer0#{match_index => LastIdx, - next_index => NextIdx}, L}; - {_Term, L} when LastIdx < MI -> - % TODO: this can only really happen when peers are - % non-persistent. - % should they turn-into non-voters when this sitution - % is detected - ?WARN("~ts: leader saw peer with last_index [~b in ~b]" - " lower than recorded match index [~b]." - "Resetting peer's state to last_index.", - [LogId, LastIdx, LastTerm, MI]), - {Peer0#{match_index => LastIdx, - next_index => LastIdx + 1}, L}; - {EntryTerm, L} -> - NextIndex = max(min(NI-1, LastIdx), MI), - ?DEBUG("~ts: leader received last_index ~b" - " from ~w with term ~b " - "- expected term ~b. Setting " - "next_index to ~b", - [LogId, LastIdx, PeerId, LastTerm, EntryTerm, - NextIndex]), - % last_index has a different term or entry does not - % exist - % The peer must have received an entry from a previous - % leader and the current leader wrote a different - % entry at the same index in a different term. - % decrement next_index but don't go lower than - % match index. - {Peer0#{next_index => NextIndex}, L} - end, - State1 = State0#{cluster => Nodes#{PeerId => Peer}, log => Log}, - {State, _, Effects} = make_pipelined_rpc_effects(State1, []), - {leader, State, Effects} - end; + #{PeerId := Peer0 = #{match_index := MI, + next_index := NI}} = Nodes, + % if the last_index exists and has a matching term we can forward + % match_index and update next_index directly + {Peer, Log} = case ra_log:fetch_term(LastIdx, Log0) of + {undefined, L} -> + % entry was not found - simply set next index to + ?DEBUG("~ts: setting next index for ~w ~b", + [LogId, PeerId, NextIdx]), + {Peer0#{match_index => LastIdx, + next_index => NextIdx}, L}; + % entry exists we can forward + {LastTerm, L} when LastIdx >= MI -> + ?DEBUG("~ts: setting last index to ~b, " + " next_index ~b for ~w", + [LogId, LastIdx, NextIdx, PeerId]), + {Peer0#{match_index => LastIdx, + next_index => NextIdx}, L}; + {_Term, L} when LastIdx < MI -> + % TODO: this can only really happen when peers are + % non-persistent. + % should they turn-into non-voters when this sitution + % is detected + ?WARN("~ts: leader saw peer with last_index [~b in ~b]" + " lower than recorded match index [~b]." + "Resetting peer's state to last_index.", + [LogId, LastIdx, LastTerm, MI]), + {Peer0#{match_index => LastIdx, + next_index => LastIdx + 1}, L}; + {EntryTerm, L} -> + NextIndex = max(min(NI-1, LastIdx), MI), + ?DEBUG("~ts: leader received last_index ~b" + " from ~w with term ~b " + "- expected term ~b. Setting " + "next_index to ~b", + [LogId, LastIdx, PeerId, LastTerm, EntryTerm, + NextIndex]), + % last_index has a different term or entry does not + % exist + % The peer must have received an entry from a previous + % leader and the current leader wrote a different + % entry at the same index in a different term. + % decrement next_index but don't go lower than + % match index. + {Peer0#{next_index => NextIndex}, L} + end, + State1 = State0#{cluster => Nodes#{PeerId => Peer}, log => Log}, + {State, _, Effects} = make_pipelined_rpc_effects(State1, []), + {leader, State, Effects}; handle_leader({command, Cmd}, #{cfg := #cfg{log_id = LogId} = Cfg} = State00) -> ok = incr_counter(Cfg, ?C_RA_SRV_COMMANDS, 1), case append_log_leader(Cmd, State00, []) of