diff --git a/CHANGELOG.md b/CHANGELOG.md index 287d70b8..069b6585 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ The minor version will be incremented upon a breaking change and the patch versi - geyser: use Arc wrapped messages in block message ([#446](https://github.com/rpcpool/yellowstone-grpc/pull/446)) - node: remove generated grpc files ([#447](https://github.com/rpcpool/yellowstone-grpc/pull/447)) - proto: add txn_signature filter ([#445](https://github.com/rpcpool/yellowstone-grpc/pull/445)) +- geyser: limit length of filter name ([#448](https://github.com/rpcpool/yellowstone-grpc/pull/448)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index 4fef7186..f468ff7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1625,6 +1625,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "humantime-serde" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57a3db5ea5923d99402c94e9feb261dc5ee9b4efa158b0315f788cf549cc200c" +dependencies = [ + "humantime", + "serde", +] + [[package]] name = "hyper" version = "0.14.30" @@ -2408,7 +2418,7 @@ checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1" dependencies = [ "bytes", "heck", - "itertools 0.10.5", + "itertools 0.12.1", "log", "multimap", "once_cell", @@ -2428,7 +2438,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.72", @@ -4486,6 +4496,7 @@ dependencies = [ "hostname", "http 1.1.0", "http-body-util", + "humantime-serde", "hyper 1.4.1", "hyper-util", "lazy_static", @@ -4497,6 +4508,7 @@ dependencies = [ "solana-sdk", "solana-transaction-status", "spl-token-2022", + "thiserror", "tokio", "tokio-stream", "tonic", diff --git a/Cargo.toml b/Cargo.toml index 9dc48b0c..81ecef70 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ hex = "0.4.3" hostname = "0.4.0" http = "1.1.0" http-body-util = "0.1.2" +humantime-serde = "1.1.1" hyper = "1.4.1" hyper-util = "0.1.7" lazy_static = "1.4.0" diff --git a/yellowstone-grpc-geyser/Cargo.toml b/yellowstone-grpc-geyser/Cargo.toml index 1fa921dc..84636d05 100644 --- a/yellowstone-grpc-geyser/Cargo.toml +++ b/yellowstone-grpc-geyser/Cargo.toml @@ -28,6 +28,7 @@ futures = { workspace = true } hostname = { workspace = true } http = { workspace = true } http-body-util = { workspace = true } +humantime-serde = { workspace = true } hyper = { workspace = true } hyper-util = { workspace = true } lazy_static = { workspace = true } @@ -39,6 +40,7 @@ solana-logger = { workspace = true } solana-sdk = { workspace = true } solana-transaction-status = { workspace = true } spl-token-2022 = { workspace = true, features = ["no-entrypoint"] } +thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "fs"] } tokio-stream = { workspace = true } tonic = { workspace = true, features = ["gzip", "zstd", "tls", "tls-roots"] } diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 670d8fad..e9004630 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -24,6 +24,9 @@ "unary_concurrency_limit": 100, "unary_disabled": false, "x_token": null, + "filter_name_size_limit": 32, + "filter_names_size_limit": 1024, + "filter_names_cleanup_interval": "1s", "filters": { "accounts": { "max": 1, diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index 954785e6..9774c205 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -4,7 +4,7 @@ use { }, serde::{de, Deserialize, Deserializer}, solana_sdk::pubkey::Pubkey, - std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path}, + std::{collections::HashSet, fs::read_to_string, net::SocketAddr, path::Path, time::Duration}, tokio::sync::Semaphore, tonic::codec::CompressionEncoding, }; @@ -110,6 +110,18 @@ pub struct ConfigGrpc { pub filters: ConfigGrpcFilters, /// x_token to enforce on connections pub x_token: Option, + /// Filter name size limit + #[serde(default = "ConfigGrpc::default_filter_name_size_limit")] + pub filter_name_size_limit: usize, + /// Number of cached filter names before doing cleanup + #[serde(default = "ConfigGrpc::default_filter_names_size_limit")] + pub filter_names_size_limit: usize, + /// Cleanup interval once filter names reached `filter_names_size_limit` + #[serde( + default = "ConfigGrpc::default_filter_names_cleanup_interval", + with = "humantime_serde" + )] + pub filter_names_cleanup_interval: Duration, } impl ConfigGrpc { @@ -132,6 +144,18 @@ impl ConfigGrpc { const fn unary_concurrency_limit_default() -> usize { Semaphore::MAX_PERMITS } + + const fn default_filter_name_size_limit() -> usize { + 32 + } + + const fn default_filter_names_size_limit() -> usize { + 1_024 + } + + const fn default_filter_names_cleanup_interval() -> Duration { + Duration::from_secs(1) + } } #[derive(Debug, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/filters.rs b/yellowstone-grpc-geyser/src/filters.rs index c40b4e56..29fc14c8 100644 --- a/yellowstone-grpc-geyser/src/filters.rs +++ b/yellowstone-grpc-geyser/src/filters.rs @@ -14,9 +14,11 @@ use { solana_sdk::{pubkey::Pubkey, signature::Signature}, spl_token_2022::{generic_token_account::GenericTokenAccount, state::Account as TokenAccount}, std::{ + borrow::Borrow, collections::{HashMap, HashSet}, str::FromStr, sync::Arc, + time::{Duration, Instant}, }, yellowstone_grpc_proto::{ convert_to, @@ -38,6 +40,93 @@ use { }, }; +#[derive(Debug, thiserror::Error)] +pub enum FilterError { + #[error("invalid filter name (max allowed size {limit}), found {size}")] + OversizedFilterName { limit: usize, size: usize }, +} + +pub type FilterResult = Result; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FilterName(Arc); + +impl AsRef for FilterName { + #[inline] + fn as_ref(&self) -> &str { + &self.0 + } +} + +impl Borrow for FilterName { + #[inline] + fn borrow(&self) -> &str { + &self.0[..] + } +} + +impl FilterName { + pub fn new(name: impl Into) -> Self { + Self(Arc::new(name.into())) + } + + pub fn is_uniq(&self) -> bool { + Arc::strong_count(&self.0) == 1 + } +} + +#[derive(Debug)] +pub struct FilterNames { + name_size_limit: usize, + names: HashSet, + names_size_limit: usize, + cleanup_ts: Instant, + cleanup_interval: Duration, +} + +impl FilterNames { + pub fn new( + name_size_limit: usize, + names_size_limit: usize, + cleanup_interval: Duration, + ) -> Self { + Self { + name_size_limit, + names: HashSet::with_capacity(names_size_limit), + names_size_limit, + cleanup_ts: Instant::now(), + cleanup_interval, + } + } + + pub fn try_clean(&mut self) { + if self.names.len() > self.names_size_limit + && self.cleanup_ts.elapsed() > self.cleanup_interval + { + self.names.retain(|name| !name.is_uniq()); + self.cleanup_ts = Instant::now(); + } + } + + pub fn get(&mut self, name: &str) -> FilterResult { + match self.names.get(name) { + Some(name) => Ok(name.clone()), + None => { + if name.len() > self.name_size_limit { + Err(FilterError::OversizedFilterName { + limit: self.name_size_limit, + size: name.len(), + }) + } else { + let name = FilterName::new(name); + self.names.insert(name.clone()); + Ok(name) + } + } + } + } +} + #[derive(Debug, Clone)] pub enum FilteredMessage<'a> { Slot(&'a MessageSlot), @@ -198,24 +287,53 @@ pub struct Filter { ping: Option, } +impl Default for Filter { + fn default() -> Self { + Self { + accounts: FilterAccounts::default(), + slots: FilterSlots::default(), + transactions: FilterTransactions { + filter_type: FilterTransactionsType::Transaction, + filters: HashMap::new(), + }, + transactions_status: FilterTransactions { + filter_type: FilterTransactionsType::TransactionStatus, + filters: HashMap::new(), + }, + entry: FilterEntry::default(), + blocks: FilterBlocks::default(), + blocks_meta: FilterBlocksMeta::default(), + commitment: CommitmentLevel::Processed, + accounts_data_slice: vec![], + ping: None, + } + } +} + impl Filter { - pub fn new(config: &SubscribeRequest, limit: &ConfigGrpcFilters) -> anyhow::Result { + pub fn new( + config: &SubscribeRequest, + limit: &ConfigGrpcFilters, + names: &mut FilterNames, + ) -> anyhow::Result { Ok(Self { - accounts: FilterAccounts::new(&config.accounts, &limit.accounts)?, - slots: FilterSlots::new(&config.slots, &limit.slots)?, + accounts: FilterAccounts::new(&config.accounts, &limit.accounts, names)?, + slots: FilterSlots::new(&config.slots, &limit.slots, names)?, transactions: FilterTransactions::new( &config.transactions, &limit.transactions, FilterTransactionsType::Transaction, + names, )?, transactions_status: FilterTransactions::new( &config.transactions_status, &limit.transactions_status, FilterTransactionsType::TransactionStatus, + names, )?, - entry: FilterEntry::new(&config.entry, &limit.entry)?, - blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?, - blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?, + entry: FilterEntry::new(&config.entry, &limit.entry, names)?, + blocks: FilterBlocks::new(&config.blocks, &limit.blocks, names)?, + blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta, names)?, commitment: Self::decode_commitment(config.commitment)?, accounts_data_slice: FilterAccountsDataSlice::create(&config.accounts_data_slice)?, ping: config.ping.as_ref().map(|msg| msg.id), @@ -285,7 +403,7 @@ impl Filter { &'a self, message: &'a Message, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { match message { Message::Account(message) => self.accounts.get_filters(message), Message::Slot(message) => self.slots.get_filters(message, commitment), @@ -312,7 +430,10 @@ impl Filter { None } else { Some(SubscribeUpdate { - filters, + filters: filters + .iter() + .map(|name| name.as_ref().to_string()) + .collect(), update_oneof: Some(message.as_proto(&self.accounts_data_slice)), }) } @@ -330,28 +451,30 @@ impl Filter { #[derive(Debug, Default, Clone)] struct FilterAccounts { - nonempty_txn_signature: Vec<(String, Option)>, - nonempty_txn_signature_required: HashSet, - account: HashMap>, - account_required: HashSet, - owner: HashMap>, - owner_required: HashSet, - filters: Vec<(String, FilterAccountsState)>, + nonempty_txn_signature: Vec<(FilterName, Option)>, + nonempty_txn_signature_required: HashSet, + account: HashMap>, + account_required: HashSet, + owner: HashMap>, + owner_required: HashSet, + filters: Vec<(FilterName, FilterAccountsState)>, } impl FilterAccounts { fn new( configs: &HashMap, limit: &ConfigGrpcFiltersAccounts, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; let mut this = Self::default(); for (name, filter) in configs { this.nonempty_txn_signature - .push((name.clone(), filter.nonempty_txn_signature)); + .push((names.get(name)?, filter.nonempty_txn_signature)); if filter.nonempty_txn_signature.is_some() { - this.nonempty_txn_signature_required.insert(name.clone()); + this.nonempty_txn_signature_required + .insert(names.get(name)?); } ConfigGrpcFilters::check_any( @@ -365,6 +488,7 @@ impl FilterAccounts { &mut this.account, &mut this.account_required, name, + names, Filter::decode_pubkeys(&filter.account, &limit.account_reject), )?; @@ -372,30 +496,32 @@ impl FilterAccounts { &mut this.owner, &mut this.owner_required, name, + names, Filter::decode_pubkeys(&filter.owner, &limit.owner_reject), )?; this.filters - .push((name.clone(), FilterAccountsState::new(&filter.filters)?)); + .push((names.get(name)?, FilterAccountsState::new(&filter.filters)?)); } Ok(this) } fn set( - map: &mut HashMap>, - map_required: &mut HashSet, + map: &mut HashMap>, + map_required: &mut HashSet, name: &str, + names: &mut FilterNames, keys: impl Iterator>, ) -> anyhow::Result { let mut required = false; for maybe_key in keys { - if map.entry(maybe_key?).or_default().insert(name.to_string()) { + if map.entry(maybe_key?).or_default().insert(names.get(name)?) { required = true; } } if required { - map_required.insert(name.to_string()); + map_required.insert(names.get(name)?); } Ok(required) } @@ -403,7 +529,7 @@ impl FilterAccounts { fn get_filters<'a>( &'a self, message: &'a MessageAccount, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { let mut filter = FilterAccountsMatch::new(self); filter.match_txn_signature(&message.account.txn_signature); filter.match_account(&message.account.pubkey); @@ -565,10 +691,14 @@ impl<'a> FilterAccountsMatch<'a> { } } - fn extend(set: &mut HashSet<&'a str>, map: &'a HashMap>, key: &Pubkey) { + fn extend( + set: &mut HashSet<&'a str>, + map: &'a HashMap>, + key: &Pubkey, + ) { if let Some(names) = map.get(key) { for name in names { - set.insert(name); + set.insert(name.as_ref()); } } } @@ -577,7 +707,7 @@ impl<'a> FilterAccountsMatch<'a> { for (name, filter) in self.filter.nonempty_txn_signature.iter() { if let Some(nonempty_txn_signature) = filter { if *nonempty_txn_signature == txn_signature.is_some() { - self.nonempty_txn_signature.insert(name); + self.nonempty_txn_signature.insert(name.as_ref()); } } } @@ -594,17 +724,17 @@ impl<'a> FilterAccountsMatch<'a> { pub fn match_data_lamports(&mut self, data: &[u8], lamports: u64) { for (name, filter) in self.filter.filters.iter() { if filter.is_match(data, lamports) { - self.data.insert(name); + self.data.insert(name.as_ref()); } } } - pub fn get_filters(&self) -> Vec { + pub fn get_filters(&self) -> Vec { self.filter .filters .iter() - .filter_map(|(name, filter)| { - let name = name.as_str(); + .filter_map(|(filter_name, filter)| { + let name = filter_name.as_ref(); let af = &self.filter; // If filter name in required but not in matched => return `false` @@ -623,7 +753,7 @@ impl<'a> FilterAccountsMatch<'a> { return None; } - Some(name.to_string()) + Some(filter_name.clone()) }) .collect() } @@ -644,21 +774,26 @@ impl FilterSlotsInner { #[derive(Debug, Default, Clone)] struct FilterSlots { - filters: HashMap, + filters: HashMap, } impl FilterSlots { fn new( configs: &HashMap, limit: &ConfigGrpcFiltersSlots, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; Ok(Self { filters: configs .iter() - .map(|(name, filter)| (name.clone(), FilterSlotsInner::new(*filter))) - .collect(), + .map(|(name, filter)| { + names + .get(name) + .map(|name| (name, FilterSlotsInner::new(*filter))) + }) + .collect::>()?, }) } @@ -666,7 +801,7 @@ impl FilterSlots { &'a self, message: &'a MessageSlot, commitment: Option, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters .iter() @@ -702,7 +837,7 @@ pub struct FilterTransactionsInner { #[derive(Debug, Clone)] pub struct FilterTransactions { filter_type: FilterTransactionsType, - filters: HashMap, + filters: HashMap, } impl FilterTransactions { @@ -710,6 +845,7 @@ impl FilterTransactions { configs: &HashMap, limit: &ConfigGrpcFiltersTransactions, filter_type: FilterTransactionsType, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; @@ -737,7 +873,7 @@ impl FilterTransactions { )?; filters.insert( - name.clone(), + names.get(name)?, FilterTransactionsInner { vote: filter.vote, failed: filter.failed, @@ -774,7 +910,7 @@ impl FilterTransactions { pub fn get_filters<'a>( &'a self, message: &'a MessageTransaction, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { let filters = self .filters .iter() @@ -860,29 +996,29 @@ impl FilterTransactions { #[derive(Debug, Default, Clone)] struct FilterEntry { - filters: Vec, + filters: Vec, } impl FilterEntry { fn new( configs: &HashMap, limit: &ConfigGrpcFiltersEntry, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; Ok(Self { filters: configs .iter() - // .filter_map(|(name, _filter)| Some(name.clone())) - .map(|(name, _filter)| name.clone()) - .collect(), + .map(|(name, _filter)| names.get(name)) + .collect::>()?, }) } fn get_filters<'a>( &'a self, message: &'a MessageEntry, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters.clone(), FilteredMessage::Entry(message), @@ -900,13 +1036,14 @@ pub struct FilterBlocksInner { #[derive(Debug, Default, Clone)] struct FilterBlocks { - filters: HashMap, + filters: HashMap, } impl FilterBlocks { fn new( configs: &HashMap, limit: &ConfigGrpcFiltersBlocks, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; @@ -934,7 +1071,7 @@ impl FilterBlocks { ); this.filters.insert( - name.clone(), + names.get(name)?, FilterBlocksInner { account_include: Filter::decode_pubkeys_into_vec( &filter.account_include, @@ -952,7 +1089,7 @@ impl FilterBlocks { fn get_filters<'a>( &'a self, message: &'a MessageBlock, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(self.filters.iter().map(move |(filter, inner)| { #[allow(clippy::unnecessary_filter_map)] let transactions = if matches!(inner.include_transactions, None | Some(true)) { @@ -1022,29 +1159,29 @@ impl FilterBlocks { #[derive(Debug, Default, Clone)] struct FilterBlocksMeta { - filters: Vec, + filters: Vec, } impl FilterBlocksMeta { fn new( configs: &HashMap, limit: &ConfigGrpcFiltersBlocksMeta, + names: &mut FilterNames, ) -> anyhow::Result { ConfigGrpcFilters::check_max(configs.len(), limit.max)?; Ok(Self { filters: configs .iter() - // .filter_map(|(name, _filter)| Some(name.clone())) - .map(|(name, _filter)| name.clone()) - .collect(), + .map(|(name, _filter)| names.get(name)) + .collect::>()?, }) } fn get_filters<'a>( &'a self, message: &'a MessageBlockMeta, - ) -> Box, FilteredMessage<'a>)> + Send + 'a> { + ) -> Box, FilteredMessage<'a>)> + Send + 'a> { Box::new(std::iter::once(( self.filters.clone(), FilteredMessage::BlockMeta(message), @@ -1092,7 +1229,7 @@ impl FilterAccountsDataSlice { #[cfg(test)] mod tests { use { - super::FilteredMessage, + super::{FilterName, FilterNames, FilteredMessage}, crate::{ config::ConfigGrpcFilters, filters::Filter, @@ -1106,12 +1243,16 @@ mod tests { transaction::{SanitizedTransaction, Transaction}, }, solana_transaction_status::TransactionStatusMeta, - std::{collections::HashMap, sync::Arc}, + std::{collections::HashMap, sync::Arc, time::Duration}, yellowstone_grpc_proto::geyser::{ SubscribeRequest, SubscribeRequestFilterAccounts, SubscribeRequestFilterTransactions, }, }; + fn create_filter_names() -> FilterNames { + FilterNames::new(64, 1024, Duration::from_secs(1)) + } + fn create_message_transaction( keypair: &Keypair, account_keys: Vec, @@ -1171,7 +1312,7 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit); + let filter = Filter::new(&config, &limit, &mut create_filter_names()); assert!(filter.is_ok()); } @@ -1203,7 +1344,7 @@ mod tests { }; let mut limit = ConfigGrpcFilters::default(); limit.accounts.any = false; - let filter = Filter::new(&config, &limit); + let filter = Filter::new(&config, &limit, &mut create_filter_names()); // filter should fail assert!(filter.is_err()); } @@ -1238,7 +1379,7 @@ mod tests { }; let mut limit = ConfigGrpcFilters::default(); limit.transactions.any = false; - let filter = Filter::new(&config, &limit); + let filter = Filter::new(&config, &limit, &mut create_filter_names()); // filter should fail assert!(filter.is_err()); } @@ -1272,7 +1413,7 @@ mod tests { }; let mut limit = ConfigGrpcFilters::default(); limit.transactions.any = false; - let filter_res = Filter::new(&config, &limit); + let filter_res = Filter::new(&config, &limit, &mut create_filter_names()); // filter should succeed assert!(filter_res.is_ok()); } @@ -1311,16 +1452,16 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit).unwrap(); + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec!["serum"]); + assert_eq!(updates[0].0, vec![FilterName::new("serum")]); assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!(updates[1].0, Vec::::new()); assert!(matches!( updates[1].1, FilteredMessage::TransactionStatus(_) @@ -1361,16 +1502,16 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit).unwrap(); + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); let message = Message::Transaction(message_transaction); let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec!["serum"]); + assert_eq!(updates[0].0, vec![FilterName::new("serum")]); assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!(updates[1].0, Vec::::new()); assert!(matches!( updates[1].1, FilteredMessage::TransactionStatus(_) @@ -1411,7 +1552,7 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit).unwrap(); + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); let message_transaction = create_message_transaction(&keypair_b, vec![account_key_b, account_key_a]); @@ -1461,7 +1602,7 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit).unwrap(); + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); let message_transaction = create_message_transaction( &keypair_x, @@ -1470,9 +1611,9 @@ mod tests { let message = Message::Transaction(message_transaction); let updates = filter.get_filters(&message, None).collect::>(); assert_eq!(updates.len(), 2); - assert_eq!(updates[0].0, vec!["serum"]); + assert_eq!(updates[0].0, vec![FilterName::new("serum")]); assert!(matches!(updates[0].1, FilteredMessage::Transaction(_))); - assert_eq!(updates[1].0, Vec::::new()); + assert_eq!(updates[1].0, Vec::::new()); assert!(matches!( updates[1].1, FilteredMessage::TransactionStatus(_) @@ -1519,7 +1660,7 @@ mod tests { ping: None, }; let limit = ConfigGrpcFilters::default(); - let filter = Filter::new(&config, &limit).unwrap(); + let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); let message_transaction = create_message_transaction(&keypair_x, vec![account_key_x, account_key_z]); diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 54252c67..66de0d35 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -1,7 +1,7 @@ use { crate::{ config::{ConfigBlockFailAction, ConfigGrpc, ConfigGrpcFilters}, - filters::Filter, + filters::{Filter, FilterNames}, message::{Message, MessageBlockMeta, MessageEntry, MessageSlot, MessageTransactionInfo}, metrics::{self, DebugClientMessage}, version::GrpcVersionInfo, @@ -287,6 +287,7 @@ pub struct GrpcService { snapshot_rx: Mutex>>>, broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, debug_clients_tx: Option>, + filter_names: Arc>, } impl GrpcService { @@ -343,6 +344,12 @@ impl GrpcService { .context("failed to apply tls_config")?; } + let filter_names = Arc::new(Mutex::new(FilterNames::new( + config.filter_name_size_limit, + config.filter_names_size_limit, + config.filter_names_cleanup_interval, + ))); + // Create Server let max_decoding_message_size = config.max_decoding_message_size; let mut service = GeyserServer::new(Self { @@ -354,6 +361,7 @@ impl GrpcService { snapshot_rx: Mutex::new(snapshot_rx), broadcast_tx: broadcast_tx.clone(), debug_clients_tx, + filter_names, }) .max_decoding_message_size(max_decoding_message_size); for encoding in config.compression.accept { @@ -726,7 +734,6 @@ impl GrpcService { async fn client_loop( id: usize, endpoint: String, - config_filters: Arc, stream_tx: mpsc::Sender>, mut client_rx: mpsc::UnboundedReceiver>, mut snapshot_rx: Option>>, @@ -734,22 +741,7 @@ impl GrpcService { debug_client_tx: Option>, drop_client: impl FnOnce(), ) { - let mut filter = Filter::new( - &SubscribeRequest { - accounts: HashMap::new(), - slots: HashMap::new(), - transactions: HashMap::new(), - transactions_status: HashMap::new(), - blocks: HashMap::new(), - blocks_meta: HashMap::new(), - entry: HashMap::new(), - commitment: None, - accounts_data_slice: Vec::new(), - ping: None, - }, - &config_filters, - ) - .expect("empty filter"); + let mut filter = Filter::default(); metrics::update_subscriptions(&endpoint, None, Some(&filter)); metrics::connections_total_inc(); @@ -996,6 +988,7 @@ impl Geyser for GrpcService { .unwrap_or_else(|| "".to_owned()); let config_filters = Arc::clone(&self.config_filters); + let filter_names = Arc::clone(&self.filter_names); let incoming_stream_tx = stream_tx.clone(); let incoming_client_tx = client_tx; let incoming_exit = Arc::clone(¬ify_exit2); @@ -1010,7 +1003,10 @@ impl Geyser for GrpcService { } message = request.get_mut().message() => match message { Ok(Some(request)) => { - if let Err(error) = match Filter::new(&request, &config_filters) { + let mut filter_names = filter_names.lock().await; + filter_names.try_clean(); + + if let Err(error) = match Filter::new(&request, &config_filters, &mut filter_names) { Ok(filter) => match incoming_client_tx.send(Some(filter)) { Ok(()) => Ok(()), Err(error) => Err(error.to_string()), @@ -1040,7 +1036,6 @@ impl Geyser for GrpcService { tokio::spawn(Self::client_loop( id, endpoint, - Arc::clone(&self.config_filters), stream_tx, client_rx, snapshot_rx,