From 565258c404833d2c8379372a3d1e221ff71e3e09 Mon Sep 17 00:00:00 2001 From: gritsly Date: Thu, 22 Aug 2024 17:47:00 +0200 Subject: [PATCH] backport snapshot flow change by @kespinola to 1.18 --- yellowstone-grpc-geyser/src/grpc.rs | 72 +++++++++++++---------------- 1 file changed, 33 insertions(+), 39 deletions(-) diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 29baa62c..d4fe8297 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1146,58 +1146,52 @@ impl GrpcService { let mut is_alive = true; if let Some(snapshot_rx) = snapshot_rx.take() { info!("client #{id}: going to receive snapshot data"); - - // we start with default filter, for snapshot we need wait actual filter first while is_alive { - match client_rx.recv().await { - Some(Some(filter_new)) => { - if let Some(msg) = filter_new.get_pong_msg() { - if stream_tx.send(Ok(msg)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; + // Handle new filters and connection status + if let Ok(filter_msg) = client_rx.try_recv() { + match filter_msg { + Some(filter_new) => { + if let Some(msg) = filter_new.get_pong_msg() { + if stream_tx.send(Ok(msg)).await.is_err() { + error!("client #{id}: stream closed"); + is_alive = false; + } + continue; } - continue; - } - prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new)); - filter = filter_new; - info!("client #{id}: filter updated"); - } - Some(None) => { - is_alive = false; - } - None => { - is_alive = false; + metrics::update_subscriptions( + &endpoint, + Some(&filter), + Some(&filter_new), + ); + filter = filter_new; + info!("client #{id}: filter updated"); + } + None => { + is_alive = false; + } } - }; - } + } - while is_alive { - let message = match snapshot_rx.try_recv() { - Ok(message) => { + // Handle snapshot messages + match snapshot_rx.try_recv() { + Ok(Some(message)) => { MESSAGE_QUEUE_SIZE.dec(); - match message { - Some(message) => message, - None => break, + for message in filter.get_update(&message, None) { + if stream_tx.send(Ok(message)).await.is_err() { + error!("client #{id}: stream closed"); + is_alive = false; + break; + } } } + Ok(None) => break, Err(crossbeam_channel::TryRecvError::Empty) => { - sleep(Duration::from_millis(1)).await; - continue; + tokio::time::sleep(tokio::time::Duration::from_millis(1)).await; } Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("client #{id}: snapshot channel disconnected"); is_alive = false; - break; - } - }; - - for message in filter.get_update(&message, None) { - if stream_tx.send(Ok(message)).await.is_err() { - error!("client #{id}: stream closed"); - is_alive = false; - break; } } }