Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Abstract Vote #1292

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4


- uses: actions-rs/[email protected]
with:
components: rustfmt, clippy


- name: Format
uses: actions-rs/cargo@v1
with:
Expand All @@ -396,6 +399,10 @@ jobs:
RUSTDOCFLAGS: "-D warnings"


- shell: bash
run: cargo install cargo-audit


- name: Audit dependencies
shell: bash
# if: "!contains(github.event.head_commit.message, 'skip audit')"
Expand Down
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"openraftpb.LeaderId",
"#[derive(Eq, serde::Serialize, serde::Deserialize)]",
)
.type_attribute("openraftpb.Vote", "#[derive(Eq, serde::Serialize, serde::Deserialize)]")
.compile_protos_with_config(config, &proto_files, &["proto"])?;
Ok(())
}
31 changes: 5 additions & 26 deletions examples/raft-kv-memstore-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ openraft::declare_raft_types!(
D = pb::SetRequest,
R = pb::Response,
LeaderId = pb::LeaderId,
Vote = pb::Vote,
Node = pb::Node,
SnapshotData = StateMachineData,
);
Expand All @@ -34,17 +35,6 @@ pub mod protobuf {
#[path = "../../utils/declare_types.rs"]
pub mod typ;

impl From<pb::Vote> for Vote {
fn from(proto_vote: pb::Vote) -> Self {
let leader_id: LeaderId = proto_vote.leader_id.unwrap();
if proto_vote.committed {
Vote::new_committed(leader_id.term, leader_id.node_id)
} else {
Vote::new(leader_id.term, leader_id.node_id)
}
}
}

impl From<pb::LogId> for LogId {
fn from(proto_log_id: pb::LogId) -> Self {
LogId::new(proto_log_id.term, proto_log_id.index)
Expand All @@ -53,31 +43,20 @@ impl From<pb::LogId> for LogId {

impl From<pb::VoteRequest> for VoteRequest {
fn from(proto_vote_req: pb::VoteRequest) -> Self {
let vote: Vote = proto_vote_req.vote.unwrap().into();
let vote = proto_vote_req.vote.unwrap();
let last_log_id = proto_vote_req.last_log_id.map(|log_id| log_id.into());
VoteRequest::new(vote, last_log_id)
}
}

impl From<pb::VoteResponse> for VoteResponse {
fn from(proto_vote_resp: pb::VoteResponse) -> Self {
let vote: Vote = proto_vote_resp.vote.unwrap().into();
let vote = proto_vote_resp.vote.unwrap();
let last_log_id = proto_vote_resp.last_log_id.map(|log_id| log_id.into());
VoteResponse::new(vote, last_log_id, proto_vote_resp.vote_granted)
}
}

impl From<Vote> for pb::Vote {
fn from(vote: Vote) -> Self {
pb::Vote {
leader_id: Some(pb::LeaderId {
term: vote.leader_id().term,
node_id: vote.leader_id().node_id,
}),
committed: vote.is_committed(),
}
}
}
impl From<LogId> for pb::LogId {
fn from(log_id: LogId) -> Self {
pb::LogId {
Expand All @@ -90,7 +69,7 @@ impl From<LogId> for pb::LogId {
impl From<VoteRequest> for pb::VoteRequest {
fn from(vote_req: VoteRequest) -> Self {
pb::VoteRequest {
vote: Some(vote_req.vote.into()),
vote: Some(vote_req.vote),
last_log_id: vote_req.last_log_id.map(|log_id| log_id.into()),
}
}
Expand All @@ -99,7 +78,7 @@ impl From<VoteRequest> for pb::VoteRequest {
impl From<VoteResponse> for pb::VoteResponse {
fn from(vote_resp: VoteResponse) -> Self {
pb::VoteResponse {
vote: Some(vote_resp.vote.into()),
vote: Some(vote_resp.vote),
vote_granted: vote_resp.vote_granted,
last_log_id: vote_resp.last_log_id.map(|log_id| log_id.into()),
}
Expand Down
34 changes: 34 additions & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/impl_vote.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::fmt;

use openraft::vote::RaftVote;

use crate::typ::*;
use crate::TypeConfig;

impl RaftVote<TypeConfig> for Vote {
fn from_leader_id(leader_id: LeaderId, committed: bool) -> Self {
Vote {
leader_id: Some(leader_id),
committed,
}
}

fn leader_id(&self) -> Option<&LeaderId> {
self.leader_id.as_ref()
}

fn is_committed(&self) -> bool {
self.committed
}
}

impl fmt::Display for Vote {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"<{}:{}>",
self.leader_id.as_ref().unwrap(),
if self.is_committed() { "Q" } else { "-" }
)
}
}
1 change: 1 addition & 0 deletions examples/raft-kv-memstore-grpc/src/pb_impl/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Implements traits for protobuf types

mod impl_leader_id;
mod impl_vote;
2 changes: 1 addition & 1 deletion examples/utils/declare_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::TypeConfig;

pub type Raft = openraft::Raft<TypeConfig>;

pub type Vote = openraft::Vote<TypeConfig>;
pub type Vote = <TypeConfig as openraft::RaftTypeConfig>::Vote;
pub type LeaderId = <TypeConfig as openraft::RaftTypeConfig>::LeaderId;
pub type LogId = openraft::LogId<TypeConfig>;
pub type Entry = openraft::Entry<TypeConfig>;
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/base/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Basic types used in the Raft implementation.

pub(crate) mod ord_by;

pub use serde_able::OptionalSerde;
pub use threaded::BoxAny;
pub use threaded::BoxAsyncOnceMut;
Expand Down
36 changes: 36 additions & 0 deletions openraft/src/base/ord_by.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/// A trait for types whose order can be determined by a key.
///
/// Types implementing this trait define how they should be compared by providing a key
/// that implements [`PartialOrd`].
///
/// OpenRaft uses this trait to compare types that may not be [`PartialOrd`] themselves.
///
/// # Type Parameters
/// - `Key<'k>`: The type of the comparison key, which must be partially ordered and must not out
/// live the value.
///
/// # Examples
/// ```rust,ignore
/// # use openraft::base::ord_by::OrdBy;
///
/// struct Person {
/// name: String,
/// age: u32,
/// }
///
/// impl OrdBy<()> for Person {
/// type By<'k> = &'k str;
///
/// fn ord_by(&self) -> Self::By<'_> {
/// &self.name
/// }
/// }
/// ```
pub(crate) trait OrdBy<C> {
/// The key type used for comparison.
type By<'k>: PartialOrd + 'k
where Self: 'k;

/// Returns the key used for comparing this value.
fn ord_by(&self) -> Self::By<'_>;
}
15 changes: 7 additions & 8 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ use crate::type_config::async_runtime::MpscUnboundedReceiver;
use crate::type_config::TypeConfigExt;
use crate::vote::committed::CommittedVote;
use crate::vote::non_committed::NonCommittedVote;
use crate::vote::raft_vote::RaftVoteExt;
use crate::vote::vote_status::VoteStatus;
use crate::vote::RaftLeaderId;
use crate::vote::RaftVote;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -392,7 +394,7 @@ where
// request.
if let AppendEntriesResponse::HigherVote(vote) = append_res {
debug_assert!(
vote > my_vote,
vote.as_ref_vote() > my_vote.as_ref_vote(),
"committed vote({}) has total order relation with other votes({})",
my_vote,
vote
Expand Down Expand Up @@ -584,7 +586,7 @@ where
id: self.id.clone(),

// --- data ---
current_term: st.vote_ref().leader_id().term(),
current_term: st.vote_ref().to_leader_id().term(),
vote: st.io_state().io_progress.flushed().map(|io_id| io_id.to_vote()).unwrap_or_default(),
last_log_index: st.last_log_id().index(),
last_applied: st.io_applied().cloned(),
Expand Down Expand Up @@ -727,7 +729,7 @@ where
}

// Safe unwrap(): vote that is committed has to already have voted for some node.
let id = vote.leader_id().node_id().cloned().unwrap();
let id = vote.to_leader_id().node_id().cloned().unwrap();

// TODO: `is_voter()` is slow, maybe cache `current_leader`,
// e.g., only update it when membership or vote changes
Expand All @@ -750,10 +752,7 @@ where
}

pub(crate) fn get_leader_node(&self, leader_id: Option<C::NodeId>) -> Option<C::Node> {
let leader_id = match leader_id {
None => return None,
Some(x) => x,
};
let leader_id = leader_id?;

self.engine.state.membership_state.effective().get_node(&leader_id).cloned()
}
Expand Down Expand Up @@ -1677,7 +1676,7 @@ where
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
{
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C>> {
async fn run_command(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C>> {
// tracing::debug!("RAFT_event id={:<2} trycmd: {}", self.id, cmd);

let condition = cmd.condition();
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ where C: RaftTypeConfig
///
/// Returns a snapshot data handle for receiving data.
///
/// It does not check [`Vote`] because it is a read operation
/// It does not check `Vote` because it is a read operation
/// and does not break raft protocol.
BeginReceivingSnapshot {
tx: ResultSender<C, Box<SnapshotDataOf<C>>, Infallible>,
Expand Down
3 changes: 2 additions & 1 deletion openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ mod tests {
type Node = ();
type Term = u64;
type LeaderId = crate::impls::leader_id_adv::LeaderId<Self>;
type Entry = crate::Entry<TickUTConfig>;
type Vote = crate::impls::Vote<Self>;
type Entry = crate::Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext/display_btreemap_opt_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::DisplayOption;
/// For how to format the `opt_value`, see [`DisplayOption`].
pub(crate) struct DisplayBTreeMapOptValue<'a, K: fmt::Display, V: fmt::Display>(pub &'a BTreeMap<K, Option<V>>);

impl<'a, K: fmt::Display, V: fmt::Display> fmt::Display for DisplayBTreeMapOptValue<'a, K, V> {
impl<K: fmt::Display, V: fmt::Display> fmt::Display for DisplayBTreeMapOptValue<'_, K, V> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let len = self.0.len();
for (idx, (key, value)) in self.0.iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext/display_instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<'a, T, const SIMPLE: bool, const LOCAL: bool> DisplayInstant<'a, T, SIMPLE,
}
}

impl<'a, T, const SIMPLE: bool, const LOCAL: bool> fmt::Display for DisplayInstant<'a, T, SIMPLE, LOCAL>
impl<T, const SIMPLE: bool, const LOCAL: bool> fmt::Display for DisplayInstant<'_, T, SIMPLE, LOCAL>
where T: Instant
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext/display_option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt;
/// implementation for T.
pub(crate) struct DisplayOption<'a, T: fmt::Display>(pub &'a Option<T>);

impl<'a, T: fmt::Display> fmt::Display for DisplayOption<'a, T> {
impl<T: fmt::Display> fmt::Display for DisplayOption<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
None => {
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext/display_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::fmt;
/// It outputs `"Ok(...)"` or `"Err(...)"`.
pub(crate) struct DisplayResult<'a, T: fmt::Display, E: fmt::Display>(pub &'a Result<T, E>);

impl<'a, T, E> fmt::Display for DisplayResult<'a, T, E>
impl<T, E> fmt::Display for DisplayResult<'_, T, E>
where
T: fmt::Display,
E: fmt::Display,
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/display_ext/display_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::fmt;
/// - `DisplaySlice(&[1,2,3,4,5,6])` outputs: `"[1,2,3,4,...,6]"`.
pub(crate) struct DisplaySlice<'a, T: fmt::Display, const MAX: usize = 5>(pub &'a [T]);

impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, MAX> {
impl<T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'_, T, MAX> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let slice = self.0;
let len = slice.len();
Expand Down
1 change: 0 additions & 1 deletion openraft/src/docs/faq/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
//! # FAQ
#![doc = include_str!("faq-toc.md")]

#![doc = include_str!("faq.md")]
3 changes: 0 additions & 3 deletions openraft/src/docs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
#![allow(rustdoc::redundant_explicit_links)]
#![doc = include_str!("docs.md")]

#[rustfmt::skip]


pub mod faq;

pub mod getting_started;
Expand Down
Loading
Loading