Skip to content

Commit

Permalink
Fix metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 4, 2024
1 parent f1420ef commit 5126def
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 89 deletions.
4 changes: 2 additions & 2 deletions src/account_info.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics::get_metrics;
use crate::metrics;
use crate::{
relay_subscriber::GetEventsOf,
repo::{Repo, RepoTrait},
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn verified_friendly_id(

if let Some(nip05_value) = metadata.nip05 {
if nip05_verifier.verify_nip05(public_key, &nip05_value).await {
get_metrics().verified_nip05.increment(1);
metrics::verified_nip05().increment(1);
return FriendlyId::Nip05(nip05_value);
}
return name_or_npub_or_pubkey;
Expand Down
6 changes: 3 additions & 3 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::account_info::FriendlyId;
use crate::metrics::get_metrics;
use crate::metrics;
use chrono::{DateTime, Utc};
use nostr_sdk::prelude::*;
use std::fmt;
Expand Down Expand Up @@ -33,7 +33,7 @@ impl Eq for FollowChange {}

impl FollowChange {
pub fn new_followed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
get_metrics().follows.increment(1);
metrics::follows().increment(1);

Self {
change_type: ChangeType::Followed,
Expand All @@ -46,7 +46,7 @@ impl FollowChange {
}

pub fn new_unfollowed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
get_metrics().unfollows.increment(1);
metrics::unfollows().increment(1);

Self {
change_type: ChangeType::Unfollowed,
Expand Down
4 changes: 2 additions & 2 deletions src/domain/follows_differ.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::account_info::{fetch_account_info, AccountInfo};
use crate::metrics::get_metrics;
use crate::metrics;
use crate::relay_subscriber::GetEventsOf;
use crate::repo::RepoTrait;
use crate::{
Expand Down Expand Up @@ -235,7 +235,7 @@ where
info!("{}", log_line);
}

get_metrics().contact_lists_processed.increment(1);
metrics::contact_lists_processed().increment(1);
Ok(())
}
}
Expand Down
23 changes: 6 additions & 17 deletions src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::followee_notification_factory::CollectedFollowChange;
use super::{NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use crate::domain::{FollowChange, FolloweeNotificationFactory};
use crate::metrics::get_metrics;
use crate::metrics;
use anyhow::Result;
use governor::clock::Clock;
use governor::clock::DefaultClock;
Expand Down Expand Up @@ -134,7 +134,6 @@ impl<T: Clock> NotificationFactory<T> {
}

fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usize) {
let metrics = get_metrics();
let mut individual_follow_changes = 0;
let mut aggregated_follow_changes = 0;

Expand All @@ -147,23 +146,13 @@ fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usi
aggregated_follow_changes += 1;
}

metrics
.followers_per_message
.record(message.follows().len() as f64);
metrics
.unfollowers_per_message
.record(message.unfollows().len() as f64);
metrics::followers_per_message().record(message.follows().len() as f64);
metrics::unfollowers_per_message().record(message.unfollows().len() as f64);
}

metrics
.individual_follow_messages
.increment(individual_follow_changes as u64);
metrics
.aggregated_follow_messages
.increment(aggregated_follow_changes as u64);
metrics
.retained_follow_changes
.set(retained_follow_changes as f64);
metrics::individual_follow_messages().increment(individual_follow_changes as u64);
metrics::aggregated_follow_messages().increment(aggregated_follow_changes as u64);
metrics::retained_follow_changes().set(retained_follow_changes as f64);
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions src/google_pubsub_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::domain::NotificationMessage;
use crate::metrics::get_metrics;
use crate::metrics;
use futures::Future;
use gcloud_sdk::{
google::pubsub::v1::{publisher_client::PublisherClient, PublishRequest, PubsubMessage},
Expand Down Expand Up @@ -116,7 +116,7 @@ impl PublishEvents for GooglePubSubClient {
len, self.google_full_topic
);

get_metrics().pubsub_messages.increment(len as u64);
metrics::pubsub_messages().increment(len as u64);

Ok(())
}
Expand Down
12 changes: 7 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rustls::crypto::ring;
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio_util::sync::CancellationToken;
use tracing::info;
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use worker_pool::WorkerPool;

Expand Down Expand Up @@ -113,13 +113,15 @@ async fn main() -> Result<()> {
let http_server = HttpServer::run(cancellation_token.clone());

tokio::select! {
_ = nostr_sub => info!("Nostr subscription ended"),
_ = http_server => info!("HTTP server ended"),
result = nostr_sub => if let Err(e) = result {
error!("Nostr subscription encountered an error: {:?}", e);
},
result = http_server => if let Err(e) = result {
error!("HTTP server encountered an error: {:?}", e);
},
_ = cancellation_token.cancelled() => info!("Cancellation token cancelled"),
}

info!("Finished Nostr subscription");

follow_change_handle.wait().await;
info!("Finished follow change worker pool");

Expand Down
117 changes: 66 additions & 51 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,74 @@
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::sync::OnceLock;

pub struct Metrics {
pub pubsub_messages: Counter,
pub contact_lists_processed: Counter,
pub follows: Counter,
pub unfollows: Counter,
pub worker_lagged: Counter,
pub worker_closed: Counter,
pub verified_nip05: Counter,
pub individual_follow_messages: Counter,
pub aggregated_follow_messages: Counter,
pub followers_per_message: Histogram,
pub unfollowers_per_message: Histogram,
pub retained_follow_changes: Gauge,

// PubSub messages counter
pub fn pubsub_messages() -> Counter {
metrics::counter!("pubsub_messages")
}

// Contact lists processed counter
pub fn contact_lists_processed() -> Counter {
metrics::counter!("contact_lists_processed")
}

// Follows counter
pub fn follows() -> Counter {
metrics::counter!("follows")
}

// Unfollows counter
pub fn unfollows() -> Counter {
metrics::counter!("unfollows")
}

// Worker lagged counter
pub fn worker_lagged() -> Counter {
metrics::counter!("worker_lagged")
}

// Worker closed counter
pub fn worker_closed() -> Counter {
metrics::counter!("worker_closed")
}

// Verified NIP05 counter
pub fn verified_nip05() -> Counter {
metrics::counter!("verified_nip05")
}

// Global OnceLock to hold the Metrics instance
static METRICS: OnceLock<Metrics> = OnceLock::new();

impl Metrics {
fn new() -> Self {
Self {
pubsub_messages: metrics::counter!("pubsub_messages"),
contact_lists_processed: metrics::counter!("contact_lists_processed"),
follows: metrics::counter!("follows"),
unfollows: metrics::counter!("unfollows"),
worker_lagged: metrics::counter!("worker_lagged"),
worker_closed: metrics::counter!("worker_closed"),
verified_nip05: metrics::counter!("verified_nip05"),
individual_follow_messages: metrics::counter!("individual_follow_messages"),
aggregated_follow_messages: metrics::counter!("aggregated_follow_messages"),
followers_per_message: metrics::histogram!("followers_per_message"),
unfollowers_per_message: metrics::histogram!("unfollowers_per_message"),
retained_follow_changes: metrics::gauge!("retained_follow_changes"),
}
}

pub fn worker_failures(&self, name: String, id: usize) -> Counter {
metrics::counter!("worker_failures", "name" => name, "id" => id.to_string())
}

pub fn worker_timeouts(&self, name: String, id: usize) -> Counter {
metrics::counter!("worker_timeouts", "name" => name, "id" => id.to_string())
}
// Individual follow messages counter
pub fn individual_follow_messages() -> Counter {
metrics::counter!("individual_follow_messages")
}

// Function to access the initialized metrics
pub fn get_metrics() -> &'static Metrics {
METRICS.get_or_init(Metrics::new)
// Aggregated follow messages counter
pub fn aggregated_follow_messages() -> Counter {
metrics::counter!("aggregated_follow_messages")
}

// Worker failures counter (with labels)
pub fn worker_failures(name: String, id: usize) -> Counter {
metrics::counter!("worker_failures", "name" => name, "id" => id.to_string())
}

// Worker timeouts counter (with labels)
pub fn worker_timeouts(name: String, id: usize) -> Counter {
metrics::counter!("worker_timeouts", "name" => name, "id" => id.to_string())
}

// Followers per message histogram
pub fn followers_per_message() -> Histogram {
metrics::histogram!("followers_per_message")
}

// Unfollowers per message histogram
pub fn unfollowers_per_message() -> Histogram {
metrics::histogram!("unfollowers_per_message")
}

// Retained follow changes gauge
pub fn retained_follow_changes() -> Gauge {
metrics::gauge!("retained_follow_changes")
}

// Setup metrics with descriptions
Expand All @@ -64,18 +83,14 @@ pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
);
describe_counter!("follows", "Number of follows");
describe_counter!("unfollows", "Number of unfollows");
describe_counter!(
"worker_lagged",
"Number of times a worker lagged behind and missed messages, consider increasing worker pool size or channel buffer size"
);
describe_counter!("worker_lagged", "Number of times a worker lagged behind");
describe_counter!("worker_closed", "Number of times a worker channel closed");
describe_counter!(
"worker_failures",
"Number of times a worker failed to process an item"
);
describe_counter!("worker_timeouts", "Number of times a worker timed out");
describe_counter!("verified_nip05", "Number of verified NIP05 ids fetched");

describe_counter!(
"individual_follow_messages",
"Total number of individual follow messages sent"
Expand Down
12 changes: 5 additions & 7 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::metrics::get_metrics;
use crate::metrics;
use futures::Future;
use std::error::Error;
use std::fmt::Debug;
Expand Down Expand Up @@ -46,7 +46,6 @@ impl WorkerPool {
let name_clone = name.to_string();
let worker_name = format!("{}-{}", name, i);
tracker.spawn(async move {
let metrics = get_metrics();
loop {
tokio::select! {
_ = token_clone.cancelled() => {
Expand All @@ -63,11 +62,11 @@ impl WorkerPool {
trace!("{}: Worker task finished successfully processing item", worker_name);
},
Ok(Err(e)) => {
metrics.worker_failures(name_clone.to_string(), i).increment(1);
metrics::worker_failures(name_clone.to_string(), i).increment(1);
error!("{}: Worker failed: {}", worker_name, e);
},
Err(_) => {
metrics.worker_timeouts(name_clone.to_string(), i).increment(1);
metrics::worker_timeouts(name_clone.to_string(), i).increment(1);
error!("{}: Worker task timed out after {} seconds", worker_name, worker_timeout_secs);
}
}
Expand All @@ -87,7 +86,6 @@ impl WorkerPool {
tracker.spawn(async move {
// Simple cycle iterator to distribute work to workers in a round-robin fashion.
let mut worker_txs_cycle = worker_txs.iter().cycle();
let metrics = get_metrics();

loop {
tokio::select! {
Expand All @@ -113,11 +111,11 @@ impl WorkerPool {
}
}
Err(RecvError::Lagged(n)) => {
metrics.worker_lagged.increment(1);
metrics::worker_lagged().increment(1);
warn!("{}: Receiver lagged and missed {} messages", name_clone, n);
}
Err(RecvError::Closed) => {
metrics.worker_closed.increment(1);
metrics::worker_closed().increment(1);
error!("{}: Item receiver channel closed", name_clone);
break;
}
Expand Down

0 comments on commit 5126def

Please sign in to comment.