From cd235aa11b115e0762abe37ffeb985d3acfe50bc Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 14 Nov 2024 12:09:37 +0200 Subject: [PATCH 1/3] proto: change error type in mod `convert_from` (#457) --- CHANGELOG.md | 1 + yellowstone-grpc-proto/src/lib.rs | 213 +++++++++++++++--------------- 2 files changed, 107 insertions(+), 107 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cac52036..dd2d16bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add txn_signature filter ([#445](https://github.com/rpcpool/yellowstone-grpc/pull/445)) - geyser: limit length of filter name ([#448](https://github.com/rpcpool/yellowstone-grpc/pull/448)) - examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) +- proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457)) ### Breaking diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 1c83762d..6bf96447 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -36,7 +36,7 @@ pub mod convert_to { }, pubkey::Pubkey, signature::Signature, - transaction::SanitizedTransaction, + transaction::{SanitizedTransaction, TransactionError}, transaction_context::TransactionReturnData, }, solana_transaction_status::{ @@ -62,11 +62,7 @@ pub mod convert_to { header: Some(create_header(&message.header)), account_keys: create_pubkeys(&message.account_keys), recent_blockhash: message.recent_blockhash.to_bytes().into(), - instructions: message - .instructions - .iter() - .map(create_instruction) - .collect(), + instructions: create_instructions(&message.instructions), versioned: false, address_table_lookups: vec![], }, @@ -137,12 +133,7 @@ pub mod convert_to { return_data, compute_units_consumed, } = meta; - let err = match status { - Ok(()) => None, - Err(err) => Some(proto::TransactionError { - err: bincode::serialize(&err).expect("transaction error to serialize to bytes"), - }), - }; + let err = create_transaction_error(status); let inner_instructions_none = inner_instructions.is_none(); let inner_instructions = inner_instructions .as_deref() @@ -182,6 +173,17 @@ pub mod convert_to { } } + pub fn create_transaction_error( + status: &Result<(), TransactionError>, + ) -> Option { + match status { + Ok(()) => None, + Err(err) => Some(proto::TransactionError { + err: bincode::serialize(&err).expect("transaction error to serialize to bytes"), + }), + } + } + pub fn create_inner_instructions_vec( ixs: &[InnerInstructions], ) -> Vec { @@ -296,20 +298,15 @@ pub mod convert_from { }, }; - fn ensure_some(maybe_value: Option, message: impl Into) -> Result { - match maybe_value { - Some(value) => Ok(value), - None => Err(message.into()), - } - } + type CreateResult = Result; - pub fn create_block(block: proto::SubscribeUpdateBlock) -> Result { + pub fn create_block(block: proto::SubscribeUpdateBlock) -> CreateResult { let mut transactions = vec![]; for tx in block.transactions { transactions.push(create_tx_with_meta(tx)?); } - let block_rewards = ensure_some(block.rewards, "failed to get rewards")?; + let block_rewards = block.rewards.ok_or("failed to get rewards")?; let mut rewards = vec![]; for reward in block_rewards.rewards { rewards.push(create_reward(reward)?); @@ -322,22 +319,28 @@ pub mod convert_from { transactions, rewards, num_partitions: block_rewards.num_partitions.map(|msg| msg.num_partitions), - block_time: Some(ensure_some( - block.block_time.map(|wrapper| wrapper.timestamp), - "failed to get block_time", - )?), - block_height: Some(ensure_some( - block.block_height.map(|wrapper| wrapper.block_height), - "failed to get block_height", - )?), + block_time: Some( + block + .block_time + .map(|wrapper| wrapper.timestamp) + .ok_or("failed to get block_time")?, + ), + block_height: Some( + block + .block_height + .map(|wrapper| wrapper.block_height) + .ok_or("failed to get block_height")?, + ), }) } pub fn create_tx_with_meta( tx: proto::SubscribeUpdateTransactionInfo, - ) -> Result { - let meta = ensure_some(tx.meta, "failed to get transaction meta")?; - let tx = ensure_some(tx.transaction, "failed to get transaction transaction")?; + ) -> CreateResult { + let meta = tx.meta.ok_or("failed to get transaction meta")?; + let tx = tx + .transaction + .ok_or("failed to get transaction transaction")?; Ok(TransactionWithStatusMeta::Complete( VersionedTransactionWithStatusMeta { @@ -347,50 +350,48 @@ pub mod convert_from { )) } - pub fn create_tx_versioned(tx: proto::Transaction) -> Result { + pub fn create_tx_versioned(tx: proto::Transaction) -> CreateResult { let mut signatures = Vec::with_capacity(tx.signatures.len()); for signature in tx.signatures { signatures.push(match Signature::try_from(signature.as_slice()) { Ok(signature) => signature, - Err(_error) => return Err("failed to parse Signature".to_owned()), + Err(_error) => return Err("failed to parse Signature"), }); } Ok(VersionedTransaction { signatures, - message: create_message(ensure_some(tx.message, "failed to get message")?)?, + message: create_message(tx.message.ok_or("failed to get message")?)?, }) } - pub fn create_message(message: proto::Message) -> Result { - let header = ensure_some(message.header, "failed to get MessageHeader")?; + pub fn create_message(message: proto::Message) -> CreateResult { + let header = message.header.ok_or("failed to get MessageHeader")?; let header = MessageHeader { - num_required_signatures: ensure_some( - header.num_required_signatures.try_into().ok(), - "failed to parse num_required_signatures", - )?, - num_readonly_signed_accounts: ensure_some( - header.num_readonly_signed_accounts.try_into().ok(), - "failed to parse num_readonly_signed_accounts", - )?, - num_readonly_unsigned_accounts: ensure_some( - header.num_readonly_unsigned_accounts.try_into().ok(), - "failed to parse num_readonly_unsigned_accounts", - )?, + num_required_signatures: header + .num_required_signatures + .try_into() + .map_err(|_| "failed to parse num_required_signatures")?, + num_readonly_signed_accounts: header + .num_readonly_signed_accounts + .try_into() + .map_err(|_| "failed to parse num_readonly_signed_accounts")?, + num_readonly_unsigned_accounts: header + .num_readonly_unsigned_accounts + .try_into() + .map_err(|_| "failed to parse num_readonly_unsigned_accounts")?, }; if message.recent_blockhash.len() != HASH_BYTES { - return Err("failed to parse hash".to_owned()); + return Err("failed to parse hash"); } Ok(if message.versioned { let mut address_table_lookups = Vec::with_capacity(message.address_table_lookups.len()); for table in message.address_table_lookups { address_table_lookups.push(MessageAddressTableLookup { - account_key: ensure_some( - Pubkey::try_from(table.account_key.as_slice()).ok(), - "failed to parse Pubkey", - )?, + account_key: Pubkey::try_from(table.account_key.as_slice()) + .map_err(|_| "failed to parse Pubkey")?, writable_indexes: table.writable_indexes, readonly_indexes: table.readonly_indexes, }); @@ -415,18 +416,18 @@ pub mod convert_from { pub fn create_message_instructions( ixs: Vec, - ) -> Result, String> { + ) -> CreateResult> { ixs.into_iter().map(create_message_instruction).collect() } pub fn create_message_instruction( ix: proto::CompiledInstruction, - ) -> Result { + ) -> CreateResult { Ok(CompiledInstruction { - program_id_index: ensure_some( - ix.program_id_index.try_into().ok(), - "failed to decode CompiledInstruction.program_id_index)", - )?, + program_id_index: ix + .program_id_index + .try_into() + .map_err(|_| "failed to decode CompiledInstruction.program_id_index)")?, accounts: ix.accounts, data: ix.data, }) @@ -434,7 +435,7 @@ pub mod convert_from { pub fn create_tx_meta( meta: proto::TransactionStatusMeta, - ) -> Result { + ) -> CreateResult { let meta_status = match create_tx_error(meta.err.as_ref())? { Some(err) => Err(err), None => Ok(()), @@ -462,12 +463,10 @@ pub mod convert_from { return_data: if meta.return_data_none { None } else { - let data = ensure_some(meta.return_data, "failed to get return_data")?; + let data = meta.return_data.ok_or("failed to get return_data")?; Some(TransactionReturnData { - program_id: ensure_some( - Pubkey::try_from(data.program_id.as_slice()).ok(), - "failed to parse program_id", - )?, + program_id: Pubkey::try_from(data.program_id.as_slice()) + .map_err(|_| "failed to parse program_id")?, data: data.data, }) }, @@ -477,32 +476,29 @@ pub mod convert_from { pub fn create_tx_error( err: Option<&proto::TransactionError>, - ) -> Result, String> { - ensure_some( - err.map(|err| bincode::deserialize::(&err.err)) - .transpose() - .ok(), - "failed to decode TransactionError", - ) + ) -> CreateResult> { + err.map(|err| bincode::deserialize::(&err.err)) + .transpose() + .map_err(|_| "failed to decode TransactionError") } pub fn create_meta_inner_instructions( ixs: Vec, - ) -> Result, String> { + ) -> CreateResult> { ixs.into_iter().map(create_meta_inner_instruction).collect() } pub fn create_meta_inner_instruction( ix: proto::InnerInstructions, - ) -> Result { + ) -> CreateResult { let mut instructions = vec![]; for ix in ix.instructions { instructions.push(InnerInstruction { instruction: CompiledInstruction { - program_id_index: ensure_some( - ix.program_id_index.try_into().ok(), - "failed to decode CompiledInstruction.program_id_index)", - )?, + program_id_index: ix + .program_id_index + .try_into() + .map_err(|_| "failed to decode CompiledInstruction.program_id_index)")?, accounts: ix.accounts, data: ix.data, }, @@ -510,15 +506,15 @@ pub mod convert_from { }); } Ok(InnerInstructions { - index: ensure_some( - ix.index.try_into().ok(), - "failed to decode InnerInstructions.index", - )?, + index: ix + .index + .try_into() + .map_err(|_| "failed to decode InnerInstructions.index")?, instructions, }) } - pub fn create_rewards_obj(rewards: proto::Rewards) -> Result { + pub fn create_rewards_obj(rewards: proto::Rewards) -> CreateResult { Ok(RewardsAndNumPartitions { rewards: rewards .rewards @@ -529,15 +525,14 @@ pub mod convert_from { }) } - pub fn create_reward(reward: proto::Reward) -> Result { + pub fn create_reward(reward: proto::Reward) -> CreateResult { Ok(Reward { pubkey: reward.pubkey, lamports: reward.lamports, post_balance: reward.post_balance, - reward_type: match ensure_some( - proto::RewardType::try_from(reward.reward_type).ok(), - "failed to parse reward_type", - )? { + reward_type: match proto::RewardType::try_from(reward.reward_type) + .map_err(|_| "failed to parse reward_type")? + { proto::RewardType::Unspecified => None, proto::RewardType::Fee => Some(RewardType::Fee), proto::RewardType::Rent => Some(RewardType::Rent), @@ -547,32 +542,36 @@ pub mod convert_from { commission: if reward.commission.is_empty() { None } else { - Some(ensure_some( - reward.commission.parse().ok(), - "failed to parse reward commission", - )?) + Some( + reward + .commission + .parse() + .map_err(|_| "failed to parse reward commission")?, + ) }, }) } pub fn create_token_balances( balances: Vec, - ) -> Result, String> { + ) -> CreateResult> { let mut vec = Vec::with_capacity(balances.len()); for balance in balances { - let ui_amount = ensure_some(balance.ui_token_amount, "failed to get ui_token_amount")?; + let ui_amount = balance + .ui_token_amount + .ok_or("failed to get ui_token_amount")?; vec.push(TransactionTokenBalance { - account_index: ensure_some( - balance.account_index.try_into().ok(), - "failed to parse account_index", - )?, + account_index: balance + .account_index + .try_into() + .map_err(|_| "failed to parse account_index")?, mint: balance.mint, ui_token_amount: UiTokenAmount { ui_amount: Some(ui_amount.ui_amount), - decimals: ensure_some( - ui_amount.decimals.try_into().ok(), - "failed to parse decimals", - )?, + decimals: ui_amount + .decimals + .try_into() + .map_err(|_| "failed to parse decimals")?, amount: ui_amount.amount, ui_amount_string: ui_amount.ui_amount_string, }, @@ -586,27 +585,27 @@ pub mod convert_from { pub fn create_loaded_addresses( writable: Vec>, readonly: Vec>, - ) -> Result { + ) -> CreateResult { Ok(LoadedAddresses { writable: create_pubkey_vec(writable)?, readonly: create_pubkey_vec(readonly)?, }) } - pub fn create_pubkey_vec(pubkeys: Vec>) -> Result, String> { + pub fn create_pubkey_vec(pubkeys: Vec>) -> CreateResult> { pubkeys .iter() .map(|pubkey| create_pubkey(pubkey.as_slice())) .collect() } - pub fn create_pubkey(pubkey: &[u8]) -> Result { - ensure_some(Pubkey::try_from(pubkey).ok(), "failed to parse Pubkey") + pub fn create_pubkey(pubkey: &[u8]) -> CreateResult { + Pubkey::try_from(pubkey).map_err(|_| "failed to parse Pubkey") } pub fn create_account( account: proto::SubscribeUpdateAccountInfo, - ) -> Result<(Pubkey, Account), String> { + ) -> CreateResult<(Pubkey, Account)> { let pubkey = create_pubkey(&account.pubkey)?; let account = Account { lamports: account.lamports, From 32d21df349d5c45e92f943b6840a7ad031cf8545 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 14 Nov 2024 12:44:19 +0200 Subject: [PATCH 2/3] proto: add mod `plugin` with `FilterNames` cache (#458) --- CHANGELOG.md | 1 + Cargo.lock | 1 + yellowstone-grpc-geyser/Cargo.toml | 2 +- yellowstone-grpc-geyser/config.json | 3 +- yellowstone-grpc-geyser/src/config.rs | 2 + yellowstone-grpc-geyser/src/filters.rs | 167 +++++--------------- yellowstone-grpc-geyser/src/grpc.rs | 20 ++- yellowstone-grpc-proto/Cargo.toml | 8 +- yellowstone-grpc-proto/src/lib.rs | 3 + yellowstone-grpc-proto/src/plugin/filter.rs | 110 +++++++++++++ yellowstone-grpc-proto/src/plugin/mod.rs | 1 + 11 files changed, 178 insertions(+), 140 deletions(-) create mode 100644 yellowstone-grpc-proto/src/plugin/filter.rs create mode 100644 yellowstone-grpc-proto/src/plugin/mod.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index dd2d16bf..2ffcbf88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ The minor version will be incremented upon a breaking change and the patch versi - geyser: limit length of filter name ([#448](https://github.com/rpcpool/yellowstone-grpc/pull/448)) - examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) - proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457)) +- proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index 693c4277..5410f80d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4595,6 +4595,7 @@ dependencies = [ "solana-account-decoder", "solana-sdk", "solana-transaction-status", + "thiserror", "tonic", "tonic-build", ] diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 84636d05..a2bb09e0 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -45,7 +45,7 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] } tokio-stream = { workspace = true } tonic = { workspace = true, features = ["gzip", "zstd", "tls", "tls-roots"] } tonic-health = { workspace = true } -yellowstone-grpc-proto = { workspace = true, features = ["convert"] } +yellowstone-grpc-proto = { workspace = true, features = ["convert", "plugin"] } [build-dependencies] anyhow = { workspace = true } diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index ea9a5962..7a23fa0e 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -30,7 +30,8 @@ "account_max": 10, "account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], "owner_max": 10, - "owner_reject": ["11111111111111111111111111111111"] + "owner_reject": ["11111111111111111111111111111111"], + "data_slice_max": 2 }, "slots": { "max": 1 diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 9774c205..a84d5d59 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -273,6 +273,7 @@ pub struct ConfigGrpcFiltersAccounts { pub owner_max: usize, #[serde(deserialize_with = "deserialize_pubkey_set")] pub owner_reject: HashSet, + pub data_slice_max: usize, } impl Default for ConfigGrpcFiltersAccounts { @@ -284,6 +285,7 @@ impl Default for ConfigGrpcFiltersAccounts { account_reject: HashSet::new(), owner_max: usize::MAX, owner_reject: HashSet::new(), + data_slice_max: usize::MAX, } } } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index 29fc14c8..f6d361c3 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -14,14 +14,14 @@ use { solana_sdk::{pubkey::Pubkey, signature::Signature}, spl_token_2022::{generic_token_account::GenericTokenAccount, state::Account as TokenAccount}, std::{ - borrow::Borrow, collections::{HashMap, HashSet}, + ops::Range, str::FromStr, sync::Arc, - time::{Duration, Instant}, }, yellowstone_grpc_proto::{ convert_to, + plugin::filter::{FilterAccountsDataSlice, FilterName, FilterNames}, prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, @@ -40,93 +40,6 @@ use { }, }; -#[derive(Debug, thiserror::Error)] -pub enum FilterError { - #[error("invalid filter name (max allowed size {limit}), found {size}")] - OversizedFilterName { limit: usize, size: usize }, -} - -pub type FilterResult = Result; - -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct FilterName(Arc); - -impl AsRef for FilterName { - #[inline] - fn as_ref(&self) -> &str { - &self.0 - } -} - -impl Borrow for FilterName { - #[inline] - fn borrow(&self) -> &str { - &self.0[..] - } -} - -impl FilterName { - pub fn new(name: impl Into) -> Self { - Self(Arc::new(name.into())) - } - - pub fn is_uniq(&self) -> bool { - Arc::strong_count(&self.0) == 1 - } -} - -#[derive(Debug)] -pub struct FilterNames { - name_size_limit: usize, - names: HashSet, - names_size_limit: usize, - cleanup_ts: Instant, - cleanup_interval: Duration, -} - -impl FilterNames { - pub fn new( - name_size_limit: usize, - names_size_limit: usize, - cleanup_interval: Duration, - ) -> Self { - Self { - name_size_limit, - names: HashSet::with_capacity(names_size_limit), - names_size_limit, - cleanup_ts: Instant::now(), - cleanup_interval, - } - } - - pub fn try_clean(&mut self) { - if self.names.len() > self.names_size_limit - && self.cleanup_ts.elapsed() > self.cleanup_interval - { - self.names.retain(|name| !name.is_uniq()); - self.cleanup_ts = Instant::now(); - } - } - - pub fn get(&mut self, name: &str) -> FilterResult { - match self.names.get(name) { - Some(name) => Ok(name.clone()), - None => { - if name.len() > self.name_size_limit { - Err(FilterError::OversizedFilterName { - limit: self.name_size_limit, - size: name.len(), - }) - } else { - let name = FilterName::new(name); - self.names.insert(name.clone()); - Ok(name) - } - } - } - } -} - #[derive(Debug, Clone)] pub enum FilteredMessage<'a> { Slot(&'a MessageSlot), @@ -141,13 +54,14 @@ pub enum FilteredMessage<'a> { impl<'a> FilteredMessage<'a> { fn as_proto_account( message: &MessageAccountInfo, - accounts_data_slice: &[FilterAccountsDataSlice], + data_slice: &FilterAccountsDataSlice, ) -> SubscribeUpdateAccountInfo { - let data = if accounts_data_slice.is_empty() { + let data_slice = data_slice.as_ref(); + let data = if data_slice.is_empty() { message.data.clone() } else { - let mut data = Vec::with_capacity(accounts_data_slice.iter().map(|ds| ds.length).sum()); - for data_slice in accounts_data_slice { + let mut data = Vec::with_capacity(data_slice.iter().map(|ds| ds.end - ds.start).sum()); + for data_slice in data_slice { if message.data.len() >= data_slice.end { data.extend_from_slice(&message.data[data_slice.start..data_slice.end]); } @@ -187,7 +101,7 @@ impl<'a> FilteredMessage<'a> { } } - pub fn as_proto(&self, accounts_data_slice: &[FilterAccountsDataSlice]) -> UpdateOneof { + pub fn as_proto(&self, accounts_data_slice: &FilterAccountsDataSlice) -> UpdateOneof { match self { Self::Slot(message) => UpdateOneof::Slot(SubscribeUpdateSlot { slot: message.slot, @@ -283,7 +197,7 @@ pub struct Filter { blocks: FilterBlocks, blocks_meta: FilterBlocksMeta, commitment: CommitmentLevel, - accounts_data_slice: Vec, + accounts_data_slice: FilterAccountsDataSlice, ping: Option, } @@ -304,7 +218,7 @@ impl Default for Filter { blocks: FilterBlocks::default(), blocks_meta: FilterBlocksMeta::default(), commitment: CommitmentLevel::Processed, - accounts_data_slice: vec![], + accounts_data_slice: FilterAccountsDataSlice::default(), ping: None, } } @@ -335,7 +249,10 @@ impl Filter { blocks: FilterBlocks::new(&config.blocks, &limit.blocks, names)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta, names)?, commitment: Self::decode_commitment(config.commitment)?, - accounts_data_slice: FilterAccountsDataSlice::create(&config.accounts_data_slice)?, + accounts_data_slice: parse_accounts_data_slice_create( + &config.accounts_data_slice, + limit.accounts.data_slice_max, + )?, ping: config.ping.as_ref().map(|msg| msg.id), }) } @@ -1189,41 +1106,37 @@ impl FilterBlocksMeta { } } -#[derive(Debug, Clone, Copy)] -pub struct FilterAccountsDataSlice { - pub start: usize, - pub end: usize, - pub length: usize, -} +pub fn parse_accounts_data_slice_create( + slices: &[SubscribeRequestAccountsDataSlice], + limit: usize, +) -> anyhow::Result { + anyhow::ensure!( + slices.len() <= limit, + "Max amount of data_slices reached, only {} allowed", + limit + ); + + let slices = slices + .iter() + .map(|s| Range { + start: s.offset as usize, + end: (s.offset + s.length) as usize, + }) + .collect::>(); -impl From<&SubscribeRequestAccountsDataSlice> for FilterAccountsDataSlice { - fn from(data_slice: &SubscribeRequestAccountsDataSlice) -> Self { - Self { - start: data_slice.offset as usize, - end: (data_slice.offset + data_slice.length) as usize, - length: data_slice.length as usize, + for (i, slice_a) in slices.iter().enumerate() { + // check order + for slice_b in slices[i + 1..].iter() { + anyhow::ensure!(slice_a.start <= slice_b.start, "data slices out of order"); } - } -} - -impl FilterAccountsDataSlice { - pub fn create(slices: &[SubscribeRequestAccountsDataSlice]) -> anyhow::Result> { - let slices = slices.iter().map(Into::into).collect::>(); - for (i, slice_a) in slices.iter().enumerate() { - // check order - for slice_b in slices[i + 1..].iter() { - anyhow::ensure!(slice_a.start <= slice_b.start, "data slices out of order"); - } - - // check overlap - for slice_b in slices[0..i].iter() { - anyhow::ensure!(slice_a.start >= slice_b.end, "data slices overlap"); - } + // check overlap + for slice_b in slices[0..i].iter() { + anyhow::ensure!(slice_a.start >= slice_b.end, "data slices overlap"); } - - Ok(slices) } + + Ok(FilterAccountsDataSlice::new(slices)) } #[cfg(test)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 66de0d35..c81613e1 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1,7 +1,7 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters}, - filters::{Filter, FilterNames}, + filters::Filter, message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, @@ -36,13 +36,17 @@ use { Request, Response, Result as TonicResult, Status, Streaming, }, tonic_health::server::health_reporter, - yellowstone_grpc_proto::prelude::{ - geyser_server::{Geyser, GeyserServer}, - subscribe_update::UpdateOneof, - CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, - GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, - GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, - PongResponse, SubscribeRequest, SubscribeUpdate, SubscribeUpdatePing, + yellowstone_grpc_proto::{ + plugin::filter::FilterNames, + prelude::{ + geyser_server::{Geyser, GeyserServer}, + subscribe_update::UpdateOneof, + CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, + GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, + GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, + IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, + SubscribeUpdatePing, + }, }, }; diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 59fe5e92..04814dc6 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -11,11 +11,12 @@ keywords = { workspace = true } publish = true [dependencies] -bincode = { workspace = true } +bincode = { workspace = true, optional = true } prost = { workspace = true } solana-account-decoder = { workspace = true, optional = true } solana-sdk = { workspace = true, optional = true } solana-transaction-status = { workspace = true, optional = true } +thiserror = { workspace = true, optional = true } tonic = { workspace = true } [build-dependencies] @@ -24,9 +25,10 @@ protobuf-src = { workspace = true } tonic-build = { workspace = true } [features] -convert = ["dep:solana-account-decoder", "dep:solana-sdk", "dep:solana-transaction-status"] -tonic-compression = ["tonic/gzip", "tonic/zstd"] default = ["convert", "tonic-compression"] +convert = ["dep:bincode", "dep:solana-account-decoder", "dep:solana-sdk", "dep:solana-transaction-status"] +plugin = ["dep:thiserror"] +tonic-compression = ["tonic/gzip", "tonic/zstd"] [lints] workspace = true diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 6bf96447..ee61332d 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -23,6 +23,9 @@ pub mod prelude { pub use {prost, tonic}; +#[cfg(feature = "plugin")] +pub mod plugin; + #[cfg(feature = "convert")] pub mod convert_to { use { diff --git a/yellowstone-grpc-proto/src/plugin/filter.rs b/yellowstone-grpc-proto/src/plugin/filter.rs new file mode 100644 index 00000000..c9c4235e --- /dev/null +++ b/yellowstone-grpc-proto/src/plugin/filter.rs @@ -0,0 +1,110 @@ +use std::{ + borrow::Borrow, + collections::HashSet, + ops::Range, + sync::Arc, + time::{Duration, Instant}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum FilterNameError { + #[error("oversized filter name (max allowed size {limit}), found {size}")] + Oversized { limit: usize, size: usize }, +} + +pub type FilterNameResult = Result; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FilterName(Arc); + +impl AsRef for FilterName { + #[inline] + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Borrow for FilterName { + #[inline] + fn borrow(&self) -> &str { + &self.0[..] + } +} + +impl FilterName { + pub fn new(name: impl Into) -> Self { + Self(Arc::new(name.into())) + } + + pub fn is_uniq(&self) -> bool { + Arc::strong_count(&self.0) == 1 + } +} + +#[derive(Debug)] +pub struct FilterNames { + name_size_limit: usize, + names: HashSet, + names_size_limit: usize, + cleanup_ts: Instant, + cleanup_interval: Duration, +} + +impl FilterNames { + pub fn new( + name_size_limit: usize, + names_size_limit: usize, + cleanup_interval: Duration, + ) -> Self { + Self { + name_size_limit, + names: HashSet::with_capacity(names_size_limit), + names_size_limit, + cleanup_ts: Instant::now(), + cleanup_interval, + } + } + + pub fn try_clean(&mut self) { + if self.names.len() > self.names_size_limit + && self.cleanup_ts.elapsed() > self.cleanup_interval + { + self.names.retain(|name| !name.is_uniq()); + self.cleanup_ts = Instant::now(); + } + } + + pub fn get(&mut self, name: &str) -> FilterNameResult { + match self.names.get(name) { + Some(name) => Ok(name.clone()), + None => { + if name.len() > self.name_size_limit { + Err(FilterNameError::Oversized { + limit: self.name_size_limit, + size: name.len(), + }) + } else { + let name = FilterName::new(name); + self.names.insert(name.clone()); + Ok(name) + } + } + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct FilterAccountsDataSlice(Arc>>); + +impl AsRef<[Range]> for FilterAccountsDataSlice { + #[inline] + fn as_ref(&self) -> &[Range] { + &self.0 + } +} + +impl FilterAccountsDataSlice { + pub fn new(slices: Vec>) -> Self { + Self(Arc::new(slices)) + } +} diff --git a/yellowstone-grpc-proto/src/plugin/mod.rs b/yellowstone-grpc-proto/src/plugin/mod.rs new file mode 100644 index 00000000..34d102d7 --- /dev/null +++ b/yellowstone-grpc-proto/src/plugin/mod.rs @@ -0,0 +1 @@ +pub mod filter; From 6d56e68a6d2643f390f8e4a8e91028dbc5d46744 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 14 Nov 2024 13:35:33 +0200 Subject: [PATCH 3/3] proto: move enum Message from geyser crate (#459) --- .github/workflows/test.yml | 2 + CHANGELOG.md | 1 + Cargo.lock | 1 + yellowstone-grpc-geyser/config.json | 2 +- yellowstone-grpc-geyser/src/config.rs | 6 +- yellowstone-grpc-geyser/src/filters.rs | 69 +++---- yellowstone-grpc-geyser/src/grpc.rs | 37 ++-- yellowstone-grpc-geyser/src/lib.rs | 1 - yellowstone-grpc-geyser/src/metrics.rs | 2 +- yellowstone-grpc-geyser/src/plugin.rs | 2 +- yellowstone-grpc-proto/Cargo.toml | 14 +- .../src/plugin}/message.rs | 169 +++++++++++------- yellowstone-grpc-proto/src/plugin/mod.rs | 1 + 13 files changed, 187 insertions(+), 120 deletions(-) rename {yellowstone-grpc-geyser/src => yellowstone-grpc-proto/src/plugin}/message.rs (78%) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 09f11a1a..9ae6dbe3 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -96,6 +96,8 @@ jobs: run: cargo check -p yellowstone-grpc-geyser --all-targets --tests - name: check features in `proto` run: cargo check -p yellowstone-grpc-proto --all-targets --tests + - name: check features in `proto` + run: cargo check -p yellowstone-grpc-proto --all-targets --tests --all-features - name: Build run: ./ci/cargo-build-test.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ffcbf88..06e1100b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The minor version will be incremented upon a breaking change and the patch versi - examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) - proto: change error type in mod `convert_from` ([#457](https://github.com/rpcpool/yellowstone-grpc/pull/457)) - proto: add mod `plugin` with `FilterNames` cache ([#458](https://github.com/rpcpool/yellowstone-grpc/pull/458)) +- proto: move enum Message from geyser crate ([#459](https://github.com/rpcpool/yellowstone-grpc/pull/459)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index 5410f80d..7e1ca08f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4588,6 +4588,7 @@ dependencies = [ name = "yellowstone-grpc-proto" version = "2.0.0" dependencies = [ + "agave-geyser-plugin-interface", "anyhow", "bincode", "prost", diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 7a23fa0e..2e15d719 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -70,7 +70,7 @@ "blocks_meta": { "max": 1 }, - "entry": { + "entries": { "max": 1 } } diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index a84d5d59..31024b32 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -222,7 +222,7 @@ pub struct ConfigGrpcFilters { pub transactions_status: ConfigGrpcFiltersTransactions, pub blocks: ConfigGrpcFiltersBlocks, pub blocks_meta: ConfigGrpcFiltersBlocksMeta, - pub entry: ConfigGrpcFiltersEntry, + pub entries: ConfigGrpcFiltersEntries, } impl ConfigGrpcFilters { @@ -376,12 +376,12 @@ impl Default for ConfigGrpcFiltersBlocksMeta { #[derive(Debug, Clone, Deserialize)] #[serde(default, deny_unknown_fields)] -pub struct ConfigGrpcFiltersEntry { +pub struct ConfigGrpcFiltersEntries { #[serde(deserialize_with = "deserialize_usize_str")] pub max: usize, } -impl Default for ConfigGrpcFiltersEntry { +impl Default for ConfigGrpcFiltersEntries { fn default() -> Self { Self { max: usize::MAX } } diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index f6d361c3..dd74b333 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -1,14 +1,8 @@ use { - crate::{ - config::{ - ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks, - ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntry, ConfigGrpcFiltersSlots, - ConfigGrpcFiltersTransactions, - }, - message::{ - Message, MessageAccount, MessageAccountInfo, MessageBlock, MessageBlockMeta, - MessageEntry, MessageSlot, MessageTransaction, MessageTransactionInfo, - }, + crate::config::{ + ConfigGrpcFilters, ConfigGrpcFiltersAccounts, ConfigGrpcFiltersBlocks, + ConfigGrpcFiltersBlocksMeta, ConfigGrpcFiltersEntries, ConfigGrpcFiltersSlots, + ConfigGrpcFiltersTransactions, }, base64::{engine::general_purpose::STANDARD as base64_engine, Engine}, solana_sdk::{pubkey::Pubkey, signature::Signature}, @@ -21,13 +15,20 @@ use { }, yellowstone_grpc_proto::{ convert_to, - plugin::filter::{FilterAccountsDataSlice, FilterName, FilterNames}, + plugin::{ + filter::{FilterAccountsDataSlice, FilterName, FilterNames}, + message::{ + CommitmentLevel, Message, MessageAccount, MessageAccountInfo, MessageBlock, + MessageBlockMeta, MessageEntry, MessageSlot, MessageTransaction, + MessageTransactionInfo, + }, + }, prelude::{ subscribe_request_filter_accounts_filter::Filter as AccountsFilterDataOneof, subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + subscribe_update::UpdateOneof, CommitmentLevel as CommitmentLevelProto, + SubscribeRequest, SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, @@ -193,7 +194,7 @@ pub struct Filter { slots: FilterSlots, transactions: FilterTransactions, transactions_status: FilterTransactions, - entry: FilterEntry, + entries: FilterEntries, blocks: FilterBlocks, blocks_meta: FilterBlocksMeta, commitment: CommitmentLevel, @@ -214,7 +215,7 @@ impl Default for Filter { filter_type: FilterTransactionsType::TransactionStatus, filters: HashMap::new(), }, - entry: FilterEntry::default(), + entries: FilterEntries::default(), blocks: FilterBlocks::default(), blocks_meta: FilterBlocksMeta::default(), commitment: CommitmentLevel::Processed, @@ -245,7 +246,7 @@ impl Filter { FilterTransactionsType::TransactionStatus, names, )?, - entry: FilterEntry::new(&config.entry, &limit.entry, names)?, + entries: FilterEntries::new(&config.entry, &limit.entries, names)?, blocks: FilterBlocks::new(&config.blocks, &limit.blocks, names)?, blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta, names)?, commitment: Self::decode_commitment(config.commitment)?, @@ -258,10 +259,12 @@ impl Filter { } fn decode_commitment(commitment: Option) -> anyhow::Result { - let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32); - CommitmentLevel::try_from(commitment).map_err(|_error| { - anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}") - }) + let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32); + CommitmentLevelProto::try_from(commitment) + .map(Into::into) + .map_err(|_error| { + anyhow::anyhow!("failed to create CommitmentLevel from {commitment:?}") + }) } fn decode_pubkeys<'a>( @@ -296,7 +299,7 @@ impl Filter { "transactions_status", self.transactions_status.filters.len(), ), - ("entry", self.entry.filters.len()), + ("entries", self.entries.filters.len()), ("blocks", self.blocks.filters.len()), ("blocks_meta", self.blocks_meta.filters.len()), ( @@ -305,7 +308,7 @@ impl Filter { + self.slots.filters.len() + self.transactions.filters.len() + self.transactions_status.filters.len() - + self.entry.filters.len() + + self.entries.filters.len() + self.blocks.filters.len() + self.blocks_meta.filters.len(), ), @@ -329,7 +332,7 @@ impl Filter { .get_filters(message) .chain(self.transactions_status.get_filters(message)), ), - Message::Entry(message) => self.entry.get_filters(message), + Message::Entry(message) => self.entries.get_filters(message), Message::Block(message) => self.blocks.get_filters(message), Message::BlockMeta(message) => self.blocks_meta.get_filters(message), } @@ -912,14 +915,14 @@ impl FilterTransactions { } #[derive(Debug, Default, Clone)] -struct FilterEntry { +struct FilterEntries { filters: Vec, } -impl FilterEntry { +impl FilterEntries { fn new( configs: &HashMap, - limit: &ConfigGrpcFiltersEntry, + limit: &ConfigGrpcFiltersEntries, names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; @@ -1143,11 +1146,7 @@ pub fn parse_accounts_data_slice_create( mod tests { use { super::{FilterName, FilterNames, FilteredMessage}, - crate::{ - config::ConfigGrpcFilters, - filters::Filter, - message::{Message, MessageTransaction, MessageTransactionInfo}, - }, + crate::{config::ConfigGrpcFilters, filters::Filter}, solana_sdk::{ hash::Hash, message::{v0::LoadedAddresses, Message as SolMessage, MessageHeader}, @@ -1157,8 +1156,12 @@ mod tests { }, solana_transaction_status::TransactionStatusMeta, std::{collections::HashMap, sync::Arc, time::Duration}, - yellowstone_grpc_proto::geyser::{ - SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions, + yellowstone_grpc_proto::{ + geyser::{ + SubscribeRequest, SubscribeRequestFilterAccounts, + SubscribeRequestFilterTransactions, + }, + plugin::message::{Message, MessageTransaction, MessageTransactionInfo}, }, }; diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index c81613e1..b398e0a1 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -2,7 +2,6 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters}, filters::Filter, - message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, }, @@ -37,11 +36,17 @@ use { }, tonic_health::server::health_reporter, yellowstone_grpc_proto::{ - plugin::filter::FilterNames, + plugin::{ + filter::FilterNames, + message::{ + CommitmentLevel, Message, MessageBlockMeta, MessageEntry, MessageSlot, + MessageTransactionInfo, + }, + }, prelude::{ geyser_server::{Geyser, GeyserServer}, subscribe_update::UpdateOneof, - CommitmentLevel, GetBlockHeightRequest, GetBlockHeightResponse, + CommitmentLevel as CommitmentLevelProto, GetBlockHeightRequest, GetBlockHeightResponse, GetLatestBlockhashRequest, GetLatestBlockhashResponse, GetSlotRequest, GetSlotResponse, GetVersionRequest, GetVersionResponse, IsBlockhashValidRequest, IsBlockhashValidResponse, PingRequest, PongResponse, SubscribeRequest, SubscribeUpdate, @@ -156,11 +161,13 @@ impl BlockMetaStorage { } fn parse_commitment(commitment: Option) -> Result { - let commitment = commitment.unwrap_or(CommitmentLevel::Processed as i32); - CommitmentLevel::try_from(commitment).map_err(|_error| { - let msg = format!("failed to create CommitmentLevel from {commitment:?}"); - Status::unknown(msg) - }) + let commitment = commitment.unwrap_or(CommitmentLevelProto::Processed as i32); + CommitmentLevelProto::try_from(commitment) + .map(Into::into) + .map_err(|_error| { + let msg = format!("failed to create CommitmentLevel from {commitment:?}"); + Status::unknown(msg) + }) } async fn get_block( @@ -533,13 +540,21 @@ impl GrpcService { // If we already build Block message, new message will be a problem if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) { - metrics::update_invalid_blocks(format!("unexpected message {}", message.kind())); + let kind = match &message { + Message::Slot(_) => "Slot", + Message::Account(_) => "Account", + Message::Transaction(_) => "Transaction", + Message::Entry(_) => "Entry", + Message::BlockMeta(_) => "BlockMeta", + Message::Block(_) => "Block", + }; + metrics::update_invalid_blocks(format!("unexpected message {kind}")); match block_fail_action { ConfigBlockFailAction::Log => { - error!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind()); + error!("unexpected message #{} -- {kind} (invalid order)", message.get_slot()); } ConfigBlockFailAction::Panic => { - panic!("unexpected message #{} -- {} (invalid order)", message.get_slot(), message.kind()); + panic!("unexpected message #{} -- {kind} (invalid order)", message.get_slot()); } } } diff --git a/yellowstone-grpc-geyser/src/lib.rs b/yellowstone-grpc-geyser/src/lib.rs index c043be3c..f5f4d3f2 100644 --- a/yellowstone-grpc-geyser/src/lib.rs +++ b/yellowstone-grpc-geyser/src/lib.rs @@ -1,7 +1,6 @@ pub mod config; pub mod filters; pub mod grpc; -pub mod message; pub mod metrics; pub mod plugin; pub mod version; diff --git a/yellowstone-grpc-geyser/src/metrics.rs b/yellowstone-grpc-geyser/src/metrics.rs index 447002a7..16f50164 100644 --- a/yellowstone-grpc-geyser/src/metrics.rs +++ b/yellowstone-grpc-geyser/src/metrics.rs @@ -24,7 +24,7 @@ use { sync::{mpsc, oneshot, Notify}, task::JoinHandle, }, - yellowstone_grpc_proto::prelude::CommitmentLevel, + yellowstone_grpc_proto::plugin::message::CommitmentLevel, }; lazy_static::lazy_static! { diff --git a/yellowstone-grpc-geyser/src/plugin.rs b/yellowstone-grpc-geyser/src/plugin.rs index 59f76ea2..6927ec7d 100644 --- a/yellowstone-grpc-geyser/src/plugin.rs +++ b/yellowstone-grpc-geyser/src/plugin.rs @@ -2,7 +2,6 @@ use { crate::{ config::Config, grpc::GrpcService, - message::Message, metrics::{self, PrometheusService}, }, agave_geyser_plugin_interface::geyser_plugin_interface::{ @@ -22,6 +21,7 @@ use { runtime::{Builder, Runtime}, sync::{mpsc, Notify}, }, + yellowstone_grpc_proto::plugin::message::Message, }; #[derive(Debug)] diff --git a/yellowstone-grpc-proto/Cargo.toml b/yellowstone-grpc-proto/Cargo.toml index 04814dc6..7cc95a80 100644 --- a/yellowstone-grpc-proto/Cargo.toml +++ b/yellowstone-grpc-proto/Cargo.toml @@ -11,6 +11,7 @@ keywords = { workspace = true } publish = true [dependencies] +agave-geyser-plugin-interface = { workspace = true, optional = true } bincode = { workspace = true, optional = true } prost = { workspace = true } solana-account-decoder = { workspace = true, optional = true } @@ -26,8 +27,17 @@ tonic-build = { workspace = true } [features] default = ["convert", "tonic-compression"] -convert = ["dep:bincode", "dep:solana-account-decoder", "dep:solana-sdk", "dep:solana-transaction-status"] -plugin = ["dep:thiserror"] +convert = [ + "dep:bincode", + "dep:solana-account-decoder", + "dep:solana-sdk", + "dep:solana-transaction-status" +] +plugin = [ + "convert", + "dep:agave-geyser-plugin-interface", + "dep:thiserror" +] tonic-compression = ["tonic/gzip", "tonic/zstd"] [lints] diff --git a/yellowstone-grpc-geyser/src/message.rs b/yellowstone-grpc-proto/src/plugin/message.rs similarity index 78% rename from yellowstone-grpc-geyser/src/message.rs rename to yellowstone-grpc-proto/src/plugin/message.rs index 6438a2cb..3eaa9cee 100644 --- a/yellowstone-grpc-geyser/src/message.rs +++ b/yellowstone-grpc-proto/src/plugin/message.rs @@ -1,4 +1,5 @@ use { + crate::geyser::CommitmentLevel as CommitmentLevelProto, agave_geyser_plugin_interface::geyser_plugin_interface::{ ReplicaAccountInfoV3, ReplicaBlockInfoV4, ReplicaEntryInfoV2, ReplicaTransactionInfoV2, SlotStatus, @@ -9,9 +10,73 @@ use { }, solana_transaction_status::{Reward, TransactionStatusMeta}, std::sync::Arc, - yellowstone_grpc_proto::prelude::CommitmentLevel, }; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CommitmentLevel { + Processed, + Confirmed, + Finalized, +} + +impl From for CommitmentLevel { + fn from(status: SlotStatus) -> Self { + match status { + SlotStatus::Processed => Self::Processed, + SlotStatus::Confirmed => Self::Confirmed, + SlotStatus::Rooted => Self::Finalized, + } + } +} + +impl From for CommitmentLevelProto { + fn from(commitment: CommitmentLevel) -> Self { + match commitment { + CommitmentLevel::Processed => Self::Processed, + CommitmentLevel::Confirmed => Self::Confirmed, + CommitmentLevel::Finalized => Self::Finalized, + } + } +} + +impl From for CommitmentLevel { + fn from(status: CommitmentLevelProto) -> Self { + match status { + CommitmentLevelProto::Processed => Self::Processed, + CommitmentLevelProto::Confirmed => Self::Confirmed, + CommitmentLevelProto::Finalized => Self::Finalized, + } + } +} + +impl PartialEq for CommitmentLevel { + fn eq(&self, other: &CommitmentLevelProto) -> bool { + matches!( + (self, other), + (Self::Processed, CommitmentLevelProto::Processed) + | (Self::Confirmed, CommitmentLevelProto::Confirmed) + | (Self::Finalized, CommitmentLevelProto::Finalized) + ) + } +} + +#[derive(Debug, Clone, Copy)] +pub struct MessageSlot { + pub slot: u64, + pub parent: Option, + pub status: CommitmentLevel, +} + +impl From<(u64, Option, SlotStatus)> for MessageSlot { + fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { + Self { + slot, + parent, + status: status.into(), + } + } +} + #[derive(Debug, Clone)] pub struct MessageAccountInfo { pub pubkey: Pubkey, @@ -50,27 +115,6 @@ impl<'a> From<(&'a ReplicaAccountInfoV3<'a>, u64, bool)> for MessageAccount { } } -#[derive(Debug, Clone, Copy)] -pub struct MessageSlot { - pub slot: u64, - pub parent: Option, - pub status: CommitmentLevel, -} - -impl From<(u64, Option, SlotStatus)> for MessageSlot { - fn from((slot, parent, status): (u64, Option, SlotStatus)) -> Self { - Self { - slot, - parent, - status: match status { - SlotStatus::Processed => CommitmentLevel::Processed, - SlotStatus::Confirmed => CommitmentLevel::Confirmed, - SlotStatus::Rooted => CommitmentLevel::Finalized, - }, - } - } -} - #[derive(Debug, Clone)] pub struct MessageTransactionInfo { pub signature: Signature, @@ -117,7 +161,9 @@ impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { slot: entry.slot, index: entry.index, num_hashes: entry.num_hashes, - hash: entry.hash[0..32].try_into().expect("failed to create hash"), + hash: entry.hash[0..HASH_BYTES] + .try_into() + .expect("failed to create hash"), executed_transaction_count: entry.executed_transaction_count, starting_transaction_index: entry .starting_transaction_index @@ -127,6 +173,37 @@ impl From<&ReplicaEntryInfoV2<'_>> for MessageEntry { } } +#[derive(Debug, Clone)] +pub struct MessageBlockMeta { + pub parent_slot: u64, + pub slot: u64, + pub parent_blockhash: String, + pub blockhash: String, + pub rewards: Vec, + pub num_partitions: Option, + pub block_time: Option, + pub block_height: Option, + pub executed_transaction_count: u64, + pub entries_count: u64, +} + +impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { + fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { + Self { + parent_slot: blockinfo.parent_slot, + slot: blockinfo.slot, + parent_blockhash: blockinfo.parent_blockhash.to_string(), + blockhash: blockinfo.blockhash.to_string(), + rewards: blockinfo.rewards.rewards.clone(), + num_partitions: blockinfo.rewards.num_partitions, + block_time: blockinfo.block_time, + block_height: blockinfo.block_height, + executed_transaction_count: blockinfo.executed_transaction_count, + entries_count: blockinfo.entry_count, + } + } +} + #[derive(Debug, Clone)] pub struct MessageBlock { pub meta: Arc, @@ -162,45 +239,14 @@ impl } } -#[derive(Debug, Clone)] -pub struct MessageBlockMeta { - pub parent_slot: u64, - pub slot: u64, - pub parent_blockhash: String, - pub blockhash: String, - pub rewards: Vec, - pub num_partitions: Option, - pub block_time: Option, - pub block_height: Option, - pub executed_transaction_count: u64, - pub entries_count: u64, -} - -impl<'a> From<&'a ReplicaBlockInfoV4<'a>> for MessageBlockMeta { - fn from(blockinfo: &'a ReplicaBlockInfoV4<'a>) -> Self { - Self { - parent_slot: blockinfo.parent_slot, - slot: blockinfo.slot, - parent_blockhash: blockinfo.parent_blockhash.to_string(), - blockhash: blockinfo.blockhash.to_string(), - rewards: blockinfo.rewards.rewards.clone(), - num_partitions: blockinfo.rewards.num_partitions, - block_time: blockinfo.block_time, - block_height: blockinfo.block_height, - executed_transaction_count: blockinfo.executed_transaction_count, - entries_count: blockinfo.entry_count, - } - } -} - #[derive(Debug, Clone)] pub enum Message { Slot(MessageSlot), Account(MessageAccount), Transaction(MessageTransaction), Entry(Arc), - Block(Arc), BlockMeta(Arc), + Block(Arc), } impl Message { @@ -210,19 +256,8 @@ impl Message { Self::Account(msg) => msg.slot, Self::Transaction(msg) => msg.slot, Self::Entry(msg) => msg.slot, - Self::Block(msg) => msg.meta.slot, Self::BlockMeta(msg) => msg.slot, - } - } - - pub const fn kind(&self) -> &'static str { - match self { - Self::Slot(_) => "Slot", - Self::Account(_) => "Account", - Self::Transaction(_) => "Transaction", - Self::Entry(_) => "Entry", - Self::Block(_) => "Block", - Self::BlockMeta(_) => "BlockMeta", + Self::Block(msg) => msg.meta.slot, } } } diff --git a/yellowstone-grpc-proto/src/plugin/mod.rs b/yellowstone-grpc-proto/src/plugin/mod.rs index 34d102d7..39a8a487 100644 --- a/yellowstone-grpc-proto/src/plugin/mod.rs +++ b/yellowstone-grpc-proto/src/plugin/mod.rs @@ -1 +1,2 @@ pub mod filter; +pub mod message;