Skip to content

Commit

Permalink
geyser: use Vec::binary_search instead of HashSet::contains in the fi…
Browse files Browse the repository at this point in the history
…lters (rpcpool#284)
  • Loading branch information
fanatid authored Jan 27, 2024
1 parent 01faea2 commit 84a2548
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The minor version will be incremented upon a breaking change and the patch versi
### Features

- proto: add `entries_count` to block meta message ([#283](https://github.com/rpcpool/yellowstone-grpc/pull/283))
- geyser: use `Vec::binary_search` instead of `HashSet::contains` in the filters ([#284](https://github.com/rpcpool/yellowstone-grpc/pull/284))

### Breaking

Expand Down
1 change: 0 additions & 1 deletion examples/rust/src/bin/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,6 @@ async fn geyser_subscribe(
while let Some(message) = stream.next().await {
match message {
Ok(msg) => {
#[allow(clippy::single_match)]
match msg.update_oneof {
Some(UpdateOneof::Account(account)) => {
let account: AccountPretty = account.into();
Expand Down
157 changes: 86 additions & 71 deletions yellowstone-grpc-geyser/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use {
spl_token_2022::{generic_token_account::GenericTokenAccount, state::Account as TokenAccount},
std::{
collections::{HashMap, HashSet},
iter::FromIterator,
str::FromStr,
},
yellowstone_grpc_proto::prelude::{
Expand Down Expand Up @@ -64,20 +63,27 @@ impl Filter {
})
}

fn decode_pubkeys<T: FromIterator<Pubkey>>(
fn decode_pubkeys<'a>(
pubkeys: &'a [String],
limit: &'a HashSet<Pubkey>,
) -> impl Iterator<Item = anyhow::Result<Pubkey>> + 'a {
pubkeys.iter().map(|value| match Pubkey::from_str(value) {
Ok(pubkey) => {
ConfigGrpcFilters::check_pubkey_reject(&pubkey, limit)?;
Ok::<Pubkey, anyhow::Error>(pubkey)
}
Err(error) => Err(error.into()),
})
}

fn decode_pubkeys_into_vec(
pubkeys: &[String],
limit: &HashSet<Pubkey>,
) -> anyhow::Result<T> {
pubkeys
.iter()
.map(|value| match Pubkey::from_str(value) {
Ok(pubkey) => {
ConfigGrpcFilters::check_pubkey_reject(&pubkey, limit)?;
Ok(pubkey)
}
Err(error) => Err(error.into()),
})
.collect::<_>()
) -> anyhow::Result<Vec<Pubkey>> {
let mut vec =
Self::decode_pubkeys(pubkeys, limit).collect::<anyhow::Result<Vec<Pubkey>>>()?;
vec.sort();
Ok(vec)
}

pub const fn get_commitment_level(&self) -> CommitmentLevel {
Expand Down Expand Up @@ -156,15 +162,15 @@ impl FilterAccounts {
&mut this.account,
&mut this.account_required,
name,
Filter::decode_pubkeys(&filter.account, &limit.account_reject)?,
);
Filter::decode_pubkeys(&filter.account, &limit.account_reject),
)?;

Self::set(
&mut this.owner,
&mut this.owner_required,
name,
Filter::decode_pubkeys(&filter.owner, &limit.owner_reject)?,
);
Filter::decode_pubkeys(&filter.owner, &limit.owner_reject),
)?;

this.filters
.push((name.clone(), FilterAccountsData::new(&filter.filters)?));
Expand All @@ -176,19 +182,19 @@ impl FilterAccounts {
map: &mut HashMap<Pubkey, HashSet<String>>,
map_required: &mut HashSet<String>,
name: &str,
keys: Vec<Pubkey>,
) -> bool {
keys: impl Iterator<Item = anyhow::Result<Pubkey>>,
) -> anyhow::Result<bool> {
let mut required = false;
for key in keys.into_iter() {
if map.entry(key).or_default().insert(name.to_string()) {
for maybe_key in keys {
if map.entry(maybe_key?).or_default().insert(name.to_string()) {
required = true;
}
}

if required {
map_required.insert(name.to_string());
}
required
Ok(required)
}

fn get_filters<'a>(&self, message: &'a MessageAccount) -> Vec<(Vec<String>, MessageRef<'a>)> {
Expand Down Expand Up @@ -305,9 +311,7 @@ impl<'a> FilterAccountsMatch<'a> {
fn extend(set: &mut HashSet<&'a str>, map: &'a HashMap<Pubkey, HashSet<String>>, key: &Pubkey) {
if let Some(names) = map.get(key) {
for name in names {
if !set.contains(name.as_str()) {
set.insert(name);
}
set.insert(name);
}
}
}
Expand Down Expand Up @@ -412,9 +416,9 @@ pub struct FilterTransactionsInner {
vote: Option<bool>,
failed: Option<bool>,
signature: Option<Signature>,
account_include: HashSet<Pubkey>,
account_exclude: HashSet<Pubkey>,
account_required: HashSet<Pubkey>,
account_include: Vec<Pubkey>,
account_exclude: Vec<Pubkey>,
account_required: Vec<Pubkey>,
}

#[derive(Debug, Default, Clone)]
Expand Down Expand Up @@ -466,15 +470,15 @@ impl FilterTransactions {
.map_err(|error| anyhow::anyhow!("invalid signature: {error}"))
})
.transpose()?,
account_include: Filter::decode_pubkeys(
account_include: Filter::decode_pubkeys_into_vec(
&filter.account_include,
&limit.account_include_reject,
)?,
account_exclude: Filter::decode_pubkeys(
account_exclude: Filter::decode_pubkeys_into_vec(
&filter.account_exclude,
&HashSet::new(),
)?,
account_required: Filter::decode_pubkeys(
account_required: Filter::decode_pubkeys_into_vec(
&filter.account_required,
&HashSet::new(),
)?,
Expand Down Expand Up @@ -517,7 +521,7 @@ impl FilterTransactions {
.message()
.account_keys()
.iter()
.all(|pubkey| !inner.account_include.contains(pubkey))
.all(|pubkey| inner.account_include.binary_search(pubkey).is_err())
{
return None;
}
Expand All @@ -529,25 +533,33 @@ impl FilterTransactions {
.message()
.account_keys()
.iter()
.any(|pubkey| inner.account_exclude.contains(pubkey))
.any(|pubkey| inner.account_exclude.binary_search(pubkey).is_ok())
{
return None;
}

// check if transaction contains all required account keys
if !inner.account_required.is_empty()
&& !inner.account_required.is_subset(
&message
.transaction
.transaction
.message()
.account_keys()
if !inner.account_required.is_empty() {
let mut other: Vec<&Pubkey> = message
.transaction
.transaction
.message()
.account_keys()
.iter()
.collect();

let is_subset = if inner.account_required.len() <= other.len() {
other.sort();
inner
.account_required
.iter()
.cloned()
.collect(),
)
{
return None;
.all(|pubkey| other.binary_search(&pubkey).is_ok())
} else {
false
};

if !is_subset {
return None;
}
}

Some(name.clone())
Expand Down Expand Up @@ -585,7 +597,7 @@ impl FilterEntry {

#[derive(Debug, Clone)]
pub struct FilterBlocksInner {
account_include: HashSet<Pubkey>,
account_include: Vec<Pubkey>,
include_transactions: Option<bool>,
include_accounts: Option<bool>,
include_entries: Option<bool>,
Expand Down Expand Up @@ -629,7 +641,7 @@ impl FilterBlocks {
this.filters.insert(
name.clone(),
FilterBlocksInner {
account_include: Filter::decode_pubkeys(
account_include: Filter::decode_pubkeys_into_vec(
&filter.account_include,
&limit.account_include_reject,
)?,
Expand All @@ -647,28 +659,28 @@ impl FilterBlocks {
.iter()
.map(|(filter, inner)| {
#[allow(clippy::unnecessary_filter_map)]
let transactions = if matches!(inner.include_transactions, None | Some(true)) {
message
.transactions
.iter()
.filter_map(|tx| {
if !inner.account_include.is_empty()
&& tx
.transaction
.message()
.account_keys()
.iter()
.all(|pubkey| !inner.account_include.contains(pubkey))
{
return None;
}

Some(tx)
})
.collect::<Vec<_>>()
} else {
vec![]
};
let transactions =
if matches!(inner.include_transactions, None | Some(true)) {
message
.transactions
.iter()
.filter_map(|tx| {
if !inner.account_include.is_empty()
&& tx.transaction.message().account_keys().iter().all(
|pubkey| {
inner.account_include.binary_search(pubkey).is_err()
},
)
{
return None;
}

Some(tx)
})
.collect::<Vec<_>>()
} else {
vec![]
};

#[allow(clippy::unnecessary_filter_map)]
let accounts = if inner.include_accounts == Some(true) {
Expand All @@ -677,7 +689,10 @@ impl FilterBlocks {
.iter()
.filter_map(|account| {
if !inner.account_include.is_empty()
&& !inner.account_include.contains(&account.pubkey)
&& inner
.account_include
.binary_search(&account.pubkey)
.is_err()
{
return None;
}
Expand Down

0 comments on commit 84a2548

Please sign in to comment.