Skip to content

Commit

Permalink
geyser: fix x-request-snapshot handler (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Aug 23, 2024
1 parent 0bdedb5 commit e3b8ab7
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 30 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,17 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2024-08-23

- yellowstone-grpc-client-1.15.4+solana.1.18.22
- yellowstone-grpc-geyser-1.15.2+solana.1.18.22
- yellowstone-grpc-proto-1.14.2+solana.1.18.22
- yellowstone-grpc-tools-1.0.0-rc.11+solana.1.18.22

### Fixes

- geyser: fix `x-request-snapshot` handler ([#412](https://github.com/rpcpool/yellowstone-grpc/pull/412))

## 2024-08-22

- yellowstone-grpc-client-1.15.3+solana.1.18.22
Expand Down
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
resolver = "2"
members = [
"examples/rust", # 1.13.0+solana.1.18.22
"yellowstone-grpc-client", # 1.15.3+solana.1.18.22
"yellowstone-grpc-geyser", # 1.15.1+solana.1.18.22
"yellowstone-grpc-client", # 1.15.4+solana.1.18.22
"yellowstone-grpc-geyser", # 1.15.2+solana.1.18.22
"yellowstone-grpc-proto", # 1.14.2+solana.1.18.22
"yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.22
]
Expand Down Expand Up @@ -69,7 +69,7 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.17"
uuid = "1.8.0"
vergen = "8.2.1"
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.3+solana.1.18.22" }
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.4+solana.1.18.22" }
yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.2+solana.1.18.22" }

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.15.3+solana.1.18.22"
version = "1.15.4+solana.1.18.22"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Simple Client"
Expand Down
6 changes: 3 additions & 3 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ impl GeyserGrpcBuilder {
}

// Include `x-request-snapshot`
pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
pub fn set_x_request_snapshot(self, value: bool) -> Self {
Self {
x_request_snapshot: value,
..self
})
}
}

// Endpoint options
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.15.1+solana.1.18.22"
version = "1.15.2+solana.1.18.22"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
36 changes: 18 additions & 18 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,6 @@ impl GrpcService {
async fn client_loop(
id: usize,
endpoint: String,
x_request_snapshot: bool,
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
Expand Down Expand Up @@ -1145,19 +1144,17 @@ impl GrpcService {
info!("client #{id}: new");

let mut is_alive = true;
if x_request_snapshot {
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
&mut client_rx,
snapshot_rx,
&mut is_alive,
&mut filter,
)
.await;
}
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
&mut client_rx,
snapshot_rx,
&mut is_alive,
&mut filter,
)
.await;
}

if is_alive {
Expand Down Expand Up @@ -1261,7 +1258,6 @@ impl GrpcService {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
*is_alive = false;
break;
}
continue;
}
Expand Down Expand Up @@ -1320,7 +1316,13 @@ impl Geyser for GrpcService {
mut request: Request<Streaming<SubscribeRequest>>,
) -> TonicResult<Response<Self::SubscribeStream>> {
let id = self.subscribe_id.fetch_add(1, Ordering::Relaxed);
let snapshot_rx = self.snapshot_rx.lock().await.take();

let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");
let snapshot_rx = if x_request_snapshot {
self.snapshot_rx.lock().await.take()
} else {
None
};
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config_snapshot_client_channel_capacity
} else {
Expand Down Expand Up @@ -1366,7 +1368,6 @@ impl Geyser for GrpcService {
.get("x-endpoint")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.unwrap_or_else(|| "".to_owned());
let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");

let config_filters = Arc::clone(&self.config_filters);
let incoming_stream_tx = stream_tx.clone();
Expand Down Expand Up @@ -1413,7 +1414,6 @@ impl Geyser for GrpcService {
tokio::spawn(Self::client_loop(
id,
endpoint,
x_request_snapshot,
Arc::clone(&self.config_filters),
stream_tx,
client_rx,
Expand Down
19 changes: 17 additions & 2 deletions yellowstone-grpc-geyser/src/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ use {
ReplicaEntryInfoVersions, ReplicaTransactionInfoVersions, Result as PluginResult,
SlotStatus,
},
std::{concat, env, sync::Arc, time::Duration},
std::{
concat, env,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
},
tokio::{
runtime::{Builder, Runtime},
sync::{mpsc, Notify},
Expand All @@ -20,6 +27,7 @@ use {
pub struct PluginInner {
runtime: Runtime,
snapshot_channel: Option<crossbeam_channel::Sender<Option<Message>>>,
snapshot_channel_closed: AtomicBool,
grpc_channel: mpsc::UnboundedSender<Arc<Message>>,
grpc_shutdown: Arc<Notify>,
prometheus: PrometheusService,
Expand Down Expand Up @@ -93,6 +101,7 @@ impl GeyserPlugin for Plugin {
self.inner = Some(PluginInner {
runtime,
snapshot_channel,
snapshot_channel_closed: AtomicBool::new(false),
grpc_channel,
grpc_shutdown,
prometheus,
Expand Down Expand Up @@ -132,7 +141,13 @@ impl GeyserPlugin for Plugin {
if let Some(channel) = &inner.snapshot_channel {
match channel.send(Some(message)) {
Ok(()) => MESSAGE_QUEUE_SIZE.inc(),
Err(_) => panic!("failed to send message to startup queue: channel closed"),
Err(_) => {
if !inner.snapshot_channel_closed.swap(true, Ordering::Relaxed) {
log::error!(
"failed to send message to startup queue: channel closed"
)
}
}
}
}
} else {
Expand Down

0 comments on commit e3b8ab7

Please sign in to comment.