diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index a98492dd5..add17cec3 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -189,10 +189,6 @@ where C: RaftTypeConfig // 1 2 3 // Value: 1 1 2 2 2 // 1 is granted by a quorum // ``` - if granted > self.leader.vote.utime() { - // Safe unwrap(): Only Some() can be greater than another Option - self.leader.vote.touch(granted.unwrap()); - } } /// Update progress when replicated data(logs or snapshot) matches on follower/learner and is diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 2d801224f..f62ca4bb9 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -12,7 +12,6 @@ use crate::internal_server_state::InternalServerState; use crate::leader::Leading; use crate::raft_state::LogStateReader; use crate::type_config::alias::InstantOf; -use crate::utime::UTime; use crate::AsyncRuntime; use crate::Instant; use crate::OptionalSend; @@ -139,7 +138,7 @@ where C: RaftTypeConfig if let Some(l) = self.internal_server_state.leading_mut() { if l.vote.leader_id() == self.state.vote_ref().leader_id() { // Vote still belongs to the same leader. Just updating vote is enough. - l.vote = UTime::without_utime(*self.state.vote_ref()); + l.vote = *self.state.vote_ref(); self.server_state_handler().update_server_state_if_changed(); return; } diff --git a/openraft/src/leader/leader.rs b/openraft/src/leader/leader.rs index cc05c280a..d97e8aa20 100644 --- a/openraft/src/leader/leader.rs +++ b/openraft/src/leader/leader.rs @@ -1,11 +1,10 @@ use std::fmt; -use std::ops::Deref; use crate::leader::voting::Voting; use crate::progress::entry::ProgressEntry; +use crate::progress::Progress; use crate::progress::VecProgress; use crate::quorum::QuorumSet; -use crate::utime::UTime; use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; @@ -29,10 +28,8 @@ use crate::Vote; #[derive(Clone, Debug)] #[derive(PartialEq, Eq)] pub(crate) struct Leading, I: Instant> { - // TODO(1): set the utime, - // TODO(1): update it when heartbeat is granted by a quorum /// The vote this leader works in. - pub(crate) vote: UTime, I>, + pub(crate) vote: Vote, quorum_set: QS, @@ -65,7 +62,7 @@ where let learner_ids = learner_ids.collect::>(); Self { - vote: UTime::without_utime(vote), + vote, quorum_set: quorum_set.clone(), voting: None, progress: VecProgress::new( @@ -88,12 +85,7 @@ where } pub(crate) fn initialize_voting(&mut self, last_log_id: Option>, now: I) -> &mut Voting { - self.voting = Some(Voting::new( - now, - *self.vote.deref(), - last_log_id, - self.quorum_set.clone(), - )); + self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone())); self.voting.as_mut().unwrap() } @@ -102,4 +94,106 @@ where // it has to be in voting progress self.voting.take().unwrap() } + + /// Get the last timestamp acknowledged by a quorum. + /// + /// The acknowledgement by remote nodes are updated when AppendEntries reply is received. + /// But if the time of the leader itself is not updated. + /// + /// Therefore everytime to retrieve the quorum acked timestamp, it should update with the + /// leader's time first. + /// It does not matter if the leader is not a voter, the QuorumSet will just ignore it. + /// + /// Note that the leader may not be in the QuorumSet at all. + /// In such a case, the update operation will be just ignored, + /// and the quorum-acked-time is totally determined by remove voters. + #[allow(dead_code)] + pub(crate) fn quorum_acked_time(&mut self) -> Option { + // For `Leading`, the vote is always the leader's vote. + // Thus vote.voted_for() is this node. + + // Safe unwrap: voted_for() is always non-None in Openraft + let node_id = self.vote.leader_id().voted_for().unwrap(); + let now = Instant::now(); + + tracing::debug!( + leader_id = display(node_id), + now = debug(now), + "{}: update with leader's local time, before retrieving quorum acked clock", + func_name!() + ); + + let granted = self.clock_progress.increase_to(&node_id, Some(now)); + + match granted { + Ok(x) => *x, + // The leader node id may not be in the quorum set. + Err(x) => *x, + } + } +} + +#[cfg(test)] +mod tests { + use crate::engine::testing::UTConfig; + use crate::leader::Leading; + use crate::progress::Progress; + use crate::type_config::alias::InstantOf; + use crate::Vote; + + #[test] + fn test_leading_quorum_acked_time_leader_is_voter() { + let mut leading = Leading::, InstantOf>::new( + Vote::new_committed(2, 1), + vec![1, 2, 3], + vec![4].into_iter(), + None, + ); + + let now1 = InstantOf::::now(); + + let _t2 = leading.clock_progress.increase_to(&2, Some(now1)); + let t1 = leading.quorum_acked_time(); + assert_eq!(Some(now1), t1, "n1(leader) and n2 acked, t1 > t2"); + } + + #[test] + fn test_leading_quorum_acked_time_leader_is_learner() { + let mut leading = Leading::, InstantOf>::new( + Vote::new_committed(2, 4), + vec![1, 2, 3], + vec![4].into_iter(), + None, + ); + + let t2 = InstantOf::::now(); + let _ = leading.clock_progress.increase_to(&2, Some(t2)); + let t = leading.quorum_acked_time(); + assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); + + let t3 = InstantOf::::now(); + let _ = leading.clock_progress.increase_to(&3, Some(t3)); + let t = leading.quorum_acked_time(); + assert_eq!(Some(t2), t, "n2 and n3 acked"); + } + + #[test] + fn test_leading_quorum_acked_time_leader_is_not_member() { + let mut leading = Leading::, InstantOf>::new( + Vote::new_committed(2, 5), + vec![1, 2, 3], + vec![4].into_iter(), + None, + ); + + let t2 = InstantOf::::now(); + let _ = leading.clock_progress.increase_to(&2, Some(t2)); + let t = leading.quorum_acked_time(); + assert!(t.is_none(), "n1(leader+learner) does not count in quorum"); + + let t3 = InstantOf::::now(); + let _ = leading.clock_progress.increase_to(&3, Some(t3)); + let t = leading.quorum_acked_time(); + assert_eq!(Some(t2), t, "n2 and n3 acked"); + } } diff --git a/openraft/src/utime.rs b/openraft/src/utime.rs index 49268c7e6..a4ec551c3 100644 --- a/openraft/src/utime.rs +++ b/openraft/src/utime.rs @@ -67,6 +67,7 @@ impl UTime { } /// Creates a new object that has no last-updated time. + #[allow(dead_code)] pub(crate) fn without_utime(data: T) -> Self { Self { data, utime: None } }