Skip to content

Commit

Permalink
Refactor: merge send-progress methods
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Mar 24, 2024
1 parent 0becaaf commit 8306248
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 68 deletions.
8 changes: 4 additions & 4 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1464,12 +1464,12 @@ where
fn handle_replication_progress(
&mut self,
target: C::NodeId,
id: RequestId,
request_id: RequestId,
result: Result<ReplicationResult<C>, String>,
) {
tracing::debug!(
target = display(target),
request_id = display(id),
request_id = display(request_id),
result = debug(&result),
"handle_replication_progress"
);
Expand All @@ -1484,9 +1484,9 @@ where
}
}

// TODO: A leader may have stepped down.
// A leader may have stepped down.
if self.engine.internal_server_state.is_leading() {
self.engine.replication_handler().update_progress(target, id, result);
self.engine.replication_handler().update_progress(target, request_id, result);
}
}

Expand Down
17 changes: 9 additions & 8 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ where C: RaftTypeConfig
result: ReplicationResult<C>,
) {
// No matter what the result is, the validity of the leader is granted by a follower.
self.update_leader_vote_clock(target, result.sending_time);
self.update_leader_clock(target, result.sending_time);

let id = request_id.request_id();
let Some(id) = id else {
Expand All @@ -163,7 +163,7 @@ where C: RaftTypeConfig
/// Update progress when replicated data(logs or snapshot) matches on follower/learner and is
/// accepted.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn update_leader_vote_clock(&mut self, node_id: C::NodeId, t: InstantOf<C>) {
pub(crate) fn update_leader_clock(&mut self, node_id: C::NodeId, t: InstantOf<C>) {
tracing::debug!(target = display(node_id), t = debug(t), "{}", func_name!());

let granted = *self
Expand Down Expand Up @@ -206,7 +206,7 @@ where C: RaftTypeConfig

// The value granted by a quorum may not yet be a committed.
// A committed is **granted** and also is in current term.
let granted = *self
let quorum_accepted = *self
.leader
.progress
.update_with(&node_id, |prog_entry| {
Expand All @@ -218,16 +218,19 @@ where C: RaftTypeConfig
})
.expect("it should always update existing progress");

tracing::debug!(granted = display(granted.display()), "granted after updating progress");
tracing::debug!(
quorum_accepted = display(quorum_accepted.display()),
"after updating progress"
);

self.try_commit_granted(granted);
self.try_commit_quorum_accepted(quorum_accepted);
}

/// Commit the log id that is granted(accepted) by a quorum of voters.
///
/// In raft a log that is granted and in the leader term is committed.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn try_commit_granted(&mut self, granted: Option<LogId<C::NodeId>>) {
pub(crate) fn try_commit_quorum_accepted(&mut self, granted: Option<LogId<C::NodeId>>) {
// Only when the log id is proposed by current leader, it is committed.
if let Some(c) = granted {
if !self.state.vote_ref().is_same_leader(c.committed_leader_id()) {
Expand Down Expand Up @@ -280,8 +283,6 @@ where C: RaftTypeConfig
request_id: RequestId,
repl_res: Result<ReplicationResult<C>, String>,
) {
// TODO(2): test

tracing::debug!(
target = display(target),
request_id = display(request_id),
Expand Down
96 changes: 40 additions & 56 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use tracing_futures::Instrument;
use crate::config::Config;
use crate::core::notify::Notify;
use crate::core::raft_msg::ResultReceiver;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::error::HigherVote;
use crate::error::Infallible;
use crate::error::PayloadTooLarge;
use crate::error::RPCError;
use crate::error::RaftError;
Expand Down Expand Up @@ -481,7 +479,7 @@ where
debug_assert!(conflict.is_some(), "prev_log_id=None never conflict");

let conflict = conflict.unwrap();
self.send_progress_conflicting(request_id, leader_time, conflict);
self.send_progress(request_id, ReplicationResult::new(leader_time, Err(conflict)));

Ok(None)
}
Expand All @@ -490,7 +488,7 @@ where

/// Send the error result to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C, RaftError<C, Infallible>>) {
fn send_progress_error(&mut self, request_id: RequestId, err: RPCError<C, RaftError<C>>) {
let _ = self.tx_raft_core.send(Notify::Network {
response: Response::Progress {
target: self.target,
Expand All @@ -501,80 +499,63 @@ where
});
}

/// Send a `conflict` message to RaftCore.
/// RaftCore will then submit another replication command.
fn send_progress_conflicting(
&mut self,
request_id: RequestId,
leader_time: InstantOf<C>,
conflict: LogId<C::NodeId>,
) {
/// Send the success replication result(log matching or conflict) to RaftCore.
fn send_progress(&mut self, request_id: RequestId, replication_result: ReplicationResult<C>) {
tracing::debug!(
target = display(self.target),
request_id = display(request_id),
conflict = display(&conflict),
"update_conflicting"
target = display(self.target),
curr_matching = display(self.matching.display()),
result = display(&replication_result),
"{}",
func_name!()
);

match replication_result.result {
Ok(matching) => {
self.validate_matching(matching);
self.matching = matching;
}
Err(_conflict) => {
// Conflict is not allowed to be less than the current matching.
}
}

let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
request_id,
target: self.target,
result: Ok(ReplicationResult::new(leader_time, Err(conflict))),
result: Ok(replication_result),
},
}
});
}

/// Update the `matching` log id, which is for tracking follower replication, and report the
/// matched log id to `RaftCore` to commit an entry.
#[tracing::instrument(level = "trace", skip(self))]
fn send_progress_matching(
&mut self,
request_id: RequestId,
leader_time: InstantOf<C>,
new_matching: Option<LogId<C::NodeId>>,
) {
tracing::debug!(
request_id = display(request_id),
target = display(self.target),
curr_matching = display(DisplayOption(&self.matching)),
new_matching = display(DisplayOption(&new_matching)),
"{}",
func_name!()
);

/// Validate the value for updating matching log id.
///
/// If the matching log id is reverted to a smaller value:
/// - log a warning message if [`loosen-follower-log-revert`] feature flag is enabled;
/// - otherwise panic, consider it as a bug.
///
/// [`loosen-follower-log-revert`]: crate::docs::feature_flags#feature_flag_loosen_follower_log_revert
fn validate_matching(&self, matching: Option<LogId<C::NodeId>>) {
if cfg!(feature = "loosen-follower-log-revert") {
if self.matching > new_matching {
if self.matching > matching {
tracing::warn!(
"follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed",
self.matching.display(),
new_matching.display(),
);
"follower log is reverted from {} to {}; with 'loosen-follower-log-revert' enabled, this is allowed",
self.matching.display(),
matching.display(),
);
}
} else {
debug_assert!(
self.matching <= new_matching,
self.matching <= matching,
"follower log is reverted from {} to {}",
self.matching.display(),
new_matching.display(),
matching.display(),
);
}

self.matching = new_matching;

let _ = self.tx_raft_core.send({
Notify::Network {
response: Response::Progress {
session_id: self.session_id,
request_id,
target: self.target,
result: Ok(ReplicationResult::new(leader_time, Ok(new_matching))),
},
}
});
}

/// Drain all events in the channel in backoff mode, i.e., there was an un-retry-able error and
Expand Down Expand Up @@ -835,7 +816,10 @@ where
}));
}

