Skip to content

Commit

Permalink
geyser: handle x-request-snapshot on client request (#409)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Aug 23, 2024
1 parent fff9534 commit 0bdedb5
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 70 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ The minor version will be incremented upon a breaking change and the patch versi

## 2024-08-22

- yellowstone-grpc-client-1.15.2+solana.1.18.22
- yellowstone-grpc-client-1.15.3+solana.1.18.22
- yellowstone-grpc-geyser-1.15.1+solana.1.18.22
- yellowstone-grpc-proto-1.14.2+solana.1.18.22
- yellowstone-grpc-tools-1.0.0-rc.11+solana.1.18.22
Expand All @@ -30,6 +30,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- solana: bump proto/client version for publishing crates
- geyser: handle `x-request-snapshot` on client request ([#409](https://github.com/rpcpool/yellowstone-grpc/pull/409))

## 2024-08-09

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
resolver = "2"
members = [
"examples/rust", # 1.13.0+solana.1.18.22
"yellowstone-grpc-client", # 1.15.2+solana.1.18.22
"yellowstone-grpc-client", # 1.15.3+solana.1.18.22
"yellowstone-grpc-geyser", # 1.15.1+solana.1.18.22
"yellowstone-grpc-proto", # 1.14.2+solana.1.18.22
"yellowstone-grpc-tools", # 1.0.0-rc.11+solana.1.18.22
Expand Down Expand Up @@ -69,7 +69,7 @@ tracing = "0.1.37"
tracing-subscriber = "0.3.17"
uuid = "1.8.0"
vergen = "8.2.1"
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.2+solana.1.18.22" }
yellowstone-grpc-client = { path = "yellowstone-grpc-client", version = "=1.15.3+solana.1.18.22" }
yellowstone-grpc-proto = { path = "yellowstone-grpc-proto", version = "=1.14.2+solana.1.18.22" }

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-client"
version = "1.15.2+solana.1.18.22"
version = "1.15.3+solana.1.18.22"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Simple Client"
Expand Down
29 changes: 21 additions & 8 deletions yellowstone-grpc-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
std::time::Duration,
tonic::{
codec::{CompressionEncoding, Streaming},
metadata::{errors::InvalidMetadataValue, AsciiMetadataValue},
metadata::{errors::InvalidMetadataValue, AsciiMetadataValue, MetadataValue},
service::interceptor::InterceptedService,
transport::channel::{Channel, ClientTlsConfig, Endpoint},
Request, Response, Status,
Expand All @@ -27,19 +27,19 @@ use {
#[derive(Debug, Clone)]
pub struct InterceptorXToken {
pub x_token: Option<AsciiMetadataValue>,
}

impl From<Option<AsciiMetadataValue>> for InterceptorXToken {
fn from(x_token: Option<AsciiMetadataValue>) -> Self {
Self { x_token }
}
pub x_request_snapshot: bool,
}

impl Interceptor for InterceptorXToken {
fn call(&mut self, mut request: Request<()>) -> Result<Request<()>, Status> {
if let Some(x_token) = self.x_token.clone() {
request.metadata_mut().insert("x-token", x_token);
}
if self.x_request_snapshot {
request
.metadata_mut()
.insert("x-request-snapshot", MetadataValue::from_static("true"));
}
Ok(request)
}
}
Expand Down Expand Up @@ -215,6 +215,7 @@ pub type GeyserGrpcBuilderResult<T> = Result<T, GeyserGrpcBuilderError>;
pub struct GeyserGrpcBuilder {
pub endpoint: Endpoint,
pub x_token: Option<AsciiMetadataValue>,
pub x_request_snapshot: bool,
pub send_compressed: Option<CompressionEncoding>,
pub accept_compressed: Option<CompressionEncoding>,
pub max_decoding_message_size: Option<usize>,
Expand All @@ -227,6 +228,7 @@ impl GeyserGrpcBuilder {
Self {
endpoint,
x_token: None,
x_request_snapshot: false,
send_compressed: None,
accept_compressed: None,
max_decoding_message_size: None,
Expand All @@ -247,7 +249,10 @@ impl GeyserGrpcBuilder {
self,
channel: Channel,
) -> GeyserGrpcBuilderResult<GeyserGrpcClient<impl Interceptor>> {
let interceptor: InterceptorXToken = self.x_token.into();
let interceptor = InterceptorXToken {
x_token: self.x_token,
x_request_snapshot: self.x_request_snapshot,
};

let mut geyser = GeyserClient::with_interceptor(channel.clone(), interceptor.clone());
if let Some(encoding) = self.send_compressed {
Expand Down Expand Up @@ -299,6 +304,14 @@ impl GeyserGrpcBuilder {
})
}

// Include `x-request-snapshot`
pub fn set_x_request_snapshot(self, value: bool) -> GeyserGrpcBuilderResult<Self> {
Ok(Self {
x_request_snapshot: value,
..self
})
}

// Endpoint options
pub fn connect_timeout(self, dur: Duration) -> Self {
Self {
Expand Down
140 changes: 83 additions & 57 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,7 @@ impl GrpcService {
async fn client_loop(
id: usize,
endpoint: String,
x_request_snapshot: bool,
config_filters: Arc<ConfigGrpcFilters>,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
Expand Down Expand Up @@ -1144,63 +1145,18 @@ impl GrpcService {
info!("client #{id}: new");

let mut is_alive = true;
if let Some(snapshot_rx) = snapshot_rx.take() {
info!("client #{id}: going to receive snapshot data");

// we start with default filter, for snapshot we need wait actual filter first
while is_alive {
match client_rx.recv().await {
Some(Some(filter_new)) => {
if let Some(msg) = filter_new.get_pong_msg() {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
}
continue;
}

prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new));
filter = filter_new;
info!("client #{id}: filter updated");
break;
}
Some(None) => {
is_alive = false;
}
None => {
is_alive = false;
}
};
}

while is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
}
}
Err(crossbeam_channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1)).await;
continue;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("client #{id}: snapshot channel disconnected");
is_alive = false;
break;
}
};

for message in filter.get_update(&message, None) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
}
}
if x_request_snapshot {
if let Some(snapshot_rx) = snapshot_rx.take() {
Self::client_loop_snapshot(
id,
&endpoint,
&stream_tx,
&mut client_rx,
snapshot_rx,
&mut is_alive,
&mut filter,
)
.await;
}
}

Expand Down Expand Up @@ -1285,6 +1241,74 @@ impl GrpcService {
info!("client #{id}: removed");
drop_client();
}

async fn client_loop_snapshot(
id: usize,
endpoint: &str,
stream_tx: &mpsc::Sender<TonicResult<SubscribeUpdate>>,
client_rx: &mut mpsc::UnboundedReceiver<Option<Filter>>,
snapshot_rx: crossbeam_channel::Receiver<Option<Message>>,
is_alive: &mut bool,
filter: &mut Filter,
) {
info!("client #{id}: going to receive snapshot data");

// we start with default filter, for snapshot we need wait actual filter first
while *is_alive {
match client_rx.recv().await {
Some(Some(filter_new)) => {
if let Some(msg) = filter_new.get_pong_msg() {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
*is_alive = false;
break;
}
continue;
}

prom::update_subscriptions(endpoint, Some(filter), Some(&filter_new));
*filter = filter_new;
info!("client #{id}: filter updated");
break;
}
Some(None) => {
*is_alive = false;
}
None => {
*is_alive = false;
}
};
}

while *is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
}
}
Err(crossbeam_channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1)).await;
continue;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("client #{id}: snapshot channel disconnected");
*is_alive = false;
break;
}
};

for message in filter.get_update(&message, None) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
*is_alive = false;
break;
}
}
}
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -1342,6 +1366,7 @@ impl Geyser for GrpcService {
.get("x-endpoint")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.unwrap_or_else(|| "".to_owned());
let x_request_snapshot = request.metadata().contains_key("x-request-snapshot");

let config_filters = Arc::clone(&self.config_filters);
let incoming_stream_tx = stream_tx.clone();
Expand Down Expand Up @@ -1388,6 +1413,7 @@ impl Geyser for GrpcService {
tokio::spawn(Self::client_loop(
id,
endpoint,
x_request_snapshot,
Arc::clone(&self.config_filters),
stream_tx,
client_rx,
Expand Down

0 comments on commit 0bdedb5

Please sign in to comment.