diff --git a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs index 8108a6b25..7ca0ba100 100644 --- a/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs +++ b/node-metrics/src/api/node_validator/v0/create_node_validator_api.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use super::{get_stake_table_from_sequencer, LeafAndBlock, ProcessNodeIdentityUrlStreamTask}; use crate::service::{ @@ -32,6 +32,7 @@ pub struct NodeValidatorAPI { pub process_leaf_stream_handle: Option, pub process_node_identity_stream_handle: Option, pub process_url_stream_handle: Option, + pub submit_public_urls_handle: Option, pub url_sender: K, } @@ -259,6 +260,53 @@ impl ProcessExternalMessageHandlingTask { } } +/// [SubmitPublicUrlsToScrapeTask] is a task that is capable of submitting +/// public urls to a url sender at a regular interval. This task will +/// submit the provided urls to the url sender every 5 minutes. +pub struct SubmitPublicUrlsToScrapeTask { + pub task_handle: Option>, +} + +const PUBLIC_URL_RESUBMIT_INTERVAL: Duration = Duration::from_secs(300); + +impl SubmitPublicUrlsToScrapeTask { + pub fn new(url_sender: S, urls: Vec) -> Self + where + S: Sink + Send + Unpin + 'static, + { + let task_handle = spawn(Self::submit_urls(url_sender, urls)); + + Self { + task_handle: Some(task_handle), + } + } + + pub async fn submit_urls(url_sender: S, urls: Vec) + where + S: Sink + Unpin + 'static, + { + if urls.is_empty() { + tracing::warn!("no urls to send to url sender"); + return; + } + + let mut url_sender = url_sender; + tracing::debug!("sending initial urls to url sender to process node identity"); + loop { + for url in urls.iter() { + let send_result = url_sender.send(url.clone()).await; + if let Err(err) = send_result { + tracing::error!("url sender closed: {}", err); + panic!("SubmitPublicUrlsToScrapeTask url sender is closed, unrecoverable, the node state will stagnate."); + } + } + + // Sleep for 5 minutes before sending the urls again + tokio::time::sleep(PUBLIC_URL_RESUBMIT_INTERVAL).await; + } + } +} + /// [Drop] implementation for [ProcessExternalMessageHandlingTask] that will /// cancel the task when the structure is dropped. impl Drop for ProcessExternalMessageHandlingTask { @@ -303,7 +351,7 @@ pub async fn create_node_validator_processing( let (node_identity_sender_1, node_identity_receiver_1) = mpsc::channel(32); let (node_identity_sender_2, node_identity_receiver_2) = mpsc::channel(32); let (voters_sender, voters_receiver) = mpsc::channel(32); - let (mut url_sender, url_receiver) = mpsc::channel(32); + let (url_sender, url_receiver) = mpsc::channel(32); let process_internal_client_message_handle = InternalClientMessageProcessingTask::new( internal_client_message_receiver, @@ -340,20 +388,12 @@ pub async fn create_node_validator_processing( let process_url_stream_handle = ProcessNodeIdentityUrlStreamTask::new(url_receiver, node_identity_sender_1); - tracing::debug!("sending initial urls to url sender to process node identity"); // Send any initial URLS to the url sender for immediate processing. // These urls are supplied by the configuration of this function - { - let urls = config.initial_node_public_base_urls; - - for url in urls { - let send_result = url_sender.send(url).await; - if let Err(err) = send_result { - tracing::info!("url sender closed: {}", err); - break; - } - } - } + let submit_public_urls_handle = SubmitPublicUrlsToScrapeTask::new( + url_sender.clone(), + config.initial_node_public_base_urls.clone(), + ); Ok(NodeValidatorAPI { process_internal_client_message_handle: Some(process_internal_client_message_handle), @@ -363,7 +403,8 @@ pub async fn create_node_validator_processing( process_leaf_stream_handle: Some(process_leaf_stream_handle), process_node_identity_stream_handle: Some(process_node_identity_stream_handle), process_url_stream_handle: Some(process_url_stream_handle), - url_sender: url_sender.clone(), + submit_public_urls_handle: Some(submit_public_urls_handle), + url_sender, }) }