From 1f9cffefa30daef7f5418649eafafa0de0eea730 Mon Sep 17 00:00:00 2001 From: Hunter Beast Date: Tue, 12 Dec 2023 16:14:03 -0700 Subject: [PATCH] Live metrics (#435) * Switch to in-memory metrics. * Metrics fixes. * Also write metrics to disk as a backup. --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/bin/bitmaskd.rs | 55 +++---- src/carbonado.rs | 6 +- src/carbonado/error.rs | 2 + src/carbonado/metrics.rs | 326 +++++++++++++++++++++++++++------------ 6 files changed, 255 insertions(+), 138 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index db393a64..4da41365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -659,7 +659,7 @@ dependencies = [ [[package]] name = "bitmask-core" -version = "0.7.0-beta.9" +version = "0.7.0-beta.10" dependencies = [ "amplify", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index a82c1db1..6d0905c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bitmask-core" -version = "0.7.0-beta.9" +version = "0.7.0-beta.10" authors = [ "Jose Diego Robles ", "Hunter Trujillo ", diff --git a/src/bin/bitmaskd.rs b/src/bin/bitmaskd.rs index 75b8af78..658ce315 100644 --- a/src/bin/bitmaskd.rs +++ b/src/bin/bitmaskd.rs @@ -6,6 +6,7 @@ use std::{ fs::OpenOptions, io::ErrorKind, net::SocketAddr, + path, str::FromStr, time::{Duration, Instant}, }; @@ -24,11 +25,7 @@ use axum::{ use bitcoin_30::secp256k1::{ecdh::SharedSecret, PublicKey, SecretKey}; use bitmask_core::{ bitcoin::{save_mnemonic, sign_and_publish_psbt_file}, - carbonado::{ - handle_file, - metrics::{metrics, metrics_csv}, - server_retrieve, server_store, store, - }, + carbonado::{handle_file, metrics, server_retrieve, server_store, store}, constants::{ get_marketplace_nostr_key, get_marketplace_seed, get_network, get_udas_utxo, switch_network, }, @@ -432,7 +429,13 @@ async fn co_store( Path((pk, name)): Path<(String, String)>, body: Bytes, ) -> Result { + let cc = CacheControl::new().with_no_cache(); + let incoming_header = carbonado::file::Header::try_from(&body)?; + if incoming_header.pubkey.to_string() != pk { + return Ok((StatusCode::UNAUTHORIZED, TypedHeader(cc), "Unauthorized")); + } + let body_len = incoming_header.encoded_len - incoming_header.padding_len; info!("POST /carbonado/{pk}/{name}, {body_len} bytes"); @@ -471,9 +474,9 @@ async fn co_store( }, } - let cc = CacheControl::new().with_no_cache(); + metrics::update(&filepath).await?; - Ok((StatusCode::OK, TypedHeader(cc))) + Ok((StatusCode::OK, TypedHeader(cc), "Success")) } async fn co_force_store( @@ -721,40 +724,29 @@ async fn send_coins( } async fn json_metrics() -> Result { - use bitmask_core::carbonado::metrics::metrics; - let path = std::env::var("CARBONADO_DIR").unwrap_or("/tmp/bitmaskd/carbonado".to_owned()); - let contents = fs::read_to_string(&format!("{path}/metrics.json")).await?; + let metrics_json = metrics::json().await?; Ok(( StatusCode::OK, [("content-type", "application/json")], - contents, + metrics_json, )) } async fn csv_metrics() -> Result { - let path = std::env::var("CARBONADO_DIR").unwrap_or("/tmp/bitmaskd/carbonado".to_owned()); - let contents = fs::read_to_string(&format!("{path}/metrics.csv")).await?; + let metrics_csv = metrics::csv().await; - Ok(( - StatusCode::OK, - [("content-type", "application/json")], - contents, - )) + Ok((StatusCode::OK, [("content-type", "text/csv")], metrics_csv)) } -async fn periodic_metrics() -> Result<()> { - let path = std::env::var("CARBONADO_DIR").unwrap_or("/tmp/bitmaskd/carbonado".to_owned()); - let dir = std::path::Path::new(&path); - fs::create_dir_all(dir).await?; +async fn init_metrics() -> Result<()> { + let path = env::var("CARBONADO_DIR").unwrap_or("/tmp/bitmaskd/carbonado".to_owned()); + let dir = path::Path::new(&path); info!("Starting metrics collection..."); - let duration = Instant::now(); - let metrics = metrics(dir).await?; - let metrics_json = serde_json::to_string_pretty(&metrics)?; - let metrics_csv = metrics_csv(metrics); + metrics::init(dir).await?; let duration = Instant::now() - duration; @@ -763,11 +755,6 @@ async fn periodic_metrics() -> Result<()> { duration.as_secs_f32() ); - fs::write(&format!("{path}/metrics.json"), &metrics_json).await?; - fs::write(&format!("{path}/metrics.csv"), &metrics_csv).await?; - - sleep(Duration::from_secs(4 * 60 * 60)).await; - Ok(()) } @@ -834,10 +821,8 @@ async fn main() -> Result<()> { .route("/regtest/send/:address/:amount", get(send_coins)); } else { tokio::spawn(async { - loop { - if let Err(e) = periodic_metrics().await { - error!("Error in periodic metrics: {e}"); - } + if let Err(e) = init_metrics().await { + error!("Error in periodic metrics: {e}"); } }); } diff --git a/src/carbonado.rs b/src/carbonado.rs index d2f02444..3ef3a43e 100644 --- a/src/carbonado.rs +++ b/src/carbonado.rs @@ -47,7 +47,8 @@ mod server { let (body, _encode_info) = carbonado::file::encode(&sk, Some(&pk), input, level, meta)?; let filepath = handle_file(&pk_hex, name, body.len()).await?; - fs::write(filepath, body).await?; + fs::write(&filepath, body).await?; + metrics::update(&filepath).await?; Ok(()) } @@ -74,7 +75,8 @@ mod server { let (body, _encode_info) = carbonado::file::encode(&sk, Some(&pk), input, level, meta)?; let filepath = handle_file(&pk_hex, name, body.len()).await?; - fs::write(filepath.clone(), body.clone()).await?; + fs::write(&filepath, body.clone()).await?; + metrics::update(&filepath).await?; Ok((filepath, body)) } diff --git a/src/carbonado/error.rs b/src/carbonado/error.rs index aa3bb9a6..751170c7 100644 --- a/src/carbonado/error.rs +++ b/src/carbonado/error.rs @@ -29,4 +29,6 @@ pub enum CarbonadoError { WrongNostrPrivateKey, /// Debug: {0} Debug(String), + /// Error: {0} + AnyhowError(#[from] anyhow::Error), } diff --git a/src/carbonado/metrics.rs b/src/carbonado/metrics.rs index 8301015c..aa813db0 100644 --- a/src/carbonado/metrics.rs +++ b/src/carbonado/metrics.rs @@ -1,18 +1,22 @@ #![cfg(not(target_arch = "wasm32"))] use std::{ - collections::BTreeMap, - path::Path, - time::{Duration as StdDuration, SystemTime}, + collections::{BTreeMap, BTreeSet}, + env, + path::{Path, PathBuf}, + sync::Arc, + time::SystemTime, }; -use anyhow::Result; +use anyhow::{anyhow, Result}; use chrono::{DateTime, Duration, NaiveDate, Utc}; +use log::debug; +use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use tokio::time::sleep; +use tokio::{fs, sync::RwLock}; use walkdir::WalkDir; #[derive(Serialize, Deserialize, Default)] -pub struct MetricsResponse { +pub struct MetricsData { bytes: u64, bytes_by_day: BTreeMap, bitcoin_wallets_by_day: BTreeMap, @@ -43,26 +47,30 @@ const NETWORK_TOTAL: &str = "total"; const NETWORK_RGB_STOCKS: &str = "rgb_stocks"; const NETWORK_RGB_TRANSFER_FILES: &str = "rgb_transfer_files"; -pub async fn metrics(dir: &Path) -> Result { - let mut response = MetricsResponse::default(); +static METRICS_DATA: Lazy>> = Lazy::new(Default::default); +static METRICS_SET: Lazy>>> = Lazy::new(Default::default); - response +pub async fn init(dir: &Path) -> Result<()> { + let mut metrics = METRICS_DATA.write().await; + let mut dataset = METRICS_SET.write().await; + + metrics .wallets_by_network .insert(NETWORK_BITCOIN.to_string(), 0); - response + metrics .wallets_by_network .insert(NETWORK_TESTNET.to_string(), 0); - response + metrics .wallets_by_network .insert(NETWORK_SIGNET.to_string(), 0); - response + metrics .wallets_by_network .insert(NETWORK_REGTEST.to_string(), 0); - response.wallets_by_network.insert("total".to_string(), 0); - response + metrics.wallets_by_network.insert("total".to_string(), 0); + metrics .wallets_by_network .insert(NETWORK_RGB_STOCKS.to_string(), 0); - response + metrics .wallets_by_network .insert(NETWORK_RGB_TRANSFER_FILES.to_string(), 0); @@ -77,86 +85,86 @@ pub async fn metrics(dir: &Path) -> Result { let day_created = metadata.created()?; let day = round_datetime_to_day(day_created.into()); - if metadata.is_file() { - response.bytes += metadata.len(); - - *response.bytes_by_day.entry(day.clone()).or_insert(0) += metadata.len(); - - if filename == MAINNET_WALLET { - *response - .wallets_by_network - .get_mut(NETWORK_BITCOIN) - .unwrap_or(&mut 0) += 1; - *response - .bitcoin_wallets_by_day - .entry(day.clone()) - .or_insert(0) += 1; - } - - if filename == TESTNET_WALLET { - *response - .wallets_by_network - .get_mut(NETWORK_TESTNET) - .unwrap_or(&mut 0) += 1; - *response - .testnet_wallets_by_day - .entry(day.clone()) - .or_insert(0) += 1; - } - - if filename == SIGNET_WALLET { - *response - .wallets_by_network - .get_mut(NETWORK_SIGNET) - .unwrap_or(&mut 0) += 1; - *response - .signet_wallets_by_day - .entry(day.clone()) - .or_insert(0) += 1; - } + dataset.insert(entry.path().to_path_buf()); - if filename == REGTEST_WALLET { - *response - .wallets_by_network - .get_mut(NETWORK_REGTEST) - .unwrap_or(&mut 0) += 1; - *response - .regtest_wallets_by_day - .entry(day.clone()) - .or_insert(0) += 1; - } - - if filename == MAINNET_WALLET - || filename == TESTNET_WALLET - || filename == SIGNET_WALLET - || filename == REGTEST_WALLET - { - total_wallets += 1; - } - - if filename == RGB_STOCK { - rgb_stocks += 1; - } - - if filename == RGB_TRANSFER_FILE { - rgb_transfer_files += 1; + if metadata.is_file() { + metrics.bytes += metadata.len(); + + *metrics.bytes_by_day.entry(day.clone()).or_insert(0) += metadata.len(); + + match filename.as_str() { + MAINNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_BITCOIN) + .unwrap_or(&mut 0) += 1; + *metrics + .bitcoin_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + total_wallets += 1; + } + + TESTNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_TESTNET) + .unwrap_or(&mut 0) += 1; + *metrics + .testnet_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + total_wallets += 1; + } + + SIGNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_SIGNET) + .unwrap_or(&mut 0) += 1; + *metrics + .signet_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + total_wallets += 1; + } + + REGTEST_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_REGTEST) + .unwrap_or(&mut 0) += 1; + *metrics + .regtest_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + total_wallets += 1; + } + + RGB_STOCK => { + rgb_stocks += 1; + } + + RGB_TRANSFER_FILE => { + rgb_transfer_files += 1; + } + + _ => {} } } - - sleep(StdDuration::from_millis(10)).await; } - *response + *metrics .wallets_by_network .get_mut(NETWORK_TOTAL) .unwrap_or(&mut 0) = total_wallets; - *response + *metrics .wallets_by_network .get_mut(NETWORK_RGB_STOCKS) .unwrap_or(&mut 0) = rgb_stocks; - *response + *metrics .wallets_by_network .get_mut(NETWORK_RGB_TRANSFER_FILES) .unwrap_or(&mut 0) = rgb_transfer_files; @@ -180,70 +188,70 @@ pub async fn metrics(dir: &Path) -> Result { let day = round_datetime_to_day(start_day); let bytes_day_prior = { - response + metrics .bytes_by_day .get(&day_prior) .unwrap_or(&0) .to_owned() }; - response + metrics .bytes_by_day .entry(day.clone()) .and_modify(|b| *b += bytes_day_prior) .or_insert(bytes_day_prior); let bitcoin_wallets_day_prior = { - response + metrics .bitcoin_wallets_by_day .get(&day_prior) .unwrap_or(&0) .to_owned() }; - response + metrics .bitcoin_wallets_by_day .entry(day.clone()) .and_modify(|w| *w += bitcoin_wallets_day_prior) .or_insert(bitcoin_wallets_day_prior); let testnet_wallets_day_prior = { - response + metrics .testnet_wallets_by_day .get(&day_prior) .unwrap_or(&0) .to_owned() }; - response + metrics .testnet_wallets_by_day .entry(day.clone()) .and_modify(|w| *w += testnet_wallets_day_prior) .or_insert(testnet_wallets_day_prior); let signet_wallets_day_prior = { - response + metrics .signet_wallets_by_day .get(&day_prior) .unwrap_or(&0) .to_owned() }; - response + metrics .signet_wallets_by_day .entry(day.clone()) .and_modify(|w| *w += signet_wallets_day_prior) .or_insert(signet_wallets_day_prior); let regtest_wallets_day_prior = { - response + metrics .regtest_wallets_by_day .get(&day_prior) .unwrap_or(&0) .to_owned() }; - response + metrics .regtest_wallets_by_day .entry(day.clone()) .and_modify(|w| *w += regtest_wallets_day_prior) @@ -256,10 +264,122 @@ pub async fn metrics(dir: &Path) -> Result { } } - Ok(response) + Ok(()) } -pub fn metrics_csv(metrics: MetricsResponse) -> String { +pub async fn update(path: &Path) -> Result<()> { + debug!("Updating metrics with {path:?}"); + + let mut metrics = METRICS_DATA.write().await; + let mut dataset = METRICS_SET.write().await; + + if dataset.get(path).is_some() { + debug!("Path already present"); + return Ok(()); + } else { + dataset.insert(path.to_path_buf()); + } + + let filename = path + .file_name() + .ok_or(anyhow!("no filename for path"))? + .to_string_lossy() + .to_string(); + let metadata = path.metadata()?; + let day_created = metadata.created()?; + let day = round_datetime_to_day(day_created.into()); + + if metadata.is_file() { + metrics.bytes += metadata.len(); + + *metrics.bytes_by_day.entry(day.clone()).or_insert(0) += metadata.len(); + + match filename.as_str() { + MAINNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_BITCOIN) + .unwrap_or(&mut 0) += 1; + *metrics + .bitcoin_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + *metrics + .wallets_by_network + .get_mut(NETWORK_TOTAL) + .unwrap_or(&mut 0) += 1; + } + TESTNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_TESTNET) + .unwrap_or(&mut 0) += 1; + *metrics + .testnet_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + *metrics + .wallets_by_network + .get_mut(NETWORK_TOTAL) + .unwrap_or(&mut 0) += 1; + } + SIGNET_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_SIGNET) + .unwrap_or(&mut 0) += 1; + *metrics + .signet_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + *metrics + .wallets_by_network + .get_mut(NETWORK_TOTAL) + .unwrap_or(&mut 0) += 1; + } + REGTEST_WALLET => { + *metrics + .wallets_by_network + .get_mut(NETWORK_REGTEST) + .unwrap_or(&mut 0) += 1; + *metrics + .regtest_wallets_by_day + .entry(day.clone()) + .or_insert(0) += 1; + *metrics + .wallets_by_network + .get_mut(NETWORK_TOTAL) + .unwrap_or(&mut 0) += 1; + } + + RGB_STOCK => { + *metrics + .wallets_by_network + .get_mut(NETWORK_RGB_STOCKS) + .unwrap_or(&mut 0) += 1; + } + + RGB_TRANSFER_FILE => { + *metrics + .wallets_by_network + .get_mut(NETWORK_RGB_TRANSFER_FILES) + .unwrap_or(&mut 0) += 1; + } + + _ => {} + } + } + + let dir = env::var("CARBONADO_DIR").unwrap_or("/tmp/bitmaskd/carbonado".to_owned()); + + // Write metrics to disk as a backup + fs::write(&format!("{dir}/metrics.csv"), &csv().await).await?; + fs::write(&format!("{dir}/metrics.json"), &json().await?).await?; + + Ok(()) +} + +pub async fn csv() -> String { let mut lines = vec![vec![ "Wallet".to_owned(), "Wallet Count".to_owned(), @@ -272,7 +392,9 @@ pub fn metrics_csv(metrics: MetricsResponse) -> String { "Bytes by Day".to_owned(), ]]; - for (day, bitcoin_wallets) in metrics.bitcoin_wallets_by_day { + let metrics = METRICS_DATA.read().await; + + for (day, bitcoin_wallets) in metrics.bitcoin_wallets_by_day.iter() { let mut line = vec![]; if lines.len() == 1 { @@ -370,25 +492,25 @@ pub fn metrics_csv(metrics: MetricsResponse) -> String { line.push( metrics .testnet_wallets_by_day - .get(&day) + .get(day) .unwrap_or(&0) .to_string(), ); line.push( metrics .signet_wallets_by_day - .get(&day) + .get(day) .unwrap_or(&0) .to_string(), ); line.push( metrics .regtest_wallets_by_day - .get(&day) + .get(day) .unwrap_or(&0) .to_string(), ); - line.push(metrics.bytes_by_day.get(&day).unwrap_or(&0).to_string()); + line.push(metrics.bytes_by_day.get(day).unwrap_or(&0).to_string()); lines.push(line); } @@ -397,6 +519,12 @@ pub fn metrics_csv(metrics: MetricsResponse) -> String { lines.join("\n") } +pub async fn json() -> Result { + let metrics = METRICS_DATA.read().await; + + Ok(serde_json::to_string_pretty(&*metrics)?) +} + fn round_datetime_to_day(datetime: DateTime) -> String { let rounded = datetime .date_naive()