From 8306248392c224694f5a47520a54847e54154a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 24 Mar 2024 13:45:18 +0800 Subject: [PATCH] Refactor: merge send-progress methods --- openraft/src/core/raft_core.rs | 8 +- .../engine/handler/replication_handler/mod.rs | 17 ++-- openraft/src/replication/mod.rs | 96 ++++++++----------- openraft/src/replication/response.rs | 39 ++++++++ openraft/src/utime.rs | 1 + 5 files changed, 93 insertions(+), 68 deletions(-) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 68918275b..187746cc4 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1464,12 +1464,12 @@ where fn handle_replication_progress( &mut self, target: C::NodeId, - id: RequestId, + request_id: RequestId, result: Result, String>, ) { tracing::debug!( target = display(target), - request_id = display(id), + request_id = display(request_id), result = debug(&result), "handle_replication_progress" ); @@ -1484,9 +1484,9 @@ where } } - // TODO: A leader may have stepped down. + // A leader may have stepped down. if self.engine.internal_server_state.is_leading() { - self.engine.replication_handler().update_progress(target, id, result); + self.engine.replication_handler().update_progress(target, request_id, result); } } diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 656c98dc5..426407f9e 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -142,7 +142,7 @@ where C: RaftTypeConfig result: ReplicationResult, ) { // No matter what the result is, the validity of the leader is granted by a follower. - self.update_leader_vote_clock(target, result.sending_time); + self.update_leader_clock(target, result.sending_time); let id = request_id.request_id(); let Some(id) = id else { @@ -163,7 +163,7 @@ where C: RaftTypeConfig /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is /// accepted. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: InstantOf) { + pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf) { tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!()); let granted = *self @@ -206,7 +206,7 @@ where C: RaftTypeConfig // The value granted by a quorum may not yet be a committed. // A committed is **granted** and also is in current term. - let granted = *self + let quorum_accepted = *self .leader .progress .update_with(&node_id, |prog_entry| { @@ -218,16 +218,19 @@ where C: RaftTypeConfig }) .expect("it should always update existing progress"); - tracing::debug!(granted = display(granted.display()), "granted after updating progress"); + tracing::debug!( + quorum_accepted = display(quorum_accepted.display()), + "after updating progress" + ); - self.try_commit_granted(granted); + self.try_commit_quorum_accepted(quorum_accepted); } /// Commit the log id that is granted(accepted) by a quorum of voters. /// /// In raft a log that is granted and in the leader term is committed. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn try_commit_granted(&mut self, granted: Option>) { + pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option>) { // Only when the log id is proposed by current leader, it is committed. if let Some(c) = granted { if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) { @@ -280,8 +283,6 @@ where C: RaftTypeConfig request_id: RequestId, repl_res: Result, String>, ) { - // TODO(2): test - tracing::debug!( target = display(target), request_id = display(request_id), diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 5c777514d..80d31975c 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -27,10 +27,8 @@ use tracing_futures::Instrument; use crate::config::Config; use crate::core::notify::Notify; use crate::core::raft_msg::ResultReceiver; -use crate::display_ext::DisplayOption; use crate::display_ext::DisplayOptionExt; use crate::error::HigherVote; -use crate::error::Infallible; use crate::error::PayloadTooLarge; use crate::error::RPCError; use crate::error::RaftError; @@ -481,7 +479,7 @@ where debug_assert!(conflict.is_some(), "prev_log_id=None never conflict"); let conflict = conflict.unwrap(); - self.send_progress_conflicting(request_id, leader_time, conflict); + self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict))); Ok(None) } @@ -490,7 +488,7 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. - fn send_progress_error(&mut self, request_id: RequestId, err: RPCError>) { + fn send_progress_error(&mut self, request_id: RequestId, err: RPCError>) { let _ = self.tx_raft_core.send(Notify::Network { response: Response::Progress { target: self.target, @@ -501,80 +499,63 @@ where }); } - /// Send a `conflict` message to RaftCore. - /// RaftCore will then submit another replication command. - fn send_progress_conflicting( - &mut self, - request_id: RequestId, - leader_time: InstantOf, - conflict: LogId, - ) { + /// Send the success replication result(log matching or conflict) to RaftCore. + fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult) { tracing::debug!( - target = display(self.target), request_id = display(request_id), - conflict = display(&conflict), - "update_conflicting" + target = display(self.target), + curr_matching = display(self.matching.display()), + result = display(&replication_result), + "{}", + func_name!() ); + match replication_result.result { + Ok(matching) => { + self.validate_matching(matching); + self.matching = matching; + } + Err(_conflict) => { + // Conflict is not allowed to be less than the current matching. + } + } + let _ = self.tx_raft_core.send({ Notify::Network { response: Response::Progress { session_id: self.session_id, request_id, target: self.target, - result: Ok(ReplicationResult::new(leader_time, Err(conflict))), + result: Ok(replication_result), }, } }); } - /// Update the `matching` log id, which is for tracking follower replication, and report the - /// matched log id to `RaftCore` to commit an entry. - #[tracing::instrument(level = "trace", skip(self))] - fn send_progress_matching( - &mut self, - request_id: RequestId, - leader_time: InstantOf, - new_matching: Option>, - ) { - tracing::debug!( - request_id = display(request_id), - target = display(self.target), - curr_matching = display(DisplayOption(&self.matching)), - new_matching = display(DisplayOption(&new_matching)), - "{}", - func_name!() - ); - + /// Validate the value for updating matching log id. + /// + /// If the matching log id is reverted to a smaller value: + /// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled; + /// - otherwise panic, consider it as a bug. + /// + /// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert + fn validate_matching(&self, matching: Option>) { if cfg!(feature = "loosen-follower-log-revert") { - if self.matching > new_matching { + if self.matching > matching { tracing::warn!( - "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", - self.matching.display(), - new_matching.display(), - ); + "follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed", + self.matching.display(), + matching.display(), + ); } } else { debug_assert!( - self.matching <= new_matching, + self.matching <= matching, "follower log is reverted from {} to {}", self.matching.display(), - new_matching.display(), + matching.display(), ); } - - self.matching = new_matching; - - let _ = self.tx_raft_core.send({ - Notify::Network { - response: Response::Progress { - session_id: self.session_id, - request_id, - target: self.target, - result: Ok(ReplicationResult::new(leader_time, Ok(new_matching))), - }, - } - }); } /// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and @@ -835,7 +816,10 @@ where })); } - self.send_progress_matching(request_id, start_time, snapshot_meta.last_log_id); + self.send_progress( + request_id, + ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)), + ); Ok(None) } @@ -849,7 +833,7 @@ where leader_time: InstantOf, log_ids: DataWithId>, ) -> Option> { - self.send_progress_matching(log_ids.request_id(), leader_time, matching); + self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching))); if matching < log_ids.data().last_log_id { Some(Data::new_logs( diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 965f626fe..4ea8335f3 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,3 +1,6 @@ +use std::fmt; + +use crate::display_ext::DisplayOptionExt; use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; use crate::type_config::alias::InstantOf; @@ -110,6 +113,21 @@ pub(crate) struct ReplicationResult { pub(crate) result: Result>, LogIdOf>, } +impl fmt::Display for ReplicationResult +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{{time:{:?}, result:", self.sending_time)?; + + match &self.result { + Ok(matching) => write!(f, "Match:{}", matching.display())?, + Err(conflict) => write!(f, "Conflict:{}", conflict)?, + } + + write!(f, "}}") + } +} + impl ReplicationResult where C: RaftTypeConfig { @@ -117,3 +135,24 @@ where C: RaftTypeConfig Self { sending_time, result } } } + +#[cfg(test)] +mod tests { + use crate::engine::testing::UTConfig; + use crate::replication::response::ReplicationResult; + use crate::testing::log_id; + use crate::type_config::alias::InstantOf; + + #[test] + fn test_replication_result_display() { + // NOTE that with single-term-leader, log id is `1-3` + + let result = ReplicationResult::::new(InstantOf::::now(), Ok(Some(log_id(1, 2, 3)))); + let want = format!(", result:Match:{}}}", log_id(1, 2, 3)); + assert!(result.to_string().ends_with(&want), "{}", result.to_string()); + + let result = ReplicationResult::::new(InstantOf::::now(), Err(log_id(1, 2, 3))); + let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3)); + assert!(result.to_string().ends_with(&want), "{}", result.to_string()); + } +} diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index a4ec551c3..adb56d009 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -78,6 +78,7 @@ impl UTime { } /// Consumes this object and returns the inner data. + #[allow(dead_code)] pub(crate) fn into_inner(self) -> T { self.data }