Skip to content

Commit

Permalink
geyser: add metric subscriptions_total (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Jun 5, 2024
1 parent f3f830b commit 6dc8f03
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ The minor version will be incremented upon a breaking change and the patch versi

### Features

- geyser: add metric `subscriptions_total` ([#355](https://github.com/rpcpool/yellowstone-grpc/pull/355))

### Breaking

## 2024-06-02
Expand Down
25 changes: 25 additions & 0 deletions yellowstone-grpc-geyser/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,31 @@ impl Filter {
Ok(vec)
}

pub fn get_metrics(&self) -> [(&'static str, usize); 8] {
[
("accounts", self.accounts.filters.len()),
("slots", self.slots.filters.len()),
("transactions", self.transactions.filters.len()),
(
"transactions_status",
self.transactions_status.filters.len(),
),
("entry", self.entry.filters.len()),
("blocks", self.blocks.filters.len()),
("blocks_meta", self.blocks_meta.filters.len()),
(
"all",
self.accounts.filters.len()
+ self.slots.filters.len()
+ self.transactions.filters.len()
+ self.transactions_status.filters.len()
+ self.entry.filters.len()
+ self.blocks.filters.len()
+ self.blocks_meta.filters.len(),
),
]
}

pub const fn get_commitment_level(&self) -> CommitmentLevel {
self.commitment
}
Expand Down
50 changes: 29 additions & 21 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use {
crate::{
config::{ConfigBlockFailAction, ConfigGrpc},
config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters},
filters::{Filter, FilterAccountsDataSlice},
prom::{self, DebugClientMessage, CONNECTIONS_TOTAL, MESSAGE_QUEUE_SIZE},
version::GrpcVersionInfo,
Expand Down Expand Up @@ -713,6 +713,7 @@ impl SlotMessages {
#[derive(Debug)]
pub struct GrpcService {
config: ConfigGrpc,
config_filters: Arc<ConfigGrpcFilters>,
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Option<Message>>>>,
Expand Down Expand Up @@ -777,8 +778,10 @@ impl GrpcService {
// Create Server
let max_decoding_message_size = config.max_decoding_message_size;
let x_token = XTokenChecker::new(config.x_token.clone());
let config_filters = Arc::new(config.filters.clone());
let service = GeyserServer::new(Self {
config,
config_filters,
blocks_meta,
subscribe_id: AtomicUsize::new(0),
snapshot_rx: Mutex::new(snapshot_rx),
Expand Down Expand Up @@ -1094,14 +1097,32 @@ impl GrpcService {
#[allow(clippy::too_many_arguments)]
async fn client_loop(
id: usize,
mut filter: Filter,
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Option<Message>>>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Arc<Message>>>)>,
debug_client_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
drop_client: impl FnOnce(),
) {
let mut filter = Filter::new(
&SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
entry: HashMap::new(),
commitment: None,
accounts_data_slice: Vec::new(),
ping: None,
},
&config_filters,
)
.expect("empty filter");
prom::update_subscriptions(None, Some(&filter));

CONNECTIONS_TOTAL.inc();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter {
id,
Expand All @@ -1126,6 +1147,7 @@ impl GrpcService {
continue;
}

prom::update_subscriptions(Some(&filter), Some(&filter_new));
filter = filter_new;
info!("client #{id}: filter updated");
}
Expand Down Expand Up @@ -1182,6 +1204,7 @@ impl GrpcService {
continue;
}

prom::update_subscriptions(Some(&filter), Some(&filter_new));
filter = filter_new;
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) });
info!("client #{id}: filter updated");
Expand Down Expand Up @@ -1244,6 +1267,7 @@ impl GrpcService {

CONNECTIONS_TOTAL.dec();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::Removed { id });
prom::update_subscriptions(Some(&filter), None);
info!("client #{id}: removed");
drop_client();
}
Expand All @@ -1258,22 +1282,6 @@ impl Geyser for GrpcService {
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);
let filter = Filter::new(
&SubscribeRequest {
accounts: HashMap::new(),
slots: HashMap::new(),
transactions: HashMap::new(),
transactions_status: HashMap::new(),
blocks: HashMap::new(),
blocks_meta: HashMap::new(),
entry: HashMap::new(),
commitment: None,
accounts_data_slice: Vec::new(),
ping: None,
},
&self.config.filters,
)
.expect("empty filter");
let snapshot_rx = self.snapshot_rx.lock().await.take();
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config.snapshot_client_channel_capacity
Expand Down Expand Up @@ -1315,7 +1323,7 @@ impl Geyser for GrpcService {
}
});

let config_filters_limit = self.config.filters.clone();
let config_filters = Arc::clone(&self.config_filters);
let incoming_stream_tx = stream_tx.clone();
let incoming_client_tx = client_tx;
let incoming_exit = Arc::clone(&notify_exit2);
Expand All @@ -1330,7 +1338,7 @@ impl Geyser for GrpcService {
}
message = request.get_mut().message() => match message {
Ok(Some(request)) => {
if let Err(error) = match Filter::new(&request, &config_filters_limit) {
if let Err(error) = match Filter::new(&request, &config_filters) {
Ok(filter) => match incoming_client_tx.send(Some(filter)) {
Ok(()) => Ok(()),
Err(error) => Err(error.to_string()),
Expand Down Expand Up @@ -1359,7 +1367,7 @@ impl Geyser for GrpcService {

tokio::spawn(Self::client_loop(
id,
filter,
Arc::clone(&self.config_filters),
stream_tx,
client_rx,
snapshot_rx,
Expand Down
24 changes: 23 additions & 1 deletion yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ lazy_static::lazy_static! {
).unwrap();

pub static ref CONNECTIONS_TOTAL: IntGauge = IntGauge::new(
"connections_total", "Total number of connections to GRPC service"
"connections_total", "Total number of connections to gRPC service"
).unwrap();

static ref SUBSCRIPTIONS_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("subscriptions_total", "Total number of subscriptions to gRPC service"),
&["subscription"]
).unwrap();
}

Expand Down Expand Up @@ -181,6 +186,7 @@ impl PrometheusService {
register!(INVALID_FULL_BLOCKS);
register!(MESSAGE_QUEUE_SIZE);
register!(CONNECTIONS_TOTAL);
register!(SUBSCRIPTIONS_TOTAL);

VERSION
.with_label_values(&[
Expand Down Expand Up @@ -298,3 +304,19 @@ pub fn update_invalid_blocks(reason: impl AsRef<str>) {
.inc();
INVALID_FULL_BLOCKS.with_label_values(&["all"]).inc();
}

pub fn update_subscriptions(old: Option<&Filter>, new: Option<&Filter>) {
for (multiplier, filter) in [(-1, old), (1, new)] {
if let Some(filter) = filter {
SUBSCRIPTIONS_TOTAL
.with_label_values(&["grpc_total"])
.add(multiplier);

for (name, value) in filter.get_metrics() {
SUBSCRIPTIONS_TOTAL
.with_label_values(&[name])
.add((value as i64) * multiplier);
}
}
}
}

0 comments on commit 6dc8f03

Please sign in to comment.