Skip to content

Commit

Permalink
refactor: metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Arqu committed Jul 5, 2024
1 parent 2c40984 commit bdc4961
Show file tree
Hide file tree
Showing 16 changed files with 283 additions and 80 deletions.
9 changes: 9 additions & 0 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::{
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_metrics::inc;
use iroh_net::{endpoint, Endpoint, NodeAddr, NodeId};
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -50,6 +51,7 @@ use tracing::{debug, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
metrics::Metrics,
store::Store,
util::progress::ProgressSender,
};
Expand Down Expand Up @@ -566,13 +568,16 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
async fn run(mut self) {
loop {
trace!("wait for tick");
inc!(Metrics, downloader_tick_main);
tokio::select! {
Some((node, conn_result)) = self.dialer.next() => {
trace!(node=%node.fmt_short(), "tick: connection ready");
inc!(Metrics, downloader_tick_connection_ready);
self.on_connection_ready(node, conn_result);
}
maybe_msg = self.msg_rx.recv() => {
trace!(msg=?maybe_msg, "tick: message received");
inc!(Metrics, downloader_tick_message_received);
match maybe_msg {
Some(msg) => self.handle_message(msg).await,
None => return self.shutdown().await,
Expand All @@ -582,21 +587,25 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
match res {
Ok((kind, result)) => {
trace!(%kind, "tick: transfer completed");
inc!(Metrics, downloader_tick_transfer_completed);
self.on_download_completed(kind, result);
}
Err(err) => {
warn!(?err, "transfer task panicked");
inc!(Metrics, downloader_tick_transfer_failed);
}
}
}
Some(expired) = self.retry_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: retry node");
inc!(Metrics, downloader_tick_retry_node);
self.on_retry_wait_elapsed(node);
}
Some(expired) = self.goodbye_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: goodbye node");
inc!(Metrics, downloader_tick_goodbye_node);
self.disconnect_idle_node(node, "idle expired");
}
}
Expand Down
30 changes: 30 additions & 0 deletions iroh-blobs/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pub struct Metrics {
pub downloads_success: Counter,
pub downloads_error: Counter,
pub downloads_notfound: Counter,

pub downloader_tick_main: Counter,
pub downloader_tick_connection_ready: Counter,
pub downloader_tick_message_received: Counter,
pub downloader_tick_transfer_completed: Counter,
pub downloader_tick_transfer_failed: Counter,
pub downloader_tick_retry_node: Counter,
pub downloader_tick_goodbye_node: Counter,
}

impl Default for Metrics {
Expand All @@ -24,6 +32,28 @@ impl Default for Metrics {
downloads_success: Counter::new("Total number of successful downloads"),
downloads_error: Counter::new("Total number of downloads failed with error"),
downloads_notfound: Counter::new("Total number of downloads failed with not found"),

downloader_tick_main: Counter::new(
"Number of times the main downloader actor loop ticked",
),
downloader_tick_connection_ready: Counter::new(
"Number of times the downloader actor ticked for a connection ready",
),
downloader_tick_message_received: Counter::new(
"Number of times the downloader actor ticked for a message received",
),
downloader_tick_transfer_completed: Counter::new(
"Number of times the downloader actor ticked for a transfer completed",
),
downloader_tick_transfer_failed: Counter::new(
"Number of times the downloader actor ticked for a transfer failed",
),
downloader_tick_retry_node: Counter::new(
"Number of times the downloader actor ticked for a retry node",
),
downloader_tick_goodbye_node: Counter::new(
"Number of times the downloader actor ticked for a goodbye node",
),
}
}
}
Expand Down
22 changes: 1 addition & 21 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,7 @@ impl PlotterApp {
return;
}
let data = req.unwrap().text().await.unwrap();
let metrics_response = parse_prometheus_metrics(&data);
let metrics_response = iroh_metrics::parse_prometheus_metrics(&data);
if metrics_response.is_err() {
return;
}
Expand All @@ -1423,23 +1423,3 @@ impl PlotterApp {
}
}
}

