Skip to content

Commit

Permalink
Refactor: log significant events with Display
Browse files Browse the repository at this point in the history
This commit refines our logging strategy by utilizing the `Display`
trait instead of `Debug` for significant events. This change is aimed at
producing logs that are easier to read and understand.

Three kinds of significant events are logged at DEBUG level:

- `input`: the `RaftMsg`s received by `RaftCore`, such as client-write
  or AppendEntries request from the Leader.

- `cmd`: the `Command` outputted by `Engine` to execute by storage or
  network layer, such as `AppendInputEntries` or `ReplicateCommitted`.

- `notify`: the `Notification`s received by `RaftCore` from storage or
  network.

Example significant event logs:
```
RAFT_event id=0     cmd: Commit: seq: 5, (T1-N0.4, T1-N0.5]
RAFT_event id=1   input: AppendEntries: vote=<T1-N0:Q>, prev_log_id=T1-N0.5, leader_commit=T1-N0.5, entries=[]
RAFT_event id=0  notify: sm::Result(command_seq:5, Ok(ApplyResult([5, 6), last_applied=T1-N0.5, entries=[T1-N0.5])))
RAFT_event id=0   input: ClientWriteRequest
```
  • Loading branch information
drmingdrmer committed Jul 13, 2024
1 parent 3ae6b4b commit 53abbfc
Show file tree
Hide file tree
Showing 21 changed files with 311 additions and 47 deletions.
14 changes: 9 additions & 5 deletions openraft/src/core/notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,13 @@ where C: RaftTypeConfig
Self::VoteResponse {
target,
resp,
sender_vote: vote,
sender_vote,
} => {
write!(f, "VoteResponse: from: {}: {}, res-vote: {}", target, resp, vote)
write!(
f,
"VoteResponse: from target={}, to sender_vote: {}, {}",
target, sender_vote, resp
)
}
Self::HigherVote {
ref target,
Expand All @@ -91,12 +95,12 @@ where C: RaftTypeConfig
)
}
Self::StorageError { error } => write!(f, "StorageError: {}", error),
Self::LocalIO { io_id } => write!(f, "LocalIO({}) done", io_id),
Self::LocalIO { io_id } => write!(f, "{}", io_id),
Self::Network { response } => {
write!(f, "Replication command done: {}", response)
write!(f, "{}", response)
}
Self::StateMachine { command_result } => {
write!(f, "StateMachine command done: {:?}", command_result)
write!(f, "{}", command_result)
}
Self::Tick { i } => {
write!(f, "Tick {}", i)
Expand Down
52 changes: 41 additions & 11 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::borrow::Borrow;
use std::collections::BTreeMap;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -36,6 +35,7 @@ use crate::core::raft_msg::VoteTx;
use crate::core::sm;
use crate::core::sm::CommandSeq;
use crate::core::ServerState;
use crate::display_ext::display_slice::DisplaySliceExt;
use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
Expand All @@ -44,6 +44,7 @@ use crate::engine::handler::replication_handler::SendNone;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::Engine;
use crate::engine::ReplicationProgress;
use crate::engine::Respond;
use crate::entry::FromAppData;
use crate::entry::RaftEntry;
Expand Down Expand Up @@ -112,6 +113,18 @@ pub(crate) struct ApplyingEntry<C: RaftTypeConfig> {
membership: Option<Membership<C>>,
}

impl<C> fmt::Display for ApplyingEntry<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.log_id)?;
if let Some(m) = &self.membership {
write!(f, "(membership:{})", m)?;
}
Ok(())
}
}

impl<C: RaftTypeConfig> ApplyingEntry<C> {
pub(crate) fn new(log_id: LogId<C::NodeId>, membership: Option<Membership<C>>) -> Self {
Self { log_id, membership }
Expand All @@ -128,7 +141,7 @@ pub(crate) struct ApplyResult<C: RaftTypeConfig> {
}

impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ApplyResult")
.field("since", &self.since)
.field("end", &self.end)
Expand All @@ -137,6 +150,19 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
}
}

impl<C: RaftTypeConfig> fmt::Display for ApplyResult<C> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"ApplyResult([{}, {}), last_applied={}, entries={})",
self.since,
self.end,
self.last_applied,
self.applying_entries.display(),
)
}
}