self.send_progress_matching(request_id, start_time, snapshot_meta.last_log_id);
self.send_progress(
request_id,
ReplicationResult::new(start_time, Ok(snapshot_meta.last_log_id)),
);

Ok(None)
}
Expand All @@ -849,7 +833,7 @@ where
leader_time: InstantOf<C>,
log_ids: DataWithId<LogIdRange<C::NodeId>>,
) -> Option<Data<C>> {
self.send_progress_matching(log_ids.request_id(), leader_time, matching);
self.send_progress(log_ids.request_id(), ReplicationResult::new(leader_time, Ok(matching)));

if matching < log_ids.data().last_log_id {
Some(Data::new_logs(
Expand Down
39 changes: 39 additions & 0 deletions openraft/src/replication/response.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use std::fmt;

use crate::display_ext::DisplayOptionExt;
use crate::replication::request_id::RequestId;
use crate::replication::ReplicationSessionId;
use crate::type_config::alias::InstantOf;
Expand Down Expand Up @@ -110,10 +113,46 @@ pub(crate) struct ReplicationResult<C: RaftTypeConfig> {
pub(crate) result: Result<Option<LogIdOf<C>>, LogIdOf<C>>,
}

impl<C> fmt::Display for ReplicationResult<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{time:{:?}, result:", self.sending_time)?;

match &self.result {
Ok(matching) => write!(f, "Match:{}", matching.display())?,
Err(conflict) => write!(f, "Conflict:{}", conflict)?,
}

write!(f, "}}")
}
}

impl<C> ReplicationResult<C>
where C: RaftTypeConfig
{
pub(crate) fn new(sending_time: InstantOf<C>, result: Result<Option<LogIdOf<C>>, LogIdOf<C>>) -> Self {
Self { sending_time, result }
}
}

#[cfg(test)]
mod tests {
use crate::engine::testing::UTConfig;
use crate::replication::response::ReplicationResult;
use crate::testing::log_id;
use crate::type_config::alias::InstantOf;

#[test]
fn test_replication_result_display() {
// NOTE that with single-term-leader, log id is `1-3`

let result = ReplicationResult::<UTConfig>::new(InstantOf::<UTConfig>::now(), Ok(Some(log_id(1, 2, 3))));
let want = format!(", result:Match:{}}}", log_id(1, 2, 3));
assert!(result.to_string().ends_with(&want), "{}", result.to_string());

let result = ReplicationResult::<UTConfig>::new(InstantOf::<UTConfig>::now(), Err(log_id(1, 2, 3)));
let want = format!(", result:Conflict:{}}}", log_id(1, 2, 3));
assert!(result.to_string().ends_with(&want), "{}", result.to_string());
}
}
1 change: 1 addition & 0 deletions openraft/src/utime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl<T, I: Instant> UTime<T, I> {
}

/// Consumes this object and returns the inner data.
#[allow(dead_code)]
pub(crate) fn into_inner(self) -> T {
self.data
}
Expand Down

0 comments on commit 8306248

Please sign in to comment.