diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 59f2f0bea..32db67c38 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -527,10 +527,15 @@ where /// Report a metrics payload on the current state of the Raft node. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn report_metrics(&self, replication: Option>) { + pub(crate) fn report_metrics(&mut self, replication: Option>) { + let last_quorum_acked = self.last_quorum_acked_time(); + let millis_since_quorum_ack = last_quorum_acked.map(|t| t.elapsed().as_millis() as u64); + let st = &self.engine.state; let membership_config = st.membership_state.effective().stored_membership().clone(); + let current_leader = self.current_leader(); + let m = RaftMetrics { running_state: Ok(()), id: self.id, @@ -545,7 +550,8 @@ where // --- cluster --- state: st.server_state, - current_leader: self.current_leader(), + current_leader, + millis_since_quorum_ack, membership_config: membership_config.clone(), // --- replication --- @@ -564,6 +570,7 @@ where last_applied: st.io_applied().copied(), snapshot: st.io_snapshot_last_log_id().copied(), purged: st.io_purged().copied(), + millis_since_quorum_ack, replication, }; self.tx_data_metrics.send_if_modified(|metrix| { @@ -578,7 +585,7 @@ where id: self.id, vote: *st.io_state().vote(), state: st.server_state, - current_leader: self.current_leader(), + current_leader, membership_config, }; self.tx_server_metrics.send_if_modified(|metrix| { @@ -665,6 +672,16 @@ where } } + /// Retrieves the most recent timestamp that is acknowledged by a quorum. + /// + /// This function returns the latest known time at which the leader received acknowledgment + /// from a quorum of followers, indicating its leadership is current and recognized. + /// If the node is not a leader or no acknowledgment has been received, `None` is returned. + fn last_quorum_acked_time(&mut self) -> Option> { + let leading = self.engine.internal_server_state.leading_mut(); + leading.and_then(|l| l.last_quorum_acked_time()) + } + pub(crate) fn get_leader_node(&self, leader_id: Option) -> Option { let leader_id = match leader_id { None => return None, diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index f62ca4bb9..aa6946345 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -84,6 +84,7 @@ where C: RaftTypeConfig /// /// Note: This method does not check last-log-id. handle-vote-request has to deal with /// last-log-id itself. + #[tracing::instrument(level = "debug", skip_all)] pub(crate) fn update_vote(&mut self, vote: &Vote) -> Result<(), RejectVoteRequest> { // Partial ord compare: // Vote does not has to be total ord. @@ -137,7 +138,11 @@ where C: RaftTypeConfig pub(crate) fn become_leading(&mut self) { if let Some(l) = self.internal_server_state.leading_mut() { if l.vote.leader_id() == self.state.vote_ref().leader_id() { - // Vote still belongs to the same leader. Just updating vote is enough. + tracing::debug!( + "vote still belongs to the same leader. Just updating vote is enough: node-{}, {}", + self.config.id, + self.state.vote_ref() + ); l.vote = *self.state.vote_ref(); self.server_state_handler().update_server_state_if_changed(); return; diff --git a/openraft/src/instant.rs b/openraft/src/instant.rs index 27c4481d2..e09e31ddc 100644 --- a/openraft/src/instant.rs +++ b/openraft/src/instant.rs @@ -35,6 +35,18 @@ pub trait Instant: { /// Return the current instant. fn now() -> Self; + + /// Return the amount of time since the instant. + /// + /// The returned duration is guaranteed to be non-negative. + fn elapsed(&self) -> Duration { + let now = Self::now(); + if now > *self { + now - *self + } else { + Duration::from_secs(0) + } + } } pub type TokioInstant = tokio::time::Instant; diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index d97e8aa20..9bf4d971b 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -107,8 +107,7 @@ where /// Note that the leader may not be in the QuorumSet at all. /// In such a case, the update operation will be just ignored, /// and the quorum-acked-time is totally determined by remove voters. - #[allow(dead_code)] - pub(crate) fn quorum_acked_time(&mut self) -> Option { + pub(crate) fn last_quorum_acked_time(&mut self) -> Option { // For `Leading`, the vote is always the leader's vote. // Thus vote.voted_for() is this node. @@ -142,7 +141,7 @@ mod tests { use crate::Vote; #[test] - fn test_leading_quorum_acked_time_leader_is_voter() { + fn test_leading_last_quorum_acked_time_leader_is_voter() { let mut leading = Leading::, InstantOf>::new( Vote::new_committed(2, 1), vec![1, 2, 3], @@ -153,12 +152,12 @@ mod tests { let now1 = InstantOf::::now(); let _t2 = leading.clock_progress.increase_to(&2, Some(now1)); - let t1 = leading.quorum_acked_time(); + let t1 = leading.last_quorum_acked_time(); assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2"); } #[test] - fn test_leading_quorum_acked_time_leader_is_learner() { + fn test_leading_last_quorum_acked_time_leader_is_learner() { let mut leading = Leading::, InstantOf>::new( Vote::new_committed(2, 4), vec![1, 2, 3], @@ -168,17 +167,17 @@ mod tests { let t2 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); - let t = leading.quorum_acked_time(); + let t = leading.last_quorum_acked_time(); assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); let t3 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&3, Some(t3)); - let t = leading.quorum_acked_time(); + let t = leading.last_quorum_acked_time(); assert_eq!(Some(t2), t, "n2 and n3 acked"); } #[test] - fn test_leading_quorum_acked_time_leader_is_not_member() { + fn test_leading_last_quorum_acked_time_leader_is_not_member() { let mut leading = Leading::, InstantOf>::new( Vote::new_committed(2, 5), vec![1, 2, 3], @@ -188,12 +187,12 @@ mod tests { let t2 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&2, Some(t2)); - let t = leading.quorum_acked_time(); + let t = leading.last_quorum_acked_time(); assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); let t3 = InstantOf::::now(); let _ = leading.clock_progress.increase_to(&3, Some(t3)); - let t = leading.quorum_acked_time(); + let t = leading.last_quorum_acked_time(); assert_eq!(Some(t2), t, "n2 and n3 acked"); } } diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index d4d200fb0..c31f7f473 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use crate::core::ServerState; use crate::display_ext::DisplayOption; +use crate::display_ext::DisplayOptionExt; use crate::error::Fatal; use crate::metrics::ReplicationMetrics; use crate::summary::MessageSummary; @@ -54,6 +55,21 @@ pub struct RaftMetrics { /// The current cluster leader. pub current_leader: Option, + /// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged + /// timestamp by a quorum. + /// + /// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum. + /// Being acknowledged means receiving a reply of + /// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`). + /// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count, + /// because a node will not maintain a lease for a vote with `committed == false`. + /// + /// This duration is used by the application to assess the likelihood that the leader has lost + /// synchronization with the cluster. + /// A longer duration without acknowledgment may suggest a higher probability of the leader + /// being partitioned from the cluster. + pub millis_since_quorum_ack: Option, + /// The current membership config of the cluster. pub membership_config: Arc>, @@ -72,7 +88,7 @@ where C: RaftTypeConfig write!( f, - "id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}", + "id:{}, {:?}, term:{}, vote:{}, last_log:{}, last_applied:{}, leader:{}(since_last_ack:{} ms)", self.id, self.state, self.current_term, @@ -80,6 +96,7 @@ where C: RaftTypeConfig DisplayOption(&self.last_log_index), DisplayOption(&self.last_applied), DisplayOption(&self.current_leader), + DisplayOption(&self.millis_since_quorum_ack), )?; write!(f, ", ")?; @@ -124,6 +141,7 @@ where C: RaftTypeConfig state: ServerState::Follower, current_leader: None, + millis_since_quorum_ack: None, membership_config: Arc::new(StoredMembership::default()), replication: None, } @@ -138,6 +156,22 @@ pub struct RaftDataMetrics { pub last_applied: Option>, pub snapshot: Option>, pub purged: Option>, + + /// For a leader, it is the elapsed time in milliseconds since the most recently acknowledged + /// timestamp by a quorum. + /// + /// It is `None` if this node is not leader, or the leader is not yet acknowledged by a quorum. + /// Being acknowledged means receiving a reply of + /// `AppendEntries`(`AppendEntriesRequest.vote.committed == true`). + /// Receiving a reply of `RequestVote`(`RequestVote.vote.committed == false`) does not count, + /// because a node will not maintain a lease for a vote with `committed == false`. + /// + /// This duration is used by the application to assess the likelihood that the leader has lost + /// synchronization with the cluster. + /// A longer duration without acknowledgment may suggest a higher probability of the leader + /// being partitioned from the cluster. + pub millis_since_quorum_ack: Option, + pub replication: Option>, } @@ -149,11 +183,12 @@ where C: RaftTypeConfig write!( f, - "last_log:{}, last_applied:{}, snapshot:{}, purged:{}, replication:{{{}}}", + "last_log:{}, last_applied:{}, snapshot:{}, purged:{}, quorum_acked(leader):{} ms before, replication:{{{}}}", DisplayOption(&self.last_log), DisplayOption(&self.last_applied), DisplayOption(&self.snapshot), DisplayOption(&self.purged), + self.millis_since_quorum_ack.display(), self.replication .as_ref() .map(|x| { x.iter().map(|(k, v)| format!("{}:{}", k, DisplayOption(v))).collect::>().join(",") }) @@ -181,6 +216,7 @@ pub struct RaftServerMetrics { pub vote: Vote, pub state: ServerState, pub current_leader: Option, + pub membership_config: Arc>, } diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 2a6a73ddc..6713e0082 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -259,6 +259,7 @@ where C: RaftTypeConfig { purged: None, current_leader: None, + millis_since_quorum_ack: None, membership_config: Arc::new(StoredMembership::new(None, Membership::new(vec![btreeset! {}], None))), snapshot: None, diff --git a/tests/tests/metrics/main.rs b/tests/tests/metrics/main.rs index f212f60d9..8cf0f4b1c 100644 --- a/tests/tests/metrics/main.rs +++ b/tests/tests/metrics/main.rs @@ -8,6 +8,7 @@ mod fixtures; // The later tests may depend on the earlier ones. mod t10_current_leader; +mod t10_leader_last_ack; mod t10_purged; mod t10_server_metrics_and_data_metrics; mod t20_metrics_state_machine_consistency; diff --git a/tests/tests/metrics/t10_leader_last_ack.rs b/tests/tests/metrics/t10_leader_last_ack.rs new file mode 100644 index 000000000..4d98b2bf5 --- /dev/null +++ b/tests/tests/metrics/t10_leader_last_ack.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use maplit::btreeset; +use openraft::AsyncRuntime; +use openraft::Config; +use openraft::RaftTypeConfig; +use openraft_memstore::TypeConfig; + +use crate::fixtures::init_default_ut_tracing; +use crate::fixtures::RaftRouter; + +/// Get the last timestamp when a leader is acknowledged by a quorum, +/// from RaftMetrics and RaftServerMetrics. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn leader_last_ack_3_nodes() -> Result<()> { + let heartbeat_interval = 50; // ms + let config = Arc::new( + Config { + enable_heartbeat: false, + heartbeat_interval, + enable_elect: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + let log_index = router.new_cluster(btreeset! {0,1,2}, btreeset! {}).await?; + + let n0 = router.get_raft_handle(&0)?; + let millis = n0.metrics().borrow().millis_since_quorum_ack; + assert!(millis >= Some(0)); + + { + let millis = n0.data_metrics().borrow().millis_since_quorum_ack; + assert!(millis >= Some(0)); + } + + tracing::info!(log_index, "--- sleep 500 ms, the `millis` should extend"); + { + <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + + let greater = n0.metrics().borrow().millis_since_quorum_ack; + println!("greater: {:?}", greater); + assert!(greater > millis); + assert!( + greater > Some(500 - heartbeat_interval * 2), + "it extends, but may be smaller because the tick interval is 50 ms" + ); + } + + let n0 = router.get_raft_handle(&0)?; + + tracing::info!(log_index, "--- heartbeat; millis_since_quorum_ack refreshes"); + { + n0.trigger().heartbeat().await?; + n0.wait(timeout()) + .metrics( + |x| x.millis_since_quorum_ack < Some(100), + "millis_since_quorum_ack refreshed", + ) + .await?; + } + + tracing::info!( + log_index, + "--- sleep and heartbeat again; millis_since_quorum_ack refreshes" + ); + { + <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + + n0.trigger().heartbeat().await?; + + n0.wait(timeout()) + .metrics( + |x| x.millis_since_quorum_ack < Some(100), + "millis_since_quorum_ack refreshed again", + ) + .await?; + } + + tracing::info!(log_index, "--- remove node 1 and node 2"); + { + router.remove_node(1); + router.remove_node(2); + } + + tracing::info!( + log_index, + "--- sleep and heartbeat again; millis_since_quorum_ack does not refresh" + ); + { + <::AsyncRuntime as AsyncRuntime>::sleep(Duration::from_millis(500)).await; + + n0.trigger().heartbeat().await?; + + let got = n0 + .wait(timeout()) + .metrics( + |x| x.millis_since_quorum_ack < Some(100), + "millis_since_quorum_ack refreshed again", + ) + .await; + assert!(got.is_err(), "millis_since_quorum_ack does not refresh"); + } + + Ok(()) +} + +/// Get the last timestamp when a leader is acknowledged by a quorum, +/// from RaftMetrics and RaftServerMetrics. +#[async_entry::test(worker_threads = 8, init = "init_default_ut_tracing()", tracing_span = "debug")] +async fn leader_last_ack_1_node() -> Result<()> { + let config = Arc::new( + Config { + enable_heartbeat: false, + enable_elect: false, + ..Default::default() + } + .validate()?, + ); + + let mut router = RaftRouter::new(config.clone()); + + let log_index = router.new_cluster(btreeset! {0}, btreeset! {}).await?; + let _ = log_index; + + let n0 = router.get_raft_handle(&0)?; + + let millis = n0.metrics().borrow().millis_since_quorum_ack; + assert_eq!(millis, Some(0), "it is always acked for single leader"); + + { + let millis = n0.data_metrics().borrow().millis_since_quorum_ack; + assert_eq!(millis, Some(0), "it is always acked for single leader"); + } + + Ok(()) +} + +fn timeout() -> Option { + Some(Duration::from_millis(500)) +}