Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: merge send-progress methods #1082

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
Expand Down Expand Up @@ -1465,12 +1464,12 @@ where
fn handle_replication_progress(
&mut self,
target: C::NodeId,
id: RequestId,
result: Result<UTime<ReplicationResult<C::NodeId>, InstantOf<C>>, String>,
request_id: RequestId,
result: Result<ReplicationResult<C>, String>,
) {
tracing::debug!(
target = display(target),
request_id = display(id),
request_id = display(request_id),
result = debug(&result),
"handle_replication_progress"
);
Expand All @@ -1485,9 +1484,9 @@ where
}
}

// TODO: A leader may have stepped down.
// A leader may have stepped down.
if self.engine.internal_server_state.is_leading() {
self.engine.replication_handler().update_progress(target, id, result);
self.engine.replication_handler().update_progress(target, request_id, result);
}
}

Expand Down
30 changes: 14 additions & 16 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::raft_state::LogStateReader;
use crate::replication::request_id::RequestId;
use crate::replication::response::ReplicationResult;
use crate::type_config::alias::InstantOf;
use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::LogId;
use crate::LogIdOptionExt;
Expand Down Expand Up @@ -140,24 +139,22 @@ where C: RaftTypeConfig
&mut self,
target: C::NodeId,
request_id: RequestId,
result: UTime<ReplicationResult<C::NodeId>, InstantOf<C>>,
result: ReplicationResult<C>,
) {
let sending_time = result.utime().unwrap();

// No matter what the result is, the validity of the leader is granted by a follower.
self.update_leader_vote_clock(target, sending_time);
self.update_leader_clock(target, result.sending_time);

let id = request_id.request_id();
let Some(id) = id else {
tracing::debug!(request_id = display(request_id), "no data for this request, return");
return;
};

match result.into_inner() {
ReplicationResult::Matching(matching) => {
match result.result {
Ok(matching) => {
self.update_matching(target, id, matching);
}
ReplicationResult::Conflict(conflict) => {
Err(conflict) => {
self.update_conflicting(target, id, conflict);
}
}
Expand All @@ -166,7 +163,7 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: InstantOf<C>) {
pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf<C>) {
tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!());

let granted = *self
Expand Down Expand Up @@ -209,7 +206,7 @@ where C: RaftTypeConfig

// The value granted by a quorum may not yet be a committed.
// A committed is **granted** and also is in current term.
let granted = *self
let quorum_accepted = *self
.leader
.progress
.update_with(&node_id, |prog_entry| {
Expand All @@ -221,16 +218,19 @@ where C: RaftTypeConfig
})
.expect("it should always update existing progress");

tracing::debug!(granted = display(granted.display()), "granted after updating progress");
tracing::debug!(
quorum_accepted = display(quorum_accepted.display()),
"after updating progress"
);

self.try_commit_granted(granted);
self.try_commit_quorum_accepted(quorum_accepted);
}

/// Commit the log id that is granted(accepted) by a quorum of voters.
///
/// In raft a log that is granted and in the leader term is committed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_commit_granted(&mut self, granted: Option<LogId<C::NodeId>>) {
pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option<LogId<C::NodeId>>) {
// Only when the log id is proposed by current leader, it is committed.
if let Some(c) = granted {
if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) {
Expand Down Expand Up @@ -281,10 +281,8 @@ where C: RaftTypeConfig
&mut self,
target: C::NodeId,
request_id: RequestId,
repl_res: Result<UTime<ReplicationResult<C::NodeId>, InstantOf<C>>, String>,
repl_res: Result<ReplicationResult<C>, String>,
) {
// TODO(2): test

tracing::debug!(
target = display(target),
request_id = display(request_id),
Expand Down
7 changes: 3 additions & 4 deletions openraft/src/membership/membership_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::engine::testing::UTConfig;
use crate::membership::IntoNodes;
use crate::ChangeMembers;
use crate::Membership;
use crate::MessageSummary;

#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord)]
#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))]
Expand Down Expand Up @@ -38,10 +37,10 @@ fn test_membership_summary() -> anyhow::Result<()> {
};

let m = Membership::<UTConfig>::new(vec![btreeset! {1,2}, btreeset! {3}], None);
assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[]}", m.summary());
assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[]}", m.to_string());

let m = Membership::<UTConfig>::new(vec![btreeset! {1,2}, btreeset! {3}], Some(btreeset! {4}));
assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[4:()]}", m.summary());
assert_eq!("{voters:[{1:(),2:()},{3:()}], learners:[4:()]}", m.to_string());

let m = Membership::<UTConfig<TestNode>>::new_unchecked(vec![btreeset! {1,2}, btreeset! {3}], btreemap! {
1=>node("127.0.0.1", "k1"),
Expand All @@ -52,7 +51,7 @@ fn test_membership_summary() -> anyhow::Result<()> {
});
assert_eq!(
r#"{voters:[{1:TestNode { addr: "127.0.0.1", data: {"k1": "k1"} },2:TestNode { addr: "127.0.0.2", data: {"k2": "k2"} }},{3:TestNode { addr: "127.0.0.3", data: {"k3": "k3"} }}], learners:[4:TestNode { addr: "127.0.0.4", data: {"k4": "k4"} }]}"#,
m.summary()
m.to_string()
);

Ok(())
Expand Down
97 changes: 40 additions & 57 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use tracing_futures::Instrument;
use crate::config::Config;
use crate::core::notify::Notify;
use crate::core::raft_msg::ResultReceiver;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::HigherVote;
use crate::error::Infallible;
use crate::error::PayloadTooLarge;
use crate::error::RPCError;
use crate::error::RaftError;
Expand All @@ -55,7 +53,6 @@ use crate::storage::Snapshot;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::utime::UTime;
use crate::AsyncRuntime;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -482,7 +479,7 @@ where
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");

let conflict = conflict.unwrap();
self.send_progress_conflicting(request_id, leader_time, conflict);
self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict)));

