diff --git a/CHANGELOG.md b/CHANGELOG.md index dd94afd9..44c021f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The minor version will be incremented upon a breaking change and the patch versi - geyser: add `transactions_status` filter ([#310](https://github.com/rpcpool/yellowstone-grpc/pull/310)) - geyser: add metric `slot_status_plugin` ([#312](https://github.com/rpcpool/yellowstone-grpc/pull/312)) - geyser: wrap `geyser_loop` with `unconstrained` ([#313](https://github.com/rpcpool/yellowstone-grpc/pull/313)) +- geyser: handle `/debug_clients` on prometheus endpoint ([#314](https://github.com/rpcpool/yellowstone-grpc/pull/314)) - geyser: wrap messages to `Arc` ([#315](https://github.com/rpcpool/yellowstone-grpc/pull/315)) ### Breaking diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 065d991d..2ff31155 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -20,6 +20,9 @@ pub struct Config { /// Action on block re-construction error #[serde(default)] pub block_fail_action: ConfigBlockFailAction, + /// Collect client filters, processed slot and make it available on prometheus port `/debug_clients` + #[serde(default)] + pub debug_clients_http: bool, } impl Config { diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 9bcf7f2b..cdbcea14 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -2,7 +2,7 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc}, filters::{Filter, FilterAccountsDataSlice}, - prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE}, + prom::{self, DebugClientMessage, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE}, version::GrpcVersionInfo, }, anyhow::Context, @@ -714,6 +714,7 @@ pub struct GrpcService { subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>>)>, + debug_clients_tx: Option>, } impl GrpcService { @@ -721,6 +722,7 @@ impl GrpcService { pub async fn create( config: ConfigGrpc, block_fail_action: ConfigBlockFailAction, + debug_clients_tx: Option>, is_reload: bool, ) -> anyhow::Result<( Option>>, @@ -777,6 +779,7 @@ impl GrpcService { subscribe_id: AtomicUsize::new(0), snapshot_rx: Mutex::new(snapshot_rx), broadcast_tx: broadcast_tx.clone(), + debug_clients_tx, }) .accept_compressed(CompressionEncoding::Gzip) .send_compressed(CompressionEncoding::Gzip) @@ -1075,6 +1078,7 @@ impl GrpcService { } } + #[allow(clippy::too_many_arguments)] async fn client_loop( id: usize, mut filter: Filter, @@ -1082,9 +1086,14 @@ impl GrpcService { mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>>)>, + debug_client_tx: Option>, drop_client: impl FnOnce(), ) { CONNECTIONS_TOTAL.inc(); + DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { + id, + filter: Box::new(filter.clone()), + }); info!("client #{id}: new"); let mut is_alive = true; @@ -1161,6 +1170,7 @@ impl GrpcService { } filter = filter_new; + DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) }); info!("client #{id}: filter updated"); } Some(None) => { @@ -1206,13 +1216,22 @@ impl GrpcService { } } } + + if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() { + for message in messages.iter() { + if let Message::Slot(slot_message) = message.as_ref() { + DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateSlot { id, slot: slot_message.slot }); + } + } + } } } } } - info!("client #{id}: removed"); CONNECTIONS_TOTAL.dec(); + DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::Removed { id }); + info!("client #{id}: removed"); drop_client(); } } @@ -1332,6 +1351,7 @@ impl Geyser for GrpcService { client_rx, snapshot_rx, self.broadcast_tx.subscribe(), + self.debug_clients_tx.clone(), move || { notify_exit1.notify_one(); notify_exit2.notify_one(); diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index a5359b25..362d7ffc 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -79,12 +79,20 @@ impl GeyserPlugin for Plugin { let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) = runtime.block_on(async move { - let (snapshot_channel, grpc_channel, grpc_shutdown) = - GrpcService::create(config.grpc, config.block_fail_action, is_reload) - .await - .map_err(|error| GeyserPluginError::Custom(format!("{error:?}").into()))?; - let prometheus = PrometheusService::new(config.prometheus) - .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; + let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel(); + let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create( + config.grpc, + config.block_fail_action, + config.debug_clients_http.then_some(debug_client_tx), + is_reload, + ) + .await + .map_err(|error| GeyserPluginError::Custom(format!("{error:?}").into()))?; + let prometheus = PrometheusService::new( + config.prometheus, + config.debug_clients_http.then_some(debug_client_rx), + ) + .map_err(|error| GeyserPluginError::Custom(Box::new(error)))?; Ok::<_, GeyserPluginError>(( snapshot_channel, grpc_channel, diff --git a/yellowstone-grpc-geyser/src/prom.rs b/yellowstone-grpc-geyser/src/prom.rs index ddcf9745..aed2158b 100644 --- a/yellowstone-grpc-geyser/src/prom.rs +++ b/yellowstone-grpc-geyser/src/prom.rs @@ -1,5 +1,5 @@ use { - crate::{config::ConfigPrometheus, version::VERSION as VERSION_INFO}, + crate::{config::ConfigPrometheus, filters::Filter, version::VERSION as VERSION_INFO}, futures::future::FutureExt, hyper::{ server::conn::AddrStream, @@ -9,8 +9,15 @@ use { log::error, prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, solana_geyser_plugin_interface::geyser_plugin_interface::SlotStatus, - std::sync::Once, - tokio::sync::oneshot, + solana_sdk::clock::Slot, + std::{ + collections::{hash_map::Entry as HashMapEntry, HashMap}, + sync::{Arc, Once}, + }, + tokio::{ + sync::{mpsc, oneshot}, + task::JoinHandle, + }, yellowstone_grpc_proto::prelude::CommitmentLevel, }; @@ -46,13 +53,119 @@ lazy_static::lazy_static! { ).unwrap(); } +#[derive(Debug)] +pub enum DebugClientMessage { + UpdateFilter { id: usize, filter: Box }, + UpdateSlot { id: usize, slot: Slot }, + Removed { id: usize }, +} + +impl DebugClientMessage { + pub fn maybe_send(tx: &Option>, get_msg: impl FnOnce() -> Self) { + if let Some(tx) = tx { + let _ = tx.send(get_msg()); + } + } +} + +#[derive(Debug)] +struct DebugClientStatus { + filter: Box, + processed_slot: Slot, +} + +#[derive(Debug)] +struct DebugClientStatuses { + requests_tx: mpsc::UnboundedSender>, + jh: JoinHandle<()>, +} + +impl Drop for DebugClientStatuses { + fn drop(&mut self) { + self.jh.abort(); + } +} + +impl DebugClientStatuses { + fn new(clients_rx: mpsc::UnboundedReceiver) -> Arc { + let (requests_tx, requests_rx) = mpsc::unbounded_channel(); + let jh = tokio::spawn(Self::run(clients_rx, requests_rx)); + Arc::new(Self { requests_tx, jh }) + } + + async fn run( + mut clients_rx: mpsc::UnboundedReceiver, + mut requests_rx: mpsc::UnboundedReceiver>, + ) { + let mut clients = HashMap::::new(); + loop { + tokio::select! { + Some(message) = clients_rx.recv() => match message { + DebugClientMessage::UpdateFilter { id, filter } => { + match clients.entry(id) { + HashMapEntry::Occupied(mut entry) => { + entry.get_mut().filter = filter; + } + HashMapEntry::Vacant(entry) => { + entry.insert(DebugClientStatus { + filter, + processed_slot: 0, + }); + } + } + } + DebugClientMessage::UpdateSlot { id, slot } => { + if let Some(status) = clients.get_mut(&id) { + status.processed_slot = slot; + } + } + DebugClientMessage::Removed { id } => { + clients.remove(&id); + } + }, + Some(tx) = requests_rx.recv() => { + let mut statuses: Vec<(usize, String)> = clients.iter().map(|(id, status)| { + (*id, format!("client#{id:06}, {}, {:?}", status.processed_slot, status.filter)) + }).collect(); + statuses.sort(); + + let mut status = statuses.into_iter().fold(String::new(), |mut acc: String, (_id, status)| { + if !acc.is_empty() { + acc += "\n"; + } + acc + &status + }); + if !status.is_empty() { + status += "\n"; + } + + let _ = tx.send(status); + }, + } + } + } + + async fn get_statuses(&self) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.requests_tx + .send(tx) + .map_err(|_error| anyhow::anyhow!("failed to send request"))?; + rx.await + .map_err(|_error| anyhow::anyhow!("failed to wait response")) + } +} + #[derive(Debug)] pub struct PrometheusService { + debug_clients_statuses: Option>, shutdown_signal: oneshot::Sender<()>, } impl PrometheusService { - pub fn new(config: Option) -> hyper::Result { + pub fn new( + config: Option, + debug_clients_rx: Option>, + ) -> hyper::Result { static REGISTER: Once = Once::new(); REGISTER.call_once(|| { macro_rules! register { @@ -83,15 +196,42 @@ impl PrometheusService { }); let (shutdown_signal, shutdown) = oneshot::channel(); + let mut debug_clients_statuses = None; if let Some(ConfigPrometheus { address }) = config { - let make_service = make_service_fn(move |_: &AddrStream| async move { - Ok::<_, hyper::Error>(service_fn(move |req: Request| async move { - let response = match req.uri().path() { - "/metrics" => metrics_handler(), - _ => not_found_handler(), - }; - Ok::<_, hyper::Error>(response) - })) + if let Some(debug_clients_rx) = debug_clients_rx { + debug_clients_statuses = Some(DebugClientStatuses::new(debug_clients_rx)); + } + let debug_clients_statuses2 = debug_clients_statuses.clone(); + let make_service = make_service_fn(move |_: &AddrStream| { + let debug_clients_statuses = debug_clients_statuses2.clone(); + async move { + let debug_clients_statuses = debug_clients_statuses.clone(); + Ok::<_, hyper::Error>(service_fn(move |req: Request| { + let debug_clients_statuses = debug_clients_statuses.clone(); + async move { + let response = match req.uri().path() { + "/metrics" => metrics_handler(), + "/debug_clients" => { + if let Some(debug_clients_statuses) = &debug_clients_statuses { + let (status, body) = + match debug_clients_statuses.get_statuses().await { + Ok(body) => (StatusCode::OK, body), + Err(error) => ( + StatusCode::INTERNAL_SERVER_ERROR, + error.to_string(), + ), + }; + build_http_response(status, Body::from(body)) + } else { + not_found_handler() + } + } + _ => not_found_handler(), + }; + Ok::<_, hyper::Error>(response) + } + })) + } }); let server = Server::try_bind(&address)?.serve(make_service); let shutdown = shutdown.map(|_| Ok(())); @@ -102,14 +242,22 @@ impl PrometheusService { }); } - Ok(PrometheusService { shutdown_signal }) + Ok(PrometheusService { + debug_clients_statuses, + shutdown_signal, + }) } pub fn shutdown(self) { + drop(self.debug_clients_statuses); let _ = self.shutdown_signal.send(()); } } +fn build_http_response(status: StatusCode, body: Body) -> Response { + Response::builder().status(status).body(body).unwrap() +} + fn metrics_handler() -> Response { let metrics = TextEncoder::new() .encode_to_string(®ISTRY.gather()) @@ -117,14 +265,11 @@ fn metrics_handler() -> Response { error!("could not encode custom metrics: {}", error); String::new() }); - Response::builder().body(Body::from(metrics)).unwrap() + build_http_response(StatusCode::OK, Body::from(metrics)) } fn not_found_handler() -> Response { - Response::builder() - .status(StatusCode::NOT_FOUND) - .body(Body::empty()) - .unwrap() + build_http_response(StatusCode::NOT_FOUND, Body::empty()) } pub fn update_slot_status(status: SlotStatus, slot: u64) {