Skip to content

Commit

Permalink
geyser: fix replay_stored_slots (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 15, 2024
1 parent 1b98d7c commit 693a10f
Show file tree
Hide file tree
Showing 7 changed files with 27 additions and 17 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2024-12-15

- yellowstone-grpc-geyser-3.2.1

### Fixes

- geyser: fix `replay_stored_slots` ([#496](https://github.com/rpcpool/yellowstone-grpc/pull/496))

## 2024-12-13

- yellowstone-grpc-client-simple-3.2.0
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ resolver = "2"
members = [
"examples/rust", # 3.2.0
"yellowstone-grpc-client", # 3.1.0
"yellowstone-grpc-geyser", # 3.2.0
"yellowstone-grpc-geyser", # 3.2.1
"yellowstone-grpc-proto", # 3.1.0
]
exclude = [
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "3.2.0"
version = "3.2.1"
authors = { workspace = true }
edition = { workspace = true }
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"libpath": "../target/debug/libyellowstone_grpc_geyser.so",
"libpath": "../target/release/libyellowstone_grpc_geyser.so",
"log": {
"level": "info"
},
Expand Down
4 changes: 2 additions & 2 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,9 @@ pub struct ConfigGrpc {
pub replay_stored_slots: u64,
#[serde(default)]
pub server_http2_adaptive_window: Option<bool>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
pub server_http2_keepalive_interval: Option<Duration>,
#[serde(with = "humantime_serde")]
#[serde(default, with = "humantime_serde")]
pub server_http2_keepalive_timeout: Option<Duration>,
#[serde(default)]
pub server_initial_connection_window_size: Option<u32>,
Expand Down
24 changes: 13 additions & 11 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ pub struct GrpcService {
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Box<Message>>>>,
broadcast_tx: broadcast::Sender<BroadcastedMessage>,
replay_stored_slots_tx: mpsc::Sender<ReplayStoredSlotsRequest>,
replay_stored_slots_tx: Option<mpsc::Sender<ReplayStoredSlotsRequest>>,
debug_clients_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
filter_names: Arc<Mutex<FilterNames>>,
}
Expand Down Expand Up @@ -373,12 +373,12 @@ impl GrpcService {
// Messages to clients combined by commitment
let (broadcast_tx, _) = broadcast::channel(config.channel_capacity);
// attempt to prevent spam of geyser loop with capacity eq 1
let (replay_stored_slots_tx, replay_stored_slots_rx) =
mpsc::channel(if config.replay_stored_slots == 0 {
0
} else {
1
});
let (replay_stored_slots_tx, replay_stored_slots_rx) = if config.replay_stored_slots == 0 {
(None, None)
} else {
let (tx, rx) = mpsc::channel(1);
(Some(tx), Some(rx))
};

// gRPC server builder with optional TLS
let mut server_builder = Server::builder();
Expand Down Expand Up @@ -495,7 +495,7 @@ impl GrpcService {
mut messages_rx: mpsc::UnboundedReceiver<Message>,
blocks_meta_tx: Option<mpsc::UnboundedSender<Message>>,
broadcast_tx: broadcast::Sender<BroadcastedMessage>,
mut replay_stored_slots_rx: mpsc::Receiver<ReplayStoredSlotsRequest>,
replay_stored_slots_rx: Option<mpsc::Receiver<ReplayStoredSlotsRequest>>,
replay_stored_slots: u64,
) {
const PROCESSED_MESSAGES_MAX: usize = 31;
Expand All @@ -507,6 +507,8 @@ impl GrpcService {
let mut processed_first_slot = None;
let processed_sleep = sleep(PROCESSED_MESSAGES_SLEEP);
tokio::pin!(processed_sleep);
let (_tx, rx) = mpsc::channel(1);
let mut replay_stored_slots_rx = replay_stored_slots_rx.unwrap_or(rx);

loop {
tokio::select! {
Expand Down Expand Up @@ -825,7 +827,7 @@ impl GrpcService {
mut client_rx: mpsc::UnboundedReceiver<Option<(Option<u64>, Filter)>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Box<Message>>>,
mut messages_rx: broadcast::Receiver<BroadcastedMessage>,
replay_stored_slots_tx: mpsc::Sender<ReplayStoredSlotsRequest>,
replay_stored_slots_tx: Option<mpsc::Sender<ReplayStoredSlotsRequest>>,
debug_client_tx: Option<mpsc::UnboundedSender<DebugClientMessage>>,
drop_client: impl FnOnce(),
) {
Expand Down Expand Up @@ -887,13 +889,13 @@ impl GrpcService {
info!("client #{id}: filter updated");

if let Some(from_slot) = from_slot {
if replay_stored_slots_tx.max_capacity() == 0 {
let Some(replay_stored_slots_tx) = &replay_stored_slots_tx else {
info!("client #{id}: from_slot is not supported");
tokio::spawn(async move {
let _ = stream_tx.send(Err(Status::internal("from_slot is not supported"))).await;
});
break 'outer;
}
};

let (tx, rx) = oneshot::channel();
let commitment = filter.get_commitment_level();
Expand Down

0 comments on commit 693a10f

Please sign in to comment.