Skip to content

Commit

Permalink
Add public url resubmission
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayiga committed Dec 3, 2024
1 parent 16ac2b3 commit 6476c13
Showing 1 changed file with 56 additions and 15 deletions.
71 changes: 56 additions & 15 deletions node-metrics/src/api/node_validator/v0/create_node_validator_api.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Duration};

use super::{get_stake_table_from_sequencer, LeafAndBlock, ProcessNodeIdentityUrlStreamTask};
use crate::service::{
Expand Down Expand Up @@ -32,6 +32,7 @@ pub struct NodeValidatorAPI<K> {
pub process_leaf_stream_handle: Option<ProcessLeafAndBlockPairStreamTask>,
pub process_node_identity_stream_handle: Option<ProcessNodeIdentityStreamTask>,
pub process_url_stream_handle: Option<ProcessNodeIdentityUrlStreamTask>,
pub submit_public_urls_handle: Option<SubmitPublicUrlsToScrapeTask>,
pub url_sender: K,
}

Expand Down Expand Up @@ -259,6 +260,53 @@ impl ProcessExternalMessageHandlingTask {
}
}

/// [SubmitPublicUrlsToScrapeTask] is a task that is capable of submitting
/// public urls to a url sender at a regular interval. This task will
/// submit the provided urls to the url sender every 5 minutes.
pub struct SubmitPublicUrlsToScrapeTask {
pub task_handle: Option<JoinHandle<()>>,
}

const PUBLIC_URL_RESUBMIT_INTERVAL: Duration = Duration::from_secs(300);

impl SubmitPublicUrlsToScrapeTask {
pub fn new<S>(url_sender: S, urls: Vec<Url>) -> Self
where
S: Sink<Url, Error = SendError> + Send + Unpin + 'static,
{
let task_handle = spawn(Self::submit_urls(url_sender, urls));

Self {
task_handle: Some(task_handle),
}
}

pub async fn submit_urls<S>(url_sender: S, urls: Vec<Url>)
where
S: Sink<Url, Error = SendError> + Unpin + 'static,
{
if urls.is_empty() {
tracing::warn!("no urls to send to url sender");
return;
}

let mut url_sender = url_sender;
tracing::debug!("sending initial urls to url sender to process node identity");
loop {
for url in urls.iter() {
let send_result = url_sender.send(url.clone()).await;
if let Err(err) = send_result {
tracing::error!("url sender closed: {}", err);
panic!("SubmitPublicUrlsToScrapeTask url sender is closed, unrecoverable, the node state will stagnate.");
}
}

// Sleep for 5 minutes before sending the urls again
tokio::time::sleep(PUBLIC_URL_RESUBMIT_INTERVAL).await;
}
}
}

/// [Drop] implementation for [ProcessExternalMessageHandlingTask] that will
/// cancel the task when the structure is dropped.
impl Drop for ProcessExternalMessageHandlingTask {
Expand Down Expand Up @@ -303,7 +351,7 @@ pub async fn create_node_validator_processing(
let (node_identity_sender_1, node_identity_receiver_1) = mpsc::channel(32);
let (node_identity_sender_2, node_identity_receiver_2) = mpsc::channel(32);
let (voters_sender, voters_receiver) = mpsc::channel(32);
let (mut url_sender, url_receiver) = mpsc::channel(32);
let (url_sender, url_receiver) = mpsc::channel(32);

let process_internal_client_message_handle = InternalClientMessageProcessingTask::new(
internal_client_message_receiver,
Expand Down Expand Up @@ -340,20 +388,12 @@ pub async fn create_node_validator_processing(
let process_url_stream_handle =
ProcessNodeIdentityUrlStreamTask::new(url_receiver, node_identity_sender_1);

tracing::debug!("sending initial urls to url sender to process node identity");
// Send any initial URLS to the url sender for immediate processing.
// These urls are supplied by the configuration of this function
{
let urls = config.initial_node_public_base_urls;

for url in urls {
let send_result = url_sender.send(url).await;
if let Err(err) = send_result {
tracing::info!("url sender closed: {}", err);
break;
}
}
}
let submit_public_urls_handle = SubmitPublicUrlsToScrapeTask::new(
url_sender.clone(),
config.initial_node_public_base_urls.clone(),
);

Ok(NodeValidatorAPI {
process_internal_client_message_handle: Some(process_internal_client_message_handle),
Expand All @@ -363,7 +403,8 @@ pub async fn create_node_validator_processing(
process_leaf_stream_handle: Some(process_leaf_stream_handle),
process_node_identity_stream_handle: Some(process_node_identity_stream_handle),
process_url_stream_handle: Some(process_url_stream_handle),
url_sender: url_sender.clone(),
submit_public_urls_handle: Some(submit_public_urls_handle),
url_sender,
})
}

Expand Down

0 comments on commit 6476c13

Please sign in to comment.