Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrap from_reader to block_in_place #1

Open
wants to merge 4 commits into
base: accounts-from-path
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ The minor version will be incremented upon a breaking change and the patch versi
### Fixes

- deps: make cargo-deny happy about openssl, unsafe-libyaml, h2, ahash ([#278](https://github.com/rpcpool/yellowstone-grpc/pull/278))
- geyser: allow to set custom filter size in the config ([#288](https://github.com/rpcpool/yellowstone-grpc/pull/288))

### Features

- proto: add `entries_count` to block meta message ([#283](https://github.com/rpcpool/yellowstone-grpc/pull/283))
- geyser: use `Vec::binary_search` instead of `HashSet::contains` in the filters ([#284](https://github.com/rpcpool/yellowstone-grpc/pull/284))
- proto: add `starting_transaction_index` to entry message ([#289](https://github.com/rpcpool/yellowstone-grpc/pull/289))

### Breaking

Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

210 changes: 111 additions & 99 deletions examples/golang/proto/geyser.pb.go

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use {
std::{
collections::HashMap,
env, fmt,
fs::File,
sync::{Arc, Mutex},
time::Duration,
},
Expand Down Expand Up @@ -106,6 +107,10 @@ struct ActionSubscribe {
#[clap(long)]
accounts_account: Vec<String>,

/// Path to a JSON array of account addresses
#[clap(long)]
accounts_account_path: Option<String>,

/// Filter by Owner Pubkey
#[clap(long)]
accounts_owner: Vec<String>,
Expand Down Expand Up @@ -199,14 +204,23 @@ struct ActionSubscribe {
}

impl Action {
fn get_subscribe_request(
async fn get_subscribe_request(
&self,
commitment: Option<CommitmentLevel>,
) -> anyhow::Result<Option<(SubscribeRequest, usize)>> {
Ok(match self {
Self::Subscribe(args) => {
let mut accounts: AccountFilterMap = HashMap::new();
if args.accounts {
let mut accounts_account = args.accounts_account.clone();
if let Some(path) = args.accounts_account_path.clone() {
let accounts = tokio::task::block_in_place(move || {
let file = File::open(path)?;
Ok::<Vec<String>, anyhow::Error>(serde_json::from_reader(file)?)
})?;
accounts_account.extend(accounts);
}

let mut filters = vec![];
for filter in args.accounts_memcmp.iter() {
match filter.split_once(',') {
Expand Down Expand Up @@ -241,7 +255,7 @@ impl Action {
accounts.insert(
"client".to_owned(),
SubscribeRequestFilterAccounts {
account: args.accounts_account.clone(),
account: accounts_account,
owner: args.accounts_owner.clone(),
filters,
},
Expand Down Expand Up @@ -463,6 +477,7 @@ async fn main() -> anyhow::Result<()> {
let (request, resub) = args
.action
.get_subscribe_request(commitment)
.await
.map_err(backoff::Error::Permanent)?
.expect("expect subscribe action");

Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"cert_path": "",
"key_path": ""
},
"max_decoding_message_size": "4_194_304",
"snapshot_plugin_channel_capacity": null,
"snapshot_client_channel_capacity": "50_000_000",
"channel_capacity": "100_000",
Expand Down
10 changes: 10 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ pub struct ConfigGrpc {
pub address: SocketAddr,
/// TLS config
pub tls_config: Option<ConfigGrpcServerTls>,
/// Limits the maximum size of a decoded message, default is 4MiB
#[serde(
default = "ConfigGrpc::max_decoding_message_size_default",
deserialize_with = "deserialize_usize_str"
)]
pub max_decoding_message_size: usize,
/// Capacity of the channel used for accounts from snapshot,
/// on reaching the limit Sender block validator startup.
#[serde(
Expand Down Expand Up @@ -98,6 +104,10 @@ pub struct ConfigGrpc {
}

impl ConfigGrpc {
const fn max_decoding_message_size_default() -> usize {
4 * 1024 * 1024
}

const fn snapshot_plugin_channel_capacity_default() -> Option<usize> {
None
}
Expand Down
7 changes: 6 additions & 1 deletion yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ pub struct MessageEntry {
pub num_hashes: u64,
pub hash: Vec<u8>,
pub executed_transaction_count: u64,
pub starting_transaction_index: u64,
}

impl From<&ReplicaEntryInfo<'_>> for MessageEntry {
Expand All @@ -202,6 +203,7 @@ impl From<&ReplicaEntryInfo<'_>> for MessageEntry {
num_hashes: entry.num_hashes,
hash: entry.hash.into(),
executed_transaction_count: entry.executed_transaction_count,
starting_transaction_index: 0,
}
}
}
Expand All @@ -214,6 +216,7 @@ impl MessageEntry {
num_hashes: self.num_hashes,
hash: self.hash.clone(),
executed_transaction_count: self.executed_transaction_count,
starting_transaction_index: self.starting_transaction_index,
}
}
}
Expand Down Expand Up @@ -745,6 +748,7 @@ impl GrpcService {
}

// Create Server
let max_decoding_message_size = config.max_decoding_message_size;
let service = GeyserServer::new(Self {
config,
blocks_meta,
Expand All @@ -753,7 +757,8 @@ impl GrpcService {
broadcast_tx: broadcast_tx.clone(),
})
.accept_compressed(CompressionEncoding::Gzip)
.send_compressed(CompressionEncoding::Gzip);
.send_compressed(CompressionEncoding::Gzip)
.max_decoding_message_size(max_decoding_message_size);

// Run geyser message loop
let (messages_tx, messages_rx) = mpsc::unbounded_channel();
Expand Down
1 change: 1 addition & 0 deletions yellowstone-grpc-proto/proto/geyser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ message SubscribeUpdateEntry {
uint64 num_hashes = 3;
bytes hash = 4;
uint64 executed_transaction_count = 5;
uint64 starting_transaction_index = 6; // added in v1.18, for solana 1.17 value is always 0
}

message SubscribeUpdatePing {}
Expand Down