diff --git a/rpc/src/rpc_pubsub_service.rs b/rpc/src/rpc_pubsub_service.rs index 76227b9708e8e6..50c9200bb7b668 100644 --- a/rpc/src/rpc_pubsub_service.rs +++ b/rpc/src/rpc_pubsub_service.rs @@ -25,6 +25,7 @@ use { }, stream_cancel::{Trigger, Tripwire}, thiserror::Error, + tokio::time::Instant, tokio::{net::TcpStream, pin, select, sync::broadcast}, tokio_util::compat::TokioAsyncReadCompatExt, }; @@ -399,11 +400,13 @@ async fn handle_connection( Err(err) => return Err(err.into()), }, result = broadcast_receiver.recv() => { - + let time = Instant::now(); // In both possible error cases (closed or lagged) we disconnect the client. if let Some(json) = broadcast_handler.handle(result?)? { sender.send_text(&*json).await?; } + let send_time = time.elapsed().as_micros(); + datapoint_info!("rpc-pubsub-broadcast-receive-send-us", ("send_time", send_time, i64)); }, _ = &mut tripwire => { warn!("disconnecting websocket client: shutting down"); diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index 7ecfd6a31a42cc..dccc6d9e3231db 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -208,62 +208,62 @@ struct RpcNotificationContext { const RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS: Duration = Duration::from_millis(2_000); -struct RecentItems { - queue: VecDeque>, - total_bytes: usize, - max_len: usize, - max_total_bytes: usize, - last_metrics_submission: Instant, -} - -impl RecentItems { - fn new(max_len: usize, max_total_bytes: usize) -> Self { - Self { - queue: VecDeque::new(), - total_bytes: 0, - max_len, - max_total_bytes, - last_metrics_submission: Instant::now(), - } - } - - fn push(&mut self, item: Arc) { - self.total_bytes = self - .total_bytes - .checked_add(item.len()) - .expect("total bytes overflow"); - self.queue.push_back(item); - - while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len { - let item = self.queue.pop_front().expect("can't be empty"); - self.total_bytes = self - .total_bytes - .checked_sub(item.len()) - .expect("total bytes underflow"); - } - - let now = Instant::now(); - let last_metrics_ago = now.duration_since(self.last_metrics_submission); - if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS { - datapoint_info!( - "rpc_subscriptions_recent_items", - ("num", self.queue.len(), i64), - ("total_bytes", self.total_bytes, i64), - ); - self.last_metrics_submission = now; - } else { - trace!( - "rpc_subscriptions_recent_items num={} total_bytes={}", - self.queue.len(), - self.total_bytes, - ); - } - } -} +// struct RecentItems { +// queue: VecDeque>, +// total_bytes: usize, +// max_len: usize, +// max_total_bytes: usize, +// last_metrics_submission: Instant, +// } + +// impl RecentItems { +// fn new(max_len: usize, max_total_bytes: usize) -> Self { +// Self { +// queue: VecDeque::new(), +// total_bytes: 0, +// max_len, +// max_total_bytes, +// last_metrics_submission: Instant::now(), +// } +// } + +// fn push(&mut self, item: Arc) { +// self.total_bytes = self +// .total_bytes +// .checked_add(item.len()) +// .expect("total bytes overflow"); +// self.queue.push_back(item); + +// while self.total_bytes > self.max_total_bytes || self.queue.len() > self.max_len { +// let item = self.queue.pop_front().expect("can't be empty"); +// self.total_bytes = self +// .total_bytes +// .checked_sub(item.len()) +// .expect("total bytes underflow"); +// } + +// let now = Instant::now(); +// let last_metrics_ago = now.duration_since(self.last_metrics_submission); +// if last_metrics_ago > RPC_NOTIFICATIONS_METRICS_SUBMISSION_INTERVAL_MS { +// datapoint_info!( +// "rpc_subscriptions_recent_items", +// ("num", self.queue.len(), i64), +// ("total_bytes", self.total_bytes, i64), +// ); +// self.last_metrics_submission = now; +// } else { +// trace!( +// "rpc_subscriptions_recent_items num={} total_bytes={}", +// self.queue.len(), +// self.total_bytes, +// ); +// } +// } +// } struct RpcNotifier { sender: broadcast::Sender, - recent_items: Mutex, + // recent_items: Mutex, } thread_local! { @@ -313,12 +313,18 @@ impl RpcNotifier { }; // There is an unlikely case where this can fail: if the last subscription is closed // just as the notifier generates a notification for it. + let time = Instant::now(); let _ = self.sender.send(notification); + let send_time = time.elapsed().as_micros(); + datapoint_info!( + "rpc-pubsub-broadcast-send-time-us", + ("send_time", send_time, i64) + ); inc_new_counter_info!("rpc-pubsub-messages", 1); inc_new_counter_info!("rpc-pubsub-bytes", buf_arc.len()); - self.recent_items.lock().unwrap().push(buf_arc); + // self.recent_items.lock().unwrap().push(buf_arc); } } @@ -626,10 +632,10 @@ impl RpcSubscriptions { let notifier = RpcNotifier { sender: broadcast_sender.clone(), - recent_items: Mutex::new(RecentItems::new( - config.queue_capacity_items, - config.queue_capacity_bytes, - )), + // recent_items: Mutex::new(RecentItems::new( + // config.queue_capacity_items, + // config.queue_capacity_bytes, + // )), }; let notification_threads = config.notification_threads.unwrap_or_else(get_thread_count); let t_cleanup = if notification_threads == 0 { @@ -757,6 +763,15 @@ impl RpcSubscriptions { fn enqueue_notification(&self, notification_entry: NotificationEntry) { if let Some(ref notification_sender) = self.notification_sender { + let queue_size = notification_sender.len() as i64; + let max_capacity = notification_sender.capacity().unwrap_or(1_123_456) as i64; + datapoint_info!( + "rpc_pubsub_queue", + ("size", queue_size, i64), + ("remaining_capacity", max_capacity - queue_size, i64), + ("capacity", max_capacity, i64) + ); + match notification_sender.send(notification_entry.into()) { Ok(()) => (), Err(SendError(notification)) => { diff --git a/sdk/program/src/serde_varint.rs b/sdk/program/src/serde_varint.rs index 419dfe209d4502..8c4cfbf9e85e20 100644 --- a/sdk/program/src/serde_varint.rs +++ b/sdk/program/src/serde_varint.rs @@ -74,8 +74,8 @@ macro_rules! impl_var_int { let mut shift = 0u32; while shift < <$type>::BITS { let Some(byte) = seq.next_element::()? else { - return Err(A::Error::custom("Invalid Sequence")); - }; + return Err(A::Error::custom("Invalid Sequence")); + }; out |= ((byte & 0x7F) as Self) << shift; if byte & 0x80 == 0 { // Last byte should not have been truncated when it was