Skip to content

Commit

Permalink
Feature: Abstract Term
Browse files Browse the repository at this point in the history
Add associated type `Term: RaftTerm` to `RaftTypeConfig` so that
application can customize the data type for Raft `term`.

By default `Term` is `u64` and user application does not need to modify
to upgrade.

- Part of databendlabs#1278
  • Loading branch information
drmingdrmer committed Dec 26, 2024
1 parent 93a5585 commit 004e3f8
Show file tree
Hide file tree
Showing 17 changed files with 158 additions and 39 deletions.
1 change: 1 addition & 0 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ mod tests {
type R = ();
type NodeId = u64;
type Node = ();
type Term = u64;
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::storage::SnapshotMeta;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::TypeConfigExt;
use crate::vote::raft_term::RaftTerm;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
Expand Down Expand Up @@ -208,7 +209,7 @@ where C: RaftTypeConfig
/// Start to elect this node as leader
#[tracing::instrument(level = "debug", skip(self))]
pub(crate) fn elect(&mut self) {
let new_term = self.state.vote.leader_id().term + 1;
let new_term = self.state.vote.leader_id().term.next();
let new_vote = Vote::new(new_term, self.config.id.clone());

let candidate = self.new_candidate(new_vote.clone());
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ where N: Node + Ord
type NodeId = u64;
type Node = N;
type Entry = crate::Entry<Self>;
type Term = u64;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ mod runtime;
mod storage_error;
mod summary;
mod try_as_ref;
mod vote;

pub(crate) mod engine;
pub(crate) mod log_id_range;
Expand All @@ -70,6 +69,7 @@ pub mod raft;
pub mod storage;
pub mod testing;
pub mod type_config;
pub mod vote;

#[cfg(test)]
mod feature_serde_test;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::Vote;
pub enum Metric<C>
where C: RaftTypeConfig
{
Term(u64),
Term(C::Term),
Vote(Vote<C>),
LastLogIndex(Option<u64>),
Applied(Option<LogId<C>>),
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
// --- data ---
// ---
/// The current term of the Raft node.
pub current_term: u64,
pub current_term: C::Term,

/// The last flushed vote.
pub vote: Vote<C>,
Expand Down Expand Up @@ -167,7 +167,7 @@ where C: RaftTypeConfig
running_state: Ok(()),
id,

current_term: 0,
current_term: Default::default(),
vote: Vote::default(),
last_log_index: None,
last_applied: None,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/metrics/wait_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ where C: RaftTypeConfig {
running_state: Ok(()),
id: NodeIdOf::<C>::default(),
state: ServerState::Learner,
current_term: 0,
current_term: Default::default(),
vote: Vote::default(),
last_log_index: None,
last_applied: None,
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ use crate::Vote;
/// R = ClientResponse,
/// NodeId = u64,
/// Node = openraft::BasicNode,
/// Term = u64,
/// Entry = openraft::Entry<TypeConfig>,
/// SnapshotData = Cursor<Vec<u8>>,
/// Responder = openraft::impls::OneshotResponder<TypeConfig>,
Expand Down Expand Up @@ -172,8 +173,9 @@ macro_rules! declare_raft_types {
(R , , String ),
(NodeId , , u64 ),
(Node , , $crate::impls::BasicNode ),
(Term , , u64 ),
(Entry , , $crate::impls::Entry<Self> ),
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
(SnapshotData , , std::io::Cursor<Vec<u8>> ),
(Responder , , $crate::impls::OneshotResponder<Self> ),
(AsyncRuntime , , $crate::impls::TokioRuntime ),
);
Expand Down
18 changes: 13 additions & 5 deletions openraft/src/testing/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,23 @@ use crate::RaftTypeConfig;

/// Builds a log id, for testing purposes.
pub fn log_id<C>(term: u64, node_id: C::NodeId, index: u64) -> LogId<C>
where C: RaftTypeConfig {
where
C: RaftTypeConfig,
C::Term: From<u64>,
{
LogId::<C> {
leader_id: CommittedLeaderId::new(term, node_id),
leader_id: CommittedLeaderId::new(term.into(), node_id),
index,
}
}

/// Create a blank log entry for test.
pub fn blank_ent<C: RaftTypeConfig>(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry<C> {
crate::Entry::<C>::new_blank(LogId::new(CommittedLeaderId::new(term, node_id), index))
pub fn blank_ent<C>(term: u64, node_id: C::NodeId, index: u64) -> crate::Entry<C>
where
C: RaftTypeConfig,
C::Term: From<u64>,
{
crate::Entry::<C>::new_blank(log_id(term, node_id, index))
}

/// Create a membership log entry without learner config for test.
Expand All @@ -29,10 +36,11 @@ pub fn membership_ent<C: RaftTypeConfig>(
config: Vec<BTreeSet<C::NodeId>>,
) -> crate::Entry<C>
where
C::Term: From<u64>,
C::Node: Default,
{
crate::Entry::new_membership(
LogId::new(CommittedLeaderId::new(term, node_id), index),
log_id(term, node_id, index),
crate::Membership::new_with_defaults(config, []),
)
}
46 changes: 30 additions & 16 deletions openraft/src/testing/log/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ where
C: RaftTypeConfig,
C::D: Debug,
C::R: Debug,
C::Term: From<u64>,
C::NodeId: From<u64>,
C::Node: Default,
LS: RaftLogStorage<C>,
Expand Down Expand Up @@ -499,7 +500,7 @@ where
"unexpected value for last applied log"
);
assert_eq!(
Vote::new(1, NODE_ID.into()),
Vote::new(1u64.into(), NODE_ID.into()),
*initial.vote_ref(),
"unexpected value for default hard state"
);
Expand Down Expand Up @@ -632,8 +633,8 @@ where
}

pub async fn get_initial_state_log_ids(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
let log_id = |t, n: u64, i| LogId::<C> {
leader_id: CommittedLeaderId::new(t, n.into()),
let log_id = |t: u64, n: u64, i| LogId::<C> {
leader_id: CommittedLeaderId::<C>::new(t.into(), n.into()),
index: i,
};

Expand Down Expand Up @@ -820,11 +821,11 @@ where
}

pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
store.save_vote(&Vote::new(100, NODE_ID.into())).await?;
store.save_vote(&Vote::new(100.into(), NODE_ID.into())).await?;

let got = store.read_vote().await?;

assert_eq!(Some(Vote::new(100, NODE_ID.into())), got,);
assert_eq!(Some(Vote::new(100.into(), NODE_ID.into())), got,);
Ok(())
}

Expand Down Expand Up @@ -873,7 +874,7 @@ where
pub async fn try_get_log_entry(mut store: LS, mut sm: SM) -> Result<(), StorageError<C>> {
Self::feed_10_logs_vote_self(&mut store).await?;

store.purge(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)).await?;
store.purge(log_id(0, 0, 0)).await?;

// `purge()` does not have to do the purge at once.
// The implementation may choose to do it in the background.
Expand Down Expand Up @@ -923,10 +924,7 @@ where
store.purge(log_id_0(0, 0)).await?;

let st = store.get_log_state().await?;
assert_eq!(
Some(LogId::new(CommittedLeaderId::new(0, C::NodeId::default()), 0)),
st.last_purged_log_id
);
assert_eq!(Some(log_id(0, 0, 0)), st.last_purged_log_id);
assert_eq!(Some(log_id_0(1, 2)), st.last_log_id);
}

Expand Down Expand Up @@ -1368,33 +1366,37 @@ where
}

pub async fn default_vote(sto: &mut LS) -> Result<(), StorageError<C>> {
sto.save_vote(&Vote::new(1, NODE_ID.into())).await?;
sto.save_vote(&Vote::new(1u64.into(), NODE_ID.into())).await?;

Ok(())
}
}

/// Create a log id with node id 0 for testing.
fn log_id_0<C>(term: u64, index: u64) -> LogId<C>
fn log_id_0<C>(term: impl Into<C::Term>, index: u64) -> LogId<C>
where
C: RaftTypeConfig,
C::NodeId: From<u64>,
{
LogId {
leader_id: CommittedLeaderId::new(term, NODE_ID.into()),
leader_id: CommittedLeaderId::new(term.into(), NODE_ID.into()),
index,
}
}

/// Create a blank log entry with node_id 0 for test.
fn blank_ent_0<C: RaftTypeConfig>(term: u64, index: u64) -> C::Entry
where C::NodeId: From<u64> {
C::Entry::new_blank(log_id_0(term, index))
where
C::Term: From<u64>,
C::NodeId: From<u64>,
{
C::Entry::new_blank(log_id(term, 0, index))
}

/// Create a membership entry with node_id 0 for test.
fn membership_ent_0<C: RaftTypeConfig>(term: u64, index: u64, bs: BTreeSet<C::NodeId>) -> C::Entry
fn membership_ent_0<C>(term: impl Into<C::Term>, index: u64, bs: BTreeSet<C::NodeId>) -> C::Entry
where
C: RaftTypeConfig,
C::NodeId: From<u64>,
C::Node: Default,
{
Expand Down Expand Up @@ -1453,3 +1455,15 @@ where
}
Ok(())
}

fn log_id<C>(term: u64, node_id: u64, index: u64) -> LogId<C>
where
C: RaftTypeConfig,
C::Term: From<u64>,
C::NodeId: From<u64>,
{
LogId {
leader_id: CommittedLeaderId::new(term.into(), node_id.into()),
index,
}
}
13 changes: 13 additions & 0 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub use util::TypeConfigExt;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
use crate::raft::responder::Responder;
use crate::vote::raft_term::RaftTerm;
use crate::AppData;
use crate::AppDataResponse;
use crate::Node;
Expand Down Expand Up @@ -43,6 +44,7 @@ use crate::OptionalSync;
/// R = ClientResponse,
/// NodeId = u64,
/// Node = openraft::BasicNode,
/// Term = u64,
/// Entry = openraft::Entry<TypeConfig>,
/// SnapshotData = Cursor<Vec<u8>>,
/// AsyncRuntime = openraft::TokioRuntime,
Expand All @@ -67,6 +69,16 @@ pub trait RaftTypeConfig:
/// Raft log entry, which can be built from an AppData.
type Entry: RaftEntry<Self> + FromAppData<Self::D>;

/// Type representing a Raft term number.
///
/// A term is a logical clock in Raft that is used to detect obsolete information,
/// such as old leaders. It must be totally ordered and monotonically increasing.
///
/// Common implementations are provided for standard integer types like `u64`, `i64` etc.
///
/// See: [`RaftTerm`] for the required methods.
type Term: RaftTerm;

/// Snapshot data for exposing a snapshot for reading & writing.
///
/// See the [storage chapter of the guide][sto] for details on log compaction / snapshotting.
Expand Down Expand Up @@ -106,6 +118,7 @@ pub mod alias {
pub type ROf<C> = <C as RaftTypeConfig>::R;
pub type NodeIdOf<C> = <C as RaftTypeConfig>::NodeId;
pub type NodeOf<C> = <C as RaftTypeConfig>::Node;
pub type TermOf<C> = <C as RaftTypeConfig>::Term;
pub type EntryOf<C> = <C as RaftTypeConfig>::Entry;
pub type SnapshotDataOf<C> = <C as RaftTypeConfig>::SnapshotData;
pub type AsyncRuntimeOf<C> = <C as RaftTypeConfig>::AsyncRuntime;
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/vote/leader_id/leader_id_adv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ use crate::RaftTypeConfig;
pub struct LeaderId<C>
where C: RaftTypeConfig
{
pub term: u64,
pub term: C::Term,
pub node_id: C::NodeId,
}

impl<C> LeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
Self { term, node_id }
}

pub fn get_term(&self) -> u64 {
pub fn get_term(&self) -> C::Term {
self.term
}

Expand Down
14 changes: 8 additions & 6 deletions openraft/src/vote/leader_id/leader_id_std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::RaftTypeConfig;
pub struct LeaderId<C>
where C: RaftTypeConfig
{
pub term: u64,
pub term: C::Term,

pub voted_for: Option<C::NodeId>,
}
Expand Down Expand Up @@ -44,14 +44,14 @@ where C: RaftTypeConfig
impl<C> LeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
Self {
term,
voted_for: Some(node_id),
}
}

pub fn get_term(&self) -> u64 {
pub fn get_term(&self) -> C::Term {
self.term
}

Expand Down Expand Up @@ -84,8 +84,10 @@ where C: RaftTypeConfig
#[derive(PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))]
#[cfg_attr(feature = "serde", serde(transparent))]
pub struct CommittedLeaderId<C> {
pub term: u64,
pub struct CommittedLeaderId<C>
where C: RaftTypeConfig
{
pub term: C::Term,
p: PhantomData<C>,
}

Expand All @@ -100,7 +102,7 @@ where C: RaftTypeConfig
impl<C> CommittedLeaderId<C>
where C: RaftTypeConfig
{
pub fn new(term: u64, node_id: C::NodeId) -> Self {
pub fn new(term: C::Term, node_id: C::NodeId) -> Self {
let _ = node_id;
Self { term, p: PhantomData }
}
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/vote/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ pub use leader_id::CommittedLeaderId;
pub use leader_id::LeaderId;
pub(crate) use non_committed::NonCommittedVote;
pub use vote::Vote;

pub mod raft_term;
Loading

0 comments on commit 004e3f8

Please sign in to comment.