From a13b95a923f0e54da518eb69daa8bd9c31ab0222 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Thu, 2 Jan 2025 16:56:34 +0800 Subject: [PATCH] Feature: Abstract `Vote` impl CompareByKey for RaftVote implementations RaftTypeConfig: add associated type Vote impl RaftVote for Vote; add C to CompareByKey - Part of #1278 --- openraft/src/base/cmp.rs | 36 ++++++ openraft/src/base/mod.rs | 2 + openraft/src/core/raft_core.rs | 4 +- openraft/src/core/tick.rs | 3 +- openraft/src/engine/engine_impl.rs | 17 +-- .../engine/handler/establish_handler/mod.rs | 6 +- .../following_handler/append_entries_test.rs | 12 +- .../following_handler/commit_entries_test.rs | 1 + .../do_append_entries_test.rs | 1 + .../install_snapshot_test.rs | 12 +- .../leader_handler/append_entries_test.rs | 1 + .../leader_handler/send_heartbeat_test.rs | 1 + .../engine/handler/replication_handler/mod.rs | 1 + .../vote_handler/become_leader_test.rs | 1 + .../src/engine/handler/vote_handler/mod.rs | 11 +- openraft/src/engine/testing.rs | 3 +- .../src/engine/tests/append_entries_test.rs | 1 + .../src/engine/tests/handle_vote_resp_test.rs | 1 + openraft/src/engine/tests/initialize_test.rs | 1 + .../tests/install_full_snapshot_test.rs | 3 +- openraft/src/engine/tests/startup_test.rs | 1 + openraft/src/impls/mod.rs | 8 ++ openraft/src/metrics/metric.rs | 3 +- openraft/src/metrics/raft_metrics.rs | 3 +- openraft/src/metrics/wait_test.rs | 2 +- openraft/src/network/snapshot_transport.rs | 3 +- openraft/src/proposer/candidate.rs | 2 + openraft/src/proposer/leader.rs | 5 +- openraft/src/raft/declare_raft_types_test.rs | 1 + openraft/src/raft/mod.rs | 8 +- openraft/src/raft_state/io_state/io_id.rs | 6 +- openraft/src/raft_state/mod.rs | 12 +- openraft/src/replication/mod.rs | 8 +- .../src/storage/v2/raft_log_storage_ext.rs | 3 +- openraft/src/testing/log/suite.rs | 17 +-- openraft/src/type_config.rs | 9 +- openraft/src/vote/committed.rs | 42 ++++--- openraft/src/vote/mod.rs | 5 +- openraft/src/vote/non_committed.rs | 35 ++++-- openraft/src/vote/raft_vote.rs | 118 ++++++++++++++++++ openraft/src/vote/ref_vote.rs | 3 +- openraft/src/vote/vote.rs | 65 +++------- tests/tests/fixtures/mod.rs | 3 +- 43 files changed, 345 insertions(+), 135 deletions(-) create mode 100644 openraft/src/base/cmp.rs create mode 100644 openraft/src/vote/raft_vote.rs diff --git a/openraft/src/base/cmp.rs b/openraft/src/base/cmp.rs new file mode 100644 index 000000000..f775f58ad --- /dev/null +++ b/openraft/src/base/cmp.rs @@ -0,0 +1,36 @@ +/// A trait for types that can be compared via a key. +/// +/// Types implementing this trait define how they should be compared by providing a key +/// that implements [`PartialOrd`]. +/// +/// OpenRaft uses this trait to compare types that may not be [`PartialOrd`] themselves. +/// +/// # Type Parameters +/// - `Key<'k>`: The type of the comparison key, which must be partially ordered and must not out +/// live the value. +/// +/// # Examples +/// ``` +/// # use openraft::base::cmp::CompareByKey; +/// +/// struct Person { +/// name: String, +/// age: u32, +/// } +/// +/// impl CompareByKey<()> for Person { +/// type Key<'k> = &'k str; +/// +/// fn cmp_key(&self) -> Self::Key<'_> { +/// &self.name +/// } +/// } +/// ``` +pub(crate) trait CompareByKey { + /// The key type used for comparison. + type Key<'k>: PartialOrd + 'k + where Self: 'k; + + /// Returns the key used for comparing this value. + fn cmp_key(&self) -> Self::Key<'_>; +} diff --git a/openraft/src/base/mod.rs b/openraft/src/base/mod.rs index 43d621ab8..f4784a492 100644 --- a/openraft/src/base/mod.rs +++ b/openraft/src/base/mod.rs @@ -1,5 +1,7 @@ //! Basic types used in the Raft implementation. +pub(crate) mod cmp; + pub use serde_able::OptionalSerde; pub use threaded::BoxAny; pub use threaded::BoxAsyncOnceMut; diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 05019efa1..001a65f18 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -95,8 +95,10 @@ use crate::type_config::async_runtime::MpscUnboundedReceiver; use crate::type_config::TypeConfigExt; use crate::vote::committed::CommittedVote; use crate::vote::non_committed::NonCommittedVote; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::vote_status::VoteStatus; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::ChangeMembers; use crate::Instant; use crate::LogId; @@ -392,7 +394,7 @@ where // request. if let AppendEntriesResponse::HigherVote(vote) = append_res { debug_assert!( - vote > my_vote, + vote.as_ref_vote() > my_vote.as_ref_vote(), "committed vote({}) has total order relation with other votes({})", my_vote, vote diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 425d0b7d2..1b098dd6e 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -181,7 +181,8 @@ mod tests { type Node = (); type Term = u64; type LeaderId = crate::impls::leader_id_adv::LeaderId; - type Entry = crate::Entry; + type Vote = crate::impls::Vote; + type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 071e5de5a..1cbacacc1 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -2,6 +2,8 @@ use std::time::Duration; use validit::Valid; +use crate::alias::LeaderIdOf; +use crate::base::cmp::CompareByKey; use crate::core::raft_msg::AppendEntriesTx; use crate::core::raft_msg::ResultSender; use crate::core::sm; @@ -48,14 +50,15 @@ use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderId; use crate::vote::RaftTerm; +use crate::vote::RaftVote; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; use crate::RaftLogId; use crate::RaftTypeConfig; -use crate::Vote; /// Raft protocol algorithm. /// @@ -195,10 +198,7 @@ where C: RaftTypeConfig self.check_members_contain_me(m)?; // FollowingHandler requires vote to be committed. - let vote = Vote { - committed: true, - ..Default::default() - }; + let vote = as RaftVote>::from_leader_id(Default::default(), true); self.state.vote.update(C::now(), Duration::default(), vote); self.following_handler().do_append_entries(vec![entry]); @@ -211,8 +211,9 @@ where C: RaftTypeConfig /// Start to elect this node as leader #[tracing::instrument(level = "debug", skip(self))] pub(crate) fn elect(&mut self) { - let new_term = self.state.vote.leader_id().term().next(); - let new_vote = Vote::new(new_term, self.config.id.clone()); + let new_term = self.state.vote.leader_id_ref().term().next(); + let leader_id = LeaderIdOf::::new(new_term, self.config.id.clone()); + let new_vote = VoteOf::::from_leader_id(leader_id, false); let candidate = self.new_candidate(new_vote.clone()); @@ -754,7 +755,7 @@ where C: RaftTypeConfig }; debug_assert!( - leader.committed_vote_ref().clone().into_vote() >= *self.state.vote_ref(), + leader.committed_vote_ref().cmp_key() >= self.state.vote_ref().cmp_key(), "leader.vote({}) >= state.vote({})", leader.committed_vote_ref(), self.state.vote_ref() diff --git a/openraft/src/engine/handler/establish_handler/mod.rs b/openraft/src/engine/handler/establish_handler/mod.rs index 3cf3879fb..fac7e91c3 100644 --- a/openraft/src/engine/handler/establish_handler/mod.rs +++ b/openraft/src/engine/handler/establish_handler/mod.rs @@ -1,9 +1,11 @@ +use crate::base::cmp::CompareByKey; use crate::engine::EngineConfig; use crate::proposer::Candidate; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::proposer::LeaderState; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::RaftTypeConfig; /// Establish a leader for the Engine, when Candidate finishes voting stage. @@ -25,14 +27,14 @@ where C: RaftTypeConfig let vote = candidate.vote_ref().clone(); debug_assert_eq!( - vote.leader_id().node_id_ref(), + vote.leader_id_ref().node_id_ref(), Some(&self.config.id), "it can only commit its own vote" ); if let Some(l) = self.leader.as_ref() { #[allow(clippy::neg_cmp_op_on_partial_ord)] - if !(vote > l.committed_vote_ref().clone().into_vote()) { + if !(vote.cmp_key() > l.committed_vote_ref().cmp_key()) { tracing::warn!( "vote is not greater than current existing leader vote. Do not establish new leader and quit" ); diff --git a/openraft/src/engine/handler/following_handler/append_entries_test.rs b/openraft/src/engine/handler/following_handler/append_entries_test.rs index a602d930d..363301d21 100644 --- a/openraft/src/engine/handler/following_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/append_entries_test.rs @@ -12,8 +12,10 @@ use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::testing::blank_ent; use crate::testing::log_id; +use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; @@ -28,15 +30,13 @@ fn m23() -> Membership { } fn eng() -> Engine { - let mut eng = Engine::testing_default(0); + let mut eng: Engine = Engine::testing_default(0); eng.state.enable_validation(false); // Disable validation for incomplete state eng.config.id = 2; - eng.state.vote.update( - UTConfig::<()>::now(), - Duration::from_millis(500), - Vote::new_committed(2, 1), - ); + let vote = VoteOf::::new_committed(2, 1); + let now = UTConfig::<()>::now(); + eng.state.vote.update(now, Duration::from_millis(500), vote); eng.state.log_ids.append(log_id(1, 1, 1)); eng.state.log_ids.append(log_id(2, 1, 3)); eng.state.membership_state = MembershipState::new( diff --git a/openraft/src/engine/handler/following_handler/commit_entries_test.rs b/openraft/src/engine/handler/following_handler/commit_entries_test.rs index fd2e8d8dd..f51d0d4b5 100644 --- a/openraft/src/engine/handler/following_handler/commit_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/commit_entries_test.rs @@ -11,6 +11,7 @@ use crate::raft_state::LogStateReader; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; diff --git a/openraft/src/engine/handler/following_handler/do_append_entries_test.rs b/openraft/src/engine/handler/following_handler/do_append_entries_test.rs index 80fb19ec9..ce9f0b439 100644 --- a/openraft/src/engine/handler/following_handler/do_append_entries_test.rs +++ b/openraft/src/engine/handler/following_handler/do_append_entries_test.rs @@ -13,6 +13,7 @@ use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Entry; use crate::EntryPayload; diff --git a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs index 7edb5758a..acdbc937b 100644 --- a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs +++ b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs @@ -16,7 +16,9 @@ use crate::raft_state::LogStateReader; use crate::storage::Snapshot; use crate::storage::SnapshotMeta; use crate::testing::log_id; +use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Membership; use crate::StoredMembership; @@ -31,14 +33,12 @@ fn m1234() -> Membership { } fn eng() -> Engine { - let mut eng = Engine::testing_default(0); + let mut eng: Engine = Engine::testing_default(0); eng.state.enable_validation(false); // Disable validation for incomplete state - eng.state.vote.update( - UTConfig::<()>::now(), - Duration::from_millis(500), - Vote::new_committed(2, 1), - ); + let now = UTConfig::<()>::now(); + let vote = VoteOf::::new_committed(2, 1); + eng.state.vote.update(now, Duration::from_millis(500), vote); eng.state.committed = Some(log_id(4, 1, 5)); eng.state.log_ids = LogIdList::new(vec![ // diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index 18f6a8408..6a66751ac 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -23,6 +23,7 @@ use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Entry; use crate::Membership; diff --git a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs index ee184ca10..c12814220 100644 --- a/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs +++ b/openraft/src/engine/handler/leader_handler/send_heartbeat_test.rs @@ -11,6 +11,7 @@ use crate::replication::ReplicationSessionId; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Membership; use crate::MembershipState; diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index c417d46f7..75b9012d5 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -19,6 +19,7 @@ use crate::raft_state::LogStateReader; use crate::replication::request::Replicate; use crate::replication::response::ReplicationResult; use crate::type_config::alias::InstantOf; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::LogId; use crate::LogIdOptionExt; diff --git a/openraft/src/engine/handler/vote_handler/become_leader_test.rs b/openraft/src/engine/handler/vote_handler/become_leader_test.rs index 1fd4ad61b..3d4491fe8 100644 --- a/openraft/src/engine/handler/vote_handler/become_leader_test.rs +++ b/openraft/src/engine/handler/vote_handler/become_leader_test.rs @@ -18,6 +18,7 @@ use crate::testing::log_id; use crate::type_config::alias::EntryOf; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Membership; use crate::Vote; diff --git a/openraft/src/engine/handler/vote_handler/mod.rs b/openraft/src/engine/handler/vote_handler/mod.rs index 580bbecf8..76b01213a 100644 --- a/openraft/src/engine/handler/vote_handler/mod.rs +++ b/openraft/src/engine/handler/vote_handler/mod.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::time::Duration; +use crate::base::cmp::CompareByKey; use crate::core::raft_msg::ResultSender; use crate::engine::handler::leader_handler::LeaderHandler; use crate::engine::handler::replication_handler::ReplicationHandler; @@ -19,7 +20,9 @@ use crate::raft_state::IOId; use crate::raft_state::LogStateReader; use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::LogId; use crate::OptionalSend; use crate::RaftState; @@ -103,7 +106,7 @@ where C: RaftTypeConfig // Partial ord compare: // Vote does not have to be total ord. // `!(a >= b)` does not imply `a < b`. - if vote >= self.state.vote_ref() { + if vote.cmp_key() >= self.state.vote_ref().cmp_key() { // Ok } else { tracing::info!("vote {} is rejected by local vote: {}", vote, self.state.vote_ref()); @@ -123,7 +126,7 @@ where C: RaftTypeConfig Duration::default() }; - if vote > self.state.vote_ref() { + if vote.cmp_key() > self.state.vote_ref().cmp_key() { tracing::info!("vote is changing from {} to {}", self.state.vote_ref(), vote); self.state.vote.update(C::now(), leader_lease, vote.clone()); @@ -168,7 +171,7 @@ where C: RaftTypeConfig if let Some(l) = self.leader.as_mut() { tracing::debug!("leading vote: {}", l.committed_vote,); - if l.committed_vote.clone().into_vote().leader_id() == self.state.vote_ref().leader_id() { + if l.committed_vote.clone().into_vote().leader_id_ref() == self.state.vote_ref().leader_id_ref() { tracing::debug!( "vote still belongs to the same leader. Just updating vote is enough: node-{}, {}", self.config.id, @@ -177,7 +180,7 @@ where C: RaftTypeConfig // TODO: this is not gonna happen, // because `self.leader`(previous `internal_server_state`) // does not include Candidate any more. - l.committed_vote = self.state.vote_ref().clone().into_committed(); + l.committed_vote = self.state.vote_ref().to_committed(); self.server_state_handler().update_server_state_if_changed(); return; } diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index fab5b3d75..3232ecc45 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -35,9 +35,10 @@ where N: Node + Ord type R = (); type NodeId = u64; type Node = N; - type Entry = crate::Entry; type Term = u64; type LeaderId = crate::impls::leader_id_adv::LeaderId; + type Vote = crate::impls::Vote; + type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; type Responder = crate::impls::OneshotResponder; diff --git a/openraft/src/engine/tests/append_entries_test.rs b/openraft/src/engine/tests/append_entries_test.rs index a140b9a9e..f3c440592 100644 --- a/openraft/src/engine/tests/append_entries_test.rs +++ b/openraft/src/engine/tests/append_entries_test.rs @@ -17,6 +17,7 @@ use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Entry; use crate::Membership; diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index ce22fee8d..1ec1d628d 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -20,6 +20,7 @@ use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Entry; use crate::Membership; diff --git a/openraft/src/engine/tests/initialize_test.rs b/openraft/src/engine/tests/initialize_test.rs index 3911d51ed..bce6ca47f 100644 --- a/openraft/src/engine/tests/initialize_test.rs +++ b/openraft/src/engine/tests/initialize_test.rs @@ -17,6 +17,7 @@ use crate::raft_state::LogStateReader; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::Entry; use crate::LogId; use crate::Membership; diff --git a/openraft/src/engine/tests/install_full_snapshot_test.rs b/openraft/src/engine/tests/install_full_snapshot_test.rs index 0ed2b7d7d..e86be7b87 100644 --- a/openraft/src/engine/tests/install_full_snapshot_test.rs +++ b/openraft/src/engine/tests/install_full_snapshot_test.rs @@ -17,6 +17,7 @@ use crate::storage::Snapshot; use crate::storage::SnapshotMeta; use crate::testing::log_id; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::Membership; use crate::StoredMembership; use crate::Vote; @@ -30,7 +31,7 @@ fn m1234() -> Membership { } fn eng() -> Engine { - let mut eng = Engine::testing_default(0); + let mut eng: Engine = Engine::testing_default(0); eng.state.enable_validation(false); // Disable validation for incomplete state eng.state.vote.update( diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 4c52a0fd4..7c9a3621a 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -18,6 +18,7 @@ use crate::replication::request::Replicate; use crate::testing::log_id; use crate::type_config::TypeConfigExt; use crate::utime::Leased; +use crate::vote::raft_vote::RaftVoteExt; use crate::EffectiveMembership; use crate::Entry; use crate::Membership; diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index 950a756de..514640898 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -16,3 +16,11 @@ pub mod leader_id_adv { pub mod leader_id_std { pub use crate::vote::leader_id::leader_id_std::LeaderId; } + +/// Default [`RaftVote`] implementation for both standard Raft mode and multi-leader-per-term mode. +/// +/// The difference between the two modes is the implementation of [`RaftLeaderId`]. +/// +/// [`RaftVote`]: crate::vote::raft_vote::RaftVote +/// [`RaftLeaderId`]: crate::vote::RaftLeaderId +pub use crate::vote::Vote; diff --git a/openraft/src/metrics/metric.rs b/openraft/src/metrics/metric.rs index d1cb79fd8..0b0d0c4f2 100644 --- a/openraft/src/metrics/metric.rs +++ b/openraft/src/metrics/metric.rs @@ -1,5 +1,6 @@ use std::cmp::Ordering; +use crate::base::cmp::CompareByKey; use crate::metrics::metric_display::MetricDisplay; use crate::type_config::alias::VoteOf; use crate::LogId; @@ -67,7 +68,7 @@ where C: RaftTypeConfig fn partial_cmp(&self, other: &Metric) -> Option { match other { Metric::Term(v) => Some(self.current_term.cmp(v)), - Metric::Vote(v) => self.vote.partial_cmp(v), + Metric::Vote(v) => self.vote.cmp_key().partial_cmp(&v.cmp_key()), Metric::LastLogIndex(v) => Some(self.last_log_index.cmp(v)), Metric::Applied(v) => Some(self.last_applied.cmp(v)), Metric::AppliedIndex(v) => Some(self.last_applied.index().cmp(v)), diff --git a/openraft/src/metrics/raft_metrics.rs b/openraft/src/metrics/raft_metrics.rs index 929bb545c..cbe18aa71 100644 --- a/openraft/src/metrics/raft_metrics.rs +++ b/openraft/src/metrics/raft_metrics.rs @@ -15,7 +15,6 @@ use crate::Instant; use crate::LogId; use crate::RaftTypeConfig; use crate::StoredMembership; -use crate::Vote; /// A set of metrics describing the current state of a Raft node. #[derive(Clone, Debug, PartialEq, Eq)] @@ -169,7 +168,7 @@ where C: RaftTypeConfig id, current_term: Default::default(), - vote: Vote::default(), + vote: Default::default(), last_log_index: None, last_applied: None, snapshot: None, diff --git a/openraft/src/metrics/wait_test.rs b/openraft/src/metrics/wait_test.rs index 83d291954..e8ad778fe 100644 --- a/openraft/src/metrics/wait_test.rs +++ b/openraft/src/metrics/wait_test.rs @@ -252,7 +252,7 @@ where C: RaftTypeConfig { id: NodeIdOf::::default(), state: ServerState::Learner, current_term: Default::default(), - vote: Vote::default(), + vote: Default::default(), last_log_index: None, last_applied: None, purged: None, diff --git a/openraft/src/network/snapshot_transport.rs b/openraft/src/network/snapshot_transport.rs index 720eb8385..04f930e15 100644 --- a/openraft/src/network/snapshot_transport.rs +++ b/openraft/src/network/snapshot_transport.rs @@ -18,6 +18,7 @@ mod tokio_rt { use super::Chunked; use super::SnapshotTransport; use super::Streaming; + use crate::base::cmp::CompareByKey; use crate::error::Fatal; use crate::error::InstallSnapshotError; use crate::error::RPCError; @@ -148,7 +149,7 @@ mod tokio_rt { } }; - if resp.vote > vote { + if resp.vote.cmp_key() > vote.cmp_key() { // Unfinished, return a response with a higher vote. // The caller checks the vote and return a HigherVote error. return Ok(SnapshotResponse::new(resp.vote)); diff --git a/openraft/src/proposer/candidate.rs b/openraft/src/proposer/candidate.rs index 4b1582276..493ad6b81 100644 --- a/openraft/src/proposer/candidate.rs +++ b/openraft/src/proposer/candidate.rs @@ -10,6 +10,8 @@ use crate::quorum::QuorumSet; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::VoteOf; +use crate::vote::raft_vote::RaftVoteExt; +use crate::vote::RaftVote; use crate::LogId; use crate::RaftTypeConfig; diff --git a/openraft/src/proposer/leader.rs b/openraft/src/proposer/leader.rs index f9113b674..fc83b97c8 100644 --- a/openraft/src/proposer/leader.rs +++ b/openraft/src/proposer/leader.rs @@ -10,7 +10,7 @@ use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::type_config::TypeConfigExt; use crate::vote::committed::CommittedVote; -use crate::vote::RaftLeaderId; +use crate::vote::raft_vote::RaftVoteExt; use crate::LogId; use crate::LogIdOptionExt; use crate::RaftLogId; @@ -203,7 +203,7 @@ where // Thus vote.voted_for() is this node. // Safe unwrap: voted_for() is always non-None in Openraft - let node_id = self.committed_vote.clone().into_vote().leader_id().node_id_ref().cloned().unwrap(); + let node_id = self.committed_vote.leader_node_id().unwrap(); let now = C::now(); tracing::debug!( @@ -233,6 +233,7 @@ mod tests { use crate::testing::blank_ent; use crate::testing::log_id; use crate::type_config::TypeConfigExt; + use crate::vote::raft_vote::RaftVoteExt; use crate::Entry; use crate::RaftLogId; use crate::Vote; diff --git a/openraft/src/raft/declare_raft_types_test.rs b/openraft/src/raft/declare_raft_types_test.rs index aeece573e..6c95fdd94 100644 --- a/openraft/src/raft/declare_raft_types_test.rs +++ b/openraft/src/raft/declare_raft_types_test.rs @@ -20,6 +20,7 @@ declare_raft_types!( Term = u64, LeaderId = crate::impls::leader_id_std::LeaderId, Entry = crate::Entry, + Vote = crate::impls::Vote, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime, Responder = crate::impls::OneshotResponder, diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 77824c59f..f4812a6d6 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -46,6 +46,7 @@ use tracing::Level; use crate::async_runtime::watch::WatchReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; +use crate::base::cmp::CompareByKey; use crate::base::BoxAsyncOnceMut; use crate::base::BoxFuture; use crate::base::BoxOnce; @@ -116,6 +117,7 @@ use crate::StorageHelper; /// Node = openraft::BasicNode, /// Term = u64, /// LeaderId = openraft::impls::leader_id_adv::LeaderId, +/// Vote = openraft::impls::Vote, /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// Responder = openraft::impls::OneshotResponder, @@ -130,6 +132,7 @@ use crate::StorageHelper; /// - `Node`: `::openraft::impls::BasicNode` /// - `Term`: `u64` /// - `LeaderId`: `::openraft::impls::leader_id_adv::LeaderId` +/// - `Vote`: `::openraft::impls::Vote` /// - `Entry`: `::openraft::impls::Entry` /// - `SnapshotData`: `Cursor>` /// - `Responder`: `::openraft::impls::OneshotResponder` @@ -178,6 +181,7 @@ macro_rules! declare_raft_types { (Node , , $crate::impls::BasicNode ), (Term , , u64 ), (LeaderId , , $crate::impls::leader_id_adv::LeaderId ), + (Vote , , $crate::impls::Vote ), (Entry , , $crate::impls::Entry ), (SnapshotData , , std::io::Cursor> ), (Responder , , $crate::impls::OneshotResponder ), @@ -468,7 +472,7 @@ where C: RaftTypeConfig // It is not mandatory because it is just a read operation // but prevent unnecessary snapshot transfer early. { - if req_vote >= my_vote { + if req_vote.cmp_key() >= my_vote.cmp_key() { // Ok } else { tracing::info!("vote {} is rejected by local vote: {}", req_vote, my_vote); @@ -687,7 +691,7 @@ where C: RaftTypeConfig // Condition failed to become Leader #[allow(clippy::neg_cmp_op_on_partial_ord)] - let fail = |m: &RaftMetrics| !(req.from_leader >= m.vote); + let fail = |m: &RaftMetrics| !(req.from_leader.cmp_key() >= m.vote.cmp_key()); let timeout = Some(Duration::from_millis(self.inner.config.election_timeout_min)); let metrics_res = diff --git a/openraft/src/raft_state/io_state/io_id.rs b/openraft/src/raft_state/io_state/io_id.rs index 93d4e9bd0..d2fa43c96 100644 --- a/openraft/src/raft_state/io_state/io_id.rs +++ b/openraft/src/raft_state/io_state/io_id.rs @@ -5,7 +5,9 @@ use crate::raft_state::io_state::log_io_id::LogIOId; use crate::type_config::alias::VoteOf; use crate::vote::committed::CommittedVote; use crate::vote::non_committed::NonCommittedVote; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::ref_vote::RefVote; +use crate::vote::RaftVote; use crate::ErrorSubject; use crate::ErrorVerb; use crate::LogId; @@ -70,9 +72,9 @@ where C: RaftTypeConfig { pub(crate) fn new(vote: &VoteOf) -> Self { if vote.is_committed() { - Self::new_log_io(vote.clone().into_committed(), None) + Self::new_log_io(vote.to_committed(), None) } else { - Self::new_vote_io(vote.clone().into_non_committed()) + Self::new_vote_io(vote.to_non_committed()) } } diff --git a/openraft/src/raft_state/mod.rs b/openraft/src/raft_state/mod.rs index a228cbaa7..3747355bd 100644 --- a/openraft/src/raft_state/mod.rs +++ b/openraft/src/raft_state/mod.rs @@ -13,7 +13,6 @@ use crate::LogId; use crate::LogIdOptionExt; use crate::RaftTypeConfig; use crate::ServerState; -use crate::Vote; pub(crate) mod io_state; mod log_state_reader; @@ -36,13 +35,16 @@ pub(crate) use log_state_reader::LogStateReader; pub use membership_state::MembershipState; pub(crate) use vote_state_reader::VoteStateReader; +use crate::base::cmp::CompareByKey; use crate::display_ext::DisplayOptionExt; use crate::proposer::Leader; use crate::proposer::LeaderQuorumSet; use crate::type_config::alias::InstantOf; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::VoteOf; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; /// A struct used to represent the raft state which a Raft node needs. #[derive(Clone, Debug)] @@ -206,7 +208,7 @@ where C: RaftTypeConfig } // If it received a request-vote from other node, it is already initialized. - if self.vote_ref() != &Vote::default() { + if self.vote_ref() != &VoteOf::::default() { return true; } @@ -237,7 +239,7 @@ where C: RaftTypeConfig let new_vote = accepted.to_vote(); let current_vote = curr_accepted.clone().map(|io_id| io_id.to_vote()); assert!( - Some(&new_vote) >= current_vote.as_ref(), + Some(new_vote.cmp_key()) >= current_vote.as_ref().map(|x| x.cmp_key()), "new accepted.committed_vote {} must be >= current accepted.committed_vote: {}", new_vote, current_vote.display(), @@ -369,7 +371,7 @@ where C: RaftTypeConfig /// /// [Determine Server State]: crate::docs::data::vote#vote-and-membership-define-the-server-state pub(crate) fn is_leading(&self, id: &C::NodeId) -> bool { - self.membership_state.contains(id) && self.vote.leader_id().node_id_ref() == Some(id) + self.membership_state.contains(id) && self.vote.leader_node_id_ref() == Some(id) } /// The node is leader @@ -396,7 +398,7 @@ where C: RaftTypeConfig let last_leader_log_ids = self.log_ids.by_last_leader(); Leader::new( - self.vote_ref().clone().into_committed(), + self.vote_ref().to_committed(), em.to_quorum_set(), em.learner_ids(), last_leader_log_ids, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index aa27e7c40..fff07a4c5 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -21,6 +21,7 @@ use tracing_futures::Instrument; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::MpscUnboundedWeakSender; +use crate::base::cmp::CompareByKey; use crate::config::Config; use crate::core::notification::Notification; use crate::core::sm::handle::SnapshotReader; @@ -58,6 +59,7 @@ use crate::type_config::alias::VoteOf; use crate::type_config::async_runtime::mutex::Mutex; use crate::type_config::TypeConfigExt; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::LogId; use crate::RaftLogId; use crate::RaftNetworkFactory; @@ -443,7 +445,7 @@ where let append_res = res.map_err(|_e| { let to = Timeout { action: RPCTypes::AppendEntries, - id: self.session_id.vote().leader_id().node_id_ref().cloned().unwrap(), + id: self.session_id.vote().leader_id_ref().node_id_ref().cloned().unwrap(), target: self.target.clone(), timeout: the_timeout, }; @@ -484,7 +486,7 @@ where } AppendEntriesResponse::HigherVote(vote) => { debug_assert!( - vote > self.session_id.vote(), + vote.cmp_key() > self.session_id.vote().cmp_key(), "higher vote({}) should be greater than leader's vote({})", vote, self.session_id.vote(), @@ -803,7 +805,7 @@ where // Handle response conditions. let sender_vote = self.session_id.vote(); - if resp.vote > sender_vote { + if resp.vote.cmp_key() > sender_vote.cmp_key() { return Err(ReplicationError::HigherVote(HigherVote { higher: resp.vote, sender_vote, diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index 5ce52bdd0..4165dca1c 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -1,5 +1,6 @@ use openraft_macros::add_async_trait; +use crate::alias::VoteOf; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::core::notification::Notification; @@ -7,8 +8,8 @@ use crate::log_id::RaftLogId; use crate::raft_state::io_state::io_id::IOId; use crate::storage::IOFlushed; use crate::storage::RaftLogStorage; -use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::OptionalSend; use crate::RaftTypeConfig; use crate::StorageError; diff --git a/openraft/src/testing/log/suite.rs b/openraft/src/testing/log/suite.rs index 21fd00f2f..b38542ce3 100644 --- a/openraft/src/testing/log/suite.rs +++ b/openraft/src/testing/log/suite.rs @@ -25,6 +25,7 @@ use crate::storage::StorageHelper; use crate::testing::log::StoreBuilder; use crate::type_config::alias::VoteOf; use crate::type_config::TypeConfigExt; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderIdExt; use crate::LogId; use crate::Membership; @@ -466,11 +467,11 @@ where want.vote.update( initial.vote.last_update().unwrap(), Duration::default(), - Vote::default(), + VoteOf::::default(), ); - want.io_state.io_progress.accept(IOId::new(&Vote::default())); - want.io_state.io_progress.submit(IOId::new(&Vote::default())); - want.io_state.io_progress.flush(IOId::new(&Vote::default())); + want.io_state.io_progress.accept(IOId::new(&VoteOf::::default())); + want.io_state.io_progress.submit(IOId::new(&VoteOf::::default())); + want.io_state.io_progress.flush(IOId::new(&VoteOf::::default())); assert_eq!(want, initial, "uninitialized state"); Ok(()) @@ -501,7 +502,7 @@ where "unexpected value for last applied log" ); assert_eq!( - Vote::new(1u64.into(), NODE_ID.into()), + VoteOf::::from_term_node_id(1u64.into(), NODE_ID.into()), *initial.vote_ref(), "unexpected value for default hard state" ); @@ -822,11 +823,11 @@ where } pub async fn save_vote(mut store: LS, mut sm: SM) -> Result<(), StorageError> { - store.save_vote(&Vote::new(100.into(), NODE_ID.into())).await?; + store.save_vote(&VoteOf::::from_term_node_id(100.into(), NODE_ID.into())).await?; let got = store.read_vote().await?; - assert_eq!(Some(Vote::new(100.into(), NODE_ID.into())), got,); + assert_eq!(Some(VoteOf::::from_term_node_id(100.into(), NODE_ID.into())), got,); Ok(()) } @@ -1367,7 +1368,7 @@ where } pub async fn default_vote(sto: &mut LS) -> Result<(), StorageError> { - sto.save_vote(&Vote::new(1u64.into(), NODE_ID.into())).await?; + sto.save_vote(&VoteOf::::from_term_node_id(1u64.into(), NODE_ID.into())).await?; Ok(()) } diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 00e5ce3f9..c95f4df53 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -16,6 +16,7 @@ pub use util::TypeConfigExt; use crate::entry::FromAppData; use crate::entry::RaftEntry; use crate::raft::responder::Responder; +use crate::vote::raft_vote::RaftVote; use crate::vote::RaftLeaderId; use crate::vote::RaftTerm; use crate::AppData; @@ -47,6 +48,7 @@ use crate::OptionalSync; /// Node = openraft::BasicNode, /// Term = u64, /// LeaderId = openraft::impls::leader_id_adv::LeaderId, +/// Vote = openraft::impls::Vote, /// Entry = openraft::impls::Entry, /// SnapshotData = Cursor>, /// AsyncRuntime = openraft::TokioRuntime, @@ -81,6 +83,11 @@ pub trait RaftTypeConfig: /// A Leader identifier in a cluster. type LeaderId: RaftLeaderId; + /// Raft vote type. + /// + /// It represents a candidate's vote or a leader's vote that has been granted by a quorum. + type Vote: RaftVote; + /// Raft log entry, which can be built from an AppData. type Entry: RaftEntry + FromAppData; @@ -126,6 +133,7 @@ pub mod alias { pub type NodeOf = ::Node; pub type TermOf = ::Term; pub type LeaderIdOf = ::LeaderId; + pub type VoteOf = ::Vote; pub type EntryOf = ::Entry; pub type SnapshotDataOf = ::SnapshotData; pub type AsyncRuntimeOf = ::AsyncRuntime; @@ -172,7 +180,6 @@ pub mod alias { // Usually used types pub type LogIdOf = crate::LogId; - pub type VoteOf = crate::Vote; pub type CommittedLeaderIdOf = as RaftLeaderId>::Committed; pub type SerdeInstantOf = crate::metrics::SerdeInstant>; } diff --git a/openraft/src/vote/committed.rs b/openraft/src/vote/committed.rs index 07f4745f2..b64ccb2b5 100644 --- a/openraft/src/vote/committed.rs +++ b/openraft/src/vote/committed.rs @@ -1,22 +1,25 @@ use std::cmp::Ordering; use std::fmt; +use crate::alias::LeaderIdOf; use crate::type_config::alias::CommittedLeaderIdOf; use crate::type_config::alias::VoteOf; -use crate::vote::ref_vote::RefVote; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::RaftTypeConfig; /// Represents a committed Vote that has been accepted by a quorum. /// /// The inner `Vote`'s attribute `committed` is always set to `true` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] #[derive(PartialEq, Eq)] #[derive(PartialOrd)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub(crate) struct CommittedVote where C: RaftTypeConfig { - vote: VoteOf, + leader_id: LeaderIdOf, } /// The `CommittedVote` is totally ordered. @@ -30,28 +33,23 @@ impl Ord for CommittedVote where C: RaftTypeConfig { fn cmp(&self, other: &Self) -> Ordering { - self.vote.partial_cmp(&other.vote).unwrap() + self.as_ref_vote().partial_cmp(&other.as_ref_vote()).unwrap() } } impl CommittedVote where C: RaftTypeConfig { - pub(crate) fn new(mut vote: VoteOf) -> Self { - vote.committed = true; - Self { vote } + pub(crate) fn new(leader_id: LeaderIdOf) -> Self { + Self { leader_id } } pub(crate) fn committed_leader_id(&self) -> CommittedLeaderIdOf { - self.vote.leader_id().to_committed() + self.leader_id_ref().to_committed() } pub(crate) fn into_vote(self) -> VoteOf { - self.vote - } - - pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C> { - RefVote::new(&self.vote.leader_id, true) + VoteOf::::from_leader_id(self.leader_id, true) } } @@ -59,6 +57,22 @@ impl fmt::Display for CommittedVote where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.vote.fmt(f) + self.as_ref_vote().fmt(f) + } +} + +impl RaftVote for CommittedVote +where C: RaftTypeConfig +{ + fn from_leader_id(_leader_id: C::LeaderId, _committed: bool) -> Self { + unimplemented!() + } + + fn leader_id_ref(&self) -> &LeaderIdOf { + &self.leader_id + } + + fn is_committed(&self) -> bool { + true } } diff --git a/openraft/src/vote/mod.rs b/openraft/src/vote/mod.rs index 0d094f90d..e11fbda34 100644 --- a/openraft/src/vote/mod.rs +++ b/openraft/src/vote/mod.rs @@ -3,17 +3,18 @@ pub(crate) mod committed; pub(crate) mod leader_id; pub(crate) mod non_committed; +pub(crate) mod raft_term; +pub(crate) mod raft_vote; pub(crate) mod ref_vote; #[allow(clippy::module_inception)] mod vote; pub(crate) mod vote_status; -pub(crate) mod raft_term; - pub use leader_id::raft_committed_leader_id::RaftCommittedLeaderId; pub use leader_id::raft_leader_id::RaftLeaderId; pub use leader_id::raft_leader_id::RaftLeaderIdExt; pub use raft_term::RaftTerm; +pub use raft_vote::RaftVote; pub use self::leader_id::leader_id_adv; pub use self::leader_id::leader_id_cmp::LeaderIdCompare; diff --git a/openraft/src/vote/non_committed.rs b/openraft/src/vote/non_committed.rs index 06195fd6e..c2ec70476 100644 --- a/openraft/src/vote/non_committed.rs +++ b/openraft/src/vote/non_committed.rs @@ -2,39 +2,48 @@ use std::fmt; use crate::type_config::alias::LeaderIdOf; use crate::type_config::alias::VoteOf; -use crate::vote::ref_vote::RefVote; +use crate::vote::raft_vote::RaftVoteExt; +use crate::vote::RaftVote; use crate::RaftTypeConfig; /// Represents a non-committed Vote that has **NOT** been granted by a quorum. /// /// The inner `Vote`'s attribute `committed` is always set to `false` -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Default)] #[derive(PartialEq, Eq)] #[derive(PartialOrd)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub(crate) struct NonCommittedVote where C: RaftTypeConfig { - vote: VoteOf, + leader_id: LeaderIdOf, } impl NonCommittedVote where C: RaftTypeConfig { - pub(crate) fn new(vote: VoteOf) -> Self { - debug_assert!(!vote.committed); - Self { vote } + pub(crate) fn new(leader_id: LeaderIdOf) -> Self { + Self { leader_id } } - pub(crate) fn leader_id(&self) -> &LeaderIdOf { - &self.vote.leader_id + pub(crate) fn into_vote(self) -> VoteOf { + VoteOf::::from_leader_id(self.leader_id, false) } +} - pub(crate) fn into_vote(self) -> VoteOf { - self.vote +impl RaftVote for NonCommittedVote +where C: RaftTypeConfig +{ + fn from_leader_id(_leader_id: C::LeaderId, _committed: bool) -> Self { + unimplemented!() + } + + fn leader_id_ref(&self) -> &LeaderIdOf { + &self.leader_id } - pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C> { - RefVote::new(&self.vote.leader_id, false) + fn is_committed(&self) -> bool { + false } } @@ -42,6 +51,6 @@ impl fmt::Display for NonCommittedVote where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.vote.fmt(f) + self.as_ref_vote().fmt(f) } } diff --git a/openraft/src/vote/raft_vote.rs b/openraft/src/vote/raft_vote.rs new file mode 100644 index 000000000..c0caeb45f --- /dev/null +++ b/openraft/src/vote/raft_vote.rs @@ -0,0 +1,118 @@ +use std::fmt::Debug; +use std::fmt::Display; + +use crate::alias::CommittedLeaderIdOf; +use crate::base::cmp::CompareByKey; +use crate::base::OptionalFeatures; +use crate::vote::committed::CommittedVote; +use crate::vote::non_committed::NonCommittedVote; +use crate::vote::ref_vote::RefVote; +use crate::vote::vote_status::VoteStatus; +use crate::vote::RaftLeaderId; +use crate::RaftTypeConfig; +// TODO: OptionSerde can be removed after all types are made trait based. + +/// Represents a vote in Raft consensus, including both votes for leader candidates +/// and committed leader(a leader granted by a quorum). +pub trait RaftVote +where + C: RaftTypeConfig, + Self: OptionalFeatures + Eq + Clone + Debug + Display + Default + 'static, +{ + /// Create a new vote for the specified leader with optional quorum commitment + fn from_leader_id(leader_id: C::LeaderId, committed: bool) -> Self; + + /// Get a reference to this vote's LeaderId([`RaftLeaderId`] implementation) + fn leader_id_ref(&self) -> &C::LeaderId; + + /// Whether this vote has been committed by a quorum + fn is_committed(&self) -> bool; +} + +pub(crate) trait RaftVoteExt +where + C: RaftTypeConfig, + Self: RaftVote, +{ + /// Creates a new vote for a node in a specific term, with uncommitted status. + fn from_term_node_id(term: C::Term, node_id: C::NodeId) -> Self { + let leader_id = C::LeaderId::new(term, node_id); + Self::from_leader_id(leader_id, false) + } + + /// Gets the node ID of the leader this vote is for, if present. + fn leader_node_id(&self) -> Option { + self.leader_id_ref().node_id_ref().cloned() + } + + /// Gets a reference to the node ID of the leader this vote is for, if present. + + fn leader_node_id_ref(&self) -> Option<&C::NodeId> { + self.leader_id_ref().node_id_ref() + } + /// Gets the leader ID this vote is associated with. + + fn leader_id(&self) -> C::LeaderId { + self.leader_id_ref().clone() + } + + /// Creates a reference view of this vote. + /// + /// Returns a lightweight `RefVote` that borrows the data from this vote. + fn as_ref_vote(&self) -> RefVote<'_, C> { + RefVote::new(self.leader_id_ref(), self.is_committed()) + } + + /// Create a [`CommittedVote`] with the same leader id. + fn to_committed(&self) -> CommittedVote { + CommittedVote::new(self.leader_id()) + } + + /// Create a [`NonCommittedVote`] with the same leader id. + fn to_non_committed(&self) -> NonCommittedVote { + NonCommittedVote::new(self.leader_id()) + } + + /// Convert this vote into a [`CommittedVote`] + fn into_committed(self) -> CommittedVote { + CommittedVote::new(self.leader_id()) + } + + /// Convert this vote into a [`NonCommittedVote`] + fn into_non_committed(self) -> NonCommittedVote { + NonCommittedVote::new(self.leader_id()) + } + + /// Converts this vote into a [`VoteStatus`] enum based on its commitment state. + fn into_vote_status(self) -> VoteStatus { + if self.is_committed() { + VoteStatus::Committed(self.into_committed()) + } else { + VoteStatus::Pending(self.into_non_committed()) + } + } + + /// Checks if this vote is for the same leader as specified by the given committed leader ID. + fn is_same_leader(&self, leader_id: &CommittedLeaderIdOf) -> bool { + self.leader_id_ref().to_committed() == *leader_id + } +} + +impl RaftVoteExt for T +where + C: RaftTypeConfig, + T: RaftVote, +{ +} + +impl CompareByKey for T +where + C: RaftTypeConfig, + T: RaftVote, +{ + type Key<'k> = RefVote<'k, C> where Self: 'k; + + fn cmp_key(&self) -> Self::Key<'_> { + RefVote::new(self.leader_id_ref(), self.is_committed()) + } +} diff --git a/openraft/src/vote/ref_vote.rs b/openraft/src/vote/ref_vote.rs index 7d7c2a9af..5d0a9ad79 100644 --- a/openraft/src/vote/ref_vote.rs +++ b/openraft/src/vote/ref_vote.rs @@ -3,7 +3,8 @@ use std::fmt::Formatter; use crate::RaftTypeConfig; -/// Same as [`Vote`] but with a reference to the `LeaderId`. +/// Similar to [`Vote`] but with a reference to the `LeaderId`, and provide ordering and display +/// implementation. /// /// [`Vote`]: crate::vote::Vote #[derive(Debug, Clone, Copy, PartialEq, Eq)] diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index 37a667e1f..79ad9a89f 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -1,16 +1,13 @@ use std::cmp::Ordering; use std::fmt::Formatter; -use crate::type_config::alias::CommittedLeaderIdOf; -use crate::vote::committed::CommittedVote; -use crate::vote::non_committed::NonCommittedVote; -use crate::vote::ref_vote::RefVote; -use crate::vote::vote_status::VoteStatus; +use crate::vote::raft_vote::RaftVoteExt; use crate::vote::RaftLeaderId; +use crate::vote::RaftVote; use crate::RaftTypeConfig; /// `Vote` represent the privilege of a node. -#[derive(Debug, Clone, Default, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct Vote { /// The id of the node that tries to become the leader. @@ -19,13 +16,6 @@ pub struct Vote { pub committed: bool, } -impl Copy for Vote -where - C: RaftTypeConfig, - C::LeaderId: Copy, -{ -} - impl PartialOrd for Vote where C: RaftTypeConfig { @@ -39,12 +29,23 @@ impl std::fmt::Display for Vote where C: RaftTypeConfig { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "<{}:{}>", - self.leader_id, - if self.is_committed() { "Q" } else { "-" } - ) + self.as_ref_vote().fmt(f) + } +} + +impl RaftVote for Vote +where C: RaftTypeConfig +{ + fn from_leader_id(leader_id: C::LeaderId, committed: bool) -> Self { + Self { leader_id, committed } + } + + fn leader_id_ref(&self) -> &C::LeaderId { + &self.leader_id + } + + fn is_committed(&self) -> bool { + self.committed } } @@ -70,27 +71,6 @@ where C: RaftTypeConfig self.committed = true } - pub(crate) fn as_ref_vote(&self) -> RefVote<'_, C> { - RefVote::new(&self.leader_id, self.committed) - } - - /// Convert this vote into a `CommittedVote` - pub(crate) fn into_committed(self) -> CommittedVote { - CommittedVote::new(self) - } - - pub(crate) fn into_non_committed(self) -> NonCommittedVote { - NonCommittedVote::new(self) - } - - pub(crate) fn into_vote_status(self) -> VoteStatus { - if self.committed { - VoteStatus::Committed(self.into_committed()) - } else { - VoteStatus::Pending(self.into_non_committed()) - } - } - pub fn is_committed(&self) -> bool { self.committed } @@ -101,11 +81,6 @@ where C: RaftTypeConfig pub fn leader_id(&self) -> &C::LeaderId { &self.leader_id } - - // TODO: remove this method - pub(crate) fn is_same_leader(&self, leader_id: &CommittedLeaderIdOf) -> bool { - self.leader_id().to_committed() == *leader_id - } } #[cfg(test)] diff --git a/tests/tests/fixtures/mod.rs b/tests/tests/fixtures/mod.rs index a823206d2..146516bd6 100644 --- a/tests/tests/fixtures/mod.rs +++ b/tests/tests/fixtures/mod.rs @@ -185,6 +185,7 @@ impl fmt::Display for Direction { } } +use openraft::alias::VoteOf; use openraft::network::v2::RaftNetworkV2; use openraft::vote::RaftLeaderId; use openraft::vote::RaftLeaderIdExt; @@ -416,7 +417,7 @@ impl TypedRaftRouter { tracing::info!(log_index, "--- wait for init node to become leader"); self.wait_for_log(&btreeset![leader_id], Some(log_index), timeout(), "init").await?; - self.wait(&leader_id, timeout()).vote(Vote::new_committed(1, 0), "init vote").await?; + self.wait(&leader_id, timeout()).vote(VoteOf::::new_committed(1, 0), "init vote").await?; for id in voter_ids.iter() { if *id == leader_id {