fn parse_prometheus_metrics(data: &str) -> anyhow::Result<HashMap<String, f64>> {
let mut metrics = HashMap::new();
for line in data.lines() {
if line.starts_with('#') {
continue;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 2 {
continue;
}
let metric = parts[0];
let value = parts[1].parse::<f64>();
if value.is_err() {
continue;
}
metrics.insert(metric.to_string(), value.unwrap());
}
Ok(metrics)
}
4 changes: 2 additions & 2 deletions iroh-dns-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use struct_iterable::Iterable;
pub struct Metrics {
pub pkarr_publish_update: Counter,
pub pkarr_publish_noop: Counter,
pub pkarr_publish_error: Counter,
// pub pkarr_publish_error: Counter,
pub dns_requests: Counter,
pub dns_requests_udp: Counter,
pub dns_requests_https: Counter,
Expand All @@ -32,7 +32,7 @@ impl Default for Metrics {
pkarr_publish_noop: Counter::new(
"Number of pkarr relay puts that did not update the state",
),
pkarr_publish_error: Counter::new("Number of pkarr relay puts that failed"),
// pkarr_publish_error: Counter::new("Number of pkarr relay puts that failed"),
dns_requests: Counter::new("DNS requests (total)"),
dns_requests_udp: Counter::new("DNS requests via UDP"),
dns_requests_https: Counter::new("DNS requests via HTTPS (DoH)"),
Expand Down
3 changes: 3 additions & 0 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_base::hash::Hash;
use iroh_metrics::inc;
use serde::{Deserialize, Serialize};
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};

use crate::{
metrics::Metrics,
ranger::Message,
store::{
fs::{ContentHashesIterator, StoreInstance},
Expand Down Expand Up @@ -609,6 +611,7 @@ impl Actor {
}
};
trace!(%action, "tick");
inc!(Metrics, actor_tick_main);
match action {
Action::Shutdown { reply } => {
break reply;
Expand Down
6 changes: 6 additions & 0 deletions iroh-docs/src/engine/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use anyhow::{Context, Result};
use futures_lite::StreamExt;
use futures_util::FutureExt;
use iroh_gossip::net::{Event, Gossip};
use iroh_metrics::inc;
use iroh_net::key::PublicKey;
use tokio::{
sync::{broadcast, mpsc},
Expand All @@ -16,6 +17,7 @@ use tokio_stream::{
use tracing::{debug, error, trace, warn};

use crate::{actor::SyncHandle, ContentStatus, NamespaceId};
use crate::metrics::Metrics;

use super::live::{Op, ToLiveActor};

Expand Down Expand Up @@ -67,9 +69,11 @@ impl GossipActor {
loop {
i += 1;
trace!(?i, "tick wait");
inc!(Metrics, doc_gossip_tick_main);
tokio::select! {
next = self.gossip_events.next(), if !self.gossip_events.is_empty() => {
trace!(?i, "tick: gossip_event");
inc!(Metrics, doc_gossip_tick_event);
if let Err(err) = self.on_gossip_event(next).await {
error!("gossip actor died: {err:?}");
return Err(err);
Expand All @@ -78,12 +82,14 @@ impl GossipActor {
msg = self.inbox.recv() => {
let msg = msg.context("to_actor closed")?;
trace!(%msg, ?i, "tick: to_actor");
inc!(Metrics, doc_gossip_tick_actor);
if !self.on_actor_message(msg).await.context("on_actor_message")? {
break;
}
}
Some(res) = self.pending_joins.join_next(), if !self.pending_joins.is_empty() => {
trace!(?i, "tick: pending_joins");
inc!(Metrics, doc_gossip_tick_pending_join);
let (namespace, res) = res.context("pending_joins closed")?;
match res {
Ok(stream) => {
Expand Down
8 changes: 8 additions & 0 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use iroh_blobs::get::Stats;
use iroh_blobs::HashAndFormat;
use iroh_blobs::{store::EntryStatus, Hash};
use iroh_gossip::{net::Gossip, proto::TopicId};
use iroh_metrics::inc;
use iroh_net::NodeId;
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
use serde::{Deserialize, Serialize};
Expand All @@ -19,6 +20,7 @@ use tokio::{
};
use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument, Span};

use crate::metrics::Metrics;
use crate::{
actor::{OpenOpts, SyncHandle},
net::{
Expand Down Expand Up @@ -244,11 +246,13 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
loop {
i += 1;
trace!(?i, "tick wait");
inc!(Metrics, doc_live_tick_main);
tokio::select! {
biased;
msg = self.inbox.recv() => {
let msg = msg.context("to_actor closed")?;
trace!(?i, %msg, "tick: to_actor");
inc!(Metrics, doc_live_tick_actor);
match msg {
ToLiveActor::Shutdown { reply } => {
break Ok(reply);
Expand All @@ -260,24 +264,28 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
event = self.replica_events_rx.recv_async() => {
trace!(?i, "tick: replica_event");
inc!(Metrics, doc_live_tick_replica_event);
let event = event.context("replica_events closed")?;
if let Err(err) = self.on_replica_event(event).await {
error!(?err, "Failed to process replica event");
}
}
Some(res) = self.running_sync_connect.join_next(), if !self.running_sync_connect.is_empty() => {
trace!(?i, "tick: running_sync_connect");
inc!(Metrics, doc_live_tick_running_sync_connect);
let (namespace, peer, reason, res) = res.context("running_sync_connect closed")?;
self.on_sync_via_connect_finished(namespace, peer, reason, res).await;

}
Some(res) = self.running_sync_accept.join_next(), if !self.running_sync_accept.is_empty() => {
trace!(?i, "tick: running_sync_accept");
inc!(Metrics, doc_live_tick_running_sync_accept);
let res = res.context("running_sync_accept closed")?;
self.on_sync_via_accept_finished(res).await;
}
Some(res) = self.download_tasks.join_next(), if !self.download_tasks.is_empty() => {
trace!(?i, "tick: pending_downloads");
inc!(Metrics, doc_live_tick_pending_downloads);
let (namespace, hash, res) = res.context("pending_downloads closed")?;
self.on_download_ready(namespace, hash, res).await;

Expand Down
44 changes: 44 additions & 0 deletions iroh-docs/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ pub struct Metrics {
pub sync_via_connect_failure: Counter,
pub sync_via_accept_success: Counter,
pub sync_via_accept_failure: Counter,

pub actor_tick_main: Counter,

pub doc_gossip_tick_main: Counter,
pub doc_gossip_tick_event: Counter,
pub doc_gossip_tick_actor: Counter,
pub doc_gossip_tick_pending_join: Counter,

pub doc_live_tick_main: Counter,
pub doc_live_tick_actor: Counter,
pub doc_live_tick_replica_event: Counter,
pub doc_live_tick_running_sync_connect: Counter,
pub doc_live_tick_running_sync_accept: Counter,
pub doc_live_tick_pending_downloads: Counter,
}

impl Default for Metrics {
Expand All @@ -30,6 +44,36 @@ impl Default for Metrics {
sync_via_accept_failure: Counter::new("Number of failed syncs (via accept)"),
sync_via_connect_success: Counter::new("Number of successful syncs (via connect)"),
sync_via_connect_failure: Counter::new("Number of failed syncs (via connect)"),

actor_tick_main: Counter::new("Number of times the main actor loop ticked"),

doc_gossip_tick_main: Counter::new("Number of times the gossip actor loop ticked"),
doc_gossip_tick_event: Counter::new(
"Number of times the gossip actor processed an event",
),
doc_gossip_tick_actor: Counter::new(
"Number of times the gossip actor processed an actor event",
),
doc_gossip_tick_pending_join: Counter::new(
"Number of times the gossip actor processed a pending join",
),

doc_live_tick_main: Counter::new("Number of times the live actor loop ticked"),
doc_live_tick_actor: Counter::new(
"Number of times the live actor processed an actor event",
),
doc_live_tick_replica_event: Counter::new(
"Number of times the live actor processed a replica event",
),
doc_live_tick_running_sync_connect: Counter::new(
"Number of times the live actor processed a running sync connect",
),
doc_live_tick_running_sync_accept: Counter::new(
"Number of times the live actor processed a running sync accept",
),
doc_live_tick_pending_downloads: Counter::new(
"Number of times the live actor processed a pending download",
),
}
}
}
Expand Down
24 changes: 24 additions & 0 deletions iroh-gossip/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub struct Metrics {
pub neighbor_down: Counter,
// pub topics_joined: Counter,
// pub topics_left: Counter,
pub actor_tick_main: Counter,
pub actor_tick_rx: Counter,
pub actor_tick_endpoint: Counter,
pub actor_tick_dialer: Counter,
pub actor_tick_dialer_success: Counter,
pub actor_tick_dialer_failure: Counter,
pub actor_tick_in_event_rx: Counter,
pub actor_tick_timers: Counter,
}

impl Default for Metrics {
Expand All @@ -38,6 +46,22 @@ impl Default for Metrics {
neighbor_down: Counter::new("Number of times we disconnected from a peer"),
// topics_joined: Counter::new("Number of times we joined a topic"),
// topics_left: Counter::new("Number of times we left a topic"),
actor_tick_main: Counter::new("Number of times the main actor loop ticked"),
actor_tick_rx: Counter::new("Number of times the actor ticked for a message received"),
actor_tick_endpoint: Counter::new(
"Number of times the actor ticked for an endpoint event",
),
actor_tick_dialer: Counter::new("Number of times the actor ticked for a dialer event"),
actor_tick_dialer_success: Counter::new(
"Number of times the actor ticked for a successful dialer event",
),
actor_tick_dialer_failure: Counter::new(
"Number of times the actor ticked for a failed dialer event",
),
actor_tick_in_event_rx: Counter::new(
"Number of times the actor ticked for an incoming event",
),
actor_tick_timers: Counter::new("Number of times the actor ticked for a timer event"),
}
}
}
Expand Down
Loading

0 comments on commit bdc4961

Please sign in to comment.