diff --git a/Cargo.lock b/Cargo.lock index b3cdfe839..9e451e33b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2603,6 +2603,15 @@ dependencies = [ "serde", ] +[[package]] +name = "cache-advisor" +version = "1.0.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f89ab55ca4e6a46a0740a1c5346db1ad66e4a76598bbfa060dc3259935a7450" +dependencies = [ + "crossbeam-queue", +] + [[package]] name = "camino" version = "1.1.7" @@ -2877,6 +2886,16 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "concurrent-map" +version = "5.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6542c565fbcba786db59307d7840f0bf5cd9e0aba6502755337e15f0e06fd65" +dependencies = [ + "ebr", + "stack-map", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -3076,6 +3095,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df0346b5d5e76ac2fe4e327c5fd1118d6be7c51dfb18f9b7922923f287471e35" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.20" @@ -3672,6 +3700,15 @@ version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125" +[[package]] +name = "ebr" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b1ea3b18359d566f360eaf811a2d69bc6c8eb6faaeecc8839975633860a076e" +dependencies = [ + "shared-local-state", +] + [[package]] name = "ecdsa" version = "0.16.9" @@ -5992,6 +6029,16 @@ dependencies = [ "signature", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "keccak" version = "0.1.5" @@ -6129,7 +6176,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e310b3a6b5907f99202fcdb4960ff45b93735d7c7d96b760fcff8db2dc0e103d" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -12020,6 +12067,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "shared-local-state" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a50ccb2f45251772ed15abfd1e5f10a305288187b1582ab2e4295b29bbb4929" +dependencies = [ + "parking_lot 0.12.3", +] + [[package]] name = "shell-words" version = "1.1.0" @@ -12711,6 +12767,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stack-map" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b49d6d36fee60faad91e23603db2356677b58ec2429237b39d5c60c26868f37c" + [[package]] name = "static_assertions" version = "1.1.0" @@ -13663,9 +13725,13 @@ dependencies = [ name = "strata-storage" version = "0.1.0" dependencies = [ + "ahash 0.8.11", "anyhow", "async-trait", "bitcoin", + "cache-advisor", + "concurrent-map", + "kanal", "lru", "paste", "strata-db", diff --git a/Cargo.toml b/Cargo.toml index 7e7067ec1..43812f5db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -174,6 +174,7 @@ reth-transaction-pool = { git = "https://github.com/paradigmxyz/reth.git", rev = reth-trie = { git = "https://github.com/paradigmxyz/reth.git", rev = "v1.1.0" } reth-trie-common = { git = "https://github.com/paradigmxyz/reth.git", rev = "v1.1.0" } +ahash = "0.8.11" anyhow = "1.0.86" arbitrary = { version = "1.3.2", features = ["derive"] } argh = "0.1" @@ -184,8 +185,10 @@ bitcoin = { version = "=0.32.3", features = ["serde"] } bitcoind = { version = "0.36.0", features = ["26_0"] } borsh = { version = "1.5.0", features = ["derive"] } bytes = "1.6.0" +cache-advisor = "1.0.16" chrono = "0.4.38" clap = "4" +concurrent-map = "5.0.37" digest = "0.10" ethnum = "1.5.0" eyre = "0.6" @@ -199,6 +202,7 @@ jmt = "0.10.0" jsonrpsee = "0.24" jsonrpsee-http-client = "0.24" jsonrpsee-types = "0.24" +kanal = "0.1.0-pre8" lazy_static = "1.5.0" lru = "0.12" miniscript = "12.2.0" diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index a1ef433ea..5ec06fbfd 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -8,9 +8,13 @@ strata-db.workspace = true strata-primitives.workspace = true strata-state.workspace = true +ahash.workspace = true anyhow.workspace = true async-trait.workspace = true bitcoin.workspace = true +cache-advisor.workspace = true +concurrent-map.workspace = true +kanal.workspace = true lru.workspace = true paste.workspace = true threadpool.workspace = true diff --git a/crates/storage/src/cache.rs b/crates/storage/src/cache.rs index b255e6aac..99969a630 100644 --- a/crates/storage/src/cache.rs +++ b/crates/storage/src/cache.rs @@ -1,32 +1,50 @@ //! Generic cache utility for what we're inserting into the database. -use std::{hash::Hash, num::NonZeroUsize, sync::Arc}; +use std::{cell::RefCell, cmp::PartialEq, hash::Hash, marker::PhantomData, num::NonZeroUsize}; +use ahash::RandomState; +use cache_advisor::CacheAdvisor; +use concurrent_map::{CasFailure, ConcurrentMap}; +use kanal::{bounded_async, AsyncReceiver}; use strata_db::{DbError, DbResult}; -use tokio::sync::{broadcast, Mutex, RwLock}; use tracing::*; use crate::exec::DbRecv; -/// Entry for something we can put into the cache without actually knowing what it is, and so we can -/// keep the reservation to it. -type CacheSlot = Arc>>; - /// Describes a cache entry that may be occupied, reserved for pending database read, or returned an /// error from a database read. -#[derive(Debug)] -pub enum SlotState { +#[derive(Debug, Clone, PartialEq)] +pub enum Entry { /// Authentic database entry. Ready(T), /// A database fetch is happening in the background and it will be updated. - Pending(broadcast::Receiver), + Pending(Receiver>), +} - /// An unspecified error happened fetching from the database. - Error, +// Do not use outside this module. +// +// A wrapper around a async kanal receiver that implements PartialEq so we can +// do cas ops. We don't actually CAS between SlotState::Pending and +// SlotState::Pending, so this is fine. +#[derive(Debug, Clone)] +pub struct Receiver(AsyncReceiver); + +impl AsRef> for Receiver { + fn as_ref(&self) -> &AsyncReceiver { + &self.0 + } } -impl SlotState { +impl PartialEq for Receiver { + fn eq(&self, _other: &Self) -> bool { + // this is only so we can + false + } +} +impl Eq for Receiver {} + +impl Entry { /// Tries to read a value from the slot, asynchronously. pub async fn get_async(&self) -> DbResult { match self { @@ -37,12 +55,11 @@ impl SlotState { // correctly. // TODO figure out how to test this trace!("waiting for database fetch to complete"); - match ch.resubscribe().recv().await { - Ok(v) => Ok(v), + match ch.0.recv().await { + Ok(v) => v, Err(_e) => Err(DbError::WorkerFailedStrangely), } } - Self::Error => Err(DbError::CacheLoadFail), } } @@ -56,71 +73,71 @@ impl SlotState { // correctly. // TODO figure out how to test this trace!("waiting for database fetch to complete"); - match ch.resubscribe().blocking_recv() { - Ok(v) => Ok(v), + match ch.0.as_sync().recv() { + Ok(v) => v, Err(_e) => Err(DbError::WorkerFailedStrangely), } } - Self::Error => Err(DbError::CacheLoadFail), } } } /// Wrapper around a LRU cache that handles cache reservations and asynchronously waiting for /// database operations in the background without keeping a global lock on the cache. -pub struct CacheTable { - cache: Mutex>>, +#[derive(Debug, Clone)] +pub struct CacheTable +where + K: Hash, + V: 'static + CacheTableValue + PartialEq, +{ + table: ConcurrentMap>, + advisor: RefCell, + hasher: RandomState, + _k: PhantomData, } -impl CacheTable { +pub trait CacheTableValue: Clone + Send + Sync {} +impl CacheTableValue for V where V: Clone + Send + Sync {} + +impl CacheTable +where + K: Hash, + V: 'static + CacheTableValue + PartialEq, +{ /// Creates a new cache with some maximum capacity. /// /// This measures entries by *count* not their (serialized?) size, so ideally entries should /// consume similar amounts of memory to helps us best reason about real cache capacity. pub fn new(size: NonZeroUsize) -> Self { Self { - cache: Mutex::new(lru::LruCache::new(size)), + table: ConcurrentMap::new(), + advisor: CacheAdvisor::new(size.get(), 20).into(), + hasher: RandomState::new(), + _k: PhantomData, } } /// Gets the number of elements in the cache. - // TODO replace this with an atomic we update after every op - pub async fn get_len_async(&self) -> usize { - let cache = self.cache.lock().await; - cache.len() - } - - /// Gets the number of elements in the cache. - // TODO replace this with an atomic we update after every op - pub fn get_len_blocking(&self) -> usize { - let cache = self.cache.blocking_lock(); - cache.len() + pub fn len(&self) -> usize { + self.table.len() } /// Removes the entry for a particular cache entry. - pub async fn purge_async(&self, k: &K) { - let mut cache = self.cache.lock().await; - cache.pop(k); - } - - /// Removes the entry for a particular cache entry. - pub fn purge_blocking(&self, k: &K) { - let mut cache = self.cache.blocking_lock(); - cache.pop(k); + pub fn purge(&self, k: &K) { + self.table.remove(&self.hasher.hash_one(k)); } /// Inserts an entry into the table, dropping the previous value. - pub async fn insert_async(&self, k: K, v: V) { - let slot = Arc::new(RwLock::new(SlotState::Ready(v))); - let mut cache = self.cache.lock().await; - cache.put(k, slot); + pub fn insert(&self, k: K, v: V) { + let hash = self.hasher.hash_one(&k); + self.table.insert(hash, Entry::Ready(v)); + self.record_access_and_evict(hash); } - /// Inserts an entry into the table, dropping the previous value. - pub fn insert_blocking(&self, k: K, v: V) { - let slot = Arc::new(RwLock::new(SlotState::Ready(v))); - let mut cache = self.cache.blocking_lock(); - cache.put(k, slot); + fn record_access_and_evict(&self, hash: u64) { + for (hash, _cost) in self.advisor.borrow_mut().accessed_reuse_buffer(hash, 1) { + self.table.remove(hash); + } } /// Returns a clone of an entry from the cache or possibly invoking some function returning a @@ -128,96 +145,80 @@ impl CacheTable { /// /// This is meant to be used with the `_chan` functions generated by the db ops macro in the /// `exec` module. - pub async fn get_or_fetch_async(&self, k: &K, fetch_fn: impl Fn() -> DbRecv) -> DbResult { - // See below comment about control flow. - let (mut slot_lock, complete_tx) = { - let mut cache = self.cache.lock().await; - if let Some(entry_lock) = cache.get(k) { - let entry = entry_lock.read().await; - return entry.get_async().await; - } - - // Create a new cache slot and insert and lock it. - let (complete_tx, complete_rx) = broadcast::channel(1); - let slot = Arc::new(RwLock::new(SlotState::Pending(complete_rx))); - cache.push(k.clone(), slot.clone()); - let lock = slot - .try_write_owned() - .expect("cache: lock fresh cache entry"); - - (lock, complete_tx) - }; + pub async fn get_or_fetch(&self, k: &K, fetch_fn: impl Fn() -> DbRecv) -> DbResult { + let hash = self.hasher.hash_one(k); + self.record_access_and_evict(hash); + + let (tx, rx) = bounded_async(1); + // mmmmmm atomic cas ops + if let Err(CasFailure { actual, .. }) = + self.table + .cas(hash, None, Some(Entry::Pending(Receiver(rx)))) + { + return actual.expect("should be Some").get_async().await; + } // Start the task and get the recv handle. let res_fut = fetch_fn(); let res = match res_fut.await { - Ok(Ok(v)) => v, - Ok(Err(e)) => { - error!(?e, "failed to make database fetch"); - *slot_lock = SlotState::Error; - self.purge_async(k).await; - return Err(e); + Ok(res) => { + if let Err(ref e) = res { + error!(?e, "failed to make database fetch"); + } + res } Err(_) => { error!("database fetch aborted"); - self.purge_async(k).await; return Err(DbError::WorkerFailedStrangely); } }; - // Fill in the lock state and send down the complete tx. - *slot_lock = SlotState::Ready(res.clone()); - if complete_tx.send(res.clone()).is_err() { + // Update the cache entry if we got a value. + if let Ok(v) = res.clone() { + self.table.insert(hash, Entry::Ready(v)); + } + + // Update any waiting readers. + if tx.send(res.clone()).await.is_err() { warn!("failed to notify waiting cache readers"); } - Ok(res) + res } /// Returns a clone of an entry from the cache or invokes some function to load it from /// the underlying database. pub fn get_or_fetch_blocking(&self, k: &K, fetch_fn: impl Fn() -> DbResult) -> DbResult { - // The flow control here is kinda weird, I don't like it. The key here is that we want to - // ensure the lock on the whole cache is as short-lived as possible while we check to see if - // the entry we're looking for is there. If it's not, then we want to insert a reservation - // that we hold a lock to and then release the cache-level lock. - let (mut slot_lock, complete_tx) = { - let mut cache = self.cache.blocking_lock(); - if let Some(entry_lock) = cache.get(k) { - let entry = entry_lock.blocking_read(); - return entry.get_blocking(); - } + let hash = self.hasher.hash_one(k); + self.record_access_and_evict(hash); + + let (tx, rx) = bounded_async(1); + // mmmmmm atomic cas ops + if let Err(CasFailure { actual, .. }) = + self.table + .cas(hash, None, Some(Entry::Pending(Receiver(rx)))) + { + return actual.expect("should be Some").get_blocking(); + } - // Create a new cache slot and insert and lock it. - let (complete_tx, complete_rx) = broadcast::channel(1); - let slot = Arc::new(RwLock::new(SlotState::Pending(complete_rx))); - cache.push(k.clone(), slot.clone()); - let lock = slot - .try_write_owned() - .expect("cache: lock fresh cache entry"); + let res = fetch_fn(); - (lock, complete_tx) + if let Err(ref e) = res { + error!(?e, "failed to make database fetch"); }; - // Load the entry and insert it into the slot we've already reserved. - let res = match fetch_fn() { - Ok(v) => v, - Err(e) => { - warn!(?e, "failed to make database fetch"); - *slot_lock = SlotState::Error; - self.purge_blocking(k); - return Err(e); - } - }; + // Update the cache entry if we got a value. + if let Ok(v) = res.clone() { + self.table.insert(hash, Entry::Ready(v)); + } - // Fill in the lock state and send down the complete tx. - *slot_lock = SlotState::Ready(res.clone()); - if complete_tx.send(res.clone()).is_err() { + // Update any waiting readers. + if tx.as_sync().send(res.clone()).is_err() { warn!("failed to notify waiting cache readers"); } - Ok(res) + res } } @@ -232,7 +233,7 @@ mod tests { let cache = CacheTable::::new(3.try_into().unwrap()); let res = cache - .get_or_fetch_async(&42, || { + .get_or_fetch(&42, || { let (tx, rx) = tokio::sync::oneshot::channel(); tx.send(Ok(10)).expect("test: send init value"); rx @@ -242,7 +243,7 @@ mod tests { assert_eq!(res, 10); let res = cache - .get_or_fetch_async(&42, || { + .get_or_fetch(&42, || { let (tx, rx) = tokio::sync::oneshot::channel(); tx.send(Err(DbError::Busy)).expect("test: send init value"); rx @@ -251,9 +252,9 @@ mod tests { .expect("test: load gof"); assert_eq!(res, 10); - cache.insert_async(42, 12).await; + cache.insert(42, 12); let res = cache - .get_or_fetch_async(&42, || { + .get_or_fetch(&42, || { let (tx, rx) = tokio::sync::oneshot::channel(); tx.send(Err(DbError::Busy)).expect("test: send init value"); rx @@ -262,10 +263,10 @@ mod tests { .expect("test: load gof"); assert_eq!(res, 12); - let len = cache.get_len_async().await; + let len = cache.len(); assert_eq!(len, 1); - cache.purge_async(&42).await; - let len = cache.get_len_async().await; + cache.purge(&42); + let len = cache.len(); assert_eq!(len, 0); } @@ -283,16 +284,16 @@ mod tests { .expect("test: load gof"); assert_eq!(res, 10); - cache.insert_blocking(42, 12); + cache.insert(42, 12); let res = cache .get_or_fetch_blocking(&42, || Err(DbError::Busy)) .expect("test: load gof"); assert_eq!(res, 12); - let len = cache.get_len_blocking(); + let len = cache.len(); assert_eq!(len, 1); - cache.purge_blocking(&42); - let len = cache.get_len_blocking(); + cache.purge(&42); + let len = cache.len(); assert_eq!(len, 0); } } diff --git a/crates/storage/src/managers/checkpoint.rs b/crates/storage/src/managers/checkpoint.rs index b95455a80..64046b0c4 100644 --- a/crates/storage/src/managers/checkpoint.rs +++ b/crates/storage/src/managers/checkpoint.rs @@ -22,19 +22,19 @@ impl CheckpointDbManager { pub async fn put_checkpoint(&self, idx: u64, entry: CheckpointEntry) -> DbResult<()> { self.ops.put_batch_checkpoint_async(idx, entry).await?; - self.checkpoint_cache.purge_async(&idx).await; + self.checkpoint_cache.purge(&idx); Ok(()) } pub fn put_checkpoint_blocking(&self, idx: u64, entry: CheckpointEntry) -> DbResult<()> { self.ops.put_batch_checkpoint_blocking(idx, entry)?; - self.checkpoint_cache.purge_blocking(&idx); + self.checkpoint_cache.purge(&idx); Ok(()) } pub async fn get_checkpoint(&self, idx: u64) -> DbResult> { self.checkpoint_cache - .get_or_fetch_async(&idx, || self.ops.get_batch_checkpoint_chan(idx)) + .get_or_fetch(&idx, || self.ops.get_batch_checkpoint_chan(idx)) .await } diff --git a/crates/storage/src/managers/l2.rs b/crates/storage/src/managers/l2.rs index 0e511861b..5bd3b9925 100644 --- a/crates/storage/src/managers/l2.rs +++ b/crates/storage/src/managers/l2.rs @@ -26,7 +26,7 @@ impl L2BlockManager { pub async fn put_block_async(&self, bundle: L2BlockBundle) -> DbResult<()> { let id = bundle.block().header().get_blockid(); self.ops.put_block_async(bundle).await?; - self.block_cache.purge_async(&id).await; + self.block_cache.purge(&id); Ok(()) } @@ -34,14 +34,14 @@ impl L2BlockManager { pub fn put_block_blocking(&self, bundle: L2BlockBundle) -> DbResult<()> { let id = bundle.block().header().get_blockid(); self.ops.put_block_blocking(bundle)?; - self.block_cache.purge_blocking(&id); + self.block_cache.purge(&id); Ok(()) } /// Gets a block either in the cache or from the underlying database. pub async fn get_block_async(&self, id: &L2BlockId) -> DbResult> { self.block_cache - .get_or_fetch_async(id, || self.ops.get_block_chan(*id)) + .get_or_fetch(id, || self.ops.get_block_chan(*id)) .await }