Skip to content

Commit

Permalink
geyser: handle /debug_clients on prometheus endpoint (#314)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Apr 4, 2024
1 parent e3e3f61 commit a5dc933
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The minor version will be incremented upon a breaking change and the patch versi
- geyser: add `transactions_status` filter ([#310](https://github.com/rpcpool/yellowstone-grpc/pull/310))
- geyser: add metric `slot_status_plugin` ([#312](https://github.com/rpcpool/yellowstone-grpc/pull/312))
- geyser: wrap `geyser_loop` with `unconstrained` ([#313](https://github.com/rpcpool/yellowstone-grpc/pull/313))
- geyser: handle `/debug_clients` on prometheus endpoint ([#314](https://github.com/rpcpool/yellowstone-grpc/pull/314))
- geyser: wrap messages to `Arc` ([#315](https://github.com/rpcpool/yellowstone-grpc/pull/315))

### Breaking
Expand Down
3 changes: 3 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ pub struct Config {
/// Action on block re-construction error
#[serde(default)]
pub block_fail_action: ConfigBlockFailAction,
/// Collect client filters, processed slot and make it available on prometheus port `/debug_clients`
#[serde(default)]
pub debug_clients_http: bool,
}

impl Config {
Expand Down
24 changes: 22 additions & 2 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc},
filters::{Filter, FilterAccountsDataSlice},
prom::{self, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
prom::{self, DebugClientMessage, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
version::GrpcVersionInfo,
},
anyhow::Context,
Expand Down Expand Up @@ -714,13 +714,15 @@ pub struct GrpcService {
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Option<Message>>>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
}

impl GrpcService {
#[allow(clippy::type_complexity)]
pub async fn create(
config: ConfigGrpc,
block_fail_action: ConfigBlockFailAction,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
is_reload: bool,
) -> anyhow::Result<(
Option<crossbeam_channel::Sender<Option<Message>>>,
Expand Down Expand Up @@ -777,6 +779,7 @@ impl GrpcService {
subscribe_id: AtomicUsize::new(0),
snapshot_rx: Mutex::new(snapshot_rx),
broadcast_tx: broadcast_tx.clone(),
debug_clients_tx,
})
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip)
Expand Down Expand Up @@ -1075,16 +1078,22 @@ impl GrpcService {
}
}

#[allow(clippy::too_many_arguments)]
async fn client_loop(
id: usize,
mut filter: Filter,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Option<Message>>>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
debug_client_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
drop_client: impl FnOnce(),
) {
CONNECTIONS_TOTAL.inc();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter {
id,
filter: Box::new(filter.clone()),
});
info!("client #{id}: new");

let mut is_alive = true;
Expand Down Expand Up @@ -1161,6 +1170,7 @@ impl GrpcService {
}

filter = filter_new;
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) });
info!("client #{id}: filter updated");
}
Some(None) => {
Expand Down Expand Up @@ -1206,13 +1216,22 @@ impl GrpcService {
}
}
}

if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() {
for message in messages.iter() {
if let Message::Slot(slot_message) = message.as_ref() {
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateSlot { id, slot: slot_message.slot });
}
}
}
}
}
}
}

info!("client #{id}: removed");
CONNECTIONS_TOTAL.dec();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::Removed { id });
info!("client #{id}: removed");
drop_client();
}
}
Expand Down Expand Up @@ -1332,6 +1351,7 @@ impl Geyser for GrpcService {
client_rx,
snapshot_rx,
self.broadcast_tx.subscribe(),
self.debug_clients_tx.clone(),
move || {
notify_exit1.notify_one();
notify_exit2.notify_one();
Expand Down
20 changes: 14 additions & 6 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ impl GeyserPlugin for Plugin {

let (snapshot_channel, grpc_channel, grpc_shutdown, prometheus) =
runtime.block_on(async move {
let (snapshot_channel, grpc_channel, grpc_shutdown) =
GrpcService::create(config.grpc, config.block_fail_action, is_reload)
.await
.map_err(|error| GeyserPluginError::Custom(format!("{error:?}").into()))?;
let prometheus = PrometheusService::new(config.prometheus)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
let (debug_client_tx, debug_client_rx) = mpsc::unbounded_channel();
let (snapshot_channel, grpc_channel, grpc_shutdown) = GrpcService::create(
config.grpc,
config.block_fail_action,
config.debug_clients_http.then_some(debug_client_tx),
is_reload,
)
.await
.map_err(|error| GeyserPluginError::Custom(format!("{error:?}").into()))?;
let prometheus = PrometheusService::new(
config.prometheus,
config.debug_clients_http.then_some(debug_client_rx),
)
.map_err(|error| GeyserPluginError::Custom(Box::new(error)))?;
Ok::<_, GeyserPluginError>((
snapshot_channel,
grpc_channel,
Expand Down
181 changes: 163 additions & 18 deletions yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::{config::ConfigPrometheus, version::VERSION as VERSION_INFO},
crate::{config::ConfigPrometheus, filters::Filter, version::VERSION as VERSION_INFO},
futures::future::FutureExt,
hyper::{
server::conn::AddrStream,
Expand All @@ -9,8 +9,15 @@ use {
log::error,
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
solana_geyser_plugin_interface::geyser_plugin_interface::SlotStatus,
std::sync::Once,
tokio::sync::oneshot,
solana_sdk::clock::Slot,
std::{
collections::{hash_map::Entry as HashMapEntry, HashMap},
sync::{Arc, Once},
},
tokio::{
sync::{mpsc, oneshot},
task::JoinHandle,
},
yellowstone_grpc_proto::prelude::CommitmentLevel,
};

Expand Down Expand Up @@ -46,13 +53,119 @@ lazy_static::lazy_static! {
).unwrap();
}

#[derive(Debug)]
pub enum DebugClientMessage {
UpdateFilter { id: usize, filter: Box<Filter> },
UpdateSlot { id: usize, slot: Slot },
Removed { id: usize },
}

impl DebugClientMessage {
pub fn maybe_send(tx: &Option<mpsc::UnboundedSender<Self>>, get_msg: impl FnOnce() -> Self) {
if let Some(tx) = tx {
let _ = tx.send(get_msg());
}
}
}

#[derive(Debug)]
struct DebugClientStatus {
filter: Box<Filter>,
processed_slot: Slot,
}

#[derive(Debug)]
struct DebugClientStatuses {
requests_tx: mpsc::UnboundedSender<oneshot::Sender<String>>,
jh: JoinHandle<()>,
}

impl Drop for DebugClientStatuses {
fn drop(&mut self) {
self.jh.abort();
}
}

impl DebugClientStatuses {
fn new(clients_rx: mpsc::UnboundedReceiver<DebugClientMessage>) -> Arc<Self> {
let (requests_tx, requests_rx) = mpsc::unbounded_channel();
let jh = tokio::spawn(Self::run(clients_rx, requests_rx));
Arc::new(Self { requests_tx, jh })
}

async fn run(
mut clients_rx: mpsc::UnboundedReceiver<DebugClientMessage>,
mut requests_rx: mpsc::UnboundedReceiver<oneshot::Sender<String>>,
) {
let mut clients = HashMap::<usize, DebugClientStatus>::new();
loop {
tokio::select! {
Some(message) = clients_rx.recv() => match message {
DebugClientMessage::UpdateFilter { id, filter } => {
match clients.entry(id) {
HashMapEntry::Occupied(mut entry) => {
entry.get_mut().filter = filter;
}
HashMapEntry::Vacant(entry) => {
entry.insert(DebugClientStatus {
filter,
processed_slot: 0,
});
}
}
}
DebugClientMessage::UpdateSlot { id, slot } => {
if let Some(status) = clients.get_mut(&id) {
status.processed_slot = slot;
}
}
DebugClientMessage::Removed { id } => {
clients.remove(&id);
}
},
Some(tx) = requests_rx.recv() => {
let mut statuses: Vec<(usize, String)> = clients.iter().map(|(id, status)| {
(*id, format!("client#{id:06}, {}, {:?}", status.processed_slot, status.filter))
}).collect();
statuses.sort();

let mut status = statuses.into_iter().fold(String::new(), |mut acc: String, (_id, status)| {
if !acc.is_empty() {
acc += "\n";
}
acc + &status
});
if !status.is_empty() {
status += "\n";
}

let _ = tx.send(status);
},
}
}
}

async fn get_statuses(&self) -> anyhow::Result<String> {
let (tx, rx) = oneshot::channel();
self.requests_tx
.send(tx)
.map_err(|_error| anyhow::anyhow!("failed to send request"))?;
rx.await
.map_err(|_error| anyhow::anyhow!("failed to wait response"))
}
}

#[derive(Debug)]
pub struct PrometheusService {
debug_clients_statuses: Option<Arc<DebugClientStatuses>>,
shutdown_signal: oneshot::Sender<()>,
}

impl PrometheusService {
pub fn new(config: Option<ConfigPrometheus>) -> hyper::Result<Self> {
pub fn new(
config: Option<ConfigPrometheus>,
debug_clients_rx: Option<mpsc::UnboundedReceiver<DebugClientMessage>>,
) -> hyper::Result<Self> {
static REGISTER: Once = Once::new();
REGISTER.call_once(|| {
macro_rules! register {
Expand Down Expand Up @@ -83,15 +196,42 @@ impl PrometheusService {
});

let (shutdown_signal, shutdown) = oneshot::channel();
let mut debug_clients_statuses = None;
if let Some(ConfigPrometheus { address }) = config {
let make_service = make_service_fn(move |_: &AddrStream| async move {
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}))
if let Some(debug_clients_rx) = debug_clients_rx {
debug_clients_statuses = Some(DebugClientStatuses::new(debug_clients_rx));
}
let debug_clients_statuses2 = debug_clients_statuses.clone();
let make_service = make_service_fn(move |_: &AddrStream| {
let debug_clients_statuses = debug_clients_statuses2.clone();
async move {
let debug_clients_statuses = debug_clients_statuses.clone();
Ok::<_, hyper::Error>(service_fn(move |req: Request<Body>| {
let debug_clients_statuses = debug_clients_statuses.clone();
async move {
let response = match req.uri().path() {
"/metrics" => metrics_handler(),
"/debug_clients" => {
if let Some(debug_clients_statuses) = &debug_clients_statuses {
let (status, body) =
match debug_clients_statuses.get_statuses().await {
Ok(body) => (StatusCode::OK, body),
Err(error) => (
StatusCode::INTERNAL_SERVER_ERROR,
error.to_string(),
),
};
build_http_response(status, Body::from(body))
} else {
not_found_handler()
}
}
_ => not_found_handler(),
};
Ok::<_, hyper::Error>(response)
}
}))
}
});
let server = Server::try_bind(&address)?.serve(make_service);
let shutdown = shutdown.map(|_| Ok(()));
Expand All @@ -102,29 +242,34 @@ impl PrometheusService {
});
}

Ok(PrometheusService { shutdown_signal })
Ok(PrometheusService {
debug_clients_statuses,
shutdown_signal,
})
}

pub fn shutdown(self) {
drop(self.debug_clients_statuses);
let _ = self.shutdown_signal.send(());
}
}

fn build_http_response(status: StatusCode, body: Body) -> Response<Body> {
Response::builder().status(status).body(body).unwrap()
}

fn metrics_handler() -> Response<Body> {
let metrics = TextEncoder::new()
.encode_to_string(&REGISTRY.gather())
.unwrap_or_else(|error| {
error!("could not encode custom metrics: {}", error);
String::new()
});
Response::builder().body(Body::from(metrics)).unwrap()
build_http_response(StatusCode::OK, Body::from(metrics))
}

fn not_found_handler() -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.unwrap()
build_http_response(StatusCode::NOT_FOUND, Body::empty())
}

pub fn update_slot_status(status: SlotStatus, slot: u64) {
Expand Down

0 comments on commit a5dc933

Please sign in to comment.