Skip to content

Commit

Permalink
Add pool name to metric, import script
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 10, 2024
1 parent a676686 commit f8c2e87
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 33 deletions.
53 changes: 53 additions & 0 deletions scripts/import_from_relay.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/bin/bash

# Requires nak and moreutils (pee)

if [ -z "$1" ]; then
echo "Usage: $0 <relay_url> [seconds_to_wait]"
exit 1
fi

relay_url=$1
created_at_file="$relay_url-checkpoint.txt"
wait_seconds=${2:-2}
counter=0

record_created_at() {
while read -r json; do
created_at=$(jq -r '.created_at' <<< "$json" 2>/dev/null)
formatted_date=$(jq -r '.created_at | strftime("%Y-%m-%d %H:%M:%S")' <<< "$json" 2>/dev/null)

counter=$((counter + 1))
echo -ne "\rSaving checkpoint: $created_at ($formatted_date) - Items so far: $counter" >&2
tput el >&2
echo "$created_at" > "$created_at_file"
done
}

filter_events() {
jq -cr 'select(
(.tags | length > 1)
or
(.tags[0][0] != "p" or .tags[0][1] != "0497384b57b43c107a778870462901bf68e0e8583b32e2816563543c059784a4")
)'
}


# Infinite loop
while true; do
if [ -f "$created_at_file" ]; then
initial_date=$(cat "$created_at_file")
until_option="--until $initial_date"
else
until_option=""
fi

# Tee before jq filter to record all dates, even skipped entries
nak req -k 3 $until_option --paginate --paginate-interval ${wait_seconds}s "$relay_url" \
| filter_events \
| tee >(record_created_at) \
| nc localhost 3001

echo "Command failed or completed, waiting for $wait_seconds seconds before retrying..." >&2
sleep $wait_seconds
done
24 changes: 4 additions & 20 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,77 +1,62 @@
use metrics::{describe_counter, describe_gauge, describe_histogram, Counter, Gauge, Histogram};
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};

// PubSub messages counter
pub fn pubsub_messages() -> Counter {
metrics::counter!("pubsub_messages")
}

// Contact lists processed counter
pub fn contact_lists_processed() -> Counter {
metrics::counter!("contact_lists_processed")
}

// Follows counter
pub fn follows() -> Counter {
metrics::counter!("follows")
}

// Unfollows counter
pub fn unfollows() -> Counter {
metrics::counter!("unfollows")
}

// Worker lagged counter
pub fn worker_lagged() -> Counter {
metrics::counter!("worker_lagged")
pub fn worker_lagged(name: String) -> Counter {
metrics::counter!("worker_lagged", "name" => name)
}

// Worker closed counter
pub fn worker_closed() -> Counter {
metrics::counter!("worker_closed")
pub fn worker_closed(name: String) -> Counter {
metrics::counter!("worker_closed", "name" => name)
}

// Verified NIP05 counter
pub fn verified_nip05() -> Counter {
metrics::counter!("verified_nip05")
}

// Individual follow messages counter
pub fn individual_follow_messages() -> Counter {
metrics::counter!("individual_follow_messages")
}

// Aggregated follow messages counter
pub fn aggregated_follow_messages() -> Counter {
metrics::counter!("aggregated_follow_messages")
}

// Worker failures counter (with labels)
pub fn worker_failures(name: String, id: usize) -> Counter {
metrics::counter!("worker_failures", "name" => name, "id" => id.to_string())
}

// Worker timeouts counter (with labels)
pub fn worker_timeouts(name: String, id: usize) -> Counter {
metrics::counter!("worker_timeouts", "name" => name, "id" => id.to_string())
}

// Followers per message histogram
pub fn followers_per_message() -> Histogram {
metrics::histogram!("followers_per_message")
}

// Unfollowers per message histogram
pub fn unfollowers_per_message() -> Histogram {
metrics::histogram!("unfollowers_per_message")
}

// Retained follow changes gauge
pub fn retained_follow_changes() -> Gauge {
metrics::gauge!("retained_follow_changes")
}

// Setup metrics with descriptions
pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
describe_counter!(
"pubsub_messages",
Expand Down Expand Up @@ -113,7 +98,6 @@ pub fn setup_metrics() -> Result<PrometheusHandle, anyhow::Error> {
"Number of retained follow changes"
);

// Prometheus setup
let prometheus_builder = PrometheusBuilder::new();
let prometheus_handle = prometheus_builder.install_recorder()?;
Ok(prometheus_handle)
Expand Down
28 changes: 15 additions & 13 deletions src/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl WorkerPool {

fn create_dispatcher_task<Item>(
tracker: &TaskTracker,
name: String,
pool_name: String,
mut item_receiver: broadcast::Receiver<Item>,
worker_txs: Vec<Sender<Item>>,
cancellation_token: CancellationToken,
Expand All @@ -78,31 +78,31 @@ fn create_dispatcher_task<Item>(
loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
info!("{}: Cancellation cancellation_token is cancelled, stopping worker pool", name);
info!("{}: Cancellation cancellation_token is cancelled, stopping worker pool", pool_name);
break;
}

result = item_receiver.recv() => {
match result {
Ok(item) => {
trace!("{}: Worker pool dispatching item {:?}", name, item);
trace!("{}: Worker pool dispatching item {:?}", pool_name, item);
let Some(worker_tx) = worker_txs_cycle.next() else {
error!("{}: Failed to get worker", name);
error!("{}: Failed to get worker", pool_name);
break;
};

if let Err(e) = worker_tx.send(item).await {
error!("{}: Failed to send to worker: {}", name, e);
error!("{}: Failed to send to worker: {}", pool_name, e);
break;
}
}
Err(RecvError::Lagged(n)) => {
metrics::worker_lagged().increment(1);
warn!("{}: Receiver lagged and missed {} messages", name, n);
metrics::worker_lagged(pool_name.to_string()).increment(1);
warn!("{}: Receiver lagged and missed {} messages", pool_name, n);
}
Err(RecvError::Closed) => {
metrics::worker_closed().increment(1);
error!("{}: Item receiver channel closed", name);
metrics::worker_closed(pool_name.to_string()).increment(1);
error!("{}: Item receiver channel closed", pool_name);
break;
}
}
Expand All @@ -111,7 +111,7 @@ fn create_dispatcher_task<Item>(
}

cancellation_token.cancel();
info!("{}: Worker pool finished", name);
info!("{}: Worker pool finished", pool_name);
});
}

Expand Down Expand Up @@ -162,10 +162,12 @@ fn create_worker_task<Item, Worker>(
});
}

/// The worker task trait that workers must implement to process items.
/// The same instance will be used for all items so common global state can be shared.
#[async_trait]
pub trait WorkerTask<T>
pub trait WorkerTask<Item>
where
T: Debug + Send + Sync + Clone + 'static,
Item: Debug + Send + Sync + Clone + 'static,
{
async fn call(&self, args: T) -> Result<(), Box<dyn Error>>;
async fn call(&self, args: Item) -> Result<(), Box<dyn Error>>;
}

0 comments on commit f8c2e87

Please sign in to comment.