/// The core type implementing the Raft protocol.
pub struct RaftCore<C, NF, LS>
where
Expand Down Expand Up @@ -474,7 +500,7 @@ where
///
/// Currently heartbeat is a blank log
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
pub fn send_heartbeat(&mut self, emitter: impl fmt::Display) -> bool {
tracing::debug!(now = debug(C::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
Expand Down Expand Up @@ -809,8 +835,6 @@ where
}

while let Some(cmd) = self.engine.output.pop_command() {
tracing::debug!("run command: {:?}", cmd);

let res = self.run_command(cmd).await?;

if let Some(cmd) = res {
Expand Down Expand Up @@ -1066,7 +1090,7 @@ where
// TODO: Make this method non-async. It does not need to run any async command in it.
#[tracing::instrument(level = "debug", skip(self, msg), fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) async fn handle_api_msg(&mut self, msg: RaftMsg<C>) {
tracing::debug!("recv from rx_api: {}", msg);
tracing::debug!("RAFT_event id={:<2} input: {}", self.id, msg);

match msg {
RaftMsg::AppendEntries { rpc, tx } => {
Expand Down Expand Up @@ -1152,7 +1176,7 @@ where
// TODO: Make this method non-async. It does not need to run any async command in it.
#[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) fn handle_notification(&mut self, notify: Notification<C>) -> Result<(), Fatal<C>> {
tracing::debug!("recv from rx_notify: {}", notify);
tracing::debug!("RAFT_event id={:<2} notify: {}", self.id, notify);

match notify {
Notification::VoteResponse {
Expand Down Expand Up @@ -1463,7 +1487,7 @@ where

/// If a message is sent by a previous server state but is received by current server state,
/// it is a stale message and should be just ignored.
fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl Display) -> bool {
fn does_vote_match(&self, sender_vote: &Vote<C::NodeId>, msg: impl fmt::Display) -> bool {
// Get the current leading vote:
// - If input `sender_vote` is committed, it is sent by a Leader. Therefore we check against current
// Leader's vote.
Expand Down Expand Up @@ -1491,7 +1515,11 @@ where
}
/// If a message is sent by a previous replication session but is received by current server
/// state, it is a stale message and should be just ignored.
fn does_replication_session_match(&self, session_id: &ReplicationSessionId<C>, msg: impl Display + Copy) -> bool {
fn does_replication_session_match(
&self,
session_id: &ReplicationSessionId<C>,
msg: impl fmt::Display + Copy,
) -> bool {
if !self.does_vote_match(session_id.vote_ref(), msg) {
return false;
}
Expand All @@ -1516,6 +1544,8 @@ where
LS: RaftLogStorage<C>,
{
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C>> {
tracing::debug!("RAFT_event id={:<2} cmd: {}", self.id, cmd);

let condition = cmd.condition();
tracing::debug!("condition: {:?}", condition);

Expand Down Expand Up @@ -1637,7 +1667,7 @@ where
Command::RebuildReplicationStreams { targets } => {
self.remove_all_replication().await;

for (target, matching) in targets.iter() {
for ReplicationProgress(target, matching) in targets.iter() {
let handle = self.spawn_replication_stream(*target, *matching).await;
self.replications.insert(*target, handle);
}
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where C: RaftTypeConfig
}
RaftMsg::ChangeMembership { changes, retain, .. } => {
// TODO: avoid using Debug
write!(f, "ChangeMembership: members: {:?}, retain: {}", changes, retain,)
write!(f, "ChangeMembership: {:?}, retain: {}", changes, retain,)
}
RaftMsg::ExternalCoreRequest { .. } => write!(f, "External Request"),
RaftMsg::ExternalCommand { cmd } => {
Expand Down
31 changes: 29 additions & 2 deletions openraft/src/core/sm/command.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;

Expand All @@ -19,14 +20,22 @@ where C: RaftTypeConfig
impl<C> Debug for Command<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("StateMachineCommand")
.field("seq", &self.seq)
.field("payload", &self.payload)
.finish()
}
}

impl<C> fmt::Display for Command<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "sm::Command: seq: {}, payload: {}", self.seq, self.payload)
}
}

impl<C> Command<C>
where C: RaftTypeConfig
{
Expand Down Expand Up @@ -115,7 +124,7 @@ where C: RaftTypeConfig
impl<C> Debug for CommandPayload<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
Expand All @@ -130,6 +139,24 @@ where C: RaftTypeConfig
}
}

impl<C> fmt::Display for CommandPayload<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CommandPayload::BuildSnapshot => write!(f, "BuildSnapshot"),
CommandPayload::GetSnapshot { .. } => write!(f, "GetSnapshot"),
CommandPayload::InstallFullSnapshot { snapshot } => {
write!(f, "InstallFullSnapshot: meta: {}", snapshot.meta)
}
CommandPayload::BeginReceivingSnapshot { .. } => {
write!(f, "BeginReceivingSnapshot")
}
CommandPayload::Apply { first, last } => write!(f, "Apply: [{},{}]", first, last),
}
}
}

// `PartialEq` is only used for testing
impl<C> PartialEq for CommandPayload<C>
where C: RaftTypeConfig
Expand Down
36 changes: 36 additions & 0 deletions openraft/src/core/sm/response.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
use std::fmt;
use std::fmt::Formatter;

use crate::core::sm::command::CommandSeq;
use crate::core::ApplyResult;
use crate::display_ext::display_result::DisplayResultExt;
use crate::display_ext::DisplayOptionExt;
use crate::RaftTypeConfig;
use crate::SnapshotMeta;
use crate::StorageError;
Expand All @@ -21,6 +26,24 @@ where C: RaftTypeConfig
Apply(ApplyResult<C>),
}

impl<C> fmt::Display for Response<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::BuildSnapshot(meta) => {
write!(f, "BuildSnapshot({})", meta)
}
Self::InstallSnapshot(meta) => {
write!(f, "InstallSnapshot({})", meta.display())
}
Self::Apply(result) => {
write!(f, "{}", result)
}
}
}
}

/// Container of result of a command.
#[derive(Debug)]
pub(crate) struct CommandResult<C>
Expand All @@ -31,6 +54,19 @@ where C: RaftTypeConfig
pub(crate) result: Result<Response<C>, StorageError<C>>,
}

