Skip to content

Commit

Permalink
geyser: add x-endpoint to subscriptions_total
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Jun 7, 2024
1 parent 87fc3c1 commit 1000bd9
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- geyser: add compression option to config ([#356](https://github.com/rpcpool/yellowstone-grpc/pull/356))
- geyser: add `x-endpoint` to `subscriptions_total` ([#357](https://github.com/rpcpool/yellowstone-grpc/pull/357))

### Breaking

Expand Down
16 changes: 12 additions & 4 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,7 @@ impl GrpcService {
#[allow(clippy::too_many_arguments)]
async fn client_loop(
id: usize,
endpoint: String,
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
Expand All @@ -1124,7 +1125,7 @@ impl GrpcService {
&config_filters,
)
.expect("empty filter");
prom::update_subscriptions(None, Some(&filter));
prom::update_subscriptions(&endpoint, None, Some(&filter));

CONNECTIONS_TOTAL.inc();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter {
Expand All @@ -1150,7 +1151,7 @@ impl GrpcService {
continue;
}

prom::update_subscriptions(Some(&filter), Some(&filter_new));
prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new));
filter = filter_new;
info!("client #{id}: filter updated");
}
Expand Down Expand Up @@ -1207,7 +1208,7 @@ impl GrpcService {
continue;
}

prom::update_subscriptions(Some(&filter), Some(&filter_new));
prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new));
filter = filter_new;
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) });
info!("client #{id}: filter updated");
Expand Down Expand Up @@ -1270,7 +1271,7 @@ impl GrpcService {

CONNECTIONS_TOTAL.dec();
DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::Removed { id });
prom::update_subscriptions(Some(&filter), None);
prom::update_subscriptions(&endpoint, Some(&filter), None);
info!("client #{id}: removed");
drop_client();
}
Expand Down Expand Up @@ -1326,6 +1327,12 @@ impl Geyser for GrpcService {
}
});

let endpoint = request
.metadata()
.get("x-endpoint")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.unwrap_or_else(|| "".to_owned());

let config_filters = Arc::clone(&self.config_filters);
let incoming_stream_tx = stream_tx.clone();
let incoming_client_tx = client_tx;
Expand Down Expand Up @@ -1370,6 +1377,7 @@ impl Geyser for GrpcService {

tokio::spawn(Self::client_loop(
id,
endpoint,
Arc::clone(&self.config_filters),
stream_tx,
client_rx,
Expand Down
8 changes: 4 additions & 4 deletions yellowstone-grpc-geyser/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ lazy_static::lazy_static! {

static ref SUBSCRIPTIONS_TOTAL: IntGaugeVec = IntGaugeVec::new(
Opts::new("subscriptions_total", "Total number of subscriptions to gRPC service"),
&["subscription"]
&["endpoint", "subscription"]
).unwrap();
}

Expand Down Expand Up @@ -305,16 +305,16 @@ pub fn update_invalid_blocks(reason: impl AsRef<str>) {
INVALID_FULL_BLOCKS.with_label_values(&["all"]).inc();
}

pub fn update_subscriptions(old: Option<&Filter>, new: Option<&Filter>) {
pub fn update_subscriptions(endpoint: &str, old: Option<&Filter>, new: Option<&Filter>) {
for (multiplier, filter) in [(-1, old), (1, new)] {
if let Some(filter) = filter {
SUBSCRIPTIONS_TOTAL
.with_label_values(&["grpc_total"])
.with_label_values(&[endpoint, "grpc_total"])
.add(multiplier);

for (name, value) in filter.get_metrics() {
SUBSCRIPTIONS_TOTAL
.with_label_values(&[name])
.with_label_values(&[endpoint, name])
.add((value as i64) * multiplier);
}
}
Expand Down

0 comments on commit 1000bd9

Please sign in to comment.