Skip to content

Commit

Permalink
Fix: Ensure RaftMetrics are sent after RaftDataMetrics and `RaftS…
Browse files Browse the repository at this point in the history
…erverMetrics`

This commit addresses an issue where `RaftMetrics` could be sent before
`RaftDataMetrics` and `RaftServerMetrics`. Since `Wait` relies solely on
`RaftMetrics` to reflect the latest state, it is crucial that
`RaftMetrics` are dispatched only after the other two metrics have been
updated. This change guarantees that once a change in `RaftMetrics` is
detected, it accurately represents the most recent changes from both
`RaftDataMetrics` and `RaftServerMetrics`.
  • Loading branch information
drmingdrmer committed Mar 19, 2024
1 parent b30e4b5 commit a2a2b91
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,6 @@ where
replication: replication.clone(),
};

tracing::debug!("report_metrics: {}", m.summary());
let res = self.tx_metrics.send(m);

if let Err(err) = res {
tracing::error!(error=%err, id=display(self.id), "error reporting metrics");
}

let data_metrics = RaftDataMetrics {
last_log: st.last_log_id().copied(),
last_applied: st.io_applied().copied(),
Expand All @@ -565,13 +558,6 @@ where
millis_since_quorum_ack,
replication,
};
self.tx_data_metrics.send_if_modified(|metrix| {
if data_metrics.ne(metrix) {
*metrix = data_metrics.clone();
return true;
}
false
});

let server_metrics = RaftServerMetrics {
id: self.id,
Expand All @@ -580,13 +566,34 @@ where
current_leader,
membership_config,
};

// Start to send metrics
// `RaftMetrics` is sent last, because `Wait` only examines `RaftMetrics`
// but not `RaftDataMetrics` and `RaftServerMetrics`.
// Thus if `RaftMetrics` change is perceived, the other two should have been updated.

self.tx_data_metrics.send_if_modified(|metrix| {
if data_metrics.ne(metrix) {
*metrix = data_metrics.clone();
return true;
}
false
});

self.tx_server_metrics.send_if_modified(|metrix| {
if server_metrics.ne(metrix) {
*metrix = server_metrics.clone();
return true;
}
false
});

tracing::debug!("report_metrics: {}", m.summary());
let res = self.tx_metrics.send(m);

if let Err(err) = res {
tracing::error!(error=%err, id=display(self.id), "error reporting metrics");
}
}

/// Handle the admin command `initialize`.
Expand Down

0 comments on commit a2a2b91

Please sign in to comment.