diff --git a/openraft/src/core/notification.rs b/openraft/src/core/notification.rs index 96939359b..4daaa648a 100644 --- a/openraft/src/core/notification.rs +++ b/openraft/src/core/notification.rs @@ -75,9 +75,13 @@ where C: RaftTypeConfig Self::VoteResponse { target, resp, - sender_vote: vote, + sender_vote, } => { - write!(f, "VoteResponse: from: {}: {}, res-vote: {}", target, resp, vote) + write!( + f, + "VoteResponse: from target={}, to sender_vote: {}, {}", + target, sender_vote, resp + ) } Self::HigherVote { ref target, @@ -91,12 +95,12 @@ where C: RaftTypeConfig ) } Self::StorageError { error } => write!(f, "StorageError: {}", error), - Self::LocalIO { io_id } => write!(f, "LocalIO({}) done", io_id), + Self::LocalIO { io_id } => write!(f, "{}", io_id), Self::Network { response } => { - write!(f, "Replication command done: {}", response) + write!(f, "{}", response) } Self::StateMachine { command_result } => { - write!(f, "StateMachine command done: {:?}", command_result) + write!(f, "{}", command_result) } Self::Tick { i } => { write!(f, "Tick {}", i) diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 0294a4a7b..1e4357283 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1,8 +1,7 @@ use std::borrow::Borrow; use std::collections::BTreeMap; +use std::fmt; use std::fmt::Debug; -use std::fmt::Display; -use std::fmt::Formatter; use std::ops::Deref; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -36,6 +35,7 @@ use crate::core::raft_msg::VoteTx; use crate::core::sm; use crate::core::sm::CommandSeq; use crate::core::ServerState; +use crate::display_ext::display_slice::DisplaySliceExt; use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOption; use crate::display_ext::DisplayOptionExt; @@ -44,6 +44,7 @@ use crate::engine::handler::replication_handler::SendNone; use crate::engine::Command; use crate::engine::Condition; use crate::engine::Engine; +use crate::engine::ReplicationProgress; use crate::engine::Respond; use crate::entry::FromAppData; use crate::entry::RaftEntry; @@ -112,6 +113,18 @@ pub(crate) struct ApplyingEntry { membership: Option>, } +impl fmt::Display for ApplyingEntry +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.log_id)?; + if let Some(m) = &self.membership { + write!(f, "(membership:{})", m)?; + } + Ok(()) + } +} + impl ApplyingEntry { pub(crate) fn new(log_id: LogId, membership: Option>) -> Self { Self { log_id, membership } @@ -128,7 +141,7 @@ pub(crate) struct ApplyResult { } impl Debug for ApplyResult { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("ApplyResult") .field("since", &self.since) .field("end", &self.end) @@ -137,6 +150,19 @@ impl Debug for ApplyResult { } } +impl fmt::Display for ApplyResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "ApplyResult([{}, {}), last_applied={}, entries={})", + self.since, + self.end, + self.last_applied, + self.applying_entries.display(), + ) + } +} + /// The core type implementing the Raft protocol. pub struct RaftCore where @@ -474,7 +500,7 @@ where /// /// Currently heartbeat is a blank log #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] - pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { + pub fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool { tracing::debug!(now = debug(C::now()), "send_heartbeat"); let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) { @@ -809,8 +835,6 @@ where } while let Some(cmd) = self.engine.output.pop_command() { - tracing::debug!("run command: {:?}", cmd); - let res = self.run_command(cmd).await?; if let Some(cmd) = res { @@ -1066,7 +1090,7 @@ where // TODO: Make this method non-async. It does not need to run any async command in it. #[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))] pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg) { - tracing::debug!("recv from rx_api: {}", msg); + tracing::debug!("RAFT_event id={:<2} input: {}", self.id, msg); match msg { RaftMsg::AppendEntries { rpc, tx } => { @@ -1152,7 +1176,7 @@ where // TODO: Make this method non-async. It does not need to run any async command in it. #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))] pub(crate) fn handle_notification(&mut self, notify: Notification) -> Result<(), Fatal> { - tracing::debug!("recv from rx_notify: {}", notify); + tracing::debug!("RAFT_event id={:<2} notify: {}", self.id, notify); match notify { Notification::VoteResponse { @@ -1463,7 +1487,7 @@ where /// If a message is sent by a previous server state but is received by current server state, /// it is a stale message and should be just ignored. - fn does_vote_match(&self, sender_vote: &Vote, msg: impl Display) -> bool { + fn does_vote_match(&self, sender_vote: &Vote, msg: impl fmt::Display) -> bool { // Get the current leading vote: // - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current // Leader's vote. @@ -1491,7 +1515,11 @@ where } /// If a message is sent by a previous replication session but is received by current server /// state, it is a stale message and should be just ignored. - fn does_replication_session_match(&self, session_id: &ReplicationSessionId, msg: impl Display + Copy) -> bool { + fn does_replication_session_match( + &self, + session_id: &ReplicationSessionId, + msg: impl fmt::Display + Copy, + ) -> bool { if !self.does_vote_match(session_id.vote_ref(), msg) { return false; } @@ -1516,6 +1544,8 @@ where LS: RaftLogStorage, { async fn run_command<'e>(&mut self, cmd: Command) -> Result>, StorageError> { + tracing::debug!("RAFT_event id={:<2} cmd: {}", self.id, cmd); + let condition = cmd.condition(); tracing::debug!("condition: {:?}", condition); @@ -1637,7 +1667,7 @@ where Command::RebuildReplicationStreams { targets } => { self.remove_all_replication().await; - for (target, matching) in targets.iter() { + for ReplicationProgress(target, matching) in targets.iter() { let handle = self.spawn_replication_stream(*target, *matching).await; self.replications.insert(*target, handle); } diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index 6f8044b16..64cd0b2ea 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -125,7 +125,7 @@ where C: RaftTypeConfig } RaftMsg::ChangeMembership { changes, retain, .. } => { // TODO: avoid using Debug - write!(f, "ChangeMembership: members: {:?}, retain: {}", changes, retain,) + write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,) } RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"), RaftMsg::ExternalCommand { cmd } => { diff --git a/openraft/src/core/sm/command.rs b/openraft/src/core/sm/command.rs index 211c09632..e7b66b888 100644 --- a/openraft/src/core/sm/command.rs +++ b/openraft/src/core/sm/command.rs @@ -1,3 +1,4 @@ +use std::fmt; use std::fmt::Debug; use std::fmt::Formatter; @@ -19,7 +20,7 @@ where C: RaftTypeConfig impl Debug for Command where C: RaftTypeConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("StateMachineCommand") .field("seq", &self.seq) .field("payload", &self.payload) @@ -27,6 +28,14 @@ where C: RaftTypeConfig } } +impl fmt::Display for Command +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "sm::Command: seq: {}, payload: {}", self.seq, self.payload) + } +} + impl Command where C: RaftTypeConfig { @@ -115,7 +124,7 @@ where C: RaftTypeConfig impl Debug for CommandPayload where C: RaftTypeConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"), CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"), @@ -130,6 +139,24 @@ where C: RaftTypeConfig } } +impl fmt::Display for CommandPayload +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"), + CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"), + CommandPayload::InstallFullSnapshot { snapshot } => { + write!(f, "InstallFullSnapshot: meta: {}", snapshot.meta) + } + CommandPayload::BeginReceivingSnapshot { .. } => { + write!(f, "BeginReceivingSnapshot") + } + CommandPayload::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last), + } + } +} + // `PartialEq` is only used for testing impl PartialEq for CommandPayload where C: RaftTypeConfig diff --git a/openraft/src/core/sm/response.rs b/openraft/src/core/sm/response.rs index 1aae9e810..75f9dd73d 100644 --- a/openraft/src/core/sm/response.rs +++ b/openraft/src/core/sm/response.rs @@ -1,5 +1,10 @@ +use std::fmt; +use std::fmt::Formatter; + use crate::core::sm::command::CommandSeq; use crate::core::ApplyResult; +use crate::display_ext::display_result::DisplayResultExt; +use crate::display_ext::DisplayOptionExt; use crate::RaftTypeConfig; use crate::SnapshotMeta; use crate::StorageError; @@ -21,6 +26,24 @@ where C: RaftTypeConfig Apply(ApplyResult), } +impl fmt::Display for Response +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + Self::BuildSnapshot(meta) => { + write!(f, "BuildSnapshot({})", meta) + } + Self::InstallSnapshot(meta) => { + write!(f, "InstallSnapshot({})", meta.display()) + } + Self::Apply(result) => { + write!(f, "{}", result) + } + } + } +} + /// Container of result of a command. #[derive(Debug)] pub(crate) struct CommandResult @@ -31,6 +54,19 @@ where C: RaftTypeConfig pub(crate) result: Result, StorageError>, } +impl fmt::Display for CommandResult +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "sm::Result(command_seq:{}, {})", + self.command_seq, + self.result.display() + ) + } +} + impl CommandResult where C: RaftTypeConfig { diff --git a/openraft/src/display_ext.rs b/openraft/src/display_ext.rs index 65afd0953..04471ffa1 100644 --- a/openraft/src/display_ext.rs +++ b/openraft/src/display_ext.rs @@ -2,6 +2,7 @@ pub(crate) mod display_instant; pub(crate) mod display_option; +pub(crate) mod display_result; pub(crate) mod display_slice; #[allow(unused_imports)] @@ -9,5 +10,8 @@ pub(crate) use display_instant::DisplayInstant; pub(crate) use display_instant::DisplayInstantExt; pub(crate) use display_option::DisplayOption; pub(crate) use display_option::DisplayOptionExt; +#[allow(unused_imports)] +pub(crate) use display_result::DisplayResult; +pub(crate) use display_result::DisplayResultExt; pub(crate) use display_slice::DisplaySlice; pub(crate) use display_slice::DisplaySliceExt; diff --git a/openraft/src/display_ext/display_result.rs b/openraft/src/display_ext/display_result.rs new file mode 100644 index 000000000..ed769c383 --- /dev/null +++ b/openraft/src/display_ext/display_result.rs @@ -0,0 +1,53 @@ +use std::fmt; + +/// Implement `Display` for `Result` if T and E are `Display`. +/// +/// It outputs `"Ok(...)"` or `"Err(...)"`. +pub(crate) struct DisplayResult<'a, T: fmt::Display, E: fmt::Display>(pub &'a Result); + +impl<'a, T, E> fmt::Display for DisplayResult<'a, T, E> +where + T: fmt::Display, + E: fmt::Display, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0 { + Ok(ok) => { + write!(f, "Ok({})", ok) + } + Err(err) => { + write!(f, "Err({})", err) + } + } + } +} + +pub(crate) trait DisplayResultExt<'a, T: fmt::Display, E: fmt::Display> { + fn display(&'a self) -> DisplayResult<'a, T, E>; +} + +impl DisplayResultExt<'_, T, E> for Result +where + T: fmt::Display, + E: fmt::Display, +{ + fn display(&self) -> DisplayResult { + DisplayResult(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_display_result() { + let result: Result = Ok(42); + let display_result = DisplayResult(&result); + assert_eq!(format!("{}", display_result), "Ok(42)"); + + let result: Result = Err("error"); + let display_result = DisplayResult(&result); + assert_eq!(format!("{}", display_result), "Err(error)"); + } +} diff --git a/openraft/src/display_ext/display_slice.rs b/openraft/src/display_ext/display_slice.rs index d4c58e0f9..cfb8538e4 100644 --- a/openraft/src/display_ext/display_slice.rs +++ b/openraft/src/display_ext/display_slice.rs @@ -40,6 +40,9 @@ impl<'a, T: fmt::Display, const MAX: usize> fmt::Display for DisplaySlice<'a, T, pub(crate) trait DisplaySliceExt<'a, T: fmt::Display> { fn display(&'a self) -> DisplaySlice<'a, T>; + + /// Display at most `MAX` elements. + fn display_n(&'a self) -> DisplaySlice<'a, T, MAX>; } impl DisplaySliceExt<'_, T> for [T] @@ -48,6 +51,10 @@ where T: fmt::Display fn display(&self) -> DisplaySlice { DisplaySlice(self) } + + fn display_n(&'_ self) -> DisplaySlice<'_, T, MAX> { + DisplaySlice(self) + } } #[cfg(test)] diff --git a/openraft/src/engine/command.rs b/openraft/src/engine/command.rs index c052b198d..48a5c2ac8 100644 --- a/openraft/src/engine/command.rs +++ b/openraft/src/engine/command.rs @@ -3,11 +3,14 @@ use std::fmt::Debug; use crate::async_runtime::OneshotSender; use crate::core::sm; +use crate::display_ext::DisplayOptionExt; +use crate::display_ext::DisplayResultExt; +use crate::display_ext::DisplaySliceExt; +use crate::engine::replication_progress::ReplicationProgress; use crate::engine::CommandKind; use crate::error::Infallible; use crate::error::InitializeError; use crate::error::InstallSnapshotError; -use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotResponse; @@ -73,7 +76,7 @@ where C: RaftTypeConfig /// updated. RebuildReplicationStreams { /// Targets to replicate to. - targets: Vec<(C::NodeId, ProgressEntry)>, + targets: Vec>, }, /// Save vote to storage @@ -103,6 +106,38 @@ where C: RaftTypeConfig }, } +impl fmt::Display for Command +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Command::AppendInputEntries { vote, entries } => { + write!(f, "AppendInputEntries: vote: {}, entries: {}", vote, entries.display()) + } + Command::ReplicateCommitted { committed } => { + write!(f, "ReplicateCommitted: {}", committed.display()) + } + Command::Commit { + seq, + already_committed, + upto, + } => write!(f, "Commit: seq: {}, ({}, {}]", seq, already_committed.display(), upto), + Command::Replicate { target, req } => { + write!(f, "Replicate: target={}, req: {}", target, req) + } + Command::RebuildReplicationStreams { targets } => { + write!(f, "RebuildReplicationStreams: {}", targets.display_n::<10>()) + } + Command::SaveVote { vote } => write!(f, "SaveVote: {}", vote), + Command::SendVote { vote_req } => write!(f, "SendVote: {}", vote_req), + Command::PurgeLog { upto } => write!(f, "PurgeLog: upto: {}", upto), + Command::TruncateLog { since } => write!(f, "TruncateLog: since: {}", since), + Command::StateMachine { command } => write!(f, "StateMachine: command: {}", command), + Command::Respond { when, resp } => write!(f, "Respond: when: {}, resp: {}", when.display(), resp), + } + } +} + impl From> for Command where C: RaftTypeConfig { @@ -209,6 +244,22 @@ where C: RaftTypeConfig StateMachineCommand { command_seq: sm::CommandSeq }, } +impl fmt::Display for Condition +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Condition::LogFlushed { leader, log_id } => { + write!(f, "LogFlushed: leader: {}, log_id: {}", leader, log_id.display()) + } + Condition::Applied { log_id } => write!(f, "Applied: log_id: {}", log_id.display()), + Condition::StateMachineCommand { command_seq } => { + write!(f, "StateMachineCommand: command_seq: {}", command_seq) + } + } + } +} + /// A command to send return value to the caller via a `oneshot::Sender`. #[derive(Debug, PartialEq, Eq)] #[derive(derive_more::From)] @@ -223,6 +274,27 @@ where C: RaftTypeConfig Initialize(ValueSender>>), } +impl fmt::Display for Respond +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Respond::Vote(vs) => write!(f, "Vote {}", vs.value().display()), + Respond::AppendEntries(vs) => write!(f, "AppendEntries {}", vs.value().display()), + Respond::ReceiveSnapshotChunk(vs) => { + write!( + f, + "ReceiveSnapshotChunk {}", + vs.value().as_ref().map(|_x| "()").display() + ) + } + Respond::InstallSnapshot(vs) => write!(f, "InstallSnapshot {}", vs.value().display()), + Respond::InstallFullSnapshot(vs) => write!(f, "InstallFullSnapshot {}", vs.value().display()), + Respond::Initialize(vs) => write!(f, "Initialize {}", vs.value().as_ref().map(|_x| "()").display()), + } + } +} + impl Respond where C: RaftTypeConfig { @@ -291,6 +363,10 @@ where Self { value: res, tx } } + pub(crate) fn value(&self) -> &T { + &self.value + } + pub(crate) fn send(self) { let _ = self.tx.send(self.value); } diff --git a/openraft/src/engine/handler/leader_handler/append_entries_test.rs b/openraft/src/engine/handler/leader_handler/append_entries_test.rs index bc18615d8..31721906c 100644 --- a/openraft/src/engine/handler/leader_handler/append_entries_test.rs +++ b/openraft/src/engine/handler/leader_handler/append_entries_test.rs @@ -11,6 +11,7 @@ use pretty_assertions::assert_str_eq; use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; +use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; @@ -248,7 +249,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> { ] }, Command::RebuildReplicationStreams { - targets: vec![(2, ProgressEntry::empty(7))] + targets: vec![ReplicationProgress(2, ProgressEntry::empty(7))] }, Command::Replicate { target: 2, diff --git a/openraft/src/engine/handler/replication_handler/append_membership_test.rs b/openraft/src/engine/handler/replication_handler/append_membership_test.rs index 7758679dd..c1397a654 100644 --- a/openraft/src/engine/handler/replication_handler/append_membership_test.rs +++ b/openraft/src/engine/handler/replication_handler/append_membership_test.rs @@ -8,6 +8,7 @@ use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; use crate::engine::LogIdList; +use crate::engine::ReplicationProgress; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; @@ -79,8 +80,11 @@ fn test_leader_append_membership_for_leader() -> anyhow::Result<()> { vec![ // Command::RebuildReplicationStreams { - targets: vec![(3, ProgressEntry::empty(0)), (4, ProgressEntry::empty(0))], /* node-2 is leader, - * won't be removed */ + targets: vec![ + ReplicationProgress(3, ProgressEntry::empty(0)), + ReplicationProgress(4, ProgressEntry::empty(0)) + ], /* node-2 is leader, + * won't be removed */ } ], eng.output.take_commands() diff --git a/openraft/src/engine/handler/replication_handler/mod.rs b/openraft/src/engine/handler/replication_handler/mod.rs index 7969647bf..f4bb2c0e4 100644 --- a/openraft/src/engine/handler/replication_handler/mod.rs +++ b/openraft/src/engine/handler/replication_handler/mod.rs @@ -4,6 +4,7 @@ use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::Command; use crate::engine::EngineConfig; use crate::engine::EngineOutput; +use crate::engine::ReplicationProgress; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; @@ -319,7 +320,7 @@ where C: RaftTypeConfig // Reset and resend(by self.send_to_all()) replication requests. prog_entry.inflight = Inflight::None; - targets.push((*target, *prog_entry)); + targets.push(ReplicationProgress(*target, *prog_entry)); } } self.output.push_command(Command::RebuildReplicationStreams { targets }); diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index abc197056..474c71d7e 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -33,6 +33,7 @@ mod engine_config; mod engine_impl; mod engine_output; mod log_id_list; +mod replication_progress; pub(crate) mod handler; pub(crate) mod time_state; @@ -61,3 +62,4 @@ pub(crate) use engine_config::EngineConfig; pub(crate) use engine_impl::Engine; pub(crate) use engine_output::EngineOutput; pub use log_id_list::LogIdList; +pub(crate) use replication_progress::ReplicationProgress; diff --git a/openraft/src/engine/replication_progress.rs b/openraft/src/engine/replication_progress.rs new file mode 100644 index 000000000..cc4671e6f --- /dev/null +++ b/openraft/src/engine/replication_progress.rs @@ -0,0 +1,16 @@ +use std::fmt; + +use crate::progress::entry::ProgressEntry; +use crate::RaftTypeConfig; + +#[derive(Debug)] +#[derive(PartialEq, Eq)] +pub(crate) struct ReplicationProgress(pub C::NodeId, pub ProgressEntry); + +impl fmt::Display for ReplicationProgress +where C: RaftTypeConfig +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "ReplicationProgress({}={})", self.0, self.1) + } +} diff --git a/openraft/src/engine/tests/handle_vote_resp_test.rs b/openraft/src/engine/tests/handle_vote_resp_test.rs index 224224155..d380d2460 100644 --- a/openraft/src/engine/tests/handle_vote_resp_test.rs +++ b/openraft/src/engine/tests/handle_vote_resp_test.rs @@ -9,6 +9,7 @@ use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; use crate::engine::LogIdList; +use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; @@ -201,7 +202,7 @@ fn test_handle_vote_resp_equal_vote() -> anyhow::Result<()> { assert_eq!( vec![ Command::RebuildReplicationStreams { - targets: vec![(2, ProgressEntry::empty(1))] + targets: vec![ReplicationProgress(2, ProgressEntry::empty(1))] }, Command::SaveVote { vote: Vote::new_committed(2, 1) diff --git a/openraft/src/engine/tests/startup_test.rs b/openraft/src/engine/tests/startup_test.rs index 1ed526b6b..f21afd93b 100644 --- a/openraft/src/engine/tests/startup_test.rs +++ b/openraft/src/engine/tests/startup_test.rs @@ -7,6 +7,7 @@ use crate::engine::testing::UTConfig; use crate::engine::Command; use crate::engine::Engine; use crate::engine::LogIdList; +use crate::engine::ReplicationProgress; use crate::entry::RaftEntry; use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; @@ -61,7 +62,7 @@ fn test_startup_as_leader_without_logs() -> anyhow::Result<()> { vec![ // Command::RebuildReplicationStreams { - targets: vec![(3, ProgressEntry { + targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, curr_inflight_id: 0, inflight: Inflight::None, @@ -106,7 +107,7 @@ fn test_startup_as_leader_with_proposed_logs() -> anyhow::Result<()> { vec![ // Command::RebuildReplicationStreams { - targets: vec![(3, ProgressEntry { + targets: vec![ReplicationProgress(3, ProgressEntry { matching: None, curr_inflight_id: 0, inflight: Inflight::None, diff --git a/openraft/src/log_id/mod.rs b/openraft/src/log_id/mod.rs index d87169851..a878e4cab 100644 --- a/openraft/src/log_id/mod.rs +++ b/openraft/src/log_id/mod.rs @@ -42,7 +42,7 @@ impl RaftLogId for LogId { impl Display for LogId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}-{}", self.leader_id, self.index) + write!(f, "{}.{}", self.leader_id, self.index) } } diff --git a/openraft/src/raft_state/io_state/append_log_io_id.rs b/openraft/src/raft_state/io_state/append_log_io_id.rs index 4062f3cab..0c506527c 100644 --- a/openraft/src/raft_state/io_state/append_log_io_id.rs +++ b/openraft/src/raft_state/io_state/append_log_io_id.rs @@ -33,7 +33,7 @@ impl fmt::Display for AppendLogIOId where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "by({}):{}", self.committed_vote, self.log_id) + write!(f, "by:{}, {}", self.committed_vote, self.log_id) } } diff --git a/openraft/src/replication/response.rs b/openraft/src/replication/response.rs index 40f1e15c9..9f6ba13a4 100644 --- a/openraft/src/replication/response.rs +++ b/openraft/src/replication/response.rs @@ -1,6 +1,8 @@ use std::fmt; +use crate::display_ext::DisplayInstantExt; use crate::display_ext::DisplayOptionExt; +use crate::display_ext::DisplayResultExt; use crate::replication::request_id::RequestId; use crate::replication::ReplicationSessionId; use crate::type_config::alias::InstantOf; @@ -80,12 +82,15 @@ where C: RaftTypeConfig } => { write!( f, - "UpdateReplicationProgress: target: {}, id: {}, result: {:?}, session_id: {}", - target, request_id, result, session_id + "replication::Progress: target={}, request_id: {}, result: {}, session_id: {}", + target, + request_id, + result.display(), + session_id ) } - Self::StorageError { error } => write!(f, "ReplicationStorageError: {}", error), + Self::StorageError { error } => write!(f, "replication::StorageError: {}", error), Self::HigherVote { target, @@ -94,7 +99,7 @@ where C: RaftTypeConfig } => { write!( f, - "Seen a higher vote: target: {}, vote: {}, server_state_vote: {}", + "replication::Seen a higher vote: target={}, higher: {}, sender_vote: {}", target, higher, sender_vote ) } @@ -118,7 +123,7 @@ impl fmt::Display for ReplicationResult where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{{time:{:?}, result:", self.sending_time)?; + write!(f, "{{sending_time:{}, result:", self.sending_time.display())?; match &self.result { Ok(matching) => write!(f, "Match:{}", matching.display())?, diff --git a/openraft/src/summary.rs b/openraft/src/summary.rs index 46c1b6736..c4139b770 100644 --- a/openraft/src/summary.rs +++ b/openraft/src/summary.rs @@ -86,15 +86,15 @@ mod tests { use crate::MessageSummary; let lid = crate::testing::log_id(1, 2, 3); - assert_eq!("T1-N2-3", lid.to_string()); - assert_eq!("T1-N2-3", lid.summary()); - assert_eq!("Some(T1-N2-3)", Some(&lid).summary()); - assert_eq!("Some(T1-N2-3)", Some(lid).summary()); + assert_eq!("T1-N2.3", lid.to_string()); + assert_eq!("T1-N2.3", lid.summary()); + assert_eq!("Some(T1-N2.3)", Some(&lid).summary()); + assert_eq!("Some(T1-N2.3)", Some(lid).summary()); let slc = vec![lid, lid]; - assert_eq!("T1-N2-3,T1-N2-3", slc.as_slice().summary()); + assert_eq!("T1-N2.3,T1-N2.3", slc.as_slice().summary()); let slc = vec![&lid, &lid]; - assert_eq!("T1-N2-3,T1-N2-3", slc.as_slice().summary()); + assert_eq!("T1-N2.3,T1-N2.3", slc.as_slice().summary()); } } diff --git a/openraft/src/vote/vote.rs b/openraft/src/vote/vote.rs index 139ccacef..fd5c28109 100644 --- a/openraft/src/vote/vote.rs +++ b/openraft/src/vote/vote.rs @@ -44,13 +44,9 @@ impl std::fmt::Display for Vote { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "{}:{}", + "<{}:{}>", self.leader_id, - if self.is_committed() { - "committed" - } else { - "uncommitted" - } + if self.is_committed() { "Q" } else { "-" } ) } }