diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 383b82860..828163925 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -179,6 +179,7 @@ mod tests { type R = (); type NodeId = u64; type Node = (); + type Term = u64; type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index c762e5e1e..b881d2880 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -47,6 +47,7 @@ use crate::storage::SnapshotMeta; use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_term::RaftTerm; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; @@ -208,7 +209,7 @@ where C: RaftTypeConfig /// Start to elect this node as leader #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { - let new_term = self.state.vote.leader_id().term + 1; + let new_term = self.state.vote.leader_id().term.next(); let new_vote = Vote::new(new_term, self.config.id.clone()); let candidate = self.new_candidate(new_vote.clone()); diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index 8e7162832..0f816ef54 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -36,6 +36,7 @@ where N: Node + Ord type NodeId = u64; type Node = N; type Entry = crate::Entry; + type Term = u64; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index de348d8fd..3729cafb6 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -46,7 +46,6 @@ mod runtime; mod storage_error; mod summary; mod try_as_ref; -mod vote; pub(crate) mod engine; pub(crate) mod log_id_range; @@ -70,6 +69,7 @@ pub mod raft; pub mod storage; pub mod testing; pub mod type_config; +pub mod vote; #[cfg(test)] mod feature_serde_test; diff --git a/openraft/src/metrics/metric.rs b/openraft/src/metrics/metric.rs index afba2ce7e..7959b7646 100644 --- a/openraft/src/metrics/metric.rs +++ b/openraft/src/metrics/metric.rs @@ -14,7 +14,7 @@ use crate::Vote; pub enum Metric where C: RaftTypeConfig { - Term(u64), + Term(C::Term), Vote(Vote), LastLogIndex(Option), Applied(Option>), diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 584cb5ebc..8a4735ab7 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -29,7 +29,7 @@ pub struct RaftMetrics { // --- data --- // --- /// The current term of the Raft node. - pub current_term: u64, + pub current_term: C::Term, /// The last flushed vote. pub vote: Vote, @@ -167,7 +167,7 @@ where C: RaftTypeConfig running_state: Ok(()), id, - current_term: 0, + current_term: Default::default(), vote: Vote::default(), last_log_index: None, last_applied: None, diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index b1f1647fc..f00fb7d04 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -253,7 +253,7 @@ where C: RaftTypeConfig { running_state: Ok(()), id: NodeIdOf::::default(), state: ServerState::Learner, - current_term: 0, + current_term: Default::default(), vote: Vote::default(), last_log_index: None, last_applied: None, diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index d2bad2d7e..7ded9d116 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -114,6 +114,7 @@ use crate::Vote; /// R = ClientResponse, /// NodeId = u64, /// Node = openraft::BasicNode, +/// Term = u64, /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// Responder = openraft::impls::OneshotResponder, @@ -172,8 +173,9 @@ macro_rules! declare_raft_types { (R , , String ), (NodeId , , u64 ), (Node , , $crate::impls::BasicNode ), + (Term , , u64 ), (Entry , , $crate::impls::Entry ), - (SnapshotData , , std::io::Cursor> ), + (SnapshotData , , std::io::Cursor> ), (Responder , , $crate::impls::OneshotResponder ), (AsyncRuntime , , $crate::impls::TokioRuntime ), ); diff --git a/openraft/src/testing/common.rs b/openraft/src/testing/common.rs index 427c707b1..96a0295de 100644 --- a/openraft/src/testing/common.rs +++ b/openraft/src/testing/common.rs @@ -9,16 +9,23 @@ use crate::RaftTypeConfig; /// Builds a log id, for testing purposes. pub fn log_id(term: u64, node_id: C::NodeId, index: u64) -> LogId -where C: RaftTypeConfig { +where + C: RaftTypeConfig, + C::Term: From, +{ LogId:: { - leader_id: CommittedLeaderId::new(term, node_id), + leader_id: CommittedLeaderId::new(term.into(), node_id), index, } } /// Create a blank log entry for test. -pub fn blank_ent(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry { - crate::Entry::::new_blank(LogId::new(CommittedLeaderId::new(term, node_id), index)) +pub fn blank_ent(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry +where + C: RaftTypeConfig, + C::Term: From, +{ + crate::Entry::::new_blank(log_id(term, node_id, index)) } /// Create a membership log entry without learner config for test. @@ -29,10 +36,11 @@ pub fn membership_ent( config: Vec>, ) -> crate::Entry where + C::Term: From, C::Node: Default, { crate::Entry::new_membership( - LogId::new(CommittedLeaderId::new(term, node_id), index), + log_id(term, node_id, index), crate::Membership::new_with_defaults(config, []), ) } diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index 33a908a7d..ab40d9579 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -111,6 +111,7 @@ where C: RaftTypeConfig, C::D: Debug, C::R: Debug, + C::Term: From, C::NodeId: From, C::Node: Default, LS: RaftLogStorage, @@ -499,7 +500,7 @@ where "unexpected value for last applied log" ); assert_eq!( - Vote::new(1, NODE_ID.into()), + Vote::new(1u64.into(), NODE_ID.into()), *initial.vote_ref(), "unexpected value for default hard state" ); @@ -632,8 +633,8 @@ where } pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - let log_id = |t, n: u64, i| LogId:: { - leader_id: CommittedLeaderId::new(t, n.into()), + let log_id = |t: u64, n: u64, i| LogId:: { + leader_id: CommittedLeaderId::::new(t.into(), n.into()), index: i, }; @@ -820,11 +821,11 @@ where } pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - store.save_vote(&Vote::new(100, NODE_ID.into())).await?; + store.save_vote(&Vote::new(100.into(), NODE_ID.into())).await?; let got = store.read_vote().await?; - assert_eq!(Some(Vote::new(100, NODE_ID.into())), got,); + assert_eq!(Some(Vote::new(100.into(), NODE_ID.into())), got,); Ok(()) } @@ -873,7 +874,7 @@ where pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError> { Self::feed_10_logs_vote_self(&mut store).await?; - store.purge(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)).await?; + store.purge(log_id(0, 0, 0)).await?; // `purge()` does not have to do the purge at once. // The implementation may choose to do it in the background. @@ -923,10 +924,7 @@ where store.purge(log_id_0(0, 0)).await?; let st = store.get_log_state().await?; - assert_eq!( - Some(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)), - st.last_purged_log_id - ); + assert_eq!(Some(log_id(0, 0, 0)), st.last_purged_log_id); assert_eq!(Some(log_id_0(1, 2)), st.last_log_id); } @@ -1368,33 +1366,37 @@ where } pub async fn default_vote(sto: &mut LS) -> Result<(), StorageError> { - sto.save_vote(&Vote::new(1, NODE_ID.into())).await?; + sto.save_vote(&Vote::new(1u64.into(), NODE_ID.into())).await?; Ok(()) } } /// Create a log id with node id 0 for testing. -fn log_id_0(term: u64, index: u64) -> LogId +fn log_id_0(term: impl Into, index: u64) -> LogId where C: RaftTypeConfig, C::NodeId: From, { LogId { - leader_id: CommittedLeaderId::new(term, NODE_ID.into()), + leader_id: CommittedLeaderId::new(term.into(), NODE_ID.into()), index, } } /// Create a blank log entry with node_id 0 for test. fn blank_ent_0(term: u64, index: u64) -> C::Entry -where C::NodeId: From { - C::Entry::new_blank(log_id_0(term, index)) +where + C::Term: From, + C::NodeId: From, +{ + C::Entry::new_blank(log_id(term, 0, index)) } /// Create a membership entry with node_id 0 for test. -fn membership_ent_0(term: u64, index: u64, bs: BTreeSet) -> C::Entry +fn membership_ent_0(term: impl Into, index: u64, bs: BTreeSet) -> C::Entry where + C: RaftTypeConfig, C::NodeId: From, C::Node: Default, { @@ -1453,3 +1455,15 @@ where } Ok(()) } + +fn log_id(term: u64, node_id: u64, index: u64) -> LogId +where + C: RaftTypeConfig, + C::Term: From, + C::NodeId: From, +{ + LogId { + leader_id: CommittedLeaderId::new(term.into(), node_id.into()), + index, + } +} diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 46bf359f2..f51b12f0f 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -16,6 +16,7 @@ pub use util::TypeConfigExt; use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; +use crate::vote::raft_term::RaftTerm; use crate::AppData; use crate::AppDataResponse; use crate::Node; @@ -43,6 +44,7 @@ use crate::OptionalSync; /// R = ClientResponse, /// NodeId = u64, /// Node = openraft::BasicNode, +/// Term = u64, /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// AsyncRuntime = openraft::TokioRuntime, @@ -67,6 +69,16 @@ pub trait RaftTypeConfig: /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; + /// Type representing a Raft term number. + /// + /// A term is a logical clock in Raft that is used to detect obsolete information, + /// such as old leaders. It must be totally ordered and monotonically increasing. + /// + /// Common implementations are provided for standard integer types like `u64`, `i64` etc. + /// + /// See: [`RaftTerm`] for the required methods. + type Term: RaftTerm; + /// Snapshot data for exposing a snapshot for reading & writing. /// /// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting. @@ -106,6 +118,7 @@ pub mod alias { pub type ROf = ::R; pub type NodeIdOf = ::NodeId; pub type NodeOf = ::Node; + pub type TermOf = ::Term; pub type EntryOf = ::Entry; pub type SnapshotDataOf = ::SnapshotData; pub type AsyncRuntimeOf = ::AsyncRuntime; diff --git a/openraft/src/vote/leader_id/leader_id_adv.rs b/openraft/src/vote/leader_id/leader_id_adv.rs index 35510c999..6b95365de 100644 --- a/openraft/src/vote/leader_id/leader_id_adv.rs +++ b/openraft/src/vote/leader_id/leader_id_adv.rs @@ -17,18 +17,18 @@ use crate::RaftTypeConfig; pub struct LeaderId where C: RaftTypeConfig { - pub term: u64, + pub term: C::Term, pub node_id: C::NodeId, } impl LeaderId where C: RaftTypeConfig { - pub fn new(term: u64, node_id: C::NodeId) -> Self { + pub fn new(term: C::Term, node_id: C::NodeId) -> Self { Self { term, node_id } } - pub fn get_term(&self) -> u64 { + pub fn get_term(&self) -> C::Term { self.term } diff --git a/openraft/src/vote/leader_id/leader_id_std.rs b/openraft/src/vote/leader_id/leader_id_std.rs index e73f4a4b3..95d7e7ab2 100644 --- a/openraft/src/vote/leader_id/leader_id_std.rs +++ b/openraft/src/vote/leader_id/leader_id_std.rs @@ -10,7 +10,7 @@ use crate::RaftTypeConfig; pub struct LeaderId where C: RaftTypeConfig { - pub term: u64, + pub term: C::Term, pub voted_for: Option, } @@ -44,14 +44,14 @@ where C: RaftTypeConfig impl LeaderId where C: RaftTypeConfig { - pub fn new(term: u64, node_id: C::NodeId) -> Self { + pub fn new(term: C::Term, node_id: C::NodeId) -> Self { Self { term, voted_for: Some(node_id), } } - pub fn get_term(&self) -> u64 { + pub fn get_term(&self) -> C::Term { self.term } @@ -84,8 +84,10 @@ where C: RaftTypeConfig #[derive(PartialOrd, Ord)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] #[cfg_attr(feature = "serde", serde(transparent))] -pub struct CommittedLeaderId { - pub term: u64, +pub struct CommittedLeaderId +where C: RaftTypeConfig +{ + pub term: C::Term, p: PhantomData, } @@ -100,7 +102,7 @@ where C: RaftTypeConfig impl CommittedLeaderId where C: RaftTypeConfig { - pub fn new(term: u64, node_id: C::NodeId) -> Self { + pub fn new(term: C::Term, node_id: C::NodeId) -> Self { let _ = node_id; Self { term, p: PhantomData } } diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index f72f69a45..93ce1501c 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -11,3 +11,5 @@ pub use leader_id::CommittedLeaderId; pub use leader_id::LeaderId; pub(crate) use non_committed::NonCommittedVote; pub use vote::Vote; + +pub mod raft_term; diff --git a/openraft/src/vote/raft_term/mod.rs b/openraft/src/vote/raft_term/mod.rs new file mode 100644 index 000000000..b623050ce --- /dev/null +++ b/openraft/src/vote/raft_term/mod.rs @@ -0,0 +1,21 @@ +mod raft_term_impls; + +use std::fmt::Debug; +use std::fmt::Display; + +use crate::base::OptionalFeatures; + +/// Type representing a Raft term number. +/// +/// A term is a logical clock in Raft that is used to detect obsolete information, +/// such as old leaders. It must be totally ordered and monotonically increasing. +/// +/// Common implementations are provided for standard integer types like `u64`, `i64` etc. +pub trait RaftTerm +where Self: OptionalFeatures + Ord + Debug + Display + Copy + Default + 'static +{ + /// Returns the next term. + /// + /// Must satisfy: `self < self.next()` + fn next(&self) -> Self; +} diff --git a/openraft/src/vote/raft_term/raft_term_impls.rs b/openraft/src/vote/raft_term/raft_term_impls.rs new file mode 100644 index 000000000..bb2746a92 --- /dev/null +++ b/openraft/src/vote/raft_term/raft_term_impls.rs @@ -0,0 +1,54 @@ +use crate::vote::raft_term::RaftTerm; + +macro_rules! impl_raft_term { + ($($t:ty),*) => { + $( + impl RaftTerm for $t { + fn next(&self) -> Self { + self + 1 + } + } + )* + } +} + +impl_raft_term!( + u8, u16, u32, u64, u128, // + i8, i16, i32, i64, i128, // + usize, isize +); + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_raft_term_impls() { + // Test unsigned integers + assert_eq!(1_u8.next(), 2_u8); + assert_eq!(1_u16.next(), 2_u16); + assert_eq!(1_u32.next(), 2_u32); + assert_eq!(1_u64.next(), 2_u64); + assert_eq!(1_u128.next(), 2_u128); + assert_eq!(1_usize.next(), 2_usize); + + // Test signed integers + assert_eq!(1_i8.next(), 2_i8); + assert_eq!(1_i16.next(), 2_i16); + assert_eq!(1_i32.next(), 2_i32); + assert_eq!(1_i64.next(), 2_i64); + assert_eq!(1_i128.next(), 2_i128); + assert_eq!(1_isize.next(), 2_isize); + + // Test default values + assert_eq!(u8::default().next(), 1_u8); + assert_eq!(i8::default().next(), 1_i8); + assert_eq!(usize::default().next(), 1_usize); + assert_eq!(isize::default().next(), 1_isize); + + // Test boundary cases + assert_eq!(254_u8.next(), 255_u8); + assert_eq!(126_i8.next(), 127_i8); + assert_eq!((-2_i8).next(), -1_i8); + } +} diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index 9af5d371f..6f975953b 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -51,14 +51,14 @@ where C: RaftTypeConfig impl Vote where C: RaftTypeConfig { - pub fn new(term: u64, node_id: C::NodeId) -> Self { + pub fn new(term: C::Term, node_id: C::NodeId) -> Self { Self { leader_id: LeaderId::new(term, node_id), committed: false, } } - pub fn new_committed(term: u64, node_id: C::NodeId) -> Self { + pub fn new_committed(term: C::Term, node_id: C::NodeId) -> Self { Self { leader_id: LeaderId::new(term, node_id), committed: true,