Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor!: metrics #2464

Merged
merged 10 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 133 additions & 124 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ hashlink = { version = "0.9.0", optional = true }
hex = "0.4.3"
iroh-base = { version = "0.20.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.20.0", path = "../iroh-metrics", optional = true }
iroh-metrics = { version = "0.20.0", path = "../iroh-metrics", default-features = false }
iroh-net = { version = "0.20.0", path = "../iroh-net" }
num_cpus = "1.15.0"
parking_lot = { version = "0.12.1", optional = true }
Expand Down Expand Up @@ -77,7 +77,7 @@ version = "0.10"
default = ["fs-store"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["dep:iroh-metrics"]
metrics = ["iroh-metrics/metrics"]
redb = ["dep:redb"]

[package.metadata.docs.rs]
Expand Down
9 changes: 9 additions & 0 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::{
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_metrics::inc;
use iroh_net::{endpoint, Endpoint, NodeAddr, NodeId};
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -50,6 +51,7 @@ use tracing::{debug, error_span, trace, warn, Instrument};

use crate::{
get::{db::DownloadProgress, Stats},
metrics::Metrics,
store::Store,
util::{local_pool::LocalPoolHandle, progress::ProgressSender},
};
Expand Down Expand Up @@ -566,13 +568,16 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
async fn run(mut self) {
loop {
trace!("wait for tick");
inc!(Metrics, downloader_tick_main);
tokio::select! {
Some((node, conn_result)) = self.dialer.next() => {
trace!(node=%node.fmt_short(), "tick: connection ready");
inc!(Metrics, downloader_tick_connection_ready);
self.on_connection_ready(node, conn_result);
}
maybe_msg = self.msg_rx.recv() => {
trace!(msg=?maybe_msg, "tick: message received");
inc!(Metrics, downloader_tick_message_received);
match maybe_msg {
Some(msg) => self.handle_message(msg).await,
None => return self.shutdown().await,
Expand All @@ -582,21 +587,25 @@ impl<G: Getter<Connection = D::Connection>, D: Dialer> Service<G, D> {
match res {
Ok((kind, result)) => {
trace!(%kind, "tick: transfer completed");
inc!(Metrics, downloader_tick_transfer_completed);
self.on_download_completed(kind, result);
}
Err(err) => {
warn!(?err, "transfer task panicked");
inc!(Metrics, downloader_tick_transfer_failed);
}
}
}
Some(expired) = self.retry_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: retry node");
inc!(Metrics, downloader_tick_retry_node);
self.on_retry_wait_elapsed(node);
}
Some(expired) = self.goodbye_nodes_queue.next() => {
let node = expired.into_inner();
trace!(node=%node.fmt_short(), "tick: goodbye node");
inc!(Metrics, downloader_tick_goodbye_node);
self.disconnect_idle_node(node, "idle expired");
}
}
Expand Down
1 change: 0 additions & 1 deletion iroh-blobs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub mod export;
pub mod format;
pub mod get;
pub mod hashseq;
#[cfg(feature = "metrics")]
pub mod metrics;
pub mod protocol;
pub mod provider;
Expand Down
30 changes: 30 additions & 0 deletions iroh-blobs/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pub struct Metrics {
pub downloads_success: Counter,
pub downloads_error: Counter,
pub downloads_notfound: Counter,

pub downloader_tick_main: Counter,
pub downloader_tick_connection_ready: Counter,
pub downloader_tick_message_received: Counter,
pub downloader_tick_transfer_completed: Counter,
pub downloader_tick_transfer_failed: Counter,
pub downloader_tick_retry_node: Counter,
pub downloader_tick_goodbye_node: Counter,
}

impl Default for Metrics {
Expand All @@ -24,6 +32,28 @@ impl Default for Metrics {
downloads_success: Counter::new("Total number of successful downloads"),
downloads_error: Counter::new("Total number of downloads failed with error"),
downloads_notfound: Counter::new("Total number of downloads failed with not found"),

downloader_tick_main: Counter::new(
"Number of times the main downloader actor loop ticked",
),
downloader_tick_connection_ready: Counter::new(
"Number of times the downloader actor ticked for a connection ready",
),
downloader_tick_message_received: Counter::new(
"Number of times the downloader actor ticked for a message received",
),
downloader_tick_transfer_completed: Counter::new(
"Number of times the downloader actor ticked for a transfer completed",
),
downloader_tick_transfer_failed: Counter::new(
"Number of times the downloader actor ticked for a transfer failed",
),
downloader_tick_retry_node: Counter::new(
"Number of times the downloader actor ticked for a retry node",
),
downloader_tick_goodbye_node: Counter::new(
"Number of times the downloader actor ticked for a goodbye node",
),
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ pub(crate) struct Cli {
/// Address to serve RPC on.
#[clap(long)]
pub(crate) rpc_addr: Option<SocketAddr>,

/// If set, metrics will be dumped in CSV format to the specified path at regular intervals (100ms).
#[clap(long)]
pub(crate) metrics_dump_path: Option<PathBuf>,
}

#[derive(Debug, Clone)]
Expand Down
141 changes: 107 additions & 34 deletions iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crossterm::{
};
use rand::Rng;
use ratatui::{prelude::*, widgets::*};
use tracing::warn;

#[derive(Debug, Clone, derive_more::Display)]
pub enum SecretKeyOption {
Expand Down Expand Up @@ -232,6 +233,9 @@ pub enum Commands {
/// Endpoint to scrape for prometheus metrics
#[clap(long, default_value = "http://localhost:9090")]
scrape_url: String,
/// File to read the metrics from. Takes precedence over scrape_url.
#[clap(long)]
file: Option<PathBuf>,
},
}

Expand Down Expand Up @@ -1172,6 +1176,7 @@ pub async fn run(command: Commands, config: &NodeConfig) -> anyhow::Result<()> {
metrics,
timeframe,
scrape_url,
file,
} => {
let metrics: Vec<String> = metrics.split(',').map(|s| s.to_string()).collect();
let interval = Duration::from_millis(interval);
Expand All @@ -1182,7 +1187,7 @@ pub async fn run(command: Commands, config: &NodeConfig) -> anyhow::Result<()> {
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;

let app = PlotterApp::new(metrics, timeframe, scrape_url);
let app = PlotterApp::new(metrics, timeframe, scrape_url, file);
let res = run_plotter(&mut terminal, app, interval).await;
disable_raw_mode()?;
execute!(
Expand Down Expand Up @@ -1214,7 +1219,7 @@ async fn run_plotter<B: Backend>(
loop {
terminal.draw(|f| plotter_draw(f, &mut app))?;

if crossterm::event::poll(Duration::from_millis(100))? {
if crossterm::event::poll(Duration::from_millis(10))? {
if let Event::Key(key) = event::read()? {
if key.kind == KeyEventKind::Press {
if let KeyCode::Char(c) = key.code {
Expand Down Expand Up @@ -1283,8 +1288,16 @@ fn plot_chart(frame: &mut Frame, area: Rect, app: &PlotterApp, metric: &str) {
let y_start = data_y_range.0;
let y_end = data_y_range.1;

let last_val = data.last();
let name = match last_val {
Some(val) => {
let val_y = val.1;
format!("{metric}: {val_y:.0}")
}
None => metric.to_string(),
};
let datasets = vec![Dataset::default()
.name(metric)
.name(name)
.marker(symbols::Marker::Dot)
.graph_type(GraphType::Line)
.style(Style::default().fg(Color::Cyan))
Expand All @@ -1304,19 +1317,19 @@ fn plot_chart(frame: &mut Frame, area: Rect, app: &PlotterApp, metric: &str) {
];

let mut y_labels = vec![Span::styled(
format!("{:.1}", y_start),
format!("{:.0}", y_start),
Style::default().add_modifier(Modifier::BOLD),
)];

for i in 1..=10 {
y_labels.push(Span::raw(format!(
"{:.1}",
"{:.0}",
y_start + (y_end - y_start) / 10.0 * i as f64
)));
}

y_labels.push(Span::styled(
format!("{:.1}", y_end),
format!("{:.0}", y_end),
Style::default().add_modifier(Modifier::BOLD),
));

Expand Down Expand Up @@ -1355,23 +1368,64 @@ struct PlotterApp {
freeze: bool,
internal_ts: Duration,
scrape_url: String,
file_data: Vec<String>,
file_header: Vec<String>,
}

impl PlotterApp {
fn new(metrics: Vec<String>, timeframe: usize, scrape_url: String) -> Self {
fn new(
metrics: Vec<String>,
timeframe: usize,
scrape_url: String,
file: Option<PathBuf>,
) -> Self {
let data = metrics.iter().map(|m| (m.clone(), vec![])).collect();
let data_y_range = metrics.iter().map(|m| (m.clone(), (0.0, 0.0))).collect();
let mut file_data: Vec<String> = file
.map(|f| std::fs::read_to_string(f).unwrap())
.unwrap_or_default()
.split('\n')
.map(|s| s.to_string())
.collect();
let mut file_header = vec![];
let mut timeframe = timeframe;
if !file_data.is_empty() {
file_header = file_data[0].split(',').map(|s| s.to_string()).collect();
file_data.remove(0);

while file_data.last().unwrap().is_empty() {
file_data.pop();
}

let first_line: Vec<String> = file_data[0].split(',').map(|s| s.to_string()).collect();
let last_line: Vec<String> = file_data
.last()
.unwrap()
.split(',')
.map(|s| s.to_string())
.collect();

let start_time: usize = first_line.first().unwrap().parse().unwrap();
let end_time: usize = last_line.first().unwrap().parse().unwrap();

timeframe = (end_time - start_time) / 1000;
}
timeframe = timeframe.clamp(30, 90);

file_data.reverse();
Self {
should_quit: false,
metrics,
start_ts: Instant::now(),
data,
data_y_range,
timeframe: timeframe - 25,
timeframe,
rng: rand::thread_rng(),
freeze: false,
internal_ts: Duration::default(),
scrape_url,
file_data,
file_header,
}
}

Expand All @@ -1392,16 +1446,34 @@ impl PlotterApp {
return;
}

let req = reqwest::Client::new().get(&self.scrape_url).send().await;
if req.is_err() {
return;
}
let data = req.unwrap().text().await.unwrap();
let metrics_response = parse_prometheus_metrics(&data);
if metrics_response.is_err() {
return;
}
let metrics_response = metrics_response.unwrap();
let metrics_response = match self.file_data.is_empty() {
true => {
let req = reqwest::Client::new().get(&self.scrape_url).send().await;
if req.is_err() {
return;
}
let data = req.unwrap().text().await.unwrap();
let metrics_response = iroh_metrics::parse_prometheus_metrics(&data);
if metrics_response.is_err() {
return;
}
metrics_response.unwrap()
}
false => {
if self.file_data.len() == 1 {
self.freeze = true;
return;
}
let data = self.file_data.pop().unwrap();
let r = parse_csv_metrics(&self.file_header, &data);
if let Ok(mr) = r {
mr
} else {
warn!("Failed to parse csv metrics: {:?}", r.err());
HashMap::new()
}
}
};
self.internal_ts = self.start_ts.elapsed();
for metric in &self.metrics {
let val = if metric.eq("random") {
Expand All @@ -1412,7 +1484,12 @@ impl PlotterApp {
0.0
};
let e = self.data.entry(metric.clone()).or_default();
e.push((self.internal_ts.as_secs_f64(), val));
let mut ts = self.internal_ts.as_secs_f64();
if metrics_response.contains_key("time") {
ts = *metrics_response.get("time").unwrap() / 1000.0;
}
self.internal_ts = Duration::from_secs_f64(ts);
e.push((ts, val));
let yr = self.data_y_range.get_mut(metric).unwrap();
if val * 1.1 < yr.0 {
yr.0 = val * 1.2;
Expand All @@ -1424,22 +1501,18 @@ impl PlotterApp {
}
}

fn parse_prometheus_metrics(data: &str) -> anyhow::Result<HashMap<String, f64>> {
fn parse_csv_metrics(header: &[String], data: &str) -> anyhow::Result<HashMap<String, f64>> {
let mut metrics = HashMap::new();
for line in data.lines() {
if line.starts_with('#') {
continue;
}
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 2 {
continue;
}
let metric = parts[0];
let value = parts[1].parse::<f64>();
if value.is_err() {
continue;
}
metrics.insert(metric.to_string(), value.unwrap());
let data = data.split(',').collect::<Vec<&str>>();
for (i, h) in header.iter().enumerate() {
let val = match h.as_str() {
"time" => {
let ts = data[i].parse::<u64>()?;
ts as f64
}
_ => data[i].parse::<f64>()?,
};
metrics.insert(h.clone(), val);
}
Ok(metrics)
}
Loading
Loading