diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index b5527ec23..187746cc4 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -85,7 +85,6 @@ use crate::storage::RaftStateMachine; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::OneshotReceiverOf; -use crate::utime::UTime; use crate::AsyncRuntime; use crate::ChangeMembers; use crate::Instant; @@ -1465,12 +1464,12 @@ where fn handle_replication_progress( &mut self, target: C::NodeId, - id: RequestId, - result: Result, InstantOf>, String>, + request_id: RequestId, + result: Result, String>, ) { tracing::debug!( target = display(target), - request_id = display(id), + request_id = display(request_id), result = debug(&result), "handle_replication_progress" ); @@ -1485,9 +1484,9 @@ where } } - // TODO: A leader may have stepped down. + // A leader may have stepped down. if self.engine.internal_server_state.is_leading() { - self.engine.replication_handler().update_progress(target, id, result); + self.engine.replication_handler().update_progress(target, request_id, result); } } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 7ca046504..426407f9e 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -16,7 +16,6 @@ use crate::raft_state::LogStateReader; use crate::replication::request_id::RequestId; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; -use crate::utime::UTime; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; @@ -140,12 +139,10 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: RequestId, - result: UTime, InstantOf>, + result: ReplicationResult, ) { - let sending_time = result.utime().unwrap(); - // No matter what the result is, the validity of the leader is granted by a follower. - self.update_leader_vote_clock(target, sending_time); + self.update_leader_clock(target, result.sending_time); let id = request_id.request_id(); let Some(id) = id else { @@ -153,11 +150,11 @@ where C: RaftTypeConfig return; }; - match result.into_inner() { - ReplicationResult::Matching(matching) => { + match result.result { + Ok(matching) => { self.update_matching(target, id, matching); } - ReplicationResult::Conflict(conflict) => { + Err(conflict) => { self.update_conflicting(target, id, conflict); } } @@ -166,7 +163,7 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: InstantOf) { + pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf) { tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!()); let granted = *self @@ -209,7 +206,7 @@ where C: RaftTypeConfig // The value granted by a quorum may not yet be a committed. // A committed is **granted** and also is in current term. - let granted = *self + let quorum_accepted = *self .leader .progress .update_with(&node_id, |prog_entry| { @@ -221,16 +218,19 @@ where C: RaftTypeConfig }) .expect("it should always update existing progress"); - tracing::debug!(granted = display(granted.display()), "granted after updating progress"); + tracing::debug!( + quorum_accepted = display(quorum_accepted.display()), + "after updating progress" + ); - self.try_commit_granted(granted); + self.try_commit_quorum_accepted(quorum_accepted); } /// Commit the log id that is granted(accepted) by a quorum of voters. /// /// In raft a log that is granted and in the leader term is committed. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn try_commit_granted(&mut self, granted: Option>) { + pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { // Only when the log id is proposed by current leader, it is committed. if let Some(c) = granted { if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) { @@ -281,10 +281,8 @@ where C: RaftTypeConfig &mut self, target: C::NodeId, request_id: RequestId, - repl_res: Result, InstantOf>, String>, + repl_res: Result, String>, ) { - // TODO(2): test - tracing::debug!( target = display(target), request_id = display(request_id), diff --git a/openraft/src/membership/membership_test.rs b/openraft/src/membership/membership_test.rs index 2a5ab818b..47590bc8d 100644 --- a/openraft/src/membership/membership_test.rs +++ b/openraft/src/membership/membership_test.rs @@ -9,7 +9,6 @@ use crate::engine::testing::UTConfig; use crate::membership::IntoNodes; use crate::ChangeMembers; use crate::Membership; -use crate::MessageSummary; #[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] @@ -38,10 +37,10 @@ fn test_membership_summary() -> anyhow::Result<()> { }; let m = Membership::::new(vec![btreeset! {1,2}, btreeset! {3}], None); - assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[]}", m.summary()); + assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[]}", m.to_string()); let m = Membership::::new(vec![btreeset! {1,2}, btreeset! {3}], Some(btreeset! {4})); - assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[4:()]}", m.summary()); + assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[4:()]}", m.to_string()); let m = Membership::>::new_unchecked(vec![btreeset! {1,2}, btreeset! {3}], btreemap! { 1=>node("127.0.0.1", "k1"), @@ -52,7 +51,7 @@ fn test_membership_summary() -> anyhow::Result<()> { }); assert_eq!( r#"{voters:[{1:TestNode { addr: "127.0.0.1", data: {"k1": "k1"} },2:TestNode { addr: "127.0.0.2", data: {"k2": "k2"} }},{3:TestNode { addr: "127.0.0.3", data: {"k3": "k3"} }}], learners:[4:TestNode { addr: "127.0.0.4", data: {"k4": "k4"} }]}"#, - m.summary() + m.to_string() ); Ok(()) diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index f9172a1a7..80d31975c 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -27,10 +27,8 @@ use tracing_futures::Instrument; use crate::config::Config; use crate::core::notify::Notify; use crate::core::raft_msg::ResultReceiver; -use crate::display_ext::DisplayOption; use crate::display_ext::DisplayOptionExt; use crate::error::HigherVote; -use crate::error::Infallible; use crate::error::PayloadTooLarge; use crate::error::RPCError; use crate::error::RaftError; @@ -55,7 +53,6 @@ use crate::storage::Snapshot; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::JoinHandleOf; -use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; use crate::LogId; @@ -482,7 +479,7 @@ where debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - self.send_progress_conflicting(request_id, leader_time, conflict); + self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict))); Ok(None) } @@ -491,7 +488,7 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. - fn send_progress_error(&mut self, request_id: RequestId, err: RPCError>) { + fn send_progress_error(&mut self, request_id: RequestId, err: RPCError>) { let _ = self.tx_raft_core.send(Notify::Network { response: Response::Progress { target: self.target, @@ -502,80 +499,63 @@ where }); } - /// Send a `conflict` message to RaftCore. - /// RaftCore will then submit another replication command. - fn send_progress_conflicting( - &mut self, - request_id: RequestId, - leader_time: InstantOf, - conflict: LogId, - ) { + /// Send the success replication result(log matching or conflict) to RaftCore. + fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { tracing::debug!( - target = display(self.target), request_id = display(request_id), - conflict = display(&conflict), - "update_conflicting" + target = display(self.target), + curr_matching = display(self.matching.display()), + result = display(&replication_result), + "{}", + func_name!() ); + match replication_result.result { + Ok(matching) => { + self.validate_matching(matching); + self.matching = matching; + } + Err(_conflict) => { + // Conflict is not allowed to be less than the current matching. + } + } + let _ = self.tx_raft_core.send({ Notify::Network { response: Response::Progress { session_id: self.session_id, request_id, target: self.target, - result: Ok(UTime::new(leader_time, ReplicationResult::Conflict(conflict))), + result: Ok(replication_result), }, } }); } - /// Update the `matching` log id, which is for tracking follower replication, and report the - /// matched log id to `RaftCore` to commit an entry. - #[tracing::instrument(level = "trace", skip(self))] - fn send_progress_matching( - &mut self, - request_id: RequestId, - leader_time: InstantOf, - new_matching: Option>, - ) { - tracing::debug!( - request_id = display(request_id), - target = display(self.target), - curr_matching = display(DisplayOption(&self.matching)), - new_matching = display(DisplayOption(&new_matching)), - "{}", - func_name!() - ); - + /// Validate the value for updating matching log id. + /// + /// If the matching log id is reverted to a smaller value: + /// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled; + /// - otherwise panic, consider it as a bug. + /// + /// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert + fn validate_matching(&self, matching: Option>) { if cfg!(feature = "loosen-follower-log-revert") { - if self.matching > new_matching { + if self.matching > matching { tracing::warn!( - "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", - self.matching.display(), - new_matching.display(), - ); + "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", + self.matching.display(), + matching.display(), + ); } } else { debug_assert!( - self.matching <= new_matching, + self.matching <= matching, "follower log is reverted from {} to {}", self.matching.display(), - new_matching.display(), + matching.display(), ); } - - self.matching = new_matching; - - let _ = self.tx_raft_core.send({ - Notify::Network { - response: Response::Progress { - session_id: self.session_id, - request_id, - target: self.target, - result: Ok(UTime::new(leader_time, ReplicationResult::Matching(new_matching))), - }, - } - }); } /// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and @@ -836,7 +816,10 @@ where })); } - self.send_progress_matching(request_id, start_time, snapshot_meta.last_log_id); + self.send_progress( + request_id, + ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)), + ); Ok(None) } @@ -850,7 +833,7 @@ where leader_time: InstantOf, log_ids: DataWithId>, ) -> Option> { - self.send_progress_matching(log_ids.request_id(), leader_time, matching); + self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching))); if matching < log_ids.data().last_log_id { Some(Data::new_logs( diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 2788aafde..4ea8335f3 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,10 +1,11 @@ +use std::fmt; + +use crate::display_ext::DisplayOptionExt; use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; use crate::type_config::alias::InstantOf; -use crate::utime::UTime; -use crate::LogId; +use crate::type_config::alias::LogIdOf; use crate::MessageSummary; -use crate::NodeId; use crate::RaftTypeConfig; use crate::StorageError; use crate::Vote; @@ -33,7 +34,7 @@ where C: RaftTypeConfig /// the target node. /// /// The result also track the time when this request is sent. - result: Result, InstantOf>, String>, + result: Result, String>, /// In which session this message is sent. /// @@ -100,9 +101,58 @@ where C: RaftTypeConfig } } -/// Result of an replication action. +/// Result of an append-entries replication #[derive(Clone, Debug)] -pub(crate) enum ReplicationResult { - Matching(Option>), - Conflict(LogId), +pub(crate) struct ReplicationResult { + /// The timestamp when this request is sent. + /// + /// It is used to update the lease for leader. + pub(crate) sending_time: InstantOf, + + /// Ok for matching, Err for conflict. + pub(crate) result: Result>, LogIdOf>, +} + +impl fmt::Display for ReplicationResult +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{time:{:?}, result:", self.sending_time)?; + + match &self.result { + Ok(matching) => write!(f, "Match:{}", matching.display())?, + Err(conflict) => write!(f, "Conflict:{}", conflict)?, + } + + write!(f, "}}") + } +} + +impl ReplicationResult +where C: RaftTypeConfig +{ + pub(crate) fn new(sending_time: InstantOf, result: Result>, LogIdOf>) -> Self { + Self { sending_time, result } + } +} + +#[cfg(test)] +mod tests { + use crate::engine::testing::UTConfig; + use crate::replication::response::ReplicationResult; + use crate::testing::log_id; + use crate::type_config::alias::InstantOf; + + #[test] + fn test_replication_result_display() { + // NOTE that with single-term-leader, log id is `1-3` + + let result = ReplicationResult::::new(InstantOf::::now(), Ok(Some(log_id(1, 2, 3)))); + let want = format!(", result:Match:{}}}", log_id(1, 2, 3)); + assert!(result.to_string().ends_with(&want), "{}", result.to_string()); + + let result = ReplicationResult::::new(InstantOf::::now(), Err(log_id(1, 2, 3))); + let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3)); + assert!(result.to_string().ends_with(&want), "{}", result.to_string()); + } } diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index a4ec551c3..adb56d009 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -78,6 +78,7 @@ impl UTime { } /// Consumes this object and returns the inner data. + #[allow(dead_code)] pub(crate) fn into_inner(self) -> T { self.data }