From f8c2e87e04a3e79298be0e5edb208aac447311af Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Mon, 9 Sep 2024 18:07:30 -0300 Subject: [PATCH] Add pool name to metric, import script --- scripts/import_from_relay.sh | 53 ++++++++++++++++++++++++++++++++++++ src/metrics.rs | 24 +++------------- src/worker_pool.rs | 28 ++++++++++--------- 3 files changed, 72 insertions(+), 33 deletions(-) create mode 100755 scripts/import_from_relay.sh diff --git a/scripts/import_from_relay.sh b/scripts/import_from_relay.sh new file mode 100755 index 0000000..c552589 --- /dev/null +++ b/scripts/import_from_relay.sh @@ -0,0 +1,53 @@ +#!/bin/bash + +# Requires nak and moreutils (pee) + +if [ -z "$1" ]; then + echo "Usage: $0 [seconds_to_wait]" + exit 1 +fi + +relay_url=$1 +created_at_file="$relay_url-checkpoint.txt" +wait_seconds=${2:-2} +counter=0 + +record_created_at() { + while read -r json; do + created_at=$(jq -r '.created_at' <<< "$json" 2>/dev/null) + formatted_date=$(jq -r '.created_at | strftime("%Y-%m-%d %H:%M:%S")' <<< "$json" 2>/dev/null) + + counter=$((counter + 1)) + echo -ne "\rSaving checkpoint: $created_at ($formatted_date) - Items so far: $counter" >&2 + tput el >&2 + echo "$created_at" > "$created_at_file" + done +} + +filter_events() { + jq -cr 'select( + (.tags | length > 1) + or + (.tags[0][0] != "p" or .tags[0][1] != "0497384b57b43c107a778870462901bf68e0e8583b32e2816563543c059784a4") + )' +} + + +# Infinite loop +while true; do + if [ -f "$created_at_file" ]; then + initial_date=$(cat "$created_at_file") + until_option="--until $initial_date" + else + until_option="" + fi + + # Tee before jq filter to record all dates, even skipped entries + nak req -k 3 $until_option --paginate --paginate-interval ${wait_seconds}s "$relay_url" \ + | filter_events \ + | tee >(record_created_at) \ + | nc localhost 3001 + + echo "Command failed or completed, waiting for $wait_seconds seconds before retrying..." >&2 + sleep $wait_seconds +done diff --git a/src/metrics.rs b/src/metrics.rs index de7d004..b7504a0 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,77 +1,62 @@ use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; -// PubSub messages counter pub fn pubsub_messages() -> Counter { metrics::counter!("pubsub_messages") } -// Contact lists processed counter pub fn contact_lists_processed() -> Counter { metrics::counter!("contact_lists_processed") } -// Follows counter pub fn follows() -> Counter { metrics::counter!("follows") } -// Unfollows counter pub fn unfollows() -> Counter { metrics::counter!("unfollows") } -// Worker lagged counter -pub fn worker_lagged() -> Counter { - metrics::counter!("worker_lagged") +pub fn worker_lagged(name: String) -> Counter { + metrics::counter!("worker_lagged", "name" => name) } -// Worker closed counter -pub fn worker_closed() -> Counter { - metrics::counter!("worker_closed") +pub fn worker_closed(name: String) -> Counter { + metrics::counter!("worker_closed", "name" => name) } -// Verified NIP05 counter pub fn verified_nip05() -> Counter { metrics::counter!("verified_nip05") } -// Individual follow messages counter pub fn individual_follow_messages() -> Counter { metrics::counter!("individual_follow_messages") } -// Aggregated follow messages counter pub fn aggregated_follow_messages() -> Counter { metrics::counter!("aggregated_follow_messages") } -// Worker failures counter (with labels) pub fn worker_failures(name: String, id: usize) -> Counter { metrics::counter!("worker_failures", "name" => name, "id" => id.to_string()) } -// Worker timeouts counter (with labels) pub fn worker_timeouts(name: String, id: usize) -> Counter { metrics::counter!("worker_timeouts", "name" => name, "id" => id.to_string()) } -// Followers per message histogram pub fn followers_per_message() -> Histogram { metrics::histogram!("followers_per_message") } -// Unfollowers per message histogram pub fn unfollowers_per_message() -> Histogram { metrics::histogram!("unfollowers_per_message") } -// Retained follow changes gauge pub fn retained_follow_changes() -> Gauge { metrics::gauge!("retained_follow_changes") } -// Setup metrics with descriptions pub fn setup_metrics() -> Result { describe_counter!( "pubsub_messages", @@ -113,7 +98,6 @@ pub fn setup_metrics() -> Result { "Number of retained follow changes" ); - // Prometheus setup let prometheus_builder = PrometheusBuilder::new(); let prometheus_handle = prometheus_builder.install_recorder()?; Ok(prometheus_handle) diff --git a/src/worker_pool.rs b/src/worker_pool.rs index c37662a..70d765a 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -64,7 +64,7 @@ impl WorkerPool { fn create_dispatcher_task( tracker: &TaskTracker, - name: String, + pool_name: String, mut item_receiver: broadcast::Receiver, worker_txs: Vec>, cancellation_token: CancellationToken, @@ -78,31 +78,31 @@ fn create_dispatcher_task( loop { tokio::select! { _ = cancellation_token.cancelled() => { - info!("{}: Cancellation cancellation_token is cancelled, stopping worker pool", name); + info!("{}: Cancellation cancellation_token is cancelled, stopping worker pool", pool_name); break; } result = item_receiver.recv() => { match result { Ok(item) => { - trace!("{}: Worker pool dispatching item {:?}", name, item); + trace!("{}: Worker pool dispatching item {:?}", pool_name, item); let Some(worker_tx) = worker_txs_cycle.next() else { - error!("{}: Failed to get worker", name); + error!("{}: Failed to get worker", pool_name); break; }; if let Err(e) = worker_tx.send(item).await { - error!("{}: Failed to send to worker: {}", name, e); + error!("{}: Failed to send to worker: {}", pool_name, e); break; } } Err(RecvError::Lagged(n)) => { - metrics::worker_lagged().increment(1); - warn!("{}: Receiver lagged and missed {} messages", name, n); + metrics::worker_lagged(pool_name.to_string()).increment(1); + warn!("{}: Receiver lagged and missed {} messages", pool_name, n); } Err(RecvError::Closed) => { - metrics::worker_closed().increment(1); - error!("{}: Item receiver channel closed", name); + metrics::worker_closed(pool_name.to_string()).increment(1); + error!("{}: Item receiver channel closed", pool_name); break; } } @@ -111,7 +111,7 @@ fn create_dispatcher_task( } cancellation_token.cancel(); - info!("{}: Worker pool finished", name); + info!("{}: Worker pool finished", pool_name); }); } @@ -162,10 +162,12 @@ fn create_worker_task( }); } +/// The worker task trait that workers must implement to process items. +/// The same instance will be used for all items so common global state can be shared. #[async_trait] -pub trait WorkerTask +pub trait WorkerTask where - T: Debug + Send + Sync + Clone + 'static, + Item: Debug + Send + Sync + Clone + 'static, { - async fn call(&self, args: T) -> Result<(), Box>; + async fn call(&self, args: Item) -> Result<(), Box>; }