Skip to content

Commit

Permalink
backport snapshot flow change by @kespinola to 1.18
Browse files Browse the repository at this point in the history
  • Loading branch information
gritsly committed Aug 22, 2024
1 parent 6295d58 commit 565258c
Showing 1 changed file with 33 additions and 39 deletions.
72 changes: 33 additions & 39 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1146,58 +1146,52 @@ impl GrpcService {
let mut is_alive = true;
if let Some(snapshot_rx) = snapshot_rx.take() {
info!("client #{id}: going to receive snapshot data");

// we start with default filter, for snapshot we need wait actual filter first
while is_alive {
match client_rx.recv().await {
Some(Some(filter_new)) => {
if let Some(msg) = filter_new.get_pong_msg() {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
// Handle new filters and connection status
if let Ok(filter_msg) = client_rx.try_recv() {
match filter_msg {
Some(filter_new) => {
if let Some(msg) = filter_new.get_pong_msg() {
if stream_tx.send(Ok(msg)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
}
continue;
}
continue;
}

prom::update_subscriptions(&endpoint, Some(&filter), Some(&filter_new));
filter = filter_new;
info!("client #{id}: filter updated");
}
Some(None) => {
is_alive = false;
}
None => {
is_alive = false;
metrics::update_subscriptions(
&endpoint,
Some(&filter),
Some(&filter_new),
);
filter = filter_new;
info!("client #{id}: filter updated");
}
None => {
is_alive = false;
}
}
};
}
}

while is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
// Handle snapshot messages
match snapshot_rx.try_recv() {
Ok(Some(message)) => {
MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
for message in filter.get_update(&message, None) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
}
}
}
Ok(None) => break,
Err(crossbeam_channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1)).await;
continue;
tokio::time::sleep(tokio::time::Duration::from_millis(1)).await;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("client #{id}: snapshot channel disconnected");
is_alive = false;
break;
}
};

for message in filter.get_update(&message, None) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
}
}
}
Expand Down

0 comments on commit 565258c

Please sign in to comment.