Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Consolidate Voting, Leading and InternalServerState type parameters into C #1073

Merged
merged 1 commit into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where C: RaftTypeConfig
pub(crate) seen_greater_log: bool,

/// The internal server state used by Engine.
pub(crate) internal_server_state: InternalServerState<C::NodeId, InstantOf<C>>,
pub(crate) internal_server_state: InternalServerState<C>,

/// Output entry for the runtime.
pub(crate) output: EngineOutput<C>,
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::internal_server_state::LeaderQuorumSet;
use crate::leader::Leading;
use crate::type_config::alias::InstantOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;
Expand All @@ -24,7 +23,7 @@ pub(crate) struct LeaderHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader: &'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, InstantOf<C>>,
pub(crate) leader: &'x mut Leading<C, LeaderQuorumSet<C::NodeId>>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C>,
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub(crate) struct ReplicationHandler<'x, C>
where C: RaftTypeConfig
{
pub(crate) config: &'x mut EngineConfig<C::NodeId>,
pub(crate) leader: &'x mut Leading<C::NodeId, LeaderQuorumSet<C::NodeId>, InstantOf<C>>,
pub(crate) leader: &'x mut Leading<C, LeaderQuorumSet<C::NodeId>>,
pub(crate) state: &'x mut RaftState<C>,
pub(crate) output: &'x mut EngineOutput<C>,
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/engine/handler/vote_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where C: RaftTypeConfig
pub(crate) config: &'st EngineConfig<C::NodeId>,
pub(crate) state: &'st mut RaftState<C>,
pub(crate) output: &'st mut EngineOutput<C>,
pub(crate) internal_server_state: &'st mut InternalServerState<C::NodeId, InstantOf<C>>,
pub(crate) internal_server_state: &'st mut InternalServerState<C>,
}

impl<'st, C> VoteHandler<'st, C>
Expand Down
29 changes: 11 additions & 18 deletions openraft/src/internal_server_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::leader::voting::Voting;
use crate::leader::Leading;
use crate::quorum::Joint;
use crate::Instant;
use crate::NodeId;
use crate::RaftTypeConfig;

/// The quorum set type used by `Leader`.
pub(crate) type LeaderQuorumSet<NID> = Joint<NID, Vec<NID>, Vec<Vec<NID>>>;
Expand All @@ -23,52 +22,46 @@ pub(crate) type LeaderQuorumSet<NID> = Joint<NID, Vec<NID>, Vec<Vec<NID>>>;
#[derive(PartialEq, Eq)]
// TODO(9): Make InternalServerState an Option, separate Leading(Proposer) role and
// Following(Acceptor) role
pub(crate) enum InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
pub(crate) enum InternalServerState<C>
where C: RaftTypeConfig
{
/// Leader or candidate.
///
/// `vote.committed==true` means it is a leader.
Leading(Box<Leading<NID, LeaderQuorumSet<NID>, I>>),
Leading(Box<Leading<C, LeaderQuorumSet<C::NodeId>>>),

/// Follower or learner.
///
/// Being a voter means it is a follower.
Following,
}

impl<NID, I> Default for InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
impl<C> Default for InternalServerState<C>
where C: RaftTypeConfig
{
fn default() -> Self {
Self::Following
}
}

impl<NID, I> InternalServerState<NID, I>
where
NID: NodeId,
I: Instant,
impl<C> InternalServerState<C>
where C: RaftTypeConfig
{
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => l.voting_mut(),
InternalServerState::Following => None,
}
}

pub(crate) fn leading(&self) -> Option<&Leading<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn leading(&self) -> Option<&Leading<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => Some(l),
InternalServerState::Following => None,
}
}

pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading<NID, LeaderQuorumSet<NID>, I>> {
pub(crate) fn leading_mut(&mut self) -> Option<&mut Leading<C, LeaderQuorumSet<C::NodeId>>> {
match self {
InternalServerState::Leading(l) => Some(l),
InternalServerState::Following => None,
Expand Down
68 changes: 31 additions & 37 deletions openraft/src/leader/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::NodeId;
use crate::RaftTypeConfig;
use crate::Vote;

/// Leading state data.
Expand All @@ -27,37 +28,38 @@ use crate::Vote;
/// But instead it will be able to upgrade its `leader_id` without losing leadership.
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Leading<NID: NodeId, QS: QuorumSet<NID>, I: Instant> {
pub(crate) struct Leading<C, QS: QuorumSet<C::NodeId>>
where C: RaftTypeConfig
{
/// The vote this leader works in.
pub(crate) vote: Vote<NID>,
pub(crate) vote: Vote<C::NodeId>,

quorum_set: QS,

/// Voting state, i.e., there is a Candidate running.
voting: Option<Voting<NID, QS, I>>,
voting: Option<Voting<C, QS>>,

/// Tracks the replication progress and committed index
pub(crate) progress: VecProgress<NID, ProgressEntry<NID>, Option<LogId<NID>>, QS>,
pub(crate) progress: VecProgress<C::NodeId, ProgressEntry<C::NodeId>, Option<LogIdOf<C>>, QS>,

/// Tracks the clock time acknowledged by other nodes.
///
/// See [`docs::leader_lease`] for more details.
///
/// [`docs::leader_lease`]: `crate::docs::protocol::replication::leader_lease`
pub(crate) clock_progress: VecProgress<NID, Option<I>, Option<I>, QS>,
pub(crate) clock_progress: VecProgress<C::NodeId, Option<InstantOf<C>>, Option<InstantOf<C>>, QS>,
}

impl<NID, QS, I> Leading<NID, QS, I>
impl<C, QS> Leading<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + Clone + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + Clone + fmt::Debug + 'static,
{
pub(crate) fn new(
vote: Vote<NID>,
vote: Vote<C::NodeId>,
quorum_set: QS,
learner_ids: impl Iterator<Item = NID>,
last_log_id: Option<LogId<NID>>,
learner_ids: impl Iterator<Item = C::NodeId>,
last_log_id: Option<LogIdOf<C>>,
) -> Self {
let learner_ids = learner_ids.collect::<Vec<_>>();

Expand All @@ -75,22 +77,26 @@ where
}

#[allow(dead_code)]
pub(crate) fn voting(&self) -> Option<&Voting<NID, QS, I>> {
pub(crate) fn voting(&self) -> Option<&Voting<C, QS>> {
self.voting.as_ref()
}

#[allow(dead_code)]
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<NID, QS, I>> {
pub(crate) fn voting_mut(&mut self) -> Option<&mut Voting<C, QS>> {
self.voting.as_mut()
}

pub(crate) fn initialize_voting(&mut self, last_log_id: Option<LogId<NID>>, now: I) -> &mut Voting<NID, QS, I> {
pub(crate) fn initialize_voting(
&mut self,
last_log_id: Option<LogIdOf<C>>,
now: InstantOf<C>,
) -> &mut Voting<C, QS> {
self.voting = Some(Voting::new(now, self.vote, last_log_id, self.quorum_set.clone()));
self.voting.as_mut().unwrap()
}

/// Finish the voting process and return the state.
pub(crate) fn finish_voting(&mut self) -> Voting<NID, QS, I> {
pub(crate) fn finish_voting(&mut self) -> Voting<C, QS> {
// it has to be in voting progress
self.voting.take().unwrap()
}
Expand All @@ -107,7 +113,7 @@ where
/// Note that the leader may not be in the QuorumSet at all.
/// In such a case, the update operation will be just ignored,
/// and the quorum-acked-time is totally determined by remove voters.
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<I> {
pub(crate) fn last_quorum_acked_time(&mut self) -> Option<InstantOf<C>> {
// For `Leading`, the vote is always the leader's vote.
// Thus vote.voted_for() is this node.

Expand Down Expand Up @@ -142,12 +148,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_voter() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 1),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 1), vec![1, 2, 3], vec![4].into_iter(), None);

let now1 = InstantOf::<UTConfig>::now();

Expand All @@ -158,12 +160,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_learner() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 4),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 4), vec![1, 2, 3], vec![4].into_iter(), None);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand All @@ -178,12 +176,8 @@ mod tests {

#[test]
fn test_leading_last_quorum_acked_time_leader_is_not_member() {
let mut leading = Leading::<u64, Vec<u64>, InstantOf<UTConfig>>::new(
Vote::new_committed(2, 5),
vec![1, 2, 3],
vec![4].into_iter(),
None,
);
let mut leading =
Leading::<UTConfig, Vec<u64>>::new(Vote::new_committed(2, 5), vec![1, 2, 3], vec![4].into_iter(), None);

let t2 = InstantOf::<UTConfig>::now();
let _ = leading.clock_progress.increase_to(&2, Some(t2));
Expand Down
50 changes: 26 additions & 24 deletions openraft/src/leader/voting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,35 @@ use crate::display_ext::DisplayOptionExt;
use crate::progress::Progress;
use crate::progress::VecProgress;
use crate::quorum::QuorumSet;
use crate::Instant;
use crate::LogId;
use crate::NodeId;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
use crate::RaftTypeConfig;
use crate::Vote;

/// Voting state.
#[derive(Clone, Debug)]
#[derive(PartialEq, Eq)]
pub(crate) struct Voting<NID, QS, I>
pub(crate) struct Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID>,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId>,
{
/// When the voting is started.
starting_time: I,
starting_time: InstantOf<C>,

/// The vote.
vote: Vote<NID>,
vote: Vote<C::NodeId>,

last_log_id: Option<LogId<NID>>,
last_log_id: Option<LogIdOf<C>>,

/// Which nodes have granted the the vote at certain time point.
progress: VecProgress<NID, bool, bool, QS>,
progress: VecProgress<C::NodeId, bool, bool, QS>,
}

impl<NID, QS, I> fmt::Display for Voting<NID, QS, I>
impl<C, QS> fmt::Display for Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + fmt::Debug + 'static,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand All @@ -48,13 +46,17 @@ where
}
}

impl<NID, QS, I> Voting<NID, QS, I>
impl<C, QS> Voting<C, QS>
where
NID: NodeId,
QS: QuorumSet<NID> + fmt::Debug + 'static,
I: Instant,
C: RaftTypeConfig,
QS: QuorumSet<C::NodeId> + fmt::Debug + 'static,
{
pub(crate) fn new(starting_time: I, vote: Vote<NID>, last_log_id: Option<LogId<NID>>, quorum_set: QS) -> Self {
pub(crate) fn new(
starting_time: InstantOf<C>,
vote: Vote<C::NodeId>,
last_log_id: Option<LogIdOf<C>>,
quorum_set: QS,
) -> Self {
Self {
starting_time,
vote,
Expand All @@ -63,16 +65,16 @@ where
}
}

pub(crate) fn vote_ref(&self) -> &Vote<NID> {
pub(crate) fn vote_ref(&self) -> &Vote<C::NodeId> {
&self.vote
}

pub(crate) fn progress(&self) -> &VecProgress<NID, bool, bool, QS> {
pub(crate) fn progress(&self) -> &VecProgress<C::NodeId, bool, bool, QS> {
&self.progress
}

/// Grant the vote by a node.
pub(crate) fn grant_by(&mut self, target: &NID) -> bool {
pub(crate) fn grant_by(&mut self, target: &C::NodeId) -> bool {
let granted = *self.progress.update(target, true).expect("target not in quorum set");

tracing::info!(voting = debug(&self), "{}", func_name!());
Expand All @@ -82,7 +84,7 @@ where

/// Return the node ids that has granted this vote.
#[allow(dead_code)]
pub(crate) fn granters(&self) -> impl Iterator<Item = NID> + '_ {
pub(crate) fn granters(&self) -> impl Iterator<Item = C::NodeId> + '_ {
self.progress().iter().filter(|(_, granted)| *granted).map(|(target, _)| *target)
}
}
Loading