Skip to content

Commit

Permalink
Metrics file
Browse files Browse the repository at this point in the history
dcadenas committed Sep 4, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
1 parent 3a61519 commit 9ba9b3a
Showing 9 changed files with 141 additions and 73 deletions.
4 changes: 2 additions & 2 deletions src/account_info.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use crate::metrics::get_metrics;
use crate::{
relay_subscriber::GetEventsOf,
repo::{Repo, RepoTrait},
};
use cached::proc_macro::cached;
use cached::TimedSizedCache;
use chrono::{DateTime, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use serde::Serialize;
use serde::Serializer;
@@ -199,7 +199,7 @@ async fn verified_friendly_id(

if let Some(nip05_value) = metadata.nip05 {
if nip05_verifier.verify_nip05(public_key, &nip05_value).await {
counter!("verified_nip05").increment(1);
get_metrics().verified_nip05.increment(1);
return FriendlyId::Nip05(nip05_value);
}
return name_or_npub_or_pubkey;
6 changes: 3 additions & 3 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::account_info::FriendlyId;
use crate::metrics::get_metrics;
use chrono::{DateTime, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use std::fmt;

@@ -33,7 +33,7 @@ impl Eq for FollowChange {}

impl FollowChange {
pub fn new_followed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
counter!("follows").increment(1);
get_metrics().follows.increment(1);

Self {
change_type: ChangeType::Followed,
@@ -46,7 +46,7 @@ impl FollowChange {
}

pub fn new_unfollowed(at: DateTime<Utc>, follower: PublicKey, followee: PublicKey) -> Self {
counter!("unfollows").increment(1);
get_metrics().unfollows.increment(1);

Self {
change_type: ChangeType::Unfollowed,
4 changes: 2 additions & 2 deletions src/domain/follows_differ.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use crate::account_info::{fetch_account_info, AccountInfo};
use crate::metrics::get_metrics;
use crate::relay_subscriber::GetEventsOf;
use crate::repo::RepoTrait;
use crate::{
domain::{contact_list_follow::ContactListFollow, follow_change::FollowChange},
worker_pool::{WorkerTask, WorkerTaskItem},
};
use chrono::{DateTime, Duration, Utc};
use metrics::counter;
use nostr_sdk::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
@@ -235,7 +235,7 @@ where
info!("{}", log_line);
}

counter!("contact_lists_processed").increment(1);
get_metrics().contact_lists_processed.increment(1);
Ok(())
}
}
24 changes: 18 additions & 6 deletions src/domain/notification_factory.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use super::followee_notification_factory::CollectedFollowChange;
use super::{NotificationMessage, MAX_FOLLOWERS_PER_BATCH};
use crate::domain::{FollowChange, FolloweeNotificationFactory};
use crate::metrics::get_metrics;
use anyhow::Result;
use governor::clock::Clock;
use governor::clock::DefaultClock;
use metrics::{counter, gauge, histogram};
use nostr_sdk::PublicKey;
use ordermap::OrderMap;
use std::time::Duration;
@@ -134,6 +134,7 @@ impl<T: Clock> NotificationFactory<T> {
}

fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usize) {
let metrics = get_metrics();
let mut individual_follow_changes = 0;
let mut aggregated_follow_changes = 0;

@@ -145,13 +146,24 @@ fn record_metrics(messages: &[NotificationMessage], retained_follow_changes: usi
} else {
aggregated_follow_changes += 1;
}
histogram!("followers_per_message").record(message.follows().len() as f64);
histogram!("unfollowers_per_message").record(message.unfollows().len() as f64);

metrics
.followers_per_message
.record(message.follows().len() as f64);
metrics
.unfollowers_per_message
.record(message.unfollows().len() as f64);
}

counter!("individual_follow_messages").increment(individual_follow_changes as u64);
counter!("aggregated_follow_messages").increment(aggregated_follow_changes as u64);
gauge!("retained_follow_changes").set(retained_follow_changes as f64);
metrics
.individual_follow_messages
.increment(individual_follow_changes as u64);
metrics
.aggregated_follow_messages
.increment(aggregated_follow_changes as u64);
metrics
.retained_follow_changes
.set(retained_follow_changes as f64);
}

#[cfg(test)]
4 changes: 2 additions & 2 deletions src/google_pubsub_client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::domain::NotificationMessage;
use crate::metrics::get_metrics;
use futures::Future;
use gcloud_sdk::{
google::pubsub::v1::{publisher_client::PublisherClient, PublishRequest, PubsubMessage},
*,
};
use metrics::counter;
use thiserror::Error;
use tracing::debug;

@@ -116,7 +116,7 @@ impl PublishEvents for GooglePubSubClient {
len, self.google_full_topic
);

counter!("pubsub_messages").increment(len as u64);
get_metrics().pubsub_messages.increment(len as u64);

Ok(())
}
54 changes: 1 addition & 53 deletions src/http_server/router.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::metrics::setup_metrics;
use anyhow::Result;
use axum::{http::HeaderMap, response::Html};
use axum::{response::IntoResponse, routing::get, Router};
use metrics::{describe_counter, describe_gauge, describe_histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::time::Duration;
use tower_http::trace::{DefaultMakeSpan, DefaultOnResponse, TraceLayer};
use tower_http::LatencyUnit;
@@ -27,57 +26,6 @@ pub fn create_router() -> Result<Router> {
.route("/metrics", get(|| async move { metrics_handle.render() })))
}

fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
describe_counter!(
"pubsub_messages",
"Number of messages published to Google Pub/Sub"
);
describe_counter!(
"contact_lists_processed",
"Number of contact lists processed"
);
describe_counter!("follows", "Number of follows");
describe_counter!("unfollows", "Number of unfollows");
describe_counter!(
"worker_lagged",
"Number of times a worker lagged behind and missed messages, consider increasing worker pool size or channel buffer size"
);
describe_counter!("worker_closed", "Number of times a worker channel closed");
describe_counter!(
"worker_failures",
"Number of times a worker failed to process an item"
);
describe_counter!("worker_timeouts", "Number of times a worker timed out");
describe_counter!("verified_nip05", "Number of verified NIP05 ids fetched");

describe_counter!(
"individual_follow_messages",
"Total number of individual follow messages sent"
);
describe_counter!(
"aggregated_follow_messages",
"Total number of aggregated follow messages sent"
);

describe_histogram!(
"followers_per_message",
"Number of followers per aggregated message"
);
describe_histogram!(
"unfollowers_per_message",
"Number of unfollowers per aggregated message"
);
describe_gauge!(
"retained_follow_changes",
"Number of retained follow changes"
);

// Prometheus setup
let prometheus_builder = PrometheusBuilder::new();
let prometheus_handle = prometheus_builder.install_recorder()?;
Ok(prometheus_handle)
}

async fn serve_root_page(_headers: HeaderMap) -> impl IntoResponse {
// TODO: Some stats or useful info about the server here?
let body = r#"
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ mod domain;
mod follow_change_handler;
mod google_pubsub_client;
mod http_server;
mod metrics;
mod migrations;
mod publisher;
mod rate_counter;
105 changes: 105 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use std::sync::OnceLock;

pub struct Metrics {
pub pubsub_messages: Counter,
pub contact_lists_processed: Counter,
pub follows: Counter,
pub unfollows: Counter,
pub worker_lagged: Counter,
pub worker_closed: Counter,
pub verified_nip05: Counter,
pub individual_follow_messages: Counter,
pub aggregated_follow_messages: Counter,
pub followers_per_message: Histogram,
pub unfollowers_per_message: Histogram,
pub retained_follow_changes: Gauge,
}

// Global OnceLock to hold the Metrics instance
static METRICS: OnceLock<Metrics> = OnceLock::new();

impl Metrics {
fn new() -> Self {
Self {
pubsub_messages: metrics::counter!("pubsub_messages"),
contact_lists_processed: metrics::counter!("contact_lists_processed"),
follows: metrics::counter!("follows"),
unfollows: metrics::counter!("unfollows"),
worker_lagged: metrics::counter!("worker_lagged"),
worker_closed: metrics::counter!("worker_closed"),
verified_nip05: metrics::counter!("verified_nip05"),
individual_follow_messages: metrics::counter!("individual_follow_messages"),
aggregated_follow_messages: metrics::counter!("aggregated_follow_messages"),
followers_per_message: metrics::histogram!("followers_per_message"),
unfollowers_per_message: metrics::histogram!("unfollowers_per_message"),
retained_follow_changes: metrics::gauge!("retained_follow_changes"),
}
}

pub fn worker_failures(&self, name: String, id: usize) -> Counter {
metrics::counter!("worker_failures", "name" => name, "id" => id.to_string())
}

pub fn worker_timeouts(&self, name: String, id: usize) -> Counter {
metrics::counter!("worker_timeouts", "name" => name, "id" => id.to_string())
}
}

// Function to access the initialized metrics
pub fn get_metrics() -> &'static Metrics {
METRICS.get_or_init(Metrics::new)
}

// Setup metrics with descriptions
pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
describe_counter!(
"pubsub_messages",
"Number of messages published to Google Pub/Sub"
);
describe_counter!(
"contact_lists_processed",
"Number of contact lists processed"
);
describe_counter!("follows", "Number of follows");
describe_counter!("unfollows", "Number of unfollows");
describe_counter!(
"worker_lagged",
"Number of times a worker lagged behind and missed messages, consider increasing worker pool size or channel buffer size"
);
describe_counter!("worker_closed", "Number of times a worker channel closed");
describe_counter!(
"worker_failures",
"Number of times a worker failed to process an item"
);
describe_counter!("worker_timeouts", "Number of times a worker timed out");
describe_counter!("verified_nip05", "Number of verified NIP05 ids fetched");

describe_counter!(
"individual_follow_messages",
"Total number of individual follow messages sent"
);
describe_counter!(
"aggregated_follow_messages",
"Total number of aggregated follow messages sent"
);

describe_histogram!(
"followers_per_message",
"Number of followers per aggregated message"
);
describe_histogram!(
"unfollowers_per_message",
"Number of unfollowers per aggregated message"
);
describe_gauge!(
"retained_follow_changes",
"Number of retained follow changes"
);

// Prometheus setup
let prometheus_builder = PrometheusBuilder::new();
let prometheus_handle = prometheus_builder.install_recorder()?;
Ok(prometheus_handle)
}
12 changes: 7 additions & 5 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::metrics::get_metrics;
use futures::Future;
use metrics::counter;
use std::error::Error;
use std::fmt::Debug;
use std::sync::Arc;
@@ -46,6 +46,7 @@ impl WorkerPool {
let name_clone = name.to_string();
let worker_name = format!("{}-{}", name, i);
tracker.spawn(async move {
let metrics = get_metrics();
loop {
tokio::select! {
_ = token_clone.cancelled() => {
@@ -62,11 +63,11 @@ impl WorkerPool {
trace!("{}: Worker task finished successfully processing item", worker_name);
},
Ok(Err(e)) => {
counter!("worker_failures", "name" => name_clone.to_string(), "id" => i.to_string()).increment(1);
metrics.worker_failures(name_clone.to_string(), i).increment(1);
error!("{}: Worker failed: {}", worker_name, e);
},
Err(_) => {
counter!("worker_timeouts", "name" => name_clone.to_string(), "id" => i.to_string()).increment(1);
metrics.worker_timeouts(name_clone.to_string(), i).increment(1);
error!("{}: Worker task timed out after {} seconds", worker_name, worker_timeout_secs);
}
}
@@ -86,6 +87,7 @@ impl WorkerPool {
tracker.spawn(async move {
// Simple cycle iterator to distribute work to workers in a round-robin fashion.
let mut worker_txs_cycle = worker_txs.iter().cycle();
let metrics = get_metrics();

loop {
tokio::select! {
@@ -111,11 +113,11 @@ impl WorkerPool {
}
}
Err(RecvError::Lagged(n)) => {
counter!("worker_lagged").increment(1);
metrics.worker_lagged.increment(1);
warn!("{}: Receiver lagged and missed {} messages", name_clone, n);
}
Err(RecvError::Closed) => {
counter!("worker_closed").increment(1);
metrics.worker_closed.increment(1);
error!("{}: Item receiver channel closed", name_clone);
break;
}

0 comments on commit 9ba9b3a

Please sign in to comment.