Ok(None)
}
Expand All @@ -491,7 +488,7 @@ where

/// Send the error result to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C, RaftError<C, Infallible>>) {
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C, RaftError<C>>) {
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
Expand All @@ -502,80 +499,63 @@ where
});
}

/// Send a `conflict` message to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_conflicting(
&mut self,
request_id: RequestId,
leader_time: InstantOf<C>,
conflict: LogId<C::NodeId>,
) {
/// Send the success replication result(log matching or conflict) to RaftCore.
fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult<C>) {
tracing::debug!(
target = display(self.target),
request_id = display(request_id),
conflict = display(&conflict),
"update_conflicting"
target = display(self.target),
curr_matching = display(self.matching.display()),
result = display(&replication_result),
"{}",
func_name!()
);

match replication_result.result {
Ok(matching) => {
self.validate_matching(matching);
self.matching = matching;
}
Err(_conflict) => {
// Conflict is not allowed to be less than the current matching.
}
}

let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
request_id,
target: self.target,
result: Ok(UTime::new(leader_time, ReplicationResult::Conflict(conflict))),
result: Ok(replication_result),
},
}
});
}

/// Update the `matching` log id, which is for tracking follower replication, and report the
/// matched log id to `RaftCore` to commit an entry.
#[tracing::instrument(level = "trace", skip(self))]
fn send_progress_matching(
&mut self,
request_id: RequestId,
leader_time: InstantOf<C>,
new_matching: Option<LogId<C::NodeId>>,
) {
tracing::debug!(
request_id = display(request_id),
target = display(self.target),
curr_matching = display(DisplayOption(&self.matching)),
new_matching = display(DisplayOption(&new_matching)),
"{}",
func_name!()
);

/// Validate the value for updating matching log id.
///
/// If the matching log id is reverted to a smaller value:
/// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled;
/// - otherwise panic, consider it as a bug.
///
/// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert
fn validate_matching(&self, matching: Option<LogId<C::NodeId>>) {
if cfg!(feature = "loosen-follower-log-revert") {
if self.matching > new_matching {
if self.matching > matching {
tracing::warn!(
"follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed",
self.matching.display(),
new_matching.display(),
);
"follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed",
self.matching.display(),
matching.display(),
);
}
} else {
debug_assert!(
self.matching <= new_matching,
self.matching <= matching,
"follower log is reverted from {} to {}",
self.matching.display(),
new_matching.display(),
matching.display(),
);
}

self.matching = new_matching;

let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
request_id,
target: self.target,
result: Ok(UTime::new(leader_time, ReplicationResult::Matching(new_matching))),
},
}
});
}

/// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and
Expand Down Expand Up @@ -836,7 +816,10 @@ where
}));
}

self.send_progress_matching(request_id, start_time, snapshot_meta.last_log_id);
self.send_progress(
request_id,
ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)),
);

Ok(None)
}
Expand All @@ -850,7 +833,7 @@ where
leader_time: InstantOf<C>,
log_ids: DataWithId<LogIdRange<C::NodeId>>,
) -> Option<Data<C>> {
self.send_progress_matching(log_ids.request_id(), leader_time, matching);
self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching)));

if matching < log_ids.data().last_log_id {
Some(Data::new_logs(
Expand Down
Loading
Loading