From c98b041588fe4c53dccd07538090b960d078cc2d Mon Sep 17 00:00:00 2001 From: jeffhelius Date: Wed, 4 Sep 2024 16:31:42 +0000 Subject: [PATCH 1/4] add alessandrod custom runtime --- rpc/src/rpc.rs | 64 ++++++++++++++++++++++++++++++++++++++---- rpc/src/rpc_service.rs | 47 +++++++++++++++++++++++++------ validator/src/cli.rs | 11 ++++++++ validator/src/main.rs | 1 + 4 files changed, 109 insertions(+), 14 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 250848300e3b84..2cf27c779ae0a3 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -3,6 +3,7 @@ use { crate::{ max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, parsed_token_accounts::*, rpc_cache::LargestAccountsCache, rpc_health::*, + rpc_service::service_runtime, }, base64::{prelude::BASE64_STANDARD, Engine}, bincode::{config::Options, serialize}, @@ -111,6 +112,7 @@ use { }, time::Duration, }, + tokio::runtime::Runtime, }; pub mod account_resolver; @@ -137,7 +139,7 @@ fn is_finalized( && (blockstore.is_root(slot) || bank.status_cache_ancestors().contains(&slot)) } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] pub struct JsonRpcConfig { pub enable_rpc_transaction_history: bool, pub enable_extended_tx_metadata_storage: bool, @@ -148,6 +150,7 @@ pub struct JsonRpcConfig { pub max_multiple_accounts: Option, pub account_indexes: AccountSecondaryIndexes, pub rpc_threads: usize, + pub rpc_blocking_threads: usize, pub rpc_niceness_adj: i8, pub full_api: bool, pub obsolete_v1_7_api: bool, @@ -157,6 +160,28 @@ pub struct JsonRpcConfig { pub disable_health_check: bool, } +impl Default for JsonRpcConfig { + fn default() -> Self { + Self { + enable_rpc_transaction_history: Default::default(), + enable_extended_tx_metadata_storage: Default::default(), + faucet_addr: Option::default(), + health_check_slot_distance: Default::default(), + rpc_bigtable_config: Option::default(), + max_multiple_accounts: Option::default(), + account_indexes: AccountSecondaryIndexes::default(), + rpc_threads: 1, + rpc_blocking_threads: 1, + rpc_niceness_adj: Default::default(), + full_api: Default::default(), + obsolete_v1_7_api: Default::default(), + rpc_scan_and_fix_roots: Default::default(), + max_request_body_size: Option::default(), + disable_health_check: Default::default(), + } + } +} + impl JsonRpcConfig { pub fn default_for_test() -> Self { Self { @@ -211,6 +236,7 @@ pub struct JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, } impl Metadata for JsonRpcRequestProcessor {} @@ -318,6 +344,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, + runtime: Arc, ) -> (Self, Receiver) { let (sender, receiver) = unbounded(); ( @@ -340,6 +367,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + runtime, }, receiver, ) @@ -385,8 +413,17 @@ impl JsonRpcRequestProcessor { let slot = bank.slot(); let optimistically_confirmed_bank = Arc::new(RwLock::new(OptimisticallyConfirmedBank { bank })); + let config = JsonRpcConfig::default(); + + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; + Self { - config: JsonRpcConfig::default(), + config, snapshot_config: None, bank_forks, block_commitment_cache: Arc::new(RwLock::new(BlockCommitmentCache::new( @@ -414,6 +451,7 @@ impl JsonRpcRequestProcessor { max_complete_transaction_status_slot: Arc::new(AtomicU64::default()), max_complete_rewards_slot: Arc::new(AtomicU64::default()), prioritization_fee_cache: Arc::new(PrioritizationFeeCache::default()), + runtime: service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), } } @@ -6834,8 +6872,15 @@ pub mod tests { .my_contact_info() .tpu(connection_cache.protocol()) .unwrap(); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (meta, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -7108,8 +7153,15 @@ pub mod tests { .unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let config = JsonRpcConfig::default(); + let JsonRpcConfig { + rpc_threads, + rpc_blocking_threads, + rpc_niceness_adj, + .. + } = config; let (request_processor, receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -7126,6 +7178,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); SendTransactionService::new::( tpu_address, @@ -8741,7 +8794,7 @@ pub mod tests { )); let (meta, _receiver) = JsonRpcRequestProcessor::new( - JsonRpcConfig::default(), + config, None, bank_forks.clone(), block_commitment_cache, @@ -8758,6 +8811,7 @@ pub mod tests { max_complete_transaction_status_slot, max_complete_rewards_slot, Arc::new(PrioritizationFeeCache::default()), + service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj), ); let mut io = MetaIoHandler::default(); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index d8791ab6c3bf6b..5bf1ceb3fda7f2 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -364,6 +364,7 @@ impl JsonRpcService { info!("rpc bound to {:?}", rpc_addr); info!("rpc configuration: {:?}", config); let rpc_threads = 1.max(config.rpc_threads); + let rpc_blocking_threads = 1.max(config.rpc_blocking_threads); let rpc_niceness_adj = config.rpc_niceness_adj; let health = Arc::new(RpcHealth::new( @@ -389,15 +390,8 @@ impl JsonRpcService { // So create a (shared) multi-threaded event_loop for jsonrpc and set its .threads() to 1, // so that we avoid the single-threaded event loops from being created automatically by // jsonrpc for threads when .threads(N > 1) is given. - let runtime = Arc::new( - tokio::runtime::Builder::new_multi_thread() - .worker_threads(rpc_threads) - .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) - .thread_name("solRpcEl") - .enable_all() - .build() - .expect("Runtime"), - ); + + let runtime = service_runtime(rpc_threads, rpc_blocking_threads, rpc_niceness_adj); let exit_bigtable_ledger_upload_service = Arc::new(AtomicBool::new(false)); @@ -476,6 +470,7 @@ impl JsonRpcService { max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache, + Arc::clone(&runtime), ); let leader_info = @@ -583,6 +578,40 @@ impl JsonRpcService { } } +pub fn service_runtime( + rpc_threads: usize, + rpc_blocking_threads: usize, + rpc_niceness_adj: i8, +) -> Arc { + // The jsonrpc_http_server crate supports two execution models: + // + // - By default, it spawns a number of threads - configured with .threads(N) - and runs a + // single-threaded futures executor in each thread. + // - Alternatively when configured with .event_loop_executor(executor) and .threads(1), + // it executes all the tasks on the given executor, not spawning any extra internal threads. + // + // We use the latter configuration, using a multi threaded tokio runtime as the executor. We + // do this so we can configure the number of worker threads, the number of blocking threads + // and then use tokio::task::spawn_blocking() to avoid blocking the worker threads on CPU + // bound operations like getMultipleAccounts. This results in reduced latency, since fast + // rpc calls (the majority) are not blocked by slow CPU bound ones. + // + // NB: `rpc_blocking_threads` shouldn't be set too high (defaults to num_cpus / 2). Too many + // (busy) blocking threads could compete with CPU time with other validator threads and + // negatively impact performance. + let runtime = Arc::new( + tokio::runtime::Builder::new_multi_thread() + .worker_threads(rpc_threads) + .max_blocking_threads(rpc_blocking_threads) + .on_thread_start(move || renice_this_thread(rpc_niceness_adj).unwrap()) + .thread_name("solRpcEl") + .enable_all() + .build() + .expect("Runtime"), + ); + runtime +} + #[cfg(test)] mod tests { use { diff --git a/validator/src/cli.rs b/validator/src/cli.rs index d828f93be909c7..c1581f2f1be36d 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -859,6 +859,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .default_value(&default_args.rpc_threads) .help("Number of threads to use for servicing RPC requests"), ) + .arg( + Arg::with_name("rpc_blocking_threads") + .long("rpc-blocking-threads") + .value_name("NUMBER") + .validator(is_parsable::) + .takes_value(true) + .default_value(&default_args.rpc_blocking_threads) + .help("Number of blocking threads to use for servicing CPU bound RPC requests (eg getMultipleAccounts)"), + ) .arg( Arg::with_name("rpc_niceness_adj") .long("rpc-niceness-adjustment") @@ -1990,6 +1999,7 @@ pub struct DefaultArgs { pub rpc_send_transaction_service_max_retries: String, pub rpc_send_transaction_batch_size: String, pub rpc_threads: String, + pub rpc_blocking_threads: String, pub rpc_niceness_adjustment: String, pub rpc_bigtable_timeout: String, pub rpc_bigtable_instance_name: String, @@ -2075,6 +2085,7 @@ impl DefaultArgs { .batch_size .to_string(), rpc_threads: num_cpus::get().to_string(), + rpc_blocking_threads: (num_cpus::get() / 2).to_string(), rpc_niceness_adjustment: "0".to_string(), rpc_bigtable_timeout: "30".to_string(), rpc_bigtable_instance_name: solana_storage_bigtable::DEFAULT_INSTANCE_NAME.to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index f1e68fdc7f959a..6c7eb72fb6117c 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1338,6 +1338,7 @@ pub fn main() { ), disable_health_check: false, rpc_threads: value_t_or_exit!(matches, "rpc_threads", usize), + rpc_blocking_threads: value_t_or_exit!(matches, "rpc_blocking_threads", usize), rpc_niceness_adj: value_t_or_exit!(matches, "rpc_niceness_adj", i8), account_indexes: account_indexes.clone(), rpc_scan_and_fix_roots: matches.is_present("rpc_scan_and_fix_roots"), From 1bcee8c1d20f98211af1447a5514e1b782223a6c Mon Sep 17 00:00:00 2001 From: jeffhelius Date: Wed, 4 Sep 2024 17:44:47 +0000 Subject: [PATCH 2/4] add async changes to accounts --- rpc/src/parsed_token_accounts.rs | 14 +- rpc/src/rpc.rs | 385 +++++++++++++++++++------------ rpc/src/rpc_subscriptions.rs | 2 +- 3 files changed, 241 insertions(+), 160 deletions(-) diff --git a/rpc/src/parsed_token_accounts.rs b/rpc/src/parsed_token_accounts.rs index d93cda521e65bd..5ffa1b7c2e1894 100644 --- a/rpc/src/parsed_token_accounts.rs +++ b/rpc/src/parsed_token_accounts.rs @@ -19,19 +19,11 @@ pub fn get_parsed_token_account( bank: &Bank, pubkey: &Pubkey, account: AccountSharedData, - // only used for simulation results - overwrite_accounts: Option<&HashMap>, ) -> UiAccount { let additional_data = get_token_account_mint(account.data()) - .and_then(|mint_pubkey| { - account_resolver::get_account_from_overwrites_or_bank( - &mint_pubkey, - bank, - overwrite_accounts, - ) - }) - .map(|mint_account| AccountAdditionalData { - spl_token_decimals: get_mint_decimals(mint_account.data()).ok(), + .and_then(|mint_pubkey| get_mint_owner_and_decimals(bank, &mint_pubkey).ok()) + .map(|(_, decimals)| AccountAdditionalData { + spl_token_decimals: Some(decimals), }); UiAccount::encode( diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2cf27c779ae0a3..e85dc23d409f87 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -455,9 +455,9 @@ impl JsonRpcRequestProcessor { } } - pub fn get_account_info( + pub async fn get_account_info( &self, - pubkey: &Pubkey, + pubkey: Pubkey, config: Option, ) -> Result>> { let RpcAccountInfoConfig { @@ -472,11 +472,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); - let response = get_encoded_account(&bank, pubkey, encoding, data_slice, None)?; + let response = self + .runtime + .spawn_blocking({ + let bank = Arc::clone(&bank); + move || get_encoded_account(&bank, &pubkey, encoding, data_slice) + }) + .await + .expect("rpc: get_encoded_account panicked")?; Ok(new_response(&bank, response)) } - pub fn get_multiple_accounts( + pub async fn get_multiple_accounts( &self, pubkeys: Vec, config: Option, @@ -493,10 +500,18 @@ impl JsonRpcRequestProcessor { })?; let encoding = encoding.unwrap_or(UiAccountEncoding::Base64); - let accounts = pubkeys - .into_iter() - .map(|pubkey| get_encoded_account(&bank, &pubkey, encoding, data_slice, None)) - .collect::>>()?; + let mut accounts = Vec::with_capacity(pubkeys.len()); + for pubkey in pubkeys { + let bank = Arc::clone(&bank); + accounts.push( + self.runtime + .spawn_blocking(move || { + get_encoded_account(&bank, &pubkey, encoding, data_slice) + }) + .await + .expect("rpc: get_encoded_account panicked")?, + ); + } Ok(new_response(&bank, accounts)) } @@ -509,9 +524,9 @@ impl JsonRpcRequestProcessor { .get_minimum_balance_for_rent_exemption(data_len) } - pub fn get_program_accounts( + pub async fn get_program_accounts( &self, - program_id: &Pubkey, + program_id: Pubkey, config: Option, mut filters: Vec, with_context: bool, @@ -529,18 +544,31 @@ impl JsonRpcRequestProcessor { let encoding = encoding.unwrap_or(UiAccountEncoding::Binary); optimize_filters(&mut filters); let keyed_accounts = { - if let Some(owner) = get_spl_token_owner_filter(program_id, &filters) { - self.get_filtered_spl_token_accounts_by_owner(&bank, program_id, &owner, filters)? - } else if let Some(mint) = get_spl_token_mint_filter(program_id, &filters) { - self.get_filtered_spl_token_accounts_by_mint(&bank, program_id, &mint, filters)? + if let Some(owner) = get_spl_token_owner_filter(&program_id, &filters) { + self.get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + program_id, + owner, + filters, + ) + .await? + } else if let Some(mint) = get_spl_token_mint_filter(&program_id, &filters) { + self.get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + program_id, + mint, + filters, + ) + .await? } else { - self.get_filtered_program_accounts(&bank, program_id, filters)? + self.get_filtered_program_accounts(Arc::clone(&bank), program_id, filters) + .await? } }; - let accounts = if is_known_spl_token_id(program_id) + let accounts = if is_known_spl_token_id(&program_id) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() + get_parsed_token_accounts(Arc::clone(&bank), keyed_accounts.into_iter()).collect() } else { keyed_accounts .into_iter() @@ -1883,20 +1911,21 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, supply)) } - pub fn get_token_largest_accounts( + pub async fn get_token_largest_accounts( &self, - mint: &Pubkey, + mint: Pubkey, commitment: Option, ) -> Result>> { let bank = self.bank(commitment); - let (mint_owner, decimals) = get_mint_owner_and_decimals(&bank, mint)?; + let (mint_owner, decimals) = get_mint_owner_and_decimals(&bank, &mint)?; if !is_known_spl_token_id(&mint_owner) { return Err(Error::invalid_params( "Invalid param: not a Token mint".to_string(), )); } let mut token_balances: Vec = self - .get_filtered_spl_token_accounts_by_mint(&bank, &mint_owner, mint, vec![])? + .get_filtered_spl_token_accounts_by_mint(Arc::clone(&bank), mint_owner, mint, vec![]) + .await? .into_iter() .map(|(address, account)| { let amount = StateWithExtensions::::unpack(account.data()) @@ -1921,9 +1950,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, token_balances)) } - pub fn get_token_accounts_by_owner( + pub async fn get_token_accounts_by_owner( &self, - owner: &Pubkey, + owner: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, ) -> Result>> { @@ -1949,12 +1978,14 @@ impl JsonRpcRequestProcessor { ))); } - let keyed_accounts = self.get_filtered_spl_token_accounts_by_owner( - &bank, - &token_program_id, - owner, - filters, - )?; + let keyed_accounts = self + .get_filtered_spl_token_accounts_by_owner( + Arc::clone(&bank), + token_program_id, + owner, + filters, + ) + .await?; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() } else { @@ -1971,9 +2002,9 @@ impl JsonRpcRequestProcessor { Ok(new_response(&bank, accounts)) } - pub fn get_token_accounts_by_delegate( + pub async fn get_token_accounts_by_delegate( &self, - delegate: &Pubkey, + delegate: Pubkey, token_account_filter: TokenAccountsFilter, config: Option, ) -> Result>> { @@ -2001,11 +2032,18 @@ impl JsonRpcRequestProcessor { ]; // Optional filter on Mint address, uses mint account index for scan let keyed_accounts = if let Some(mint) = mint { - self.get_filtered_spl_token_accounts_by_mint(&bank, &token_program_id, &mint, filters)? + self.get_filtered_spl_token_accounts_by_mint( + Arc::clone(&bank), + token_program_id, + mint, + filters, + ) + .await? } else { // Filter on Token Account state filters.push(RpcFilterType::TokenAccountState); - self.get_filtered_program_accounts(&bank, &token_program_id, filters)? + self.get_filtered_program_accounts(Arc::clone(&bank), token_program_id, filters) + .await? }; let accounts = if encoding == UiAccountEncoding::JsonParsed { get_parsed_token_accounts(bank.clone(), keyed_accounts.into_iter()).collect() @@ -2024,14 +2062,14 @@ impl JsonRpcRequestProcessor { } /// Use a set of filters to get an iterator of keyed program accounts from a bank - fn get_filtered_program_accounts( + async fn get_filtered_program_accounts( &self, - bank: &Bank, - program_id: &Pubkey, + bank: Arc, + program_id: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { optimize_filters(&mut filters); - let filter_closure = |account: &AccountSharedData| { + let filter_closure = move |account: &AccountSharedData| { filters .iter() .all(|filter_type| filter_type.allows(account)) @@ -2041,44 +2079,56 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::ProgramId) { - if !self.config.account_indexes.include_key(program_id) { + if !self.config.account_indexes.include_key(&program_id) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: program_id.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::ProgramId(*program_id), - |account| { - // The program-id account index checks for Account owner on inclusion. However, due - // to the current AccountsDb implementation, an account may remain in storage as a - // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later - // updates. We include the redundant filters here to avoid returning these - // accounts. - account.owner() == program_id && filter_closure(account) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::ProgramId(program_id), + |account| { + // The program-id account index checks for Account owner on inclusion. However, due + // to the current AccountsDb implementation, an account may remain in storage as a + // zero-lamport AccountSharedData::Default() after being wiped and reinitialized in later + // updates. We include the redundant filters here to avoid returning these + // accounts. + account.owner() == &program_id && filter_closure(account) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } else { // this path does not need to provide a mb limit because we only want to support secondary indexes - Ok(bank - .get_filtered_program_accounts(program_id, filter_closure, &ScanConfig::default()) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_program_accounts( + &program_id, + filter_closure, + &ScanConfig::default(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("Failed to spawn blocking task") } } /// Get an iterator of spl-token accounts by owner address - fn get_filtered_spl_token_accounts_by_owner( + async fn get_filtered_spl_token_accounts_by_owner( &self, - bank: &Bank, - program_id: &Pubkey, - owner_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + owner_key: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { // The by-owner accounts index checks for Token Account state and Owner address on @@ -2099,37 +2149,42 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenOwner) { - if !self.config.account_indexes.include_key(owner_key) { + if !self.config.account_indexes.include_key(&owner_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: owner_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenOwner(*owner_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_type.allows(account)) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenOwner(owner_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_type.allows(account)) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters) + .await } } /// Get an iterator of spl-token accounts by mint address - fn get_filtered_spl_token_accounts_by_mint( + async fn get_filtered_spl_token_accounts_by_mint( &self, - bank: &Bank, - program_id: &Pubkey, - mint_key: &Pubkey, + bank: Arc, + program_id: Pubkey, + mint_key: Pubkey, mut filters: Vec, ) -> RpcCustomResult> { // The by-mint accounts index checks for Token Account state and Mint address on inclusion. @@ -2149,28 +2204,33 @@ impl JsonRpcRequestProcessor { .account_indexes .contains(&AccountIndex::SplTokenMint) { - if !self.config.account_indexes.include_key(mint_key) { + if !self.config.account_indexes.include_key(&mint_key) { return Err(RpcCustomError::KeyExcludedFromSecondaryIndex { index_key: mint_key.to_string(), }); } - Ok(bank - .get_filtered_indexed_accounts( - &IndexKey::SplTokenMint(*mint_key), - |account| { - account.owner() == program_id - && filters - .iter() - .all(|filter_type| filter_type.allows(account)) - }, - &ScanConfig::default(), - bank.byte_limit_for_scans(), - ) - .map_err(|e| RpcCustomError::ScanError { - message: e.to_string(), - })?) + self.runtime + .spawn_blocking(move || { + bank.get_filtered_indexed_accounts( + &IndexKey::SplTokenMint(mint_key), + |account| { + account.owner() == &program_id + && filters + .iter() + .all(|filter_type| filter_type.allows(account)) + }, + &ScanConfig::default(), + bank.byte_limit_for_scans(), + ) + .map_err(|e| RpcCustomError::ScanError { + message: e.to_string(), + }) + }) + .await + .expect("rpc: get_filtered_indexed_account panicked") } else { self.get_filtered_program_accounts(bank, program_id, filters) + .await } } @@ -2323,15 +2383,13 @@ fn get_encoded_account( pubkey: &Pubkey, encoding: UiAccountEncoding, data_slice: Option, - // only used for simulation results - overwrite_accounts: Option<&HashMap>, ) -> Result> { - match account_resolver::get_account_from_overwrites_or_bank(pubkey, bank, overwrite_accounts) { + match bank.get_account(pubkey) { Some(account) => { let response = if is_known_spl_token_id(account.owner()) && encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_account(bank, pubkey, account, overwrite_accounts) + get_parsed_token_account(bank, pubkey, account) } else { encode_account(&account, pubkey, encoding, data_slice)? }; @@ -2999,7 +3057,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getMultipleAccounts")] fn get_multiple_accounts( @@ -3007,7 +3065,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>>; + ) -> BoxFuture>>>>; #[rpc(meta, name = "getBlockCommitment")] fn get_block_commitment( @@ -3046,10 +3104,13 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!("get_account_info rpc request received: {:?}", pubkey_str); - let pubkey = verify_pubkey(&pubkey_str)?; - meta.get_account_info(&pubkey, config) + let pubkey = match verify_pubkey(&pubkey_str) { + Ok(pubkey) => pubkey, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { meta.get_account_info(pubkey, config).await }) } fn get_multiple_accounts( @@ -3057,7 +3118,7 @@ pub mod rpc_accounts { meta: Self::Metadata, pubkey_strs: Vec, config: Option, - ) -> Result>>> { + ) -> BoxFuture>>>> { debug!( "get_multiple_accounts rpc request received: {:?}", pubkey_strs.len() @@ -3068,15 +3129,19 @@ pub mod rpc_accounts { .max_multiple_accounts .unwrap_or(MAX_MULTIPLE_ACCOUNTS); if pubkey_strs.len() > max_multiple_accounts { - return Err(Error::invalid_params(format!( + return Box::pin(future::err(Error::invalid_params(format!( "Too many inputs provided; max {max_multiple_accounts}" - ))); + )))); } let pubkeys = pubkey_strs .into_iter() .map(|pubkey_str| verify_pubkey(&pubkey_str)) - .collect::>>()?; - meta.get_multiple_accounts(pubkeys, config) + .collect::>>(); + let pubkeys = match pubkeys { + Ok(pubkeys) => pubkeys, + Err(err) => return Box::pin(future::err(err)), + }; + Box::pin(async move { meta.get_multiple_accounts(pubkeys, config).await }) } fn get_block_commitment( @@ -3130,7 +3195,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getLargestAccounts")] fn get_largest_accounts( @@ -3156,7 +3221,7 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByOwner")] fn get_token_accounts_by_owner( @@ -3165,7 +3230,7 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; #[rpc(meta, name = "getTokenAccountsByDelegate")] fn get_token_accounts_by_delegate( @@ -3174,7 +3239,7 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>>; + ) -> BoxFuture>>>; } pub struct AccountsScanImpl; @@ -3186,12 +3251,15 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, program_id_str: String, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_program_accounts rpc request received: {:?}", program_id_str ); - let program_id = verify_pubkey(&program_id_str)?; + let program_id = match verify_pubkey(&program_id_str) { + Ok(program_id) => program_id, + Err(e) => return Box::pin(future::err(e)), + }; let (config, filters, with_context) = if let Some(config) = config { ( Some(config.account_config), @@ -3202,14 +3270,19 @@ pub mod rpc_accounts_scan { (None, vec![], false) }; if filters.len() > MAX_GET_PROGRAM_ACCOUNT_FILTERS { - return Err(Error::invalid_params(format!( + return Box::pin(future::err(Error::invalid_params(format!( "Too many filters provided; max {MAX_GET_PROGRAM_ACCOUNT_FILTERS}" - ))); + )))); } for filter in &filters { - verify_filter(filter)?; + if let Err(e) = verify_filter(filter) { + return Box::pin(future::err(e)); + } } - meta.get_program_accounts(&program_id, config, filters, with_context) + Box::pin(async move { + meta.get_program_accounts(program_id, config, filters, with_context) + .await + }) } fn get_largest_accounts( @@ -3235,13 +3308,16 @@ pub mod rpc_accounts_scan { meta: Self::Metadata, mint_str: String, commitment: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_largest_accounts rpc request received: {:?}", mint_str ); - let mint = verify_pubkey(&mint_str)?; - meta.get_token_largest_accounts(&mint, commitment) + let mint = match verify_pubkey(&mint_str) { + Ok(mint) => mint, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { meta.get_token_largest_accounts(mint, commitment).await }) } fn get_token_accounts_by_owner( @@ -3250,14 +3326,23 @@ pub mod rpc_accounts_scan { owner_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_owner rpc request received: {:?}", owner_str ); - let owner = verify_pubkey(&owner_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_owner(&owner, token_account_filter, config) + let owner = match verify_pubkey(&owner_str) { + Ok(owner) => owner, + Err(e) => return Box::pin(future::err(e)), + }; + let token_account_filter = match verify_token_account_filter(token_account_filter) { + Ok(token_account_filter) => token_account_filter, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { + meta.get_token_accounts_by_owner(owner, token_account_filter, config) + .await + }) } fn get_token_accounts_by_delegate( @@ -3266,14 +3351,23 @@ pub mod rpc_accounts_scan { delegate_str: String, token_account_filter: RpcTokenAccountsFilter, config: Option, - ) -> Result>> { + ) -> BoxFuture>>> { debug!( "get_token_accounts_by_delegate rpc request received: {:?}", delegate_str ); - let delegate = verify_pubkey(&delegate_str)?; - let token_account_filter = verify_token_account_filter(token_account_filter)?; - meta.get_token_accounts_by_delegate(&delegate, token_account_filter, config) + let delegate = match verify_pubkey(&delegate_str) { + Ok(delegate) => delegate, + Err(e) => return Box::pin(future::err(e)), + }; + let token_account_filter = match verify_token_account_filter(token_account_filter) { + Ok(token_account_filter) => token_account_filter, + Err(e) => return Box::pin(future::err(e)), + }; + Box::pin(async move { + meta.get_token_accounts_by_delegate(delegate, token_account_filter, config) + .await + }) } } } @@ -3813,24 +3907,19 @@ pub mod rpc_full { if result.is_err() { Some(vec![None; config_accounts.addresses.len()]) } else { - let mut post_simulation_accounts_map = HashMap::new(); - for (pubkey, data) in post_simulation_accounts { - post_simulation_accounts_map.insert(pubkey, data); - } - Some( config_accounts .addresses .iter() .map(|address_str| { - let pubkey = verify_pubkey(address_str)?; - get_encoded_account( - bank, - &pubkey, - accounts_encoding, - None, - Some(&post_simulation_accounts_map), - ) + let address = verify_pubkey(address_str)?; + post_simulation_accounts + .iter() + .find(|(key, _account)| key == &address) + .map(|(pubkey, account)| { + encode_account(account, pubkey, accounts_encoding, None) + }) + .transpose() }) .collect::>>()?, ) diff --git a/rpc/src/rpc_subscriptions.rs b/rpc/src/rpc_subscriptions.rs index fa833f2179cc58..4f2a81ae39b174 100644 --- a/rpc/src/rpc_subscriptions.rs +++ b/rpc/src/rpc_subscriptions.rs @@ -385,7 +385,7 @@ fn filter_account_result( if is_known_spl_token_id(account.owner()) && params.encoding == UiAccountEncoding::JsonParsed { - get_parsed_token_account(&bank, ¶ms.pubkey, account, None) + get_parsed_token_account(&bank, ¶ms.pubkey, account) } else { UiAccount::encode(¶ms.pubkey, &account, params.encoding, None, None) } From 050471a9df2db3577d959eb287a90927b7b5fe7c Mon Sep 17 00:00:00 2001 From: jeffhelius Date: Fri, 6 Sep 2024 20:52:59 +0000 Subject: [PATCH 3/4] make it compile --- rpc/src/rpc.rs | 1 + rpc/src/rpc_pubsub.rs | 11 ++++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index e85dc23d409f87..23968893a1dbf4 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -178,6 +178,7 @@ impl Default for JsonRpcConfig { rpc_scan_and_fix_roots: Default::default(), max_request_body_size: Option::default(), disable_health_check: Default::default(), + skip_preflight_health_check: Default::default(), } } } diff --git a/rpc/src/rpc_pubsub.rs b/rpc/src/rpc_pubsub.rs index ee9f64a23e1cb2..ff246e40f8182c 100644 --- a/rpc/src/rpc_pubsub.rs +++ b/rpc/src/rpc_pubsub.rs @@ -456,22 +456,23 @@ impl RpcSolPubSubImpl { })?; let id = token.id(); self.current_subscriptions.insert(id, token); - let num_connections = self + let connection_count = self .project_connections .entry(self.project_id.clone()) .and_modify(|count| *count += 1) .or_insert(1); - datapoint_info!("rpc-pubsub-connections-by-project", "project_id" => self.project_id, "plan" => self.plan, ("num_connections", *num_connections, i64)); + datapoint_info!("rpc-pubsub-connections-by-project", "project_id" => self.project_id, "plan" => self.plan, ("num_connections", *connection_count, i64)); Ok(id) } fn unsubscribe(&self, id: SubscriptionId) -> Result { if self.current_subscriptions.remove(&id).is_some() { - let num_connections = self + let connection_count = self .project_connections .entry(self.project_id.clone()) - .and_modify(|count| *count -= 1); - datapoint_info!("rpc-pubsub-connections-by-project", "project_id" => self.project_id, "plan" => self.plan, ("num_connections", *num_connections, i64)); + .and_modify(|count| *count = count.saturating_sub(1)) + .or_insert(0); + datapoint_info!("rpc-pubsub-connections-by-project", "project_id" => self.project_id, "plan" => self.plan, ("num_connections", *connection_count, i64)); Ok(true) } else { Err(Error { From d94297313e5d4d0130152a09367966bc8341f906 Mon Sep 17 00:00:00 2001 From: jeffhelius Date: Mon, 9 Sep 2024 13:29:04 -0700 Subject: [PATCH 4/4] don't duplicate --- validator/src/cli.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/validator/src/cli.rs b/validator/src/cli.rs index c1581f2f1be36d..cf1eaf40e976ed 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -270,14 +270,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { "Skip health check when running a preflight check", ), ) - .arg( - Arg::with_name("skip_preflight_health_check") - .long("skip-preflight-health-check") - .takes_value(false) - .help( - "Skip health check when running a preflight check", - ), - ) .arg( Arg::with_name("rpc_faucet_addr") .long("rpc-faucet-address")