From 9e7b00b641249e94d6a3c72e77b818d8e78c967f Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Wed, 21 Feb 2024 20:14:06 -0500 Subject: [PATCH] rpc: add config option to getRPF (#161) --- Cargo.lock | 1 - rpc-client-api/src/config.rs | 6 + rpc/src/rpc.rs | 28 +- runtime/Cargo.toml | 1 - runtime/src/prioritization_fee.rs | 178 ++++---- runtime/src/prioritization_fee_cache.rs | 563 +++++++++++++----------- 6 files changed, 436 insertions(+), 341 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 14ea5d3a8a..e6a2c59fd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7115,7 +7115,6 @@ dependencies = [ "lazy_static", "libsecp256k1", "log", - "lru", "lz4", "memmap2", "memoffset 0.9.0", diff --git a/rpc-client-api/src/config.rs b/rpc-client-api/src/config.rs index ae2e3037a5..2b297226b9 100644 --- a/rpc-client-api/src/config.rs +++ b/rpc-client-api/src/config.rs @@ -352,3 +352,9 @@ pub struct RpcContextConfig { pub commitment: Option, pub min_context_slot: Option, } + +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RpcRecentPrioritizationFeesConfig { + pub percentile: Option, +} diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 28f25784f7..414a2332f9 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2184,10 +2184,18 @@ impl JsonRpcRequestProcessor { fn get_recent_prioritization_fees( &self, pubkeys: Vec, + percentile: Option, ) -> Result> { - Ok(self - .prioritization_fee_cache - .get_prioritization_fees(&pubkeys) + let cache = match percentile { + Some(percentile) => self + .prioritization_fee_cache + .get_prioritization_fees2(&pubkeys, percentile), + None => self + .prioritization_fee_cache + .get_prioritization_fees(&pubkeys), + }; + + Ok(cache .into_iter() .map(|(slot, prioritization_fee)| RpcPrioritizationFee { slot, @@ -3420,6 +3428,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result>; } @@ -4104,6 +4113,7 @@ pub mod rpc_full { &self, meta: Self::Metadata, pubkey_strs: Option>, + config: Option, ) -> Result> { let pubkey_strs = pubkey_strs.unwrap_or_default(); debug!( @@ -4119,7 +4129,17 @@ pub mod rpc_full { .into_iter() .map(|pubkey_str| verify_pubkey(&pubkey_str)) .collect::>>()?; - meta.get_recent_prioritization_fees(pubkeys) + + let RpcRecentPrioritizationFeesConfig { percentile } = config.unwrap_or_default(); + if let Some(percentile) = percentile { + if percentile > 10_000 { + return Err(Error::invalid_params( + "Percentile is too big; max value is 10000".to_owned(), + )); + } + } + + meta.get_recent_prioritization_fees(pubkeys, percentile) } } } diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 355c858597..a3cc5f188b 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -29,7 +29,6 @@ index_list = { workspace = true } itertools = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } -lru = { workspace = true } lz4 = { workspace = true } memmap2 = { workspace = true } mockall = { workspace = true } diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index bb5f7632c9..61f3397ad4 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -136,13 +136,13 @@ pub enum PrioritizationFeeError { /// block; and the minimum fee for each writable account in all transactions in this block. The only relevant /// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land /// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...) -#[derive(Debug)] +#[derive(Debug, Default)] pub struct PrioritizationFee { - // The minimum prioritization fee of transactions that landed in this block. - min_transaction_fee: u64, + // Prioritization fee of transactions that landed in this block. + transaction_fees: Vec, - // The minimum prioritization fee of each writable account in transactions in this block. - min_writable_account_fees: HashMap, + // Prioritization fee of each writable account in transactions in this block. + writable_account_fees: HashMap>, // Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded // are finalized, and can be made available for use (e.g., RPC query) @@ -152,38 +152,19 @@ pub struct PrioritizationFee { metrics: PrioritizationFeeMetrics, } -impl Default for PrioritizationFee { - fn default() -> Self { - PrioritizationFee { - min_transaction_fee: u64::MAX, - min_writable_account_fees: HashMap::new(), - is_finalized: false, - metrics: PrioritizationFeeMetrics::default(), - } - } -} - impl PrioritizationFee { /// Update self for minimum transaction fee in the block and minimum fee for each writable account. - pub fn update( - &mut self, - transaction_fee: u64, - writable_accounts: &[Pubkey], - ) -> Result<(), PrioritizationFeeError> { + pub fn update(&mut self, transaction_fee: u64, writable_accounts: Vec) { let (_, update_time) = measure!( { if !self.is_finalized { - if transaction_fee < self.min_transaction_fee { - self.min_transaction_fee = transaction_fee; - } + self.transaction_fees.push(transaction_fee); - for write_account in writable_accounts.iter() { - self.min_writable_account_fees - .entry(*write_account) - .and_modify(|write_lock_fee| { - *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) - }) - .or_insert(transaction_fee); + for write_account in writable_accounts { + self.writable_account_fees + .entry(write_account) + .or_default() + .push(transaction_fee); } self.metrics @@ -199,41 +180,56 @@ impl PrioritizationFee { self.metrics .accumulate_total_update_elapsed_us(update_time.as_us()); - Ok(()) - } - - /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are - /// removed to reduce memory footprint when mark_block_completed() is called. - fn prune_irrelevant_writable_accounts(&mut self) { - self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; - self.min_writable_account_fees - .retain(|_, account_fee| account_fee > &mut self.min_transaction_fee); - self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; } pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> { if self.is_finalized { return Err(PrioritizationFeeError::BlockIsAlreadyFinalized); } - self.prune_irrelevant_writable_accounts(); self.is_finalized = true; + + self.transaction_fees.sort(); + for fees in self.writable_account_fees.values_mut() { + fees.sort() + } + + self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; + self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; + Ok(()) } pub fn get_min_transaction_fee(&self) -> Option { - (self.min_transaction_fee != u64::MAX).then_some(self.min_transaction_fee) + self.transaction_fees.first().copied() + } + + fn get_percentile(fees: &[u64], percentile: u16) -> Option { + let index = (percentile as usize).min(9_999) * fees.len() / 10_000; + fees.get(index).copied() + } + + pub fn get_transaction_fee(&self, percentile: u16) -> Option { + Self::get_percentile(&self.transaction_fees, percentile) } - pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { - self.min_writable_account_fees.get(key).copied() + pub fn get_min_writable_account_fee(&self, key: &Pubkey) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| fees.first().copied()) } - pub fn get_writable_account_fees(&self) -> impl Iterator { - self.min_writable_account_fees.iter() + pub fn get_writable_account_fee(&self, key: &Pubkey, percentile: u16) -> Option { + self.writable_account_fees + .get(key) + .and_then(|fees| Self::get_percentile(fees, percentile)) + } + + pub fn get_writable_account_fees(&self) -> impl Iterator)> { + self.writable_account_fees.iter() } pub fn get_writable_accounts_count(&self) -> usize { - self.min_writable_account_fees.len() + self.writable_account_fees.len() } pub fn is_finalized(&self) -> bool { @@ -245,20 +241,28 @@ impl PrioritizationFee { // report this slot's min_transaction_fee and top 10 min_writable_account_fees let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0); - let mut accounts_fees: Vec<_> = self.get_writable_account_fees().collect(); - accounts_fees.sort_by(|lh, rh| rh.1.cmp(lh.1)); datapoint_info!( "block_min_prioritization_fee", ("slot", slot as i64, i64), ("entity", "block", String), ("min_prioritization_fee", min_transaction_fee as i64, i64), ); - for (account_key, fee) in accounts_fees.iter().take(10) { + + let mut accounts_fees: Vec<(&Pubkey, u64)> = self + .get_writable_account_fees() + .filter_map(|(account, fees)| { + fees.first() + .copied() + .map(|min_account_fee| (account, min_transaction_fee.min(min_account_fee))) + }) + .collect(); + accounts_fees.sort_by(|lh, rh| rh.1.cmp(&lh.1)); + for (account_key, fee) in accounts_fees.into_iter().take(10) { datapoint_trace!( "block_min_prioritization_fee", ("slot", slot as i64, i64), ("entity", account_key.to_string(), String), - ("min_prioritization_fee", **fee as i64, i64), + ("min_prioritization_fee", fee as i64, i64), ); } } @@ -283,25 +287,27 @@ mod tests { // ----------------------------------------------------------------------- // [5, a, b ] --> [5, 5, 5, nil ] { - assert!(prioritization_fee - .update(5, &[write_account_a, write_account_b]) - .is_ok()); + prioritization_fee.update(5, vec![write_account_a, write_account_b]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .is_none()); + + prioritization_fee.is_finalized = false; } // Assert for second transaction: @@ -309,28 +315,30 @@ mod tests { // ----------------------------------------------------------------------- // [9, b, c ] --> [5, 5, 5, 9 ] { - assert!(prioritization_fee - .update(9, &[write_account_b, write_account_c]) - .is_ok()); + prioritization_fee.update(9, vec![write_account_b, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 9, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } // Assert for third transaction: @@ -338,47 +346,57 @@ mod tests { // ----------------------------------------------------------------------- // [2, a, c ] --> [2, 2, 5, 2 ] { - assert!(prioritization_fee - .update(2, &[write_account_a, write_account_c]) - .is_ok()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_a) + .get_min_writable_account_fee(&write_account_a) .unwrap() ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) .unwrap() ); assert_eq!( 2, prioritization_fee - .get_writable_account_fee(&write_account_c) + .get_min_writable_account_fee(&write_account_c) .unwrap() ); + + prioritization_fee.is_finalized = false; } - // assert after prune, account a and c should be removed from cache to save space + // assert after sort { - prioritization_fee.prune_irrelevant_writable_accounts(); - assert_eq!(1, prioritization_fee.min_writable_account_fees.len()); + prioritization_fee.update(2, vec![write_account_a, write_account_c]); + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_a) - .is_none()); + assert_eq!(3, prioritization_fee.writable_account_fees.len()); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_a) + .unwrap() + ); assert_eq!( 5, prioritization_fee - .get_writable_account_fee(&write_account_b) + .get_min_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 2, + prioritization_fee + .get_min_writable_account_fee(&write_account_c) .unwrap() ); - assert!(prioritization_fee - .get_writable_account_fee(&write_account_c) - .is_none()); } } diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index ece749387a..317a7906da 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -4,9 +4,7 @@ use { transaction_priority_details::GetTransactionPriorityDetails, }, crossbeam_channel::{unbounded, Receiver, Sender}, - dashmap::DashMap, log::*, - lru::LruCache, solana_measure::measure, solana_sdk::{ clock::{BankId, Slot}, @@ -14,7 +12,7 @@ use { transaction::SanitizedTransaction, }, std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{ atomic::{AtomicU64, Ordering}, Arc, RwLock, @@ -121,12 +119,18 @@ impl PrioritizationFeeCacheMetrics { } } +#[derive(Debug)] +struct CacheTransactionUpdate { + transaction_fee: u64, + writable_accounts: Vec, +} + +#[derive(Debug)] enum CacheServiceUpdate { - TransactionUpdate { + TransactionsUpdate { slot: Slot, bank_id: BankId, - transaction_fee: u64, - writable_accounts: Arc>, + updates: Vec, }, BankFinalized { slot: Slot, @@ -137,14 +141,14 @@ enum CacheServiceUpdate { /// Potentially there are more than one bank that updates Prioritization Fee /// for a slot. The updates are tracked and finalized by bank_id. -type SlotPrioritizationFee = DashMap; +type SlotPrioritizationFee = HashMap; /// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, /// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, /// and collecting stats and reporting metrics. #[derive(Debug)] pub struct PrioritizationFeeCache { - cache: Arc>>>, + cache: Arc>>, service_thread: Option>, sender: Sender, metrics: Arc, @@ -169,17 +173,17 @@ impl Drop for PrioritizationFeeCache { impl PrioritizationFeeCache { pub fn new(capacity: u64) -> Self { - let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); + let cache = Arc::new(RwLock::new(BTreeMap::new())); let (sender, receiver) = unbounded(); - let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize))); + let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); - let cache_clone = cache.clone(); - let metrics_clone = metrics.clone(); let service_thread = Some( Builder::new() .name("solPrFeeCachSvc".to_string()) - .spawn(move || { - Self::service_loop(cache_clone, receiver, metrics_clone); + .spawn({ + let cache = cache.clone(); + let metrics = metrics.clone(); + move || Self::service_loop(cache, capacity as usize, receiver, metrics) }) .unwrap(), ); @@ -192,75 +196,63 @@ impl PrioritizationFeeCache { } } - /// Get prioritization fee entry, create new entry if necessary - fn get_prioritization_fee( - cache: Arc>>>, - slot: &Slot, - ) -> Arc { - let mut cache = cache.write().unwrap(); - match cache.get(slot) { - Some(entry) => Arc::clone(entry), - None => { - let entry = Arc::new(SlotPrioritizationFee::default()); - cache.put(*slot, Arc::clone(&entry)); - entry - } - } - } - /// Update with a list of non-vote transactions' tx_priority_details and tx_account_locks; Only /// transactions have both valid priority_detail and account_locks will be used to update /// fee_cache asynchronously. pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator) { let (_, send_updates_time) = measure!( { - for sanitized_transaction in txs { - // Vote transactions are not prioritized, therefore they are excluded from - // updating fee_cache. - if sanitized_transaction.is_simple_vote_transaction() { - continue; - } - - let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price) - let priority_details = sanitized_transaction - .get_transaction_priority_details(round_compute_unit_price_enabled); - let account_locks = sanitized_transaction - .get_account_locks(bank.get_transaction_account_lock_limit()); - - if priority_details.is_none() || account_locks.is_err() { - continue; - } - let priority_details = priority_details.unwrap(); - - // filter out any transaction that requests zero compute_unit_limit - // since its priority fee amount is not instructive - if priority_details.compute_unit_limit == 0 { - continue; - } - - let writable_accounts = Arc::new( - account_locks + let updates = txs + .filter_map(|sanitized_transaction| { + // Vote transactions are not prioritized, therefore they are excluded from + // updating fee_cache. + if sanitized_transaction.is_simple_vote_transaction() { + return None; + } + + let round_compute_unit_price_enabled = false; // TODO: bank.feture_set.is_active(round_compute_unit_price) + let priority_details = sanitized_transaction + .get_transaction_priority_details(round_compute_unit_price_enabled); + let account_locks = sanitized_transaction + .get_account_locks(bank.get_transaction_account_lock_limit()); + + if priority_details.is_none() || account_locks.is_err() { + return None; + } + let priority_details = priority_details.unwrap(); + + // filter out any transaction that requests zero compute_unit_limit + // since its priority fee amount is not instructive + if priority_details.compute_unit_limit == 0 { + return None; + } + + let writable_accounts = account_locks .unwrap() .writable .iter() .map(|key| **key) - .collect::>(), - ); + .collect::>(); - self.sender - .send(CacheServiceUpdate::TransactionUpdate { - slot: bank.slot(), - bank_id: bank.bank_id(), + Some(CacheTransactionUpdate { transaction_fee: priority_details.priority, writable_accounts, }) - .unwrap_or_else(|err| { - warn!( - "prioritization fee cache transaction updates failed: {:?}", - err - ); - }); - } + }) + .collect(); + + self.sender + .send(CacheServiceUpdate::TransactionsUpdate { + slot: bank.slot(), + bank_id: bank.bank_id(), + updates, + }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); + }); }, "send_updates", ); @@ -282,51 +274,66 @@ impl PrioritizationFeeCache { }); } - /// Internal function is invoked by worker thread to update slot's minimum prioritization fee, - /// Cache lock contends here. + /// Internal function is invoked by worker thread to update slot's minimum prioritization fee. fn update_cache( - cache: Arc>>>, - slot: &Slot, - bank_id: &BankId, - transaction_fee: u64, - writable_accounts: Arc>, - metrics: Arc, + unfinalized: &mut BTreeMap, + slot: Slot, + bank_id: BankId, + updates: Vec, + metrics: &PrioritizationFeeCacheMetrics, ) { - let (slot_prioritization_fee, cache_lock_time) = - measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); - + let transaction_update_count = updates.len() as u64; let (_, entry_update_time) = measure!( { - let mut block_prioritization_fee = slot_prioritization_fee - .entry(*bank_id) - .or_insert(PrioritizationFee::default()); - block_prioritization_fee.update(transaction_fee, &writable_accounts) + let block_prioritization_fee = unfinalized + .entry(slot) + .or_default() + .entry(bank_id) + .or_default(); + + for CacheTransactionUpdate { + transaction_fee, + writable_accounts, + } in updates + { + block_prioritization_fee.update(transaction_fee, writable_accounts); + } }, "entry_update_time" ); - metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); - metrics.accumulate_successful_transaction_update_count(1); + metrics.accumulate_successful_transaction_update_count(transaction_update_count); } fn finalize_slot( - cache: Arc>>>, - slot: &Slot, - bank_id: &BankId, - metrics: Arc, + unfinalized: &mut BTreeMap, + cache: &RwLock>, + cache_max_size: usize, + slot: Slot, + bank_id: BankId, + metrics: &PrioritizationFeeCacheMetrics, ) { - let (slot_prioritization_fee, cache_lock_time) = - measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + // remove unfinalized slots + // TODO: do we need to keep slots in buffer? or they always come in order? + loop { + match unfinalized.keys().next().cloned() { + Some(unfinalized_slot) if unfinalized_slot < slot - 128 => unfinalized.pop_first(), + _ => break, + }; + } + + let Some(mut slot_prioritization_fee) = unfinalized.remove(&slot) else { return }; // prune cache by evicting write account entry from prioritization fee if its fee is less // or equal to block's minimum transaction fee, because they are irrelevant in calculating // block minimum fee. - let (result, slot_finalize_time) = measure!( + let (slot_prioritization_fee, slot_finalize_time) = measure!( { // Only retain priority fee reported from optimistically confirmed bank let pre_purge_bank_count = slot_prioritization_fee.len() as u64; - slot_prioritization_fee.retain(|id, _| id == bank_id); - let post_purge_bank_count = slot_prioritization_fee.len() as u64; + let mut slot_prioritization_fee = slot_prioritization_fee.remove(&bank_id); + let post_purge_bank_count = + slot_prioritization_fee.as_ref().map(|_| 1).unwrap_or(1); metrics.accumulate_total_purged_duplicated_bank_count( pre_purge_bank_count.saturating_sub(post_purge_bank_count), ); @@ -336,49 +343,61 @@ impl PrioritizationFeeCache { warn!("Finalized bank has empty prioritization fee cache. slot {slot} bank id {bank_id}"); } - let mut block_prioritization_fee = slot_prioritization_fee - .entry(*bank_id) - .or_insert(PrioritizationFee::default()); - let result = block_prioritization_fee.mark_block_completed(); - block_prioritization_fee.report_metrics(*slot); - result + if let Some(slot_prioritization_fee) = &mut slot_prioritization_fee { + if let Err(err) = slot_prioritization_fee.mark_block_completed() { + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); + } + slot_prioritization_fee.report_metrics(slot); + } + slot_prioritization_fee }, "slot_finalize_time" ); - metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); - if let Err(err) = result { - error!( - "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", - err + // Create new cache entry + if let Some(slot_prioritization_fee) = slot_prioritization_fee { + let (_, cache_lock_time) = measure!( + { + let mut cache = cache.write().unwrap(); + while cache.len() >= cache_max_size { + cache.pop_first(); + } + cache.insert(slot, slot_prioritization_fee); + }, + "cache_lock_time" ); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); } } fn service_loop( - cache: Arc>>>, + cache: Arc>>, + cache_max_size: usize, receiver: Receiver, metrics: Arc, ) { + let mut unfinalized = BTreeMap::::new(); + for update in receiver.iter() { match update { - CacheServiceUpdate::TransactionUpdate { + CacheServiceUpdate::TransactionsUpdate { slot, bank_id, - transaction_fee, - writable_accounts, - } => Self::update_cache( - cache.clone(), - &slot, - &bank_id, - transaction_fee, - writable_accounts, - metrics.clone(), - ), + updates, + } => Self::update_cache(&mut unfinalized, slot, bank_id, updates, &metrics), CacheServiceUpdate::BankFinalized { slot, bank_id } => { - Self::finalize_slot(cache.clone(), &slot, &bank_id, metrics.clone()); - + Self::finalize_slot( + &mut unfinalized, + &cache, + cache_max_size, + slot, + bank_id, + &metrics, + ); metrics.report(slot); } CacheServiceUpdate::Exit => { @@ -390,43 +409,52 @@ impl PrioritizationFeeCache { /// Returns number of blocks that have finalized minimum fees collection pub fn available_block_count(&self) -> usize { + self.cache.read().unwrap().len() + } + + pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> Vec<(Slot, u64)> { self.cache .read() .unwrap() .iter() - .filter(|(_slot, slot_prioritization_fee)| { - slot_prioritization_fee - .iter() - .any(|prioritization_fee| prioritization_fee.is_finalized()) + .map(|(slot, slot_prioritization_fee)| { + let mut fee = slot_prioritization_fee + .get_min_transaction_fee() + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + slot_prioritization_fee.get_min_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); + } + } + (*slot, fee) }) - .count() + .collect() } - pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> HashMap { + pub fn get_prioritization_fees2( + &self, + account_keys: &[Pubkey], + percentile: u16, + ) -> Vec<(Slot, u64)> { self.cache .read() .unwrap() .iter() - .filter_map(|(slot, slot_prioritization_fee)| { - slot_prioritization_fee - .iter() - .find_map(|prioritization_fee| { - prioritization_fee.is_finalized().then(|| { - let mut fee = prioritization_fee - .get_min_transaction_fee() - .unwrap_or_default(); - for account_key in account_keys { - if let Some(account_fee) = - prioritization_fee.get_writable_account_fee(account_key) - { - fee = std::cmp::max(fee, account_fee); - } - } - Some((*slot, fee)) - }) - }) + .map(|(slot, slot_prioritization_fee)| { + let mut fee = slot_prioritization_fee + .get_transaction_fee(percentile) + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + slot_prioritization_fee.get_writable_account_fee(account_key, percentile) + { + fee = std::cmp::max(fee, account_fee); + } + } + (*slot, fee) }) - .flatten() .collect() } } @@ -486,7 +514,7 @@ mod tests { .load(Ordering::Relaxed) != expected_update_count { - std::thread::sleep(std::time::Duration::from_millis(100)); + std::thread::sleep(std::time::Duration::from_millis(10)); } } @@ -496,18 +524,20 @@ mod tests { slot: Slot, bank_id: BankId, ) { + // mark as finalized prioritization_fee_cache.finalize_priority_fee(slot, bank_id); - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); // wait till finalization is done - while !fee - .get(&bank_id) - .map_or(false, |block_fee| block_fee.is_finalized()) - { - std::thread::sleep(std::time::Duration::from_millis(100)); + loop { + let cache = prioritization_fee_cache.cache.read().unwrap(); + if let Some(slot_cache) = cache.get(&slot) { + if slot_cache.is_finalized() { + return; + } + } + drop(cache); + + std::thread::sleep(std::time::Duration::from_millis(10)); } } @@ -541,33 +571,38 @@ mod tests { // assert block minimum fee and account a, b, c fee accordingly { - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - let fee = fee.get(&bank.bank_id()).unwrap(); - assert_eq!(2, fee.get_min_transaction_fee().unwrap()); - assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); - assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); - assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap()); - // assert unknown account d fee - assert!(fee - .get_writable_account_fee(&Pubkey::new_unique()) - .is_none()); + // Not possible to check the state in the thread + // let lock = prioritization_fee_cache.cache.read().unwrap(); + // let fee = lock.get(&slot).unwrap(); + // let fee = fee.get(&bank.bank_id()).unwrap(); + // assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + // assert_eq!(2, fee.get_min_writable_account_fee(&write_account_a).unwrap()); + // assert_eq!(5, fee.get_min_writable_account_fee(&write_account_b).unwrap()); + // assert_eq!(2, fee.get_min_writable_account_fee(&write_account_c).unwrap()); + // // assert unknown account d fee + // assert!(fee + // .get_min_writable_account_fee(&Pubkey::new_unique()) + // .is_none()); } - // assert after prune, account a and c should be removed from cache to save space + // assert block minimum fee and account a, b, c fee accordingly { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); - let fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - let fee = fee.get(&bank.bank_id()).unwrap(); + let lock = prioritization_fee_cache.cache.read().unwrap(); + let fee = lock.get(&slot).unwrap(); assert_eq!(2, fee.get_min_transaction_fee().unwrap()); - assert!(fee.get_writable_account_fee(&write_account_a).is_none()); - assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); - assert!(fee.get_writable_account_fee(&write_account_c).is_none()); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_a).unwrap() + ); + assert_eq!( + 5, + fee.get_min_writable_account_fee(&write_account_b).unwrap() + ); + assert_eq!( + 2, + fee.get_min_writable_account_fee(&write_account_c).unwrap() + ); } } @@ -575,35 +610,52 @@ mod tests { fn test_available_block_count() { let prioritization_fee_cache = PrioritizationFeeCache::default(); - assert!(PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &1 - ) - .entry(1) - .or_default() - .mark_block_completed() - .is_ok()); - assert!(PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &2 - ) - .entry(2) - .or_default() - .mark_block_completed() - .is_ok()); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new_rw_arc(bank0); + let bank = bank_forks.read().unwrap().working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + + let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 1)); + sync_update( + &prioritization_fee_cache, + bank1.clone(), + vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )] + .iter(), + ); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); + + let bank2 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 2)); + sync_update( + &prioritization_fee_cache, + bank2.clone(), + vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )] + .iter(), + ); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); + // add slot 3 entry to cache, but not finalize it - PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3) - .entry(3) - .or_default(); + let bank3 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 3)); + let txs = vec![build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + )]; + sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); // assert available block count should be 2 finalized blocks + std::thread::sleep(std::time::Duration::from_millis(1_000)); assert_eq!(2, prioritization_fee_cache.available_block_count()); } - fn hashmap_of(vec: Vec<(Slot, u64)>) -> HashMap { - vec.into_iter().collect() - } - #[test] fn test_get_prioritization_fees() { solana_logger::setup(); @@ -675,28 +727,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -718,28 +770,28 @@ mod tests { sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); // before block is marked as completed assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 1)]), + vec![(1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(1, 2)]), + vec![(1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -749,28 +801,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); assert_eq!( - hashmap_of(vec![(2, 3), (1, 1)]), + vec![(2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(2, 3), (1, 2)]), + vec![(2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 1)]), + vec![(2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -792,28 +844,28 @@ mod tests { sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); // before block is marked as completed assert_eq!( - hashmap_of(vec![(2, 3), (1, 1)]), + vec![(2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(2, 3), (1, 2)]), + vec![(2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 1)]), + vec![(2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(2, 4), (1, 2)]), + vec![(2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -823,28 +875,28 @@ mod tests { // after block is completed sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id()); assert_eq!( - hashmap_of(vec![(3, 5), (2, 3), (1, 1)]), + vec![(3, 5), (2, 3), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 3), (1, 2)]), + vec![(3, 6), (2, 3), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), ); assert_eq!( - hashmap_of(vec![(3, 5), (2, 4), (1, 2)]), + vec![(3, 5), (2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 1)]), + vec![(3, 6), (2, 4), (1, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 2)]), + vec![(3, 6), (2, 4), (1, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]), ); assert_eq!( - hashmap_of(vec![(3, 6), (2, 4), (1, 2)]), + vec![(3, 6), (2, 4), (1, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b, @@ -886,12 +938,13 @@ mod tests { ]; sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - assert_eq!(1, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // Not possible to check the state in the thread + // let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + // prioritization_fee_cache.cache.clone(), + // &slot, + // ); + // assert_eq!(1, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); } // Assert after add transactions for bank2 of slot 1 @@ -906,50 +959,50 @@ mod tests { ]; sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - assert_eq!(2, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); - assert!(slot_prioritization_fee.contains_key(&bank2.bank_id())); + // Not possible to check the state in the thread + // let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + // prioritization_fee_cache.cache.clone(), + // &slot, + // ); + // assert_eq!(2, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // assert!(slot_prioritization_fee.contains_key(&bank2.bank_id())); } // Assert after finalize with bank1 of slot 1, { sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank1.bank_id()); - let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &slot, - ); - assert_eq!(1, slot_prioritization_fee.len()); - assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + // Not possible to check bank_id + // let cache_lock = prioritization_fee_cache.cache.read().unwrap(); + // let slot_prioritization_fee = cache_lock.get(&slot).unwrap(); + // assert_eq!(1, slot_prioritization_fee.len()); + // assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); // and data available for query are from bank1 assert_eq!( - hashmap_of(vec![(slot, 1)]), + vec![(slot, 1)], prioritization_fee_cache.get_prioritization_fees(&[]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) ); assert_eq!( - hashmap_of(vec![(slot, 1)]), + vec![(slot, 1)], prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache .get_prioritization_fees(&[write_account_a, write_account_b]) ); assert_eq!( - hashmap_of(vec![(slot, 2)]), + vec![(slot, 2)], prioritization_fee_cache.get_prioritization_fees(&[ write_account_a, write_account_b,