impl<C> fmt::Display for CommandResult<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"sm::Result(command_seq:{}, {})",
self.command_seq,
self.result.display()
)
}
}

impl<C> CommandResult<C>
where C: RaftTypeConfig
{
Expand Down
4 changes: 4 additions & 0 deletions openraft/src/display_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
pub(crate) mod display_instant;
pub(crate) mod display_option;
pub(crate) mod display_result;
pub(crate) mod display_slice;

#[allow(unused_imports)]
pub(crate) use display_instant::DisplayInstant;
pub(crate) use display_instant::DisplayInstantExt;
pub(crate) use display_option::DisplayOption;
pub(crate) use display_option::DisplayOptionExt;
#[allow(unused_imports)]
pub(crate) use display_result::DisplayResult;
pub(crate) use display_result::DisplayResultExt;
pub(crate) use display_slice::DisplaySlice;
pub(crate) use display_slice::DisplaySliceExt;
53 changes: 53 additions & 0 deletions openraft/src/display_ext/display_result.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::fmt;

/// Implement `Display` for `Result<T,E>` if T and E are `Display`.
///
/// It outputs `"Ok(...)"` or `"Err(...)"`.
pub(crate) struct DisplayResult<'a, T: fmt::Display, E: fmt::Display>(pub &'a Result<T, E>);

impl<'a, T, E> fmt::Display for DisplayResult<'a, T, E>
where
T: fmt::Display,
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
Ok(ok) => {
write!(f, "Ok({})", ok)
}
Err(err) => {
write!(f, "Err({})", err)
}
}
}
}

pub(crate) trait DisplayResultExt<'a, T: fmt::Display, E: fmt::Display> {
fn display(&'a self) -> DisplayResult<'a, T, E>;
}

impl<T, E> DisplayResultExt<'_, T, E> for Result<T, E>
where
T: fmt::Display,
E: fmt::Display,
{
fn display(&self) -> DisplayResult<T, E> {
DisplayResult(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_display_result() {
let result: Result<i32, &str> = Ok(42);
let display_result = DisplayResult(&result);
assert_eq!(format!("{}", display_result), "Ok(42)");

let result: Result<i32, &str> = Err("error");
let display_result = DisplayResult(&result);
assert_eq!(format!("{}", display_result), "Err(error)");
}
}
Loading

0 comments on commit 53abbfc

Please sign in to comment.