From f24557257f1b2a089336b647e07cabab04d41a87 Mon Sep 17 00:00:00 2001 From: hrxi Date: Sat, 23 Nov 2024 21:45:30 +0100 Subject: [PATCH] Intern BLS lazy public keys This ensures that for each compressed public key there's at most one lazy public key, which means everyone profits when it is uncompressed. This drops the (unused: never read from) BLS public key cache. --- Cargo.lock | 1 + blockchain/tests/signed.rs | 4 +- bls/Cargo.toml | 4 +- bls/src/cache.rs | 75 ------- bls/src/lazy.rs | 137 ++++++------ bls/src/lib.rs | 8 +- bls/tests/cache.rs | 76 ------- consensus/src/bls_cache.rs | 25 +++ consensus/src/lib.rs | 3 + .../src/sync/live/block_queue/live_sync.rs | 16 +- consensus/src/sync/live/block_queue/proxy.rs | 4 +- consensus/src/sync/live/mod.rs | 6 +- consensus/src/sync/live/queue.rs | 33 ++- .../src/sync/live/state_queue/live_sync.rs | 4 +- consensus/src/sync/syncer_proxy.rs | 8 +- consensus/tests/consensus_proxy.rs | 18 +- consensus/tests/history.rs | 23 +- consensus/tests/history_sync.rs | 11 +- consensus/tests/state_live_sync.rs | 11 +- consensus/tests/sync_utils.rs | 19 +- lib/src/client.rs | 7 +- primitives/Cargo.toml | 5 +- primitives/block/Cargo.toml | 2 +- primitives/block/src/block.rs | 24 +- primitives/src/slots_allocation.rs | 2 +- test-utils/src/node.rs | 9 +- utils/Cargo.toml | 1 + utils/src/interner.rs | 208 ++++++++++++++++++ utils/src/lib.rs | 1 + validator/src/proposal_buffer.rs | 5 +- web-client/tests/wasm.rs | 7 +- 31 files changed, 389 insertions(+), 368 deletions(-) delete mode 100644 bls/src/cache.rs delete mode 100644 bls/tests/cache.rs create mode 100644 consensus/src/bls_cache.rs create mode 100644 utils/src/interner.rs diff --git a/Cargo.lock b/Cargo.lock index f83a27b644..ec23606c8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4992,6 +4992,7 @@ dependencies = [ "nimiq-serde", "nimiq-test-log", "nimiq-test-utils", + "parking_lot", "pin-project", "rand", "rand_core", diff --git a/blockchain/tests/signed.rs b/blockchain/tests/signed.rs index c222613cdf..3b2b5e06b3 100644 --- a/blockchain/tests/signed.rs +++ b/blockchain/tests/signed.rs @@ -5,7 +5,7 @@ use nimiq_block::{ }; use nimiq_blockchain::{Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::AbstractBlockchain; -use nimiq_bls::{lazy::LazyPublicKey, AggregateSignature, KeyPair}; +use nimiq_bls::{AggregateSignature, KeyPair, LazyPublicKey as BlsLazyPublicKey}; use nimiq_collections::bitset::BitSet; use nimiq_database::mdbx::MdbxDatabase; use nimiq_keys::{Address, Ed25519PublicKey}; @@ -56,7 +56,7 @@ fn test_skip_block_single_signature() { // verify skip block proof let validators = Validators::new(vec![Validator::new( Address::default(), - LazyPublicKey::from(key_pair.public_key), + BlsLazyPublicKey::from(key_pair.public_key), Ed25519PublicKey::from([0u8; 32]), 0..Policy::SLOTS, )]); diff --git a/bls/Cargo.toml b/bls/Cargo.toml index 4869f7ccea..a701badf36 100644 --- a/bls/Cargo.toml +++ b/bls/Cargo.toml @@ -39,7 +39,5 @@ nimiq-test-log = { workspace = true } nimiq-test-utils = { workspace = true } [features] -cache = ["lazy"] -default = ["lazy", "serde-derive"] -lazy = ["parking_lot"] +default = ["serde-derive"] serde-derive = ["nimiq-serde", "serde"] diff --git a/bls/src/cache.rs b/bls/src/cache.rs deleted file mode 100644 index 2cb61b6f7e..0000000000 --- a/bls/src/cache.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::collections::HashMap; - -use crate::{lazy::LazyPublicKey, CompressedPublicKey, PublicKey}; - -/// An implementation of a max capacity cache using a hashmap for the public keys. -/// The replacement policy in use removes an arbitrary element. -pub struct PublicKeyCache { - // FIXME: Change to a map with good caching strategy. - cache: HashMap, - max_capacity: usize, -} - -impl PublicKeyCache { - /// Creates a new cache with the specified maximum capacity. - pub fn new(max_capacity: usize) -> Self { - PublicKeyCache { - cache: HashMap::with_capacity(max_capacity), - max_capacity, - } - } - - /// Gets the corresponding uncompressed key by retrieving it from the cache. - /// If the value isn't cached, it uncompresses the pk and caches it. - pub fn get_or_uncompress(&mut self, compressed_key: &CompressedPublicKey) -> Option { - // First check if we have the uncompressed key cached. - if let Some(uncompressed_key) = self.cache.get(compressed_key) { - Some(*uncompressed_key) - } else { - // If not, we try uncompressing it. - let uncompressed_key = compressed_key.uncompress().ok(); - if let Some(uncompressed_key) = uncompressed_key { - // Upon success, we store the uncompressed key. - self.put_if_absent(compressed_key.clone(), uncompressed_key); - } - uncompressed_key - } - } - - /// Gets the corresponding uncompressed key by retrieving it from the lazy key cache and storing it on this cache. - /// If there is no lazy cached uncompressed pk, it will do the same behavior as in `uncompress`. - pub fn get_or_uncompress_lazy_public_key(&mut self, compressed_key: &LazyPublicKey) { - let mut uncompressed_key = compressed_key.cache.write(); - - // If the lazy public key is already uncompressed, we store the result in our cache. - if let Some(key) = uncompressed_key.as_ref() { - self.put_if_absent(compressed_key.compressed.clone(), *key); - } else { - *uncompressed_key = self.get_or_uncompress(&compressed_key.compressed); - } - } - - /// Put if absent for the uncompressed key. - fn put_if_absent(&mut self, compressed_key: CompressedPublicKey, uncompressed_key: PublicKey) { - // Only add to cache if not present yet. - if !self.cache.contains_key(&compressed_key) { - // If the capacity is reached, we delete a key. - // Currently, it is just the first key in the iterator. - if self.cache.len() == self.max_capacity { - let key_to_delete = self.cache.keys().next().unwrap().clone(); - self.cache.remove(&key_to_delete); - } - self.cache.insert(compressed_key, uncompressed_key); - } - } - - /// Returns the number of elements inside the cache. - pub fn len(&self) -> usize { - self.cache.len() - } - - /// Returns true if cache has no elements inside. - pub fn is_empty(&self) -> bool { - self.cache.is_empty() - } -} diff --git a/bls/src/lazy.rs b/bls/src/lazy.rs index 45c56ee910..9dbdcb31c6 100644 --- a/bls/src/lazy.rs +++ b/bls/src/lazy.rs @@ -1,117 +1,116 @@ -use std::{cmp::Ordering, fmt}; +use std::{ + cmp::Ordering, + fmt, + hash::Hasher, + sync::{Arc, OnceLock}, +}; use nimiq_hash::Hash; -use parking_lot::{ - MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard, -}; +use nimiq_utils::interner::{Interned, Interner, InternerKey}; use crate::{CompressedPublicKey, PublicKey, SigHash, Signature}; -pub struct LazyPublicKey { - pub(crate) compressed: CompressedPublicKey, - pub(crate) cache: RwLock>, +static CACHE: OnceLock> = OnceLock::new(); + +impl Interned for LazyPublicKey { + type Key = CompressedPublicKey; + fn from_interner_key(compressed: InternerKey) -> LazyPublicKey { + LazyPublicKey(Arc::new(LazyPublicKeyInner { + compressed, + uncompressed: OnceLock::new(), + })) + } + fn as_interner_key(&self) -> &InternerKey { + &self.0.compressed + } + fn interned_strong_count(&self) -> usize { + Arc::strong_count(&self.0) + } +} + +impl Drop for LazyPublicKey { + fn drop(&mut self) { + InternerKey::notify_deletion(&self.0.compressed, Arc::strong_count(&self.0)) + } } +#[derive(Clone)] +pub struct LazyPublicKey(Arc); + impl fmt::Debug for LazyPublicKey { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - write!(f, "LazyPublicKey({})", self.compressed) + f.debug_tuple("LazyPublicKey") + .field(self.compressed()) + .finish() } } impl fmt::Display for LazyPublicKey { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { - fmt::Display::fmt(&self.compressed, f) + fmt::Display::fmt(self.compressed(), f) } } -impl Clone for LazyPublicKey { - fn clone(&self) -> Self { - LazyPublicKey { - compressed: self.compressed.clone(), - cache: RwLock::new(*self.cache.read()), - } - } +struct LazyPublicKeyInner { + compressed: InternerKey, // Essentially a `CompressedPublicKey` + uncompressed: OnceLock>, } impl PartialEq for LazyPublicKey { - fn eq(&self, other: &LazyPublicKey) -> bool { - self.compressed.eq(&other.compressed) + fn eq(&self, other: &Self) -> bool { + self.compressed().eq(&other.compressed()) } } impl Eq for LazyPublicKey {} -impl PartialOrd for LazyPublicKey { - fn partial_cmp(&self, other: &LazyPublicKey) -> Option { - Some(self.cmp(other)) +impl std::hash::Hash for LazyPublicKey { + fn hash(&self, state: &mut H) { + std::hash::Hash::hash(self.compressed(), state) } } -impl Ord for LazyPublicKey { - fn cmp(&self, other: &Self) -> Ordering { - self.compressed.cmp(&other.compressed) +impl PartialOrd for LazyPublicKey { + fn partial_cmp(&self, other: &Self) -> Option { + self.compressed().partial_cmp(other.compressed()) } } -impl AsRef<[u8]> for LazyPublicKey { - fn as_ref(&self) -> &[u8] { - self.compressed.as_ref() +impl Ord for LazyPublicKey { + fn cmp(&self, other: &Self) -> Ordering { + self.compressed().cmp(other.compressed()) } } impl LazyPublicKey { pub fn from_compressed(compressed: &CompressedPublicKey) -> Self { - LazyPublicKey { - compressed: compressed.clone(), - cache: RwLock::new(None), - } + CACHE.get_or_init(Default::default).intern(compressed) } - pub fn uncompress(&self) -> Option> { - let read_guard: RwLockReadGuard>; - - let upgradable = self.cache.upgradable_read(); - if upgradable.is_some() { - // Fast path, downgrade and return - read_guard = RwLockUpgradableReadGuard::downgrade(upgradable); - } else { - // Slow path, upgrade, write, downgrade and return - let mut upgraded = RwLockUpgradableReadGuard::upgrade(upgradable); - *upgraded = Some(match self.compressed.uncompress() { - Ok(p) => p, - _ => return None, - }); - read_guard = RwLockWriteGuard::downgrade(upgraded); - } - - Some(RwLockReadGuard::map(read_guard, |opt| { - opt.as_ref().unwrap() - })) - } - - pub fn uncompress_unchecked(&self) -> MappedRwLockReadGuard { - self.uncompress().expect("Invalid public key") + pub fn uncompress(&self) -> Option<&PublicKey> { + self.0 + .uncompressed + .get_or_init(|| self.0.compressed.uncompress().ok()) + .as_ref() } pub fn compressed(&self) -> &CompressedPublicKey { - &self.compressed + &self.0.compressed } pub fn has_uncompressed(&self) -> bool { - self.cache.read().is_some() + self.0.uncompressed.get().is_some() } pub fn verify(&self, msg: &M, signature: &Signature) -> bool { - let cached = self.uncompress(); - if let Some(public_key) = cached.as_ref() { + if let Some(public_key) = self.uncompress() { return public_key.verify(msg, signature); } false } pub fn verify_hash(&self, hash: SigHash, signature: &Signature) -> bool { - let cached = self.uncompress(); - if let Some(public_key) = cached.as_ref() { + if let Some(public_key) = self.uncompress() { return public_key.verify_hash(hash, signature); } false @@ -120,22 +119,22 @@ impl LazyPublicKey { impl From for LazyPublicKey { fn from(key: PublicKey) -> Self { - LazyPublicKey { - compressed: key.compress(), - cache: RwLock::new(Some(key)), - } + let result = LazyPublicKey::from_compressed(&key.compress()); + // TODO: this might block + let _ = result.0.uncompressed.set(Some(key)); + result } } impl From for LazyPublicKey { - fn from(key: CompressedPublicKey) -> Self { - LazyPublicKey::from_compressed(&key) + fn from(compressed: CompressedPublicKey) -> Self { + LazyPublicKey::from_compressed(&compressed) } } impl From for CompressedPublicKey { fn from(key: LazyPublicKey) -> Self { - key.compressed + key.0.compressed.as_ref().clone() } } @@ -155,7 +154,7 @@ mod serialization { where S: serde::Serializer, { - Serialize::serialize(&self.compressed, serializer) + Serialize::serialize(&self.compressed(), serializer) } } diff --git a/bls/src/lib.rs b/bls/src/lib.rs index 8257051ee1..c45272f65d 100644 --- a/bls/src/lib.rs +++ b/bls/src/lib.rs @@ -1,9 +1,9 @@ +pub use lazy::LazyPublicKey; use nimiq_hash::Blake2sHash; pub use types::*; // Implements the LazyPublicKey type. Which is a faster, cached version of PublicKey. -#[cfg(feature = "lazy")] -pub mod lazy; +mod lazy; // Implements all of the types needed to do BLS signatures. mod types; @@ -11,10 +11,6 @@ mod types; // Specifies the hash algorithm used for signatures pub type SigHash = Blake2sHash; -// A simple cache implementation for the (un)compressed keys. -#[cfg(feature = "cache")] -pub mod cache; - // Implements the tagged-signing traits #[cfg(feature = "serde-derive")] mod tagged_signing; diff --git a/bls/tests/cache.rs b/bls/tests/cache.rs deleted file mode 100644 index 1d5c92b15a..0000000000 --- a/bls/tests/cache.rs +++ /dev/null @@ -1,76 +0,0 @@ -use std::cmp; - -use nimiq_bls::{cache::PublicKeyCache, lazy::LazyPublicKey, KeyPair}; -use nimiq_test_utils::test_rng::test_rng; -use nimiq_utils::key_rng::SecureGenerate; - -#[test] -fn removes_items_on_low_capacity() { - let mut cache = PublicKeyCache::new(2); - - assert_eq!(cache.len(), 0, "should start empty"); - - let rng = &mut test_rng(false); - - for i in 0..3 { - let keypair = KeyPair::generate(rng); - assert_eq!( - cache - .get_or_uncompress(&keypair.public_key.compress()) - .unwrap(), - keypair.public_key - ); - assert_eq!(cache.len(), cmp::min(2, i + 1), "should enforce maximum"); - } -} - -#[test] -fn can_update_lazy_public_key() { - let mut cache = PublicKeyCache::new(2); - - assert_eq!(cache.len(), 0, "should start empty"); - - let rng = &mut test_rng(false); - - let keypair = KeyPair::generate(rng); - assert_eq!( - cache - .get_or_uncompress(&keypair.public_key.compress()) - .unwrap(), - keypair.public_key - ); - - let lazy_key = LazyPublicKey::from_compressed(&keypair.public_key.compress()); - - cache.get_or_uncompress_lazy_public_key(&lazy_key); - - assert!( - lazy_key.has_uncompressed(), - "should be uncompressed from cache" - ); -} - -#[test] -fn does_not_store_duplicates() { - let mut cache = PublicKeyCache::new(2); - - assert_eq!(cache.len(), 0, "should start empty"); - - let rng = &mut test_rng(false); - - let keypair = KeyPair::generate(rng); - assert_eq!( - cache - .get_or_uncompress(&keypair.public_key.compress()) - .unwrap(), - keypair.public_key - ); - - assert_eq!( - cache - .get_or_uncompress(&keypair.public_key.compress()) - .unwrap(), - keypair.public_key - ); - assert_eq!(cache.len(), 1, "should not store duplicates"); -} diff --git a/consensus/src/bls_cache.rs b/consensus/src/bls_cache.rs new file mode 100644 index 0000000000..ec5c00fd69 --- /dev/null +++ b/consensus/src/bls_cache.rs @@ -0,0 +1,25 @@ +use nimiq_bls::LazyPublicKey as BlsLazyPublicKey; +use nimiq_primitives::policy::Policy; + +// TODO: implement some caching strategy +pub struct BlsCache; + +impl Default for BlsCache { + fn default() -> BlsCache { + BlsCache::with_capacity(Policy::BLS_CACHE_MAX_CAPACITY) + } +} + +impl BlsCache { + fn with_capacity(capacity: usize) -> BlsCache { + let _ = capacity; + BlsCache + } + pub fn new_test() -> BlsCache { + BlsCache::with_capacity(100) + } +} + +impl BlsCache { + pub fn cache(&mut self, _data: &BlsLazyPublicKey) {} +} diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index e44dc75202..e66704e46a 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -1,9 +1,12 @@ #[macro_use] extern crate log; +pub use bls_cache::BlsCache; pub use consensus::{consensus_proxy::ConsensusProxy, Consensus, ConsensusEvent, RemoteEvent}; pub use error::{Error, SubscribeToAddressesError}; +mod bls_cache; + pub mod consensus; pub mod error; pub mod messages; diff --git a/consensus/src/sync/live/block_queue/live_sync.rs b/consensus/src/sync/live/block_queue/live_sync.rs index 3315feaa6b..571e24869c 100644 --- a/consensus/src/sync/live/block_queue/live_sync.rs +++ b/consensus/src/sync/live/block_queue/live_sync.rs @@ -11,19 +11,21 @@ use futures::{ }; use nimiq_blockchain_interface::{PushError, PushResult}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_hash::Blake2bHash; use nimiq_network_interface::network::Network; use nimiq_utils::WakerExt; use parking_lot::Mutex; use super::{BlockAndSource, QueuedBlock}; -use crate::sync::{ - live::{ - block_queue::queue::BlockQueue, - queue::{self, LiveSyncQueue}, +use crate::{ + sync::{ + live::{ + block_queue::queue::BlockQueue, + queue::{self, LiveSyncQueue}, + }, + syncer::{LiveSyncEvent, LiveSyncPeerEvent, LiveSyncPushEvent}, }, - syncer::{LiveSyncEvent, LiveSyncPeerEvent, LiveSyncPushEvent}, + BlsCache, }; pub enum PushOpResult { @@ -44,7 +46,7 @@ impl LiveSyncQueue for BlockQueue { fn push_queue_result( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, result: Self::QueueResult, ) -> VecDeque> { let mut future_results = VecDeque::new(); diff --git a/consensus/src/sync/live/block_queue/proxy.rs b/consensus/src/sync/live/block_queue/proxy.rs index 3e2a738e96..053b288306 100644 --- a/consensus/src/sync/live/block_queue/proxy.rs +++ b/consensus/src/sync/live/block_queue/proxy.rs @@ -9,7 +9,6 @@ use std::{ use futures::{future::BoxFuture, Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_hash::Blake2bHash; use nimiq_network_interface::network::Network; use nimiq_utils::spawn; @@ -32,6 +31,7 @@ use crate::{ }, syncer::LiveSyncEvent, }, + BlsCache, }; pub struct BlockQueueProxy { @@ -109,7 +109,7 @@ impl LiveSyncQueue for BlockQueueProxy { fn push_queue_result( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, result: Self::QueueResult, ) -> VecDeque> { BlockQueue::push_queue_result(network, blockchain, bls_cache, result) diff --git a/consensus/src/sync/live/mod.rs b/consensus/src/sync/live/mod.rs index 3c8bac76be..8ea1d6760c 100644 --- a/consensus/src/sync/live/mod.rs +++ b/consensus/src/sync/live/mod.rs @@ -8,7 +8,6 @@ use std::{ use futures::{future::BoxFuture, Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_network_interface::network::Network; use parking_lot::Mutex; use tokio::sync::mpsc; @@ -21,6 +20,7 @@ use super::syncer::{LiveSync, LiveSyncEvent}; use crate::{ consensus::ResolveBlockRequest, sync::live::block_queue::{BlockAndSource, BlockSource}, + BlsCache, }; pub mod block_queue; @@ -48,7 +48,7 @@ pub struct LiveSyncer> { pending: VecDeque>, /// Cache for BLS public keys to avoid repetitive uncompressing. - bls_cache: Arc>, + bls_cache: Arc>, /// Channel used to communicate additional blocks to the queue. /// We use this to wake up the queue and pass in new, unknown blocks @@ -61,7 +61,7 @@ impl> LiveSyncer { blockchain: BlockchainProxy, network: Arc, mut queue: Q, - bls_cache: Arc>, + bls_cache: Arc>, ) -> Self { let (tx, rx) = mpsc::channel(MAX_BLOCK_STREAM_BUFFER); queue.add_block_stream(ReceiverStream::new(rx)); diff --git a/consensus/src/sync/live/queue.rs b/consensus/src/sync/live/queue.rs index 760fbfab6a..467f70c935 100644 --- a/consensus/src/sync/live/queue.rs +++ b/consensus/src/sync/live/queue.rs @@ -12,7 +12,6 @@ use nimiq_blockchain::{Blockchain, PostValidationHook}; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_interface::{ChunksPushError, ChunksPushResult, PushError, PushResult}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_hash::Blake2bHash; use nimiq_light_blockchain::LightBlockchain; use nimiq_network_interface::network::{MsgAcceptance, Network}; @@ -32,6 +31,7 @@ use crate::{ live::block_queue::{BlockAndSource, BlockSource}, syncer::LiveSyncEvent, }, + BlsCache, }; async fn spawn_blocking R + Send + 'static>(f: F) -> R { @@ -89,7 +89,7 @@ pub trait LiveSyncQueue: Stream + Send + U fn push_queue_result( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, result: Self::QueueResult, ) -> VecDeque>; @@ -218,7 +218,7 @@ impl BlockchainPushResult { pub async fn push_block_and_chunks( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, block: Block, block_source: BlockSource, diff: Option, @@ -256,7 +256,7 @@ pub async fn push_block_and_chunks( pub async fn push_block_only( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, block: Block, block_source: BlockSource, ) -> (Result, Blake2bHash) { @@ -286,7 +286,7 @@ pub async fn push_block_only( /// because an invalid block automatically invalidates the remainder of the sequence. pub async fn push_multiple_blocks_impl( blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, blocks: Vec<(BlockAndSource, Option, Vec>)>, ) -> ( Result, @@ -366,7 +366,7 @@ pub async fn push_multiple_blocks_impl( pub async fn push_multiple_blocks_with_chunks( blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, blocks: Vec<(BlockAndSource, Option, Vec>)>, ) -> ( Result, @@ -382,7 +382,7 @@ pub async fn push_multiple_blocks_with_chunks( /// because an invalid block automatically invalidates the remainder of the sequence. pub async fn push_multiple_blocks( blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, blocks: Vec>, ) -> ( Result, @@ -404,7 +404,7 @@ pub async fn push_multiple_blocks( #[cfg(feature = "full")] pub async fn push_chunks_only( blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, chunks: Vec>, ) -> (Result, Blake2bHash) { let push_results = @@ -441,6 +441,18 @@ impl PostValidationHook for MessageValidator { } } +fn update_cache(block: &Block, bls_cache: &mut BlsCache) { + let Block::Macro(block) = block else { + return; + }; + let Some(validators) = &block.header.validators else { + return; + }; + for validator in validators.iter() { + bls_cache.cache(&validator.voting_key); + } +} + /// Pushes the a single block and the respective chunks into the blockchain. If a light /// blockchain was supplied, no chunks are committed. /// The return value consists of the result of pushing the block, the error of pushing @@ -449,7 +461,7 @@ impl PostValidationHook for MessageValidator { /// Note: this function doesn't prevent a new block from being committed before applying the chunks. fn blockchain_push( blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, block: Option, diff: Option, chunks: Vec>, @@ -463,8 +475,7 @@ fn blockchain_push( let blockchain_push_result; if let Some(block) = block { let block_hash = block.hash(); - // Update validator keys from BLS public key cache. - block.update_validator_keys(&mut bls_cache.lock()); + update_cache(&block, &mut bls_cache.lock()); match blockchain { #[cfg(feature = "full")] BlockchainProxy::Full(ref blockchain) => { diff --git a/consensus/src/sync/live/state_queue/live_sync.rs b/consensus/src/sync/live/state_queue/live_sync.rs index 8811463975..731fed2e90 100644 --- a/consensus/src/sync/live/state_queue/live_sync.rs +++ b/consensus/src/sync/live/state_queue/live_sync.rs @@ -9,7 +9,6 @@ use futures::{ }; use nimiq_blockchain_interface::{ChunksPushError, ChunksPushResult, PushError, PushResult}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_hash::Blake2bHash; use nimiq_network_interface::network::Network; use parking_lot::Mutex; @@ -24,6 +23,7 @@ use crate::{ }, syncer::{LiveSyncEvent, LiveSyncPeerEvent, LiveSyncPushEvent}, }, + BlsCache, }; pub enum PushOpResult { @@ -103,7 +103,7 @@ impl LiveSyncQueue for StateQueue { fn push_queue_result( network: Arc, blockchain: BlockchainProxy, - bls_cache: Arc>, + bls_cache: Arc>, result: Self::QueueResult, ) -> VecDeque> { let mut future_results = VecDeque::new(); diff --git a/consensus/src/sync/syncer_proxy.rs b/consensus/src/sync/syncer_proxy.rs index 4d82ae8abd..9bc4f83e41 100644 --- a/consensus/src/sync/syncer_proxy.rs +++ b/consensus/src/sync/syncer_proxy.rs @@ -9,7 +9,6 @@ use std::{ use futures::{Stream, StreamExt}; use nimiq_block::Block; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_network_interface::network::{Network, SubscribeEvents}; #[cfg(feature = "full")] use nimiq_primitives::policy::Policy; @@ -33,6 +32,7 @@ use crate::{ }, syncer::{LiveSyncPushEvent, Syncer}, }, + BlsCache, }; macro_rules! gen_syncer_match { @@ -66,7 +66,7 @@ impl SyncerProxy { pub async fn new_history( blockchain_proxy: BlockchainProxy, network: Arc, - bls_cache: Arc>, + bls_cache: Arc>, network_event_rx: SubscribeEvents, ) -> Self { assert!( @@ -108,7 +108,7 @@ impl SyncerProxy { pub async fn new_full( blockchain_proxy: BlockchainProxy, network: Arc, - bls_cache: Arc>, + bls_cache: Arc>, zkp_component_proxy: ZKPComponentProxy, network_event_rx: SubscribeEvents, full_sync_threshold: u32, @@ -168,7 +168,7 @@ impl SyncerProxy { pub async fn new_light( blockchain_proxy: BlockchainProxy, network: Arc, - bls_cache: Arc>, + bls_cache: Arc>, zkp_component_proxy: ZKPComponentProxy, network_event_rx: SubscribeEvents, ) -> Self { diff --git a/consensus/tests/consensus_proxy.rs b/consensus/tests/consensus_proxy.rs index dd4d6886cd..5286624cc9 100644 --- a/consensus/tests/consensus_proxy.rs +++ b/consensus/tests/consensus_proxy.rs @@ -2,19 +2,15 @@ use std::{str::FromStr, sync::Arc}; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; -use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, Consensus}; +use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, BlsCache, Consensus}; use nimiq_database::mdbx::MdbxDatabase; use nimiq_keys::{Address, KeyPair, PrivateKey}; use nimiq_network_interface::network::Network; use nimiq_network_mock::MockHub; use nimiq_primitives::{networks::NetworkId, policy::Policy}; use nimiq_test_log::test; -use nimiq_test_utils::{ - blockchain::{ - fill_micro_blocks_with_txns, produce_macro_blocks, signing_key, voting_key, REWARD_KEY, - }, - node::TESTING_BLS_CACHE_MAX_CAPACITY, +use nimiq_test_utils::blockchain::{ + fill_micro_blocks_with_txns, produce_macro_blocks, signing_key, voting_key, REWARD_KEY, }; use nimiq_transaction::{ historic_transaction::HistoricTransactionData, ExecutedTransaction, TransactionFormat, @@ -54,9 +50,7 @@ async fn test_request_transactions_by_address() { let syncer1 = SyncerProxy::new_history( blockchain1_proxy.clone(), Arc::clone(&net1), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), net1.subscribe_events(), ) .await; @@ -73,9 +67,7 @@ async fn test_request_transactions_by_address() { let syncer2 = SyncerProxy::new_history( blockchain1_proxy.clone(), Arc::clone(&net2), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), net2.subscribe_events(), ) .await; diff --git a/consensus/tests/history.rs b/consensus/tests/history.rs index c854d3c154..8a43a060b8 100644 --- a/consensus/tests/history.rs +++ b/consensus/tests/history.rs @@ -9,13 +9,13 @@ use nimiq_block::Block; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::{AbstractBlockchain, Direction}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_consensus::{ messages::{RequestMissingBlocks, ResponseBlocks}, sync::{ live::{block_queue::BlockQueue, queue::QueueConfig, BlockLiveSync}, syncer::{LiveSync, MacroSync, MacroSyncReturn, Syncer}, }, + BlsCache, }; use nimiq_database::mdbx::MdbxDatabase; use nimiq_network_interface::{network::Network, request::RequestCommon}; @@ -27,7 +27,6 @@ use nimiq_test_utils::{ next_micro_block, produce_macro_blocks, push_micro_block, signing_key, voting_key, }, mock_node::MockNode, - node::TESTING_BLS_CACHE_MAX_CAPACITY, test_rng::test_rng, }; use nimiq_utils::time::OffsetTime; @@ -77,12 +76,6 @@ fn blockchain() -> Arc> { )) } -fn bls_cache() -> Arc> { - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))) -} - #[test(tokio::test)] async fn send_single_micro_block_to_block_queue() { let blockchain2 = blockchain(); @@ -103,7 +96,7 @@ async fn send_single_micro_block_to_block_queue() { blockchain_proxy.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -165,7 +158,7 @@ async fn send_two_micro_blocks_out_of_order() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -269,7 +262,7 @@ async fn send_micro_blocks_out_of_order() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -369,7 +362,7 @@ async fn send_invalid_block() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -479,7 +472,7 @@ async fn send_block_with_gap_and_respond_to_missing_request() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -569,7 +562,7 @@ async fn request_missing_blocks_across_macro_block() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( @@ -713,7 +706,7 @@ async fn put_peer_back_into_sync_mode() { blockchain_proxy_1.clone(), Arc::clone(&network), block_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); let mut syncer = Syncer::new( diff --git a/consensus/tests/history_sync.rs b/consensus/tests/history_sync.rs index 79554e5e87..69f4336ef8 100644 --- a/consensus/tests/history_sync.rs +++ b/consensus/tests/history_sync.rs @@ -4,7 +4,6 @@ use futures::StreamExt; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_consensus::{ consensus::Consensus, sync::{ @@ -14,6 +13,7 @@ use nimiq_consensus::{ }, syncer_proxy::SyncerProxy, }, + BlsCache, }; use nimiq_database::mdbx::MdbxDatabase; use nimiq_genesis::NetworkId; @@ -24,7 +24,6 @@ use nimiq_primitives::policy::Policy; use nimiq_test_log::test; use nimiq_test_utils::{ blockchain::{produce_macro_blocks, signing_key, voting_key}, - node::TESTING_BLS_CACHE_MAX_CAPACITY, test_network::TestNetwork, }; use nimiq_time::sleep; @@ -205,9 +204,7 @@ async fn sync_ingredients() { let syncer1 = SyncerProxy::new_history( blockchain1_proxy.clone(), Arc::clone(&net1), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), net1.subscribe_events(), ) .await; @@ -244,9 +241,7 @@ async fn sync_ingredients() { let syncer2 = SyncerProxy::new_history( blockchain2_proxy.clone(), Arc::clone(&net2), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), net2.subscribe_events(), ) .await; diff --git a/consensus/tests/state_live_sync.rs b/consensus/tests/state_live_sync.rs index d13567b5e4..3dcfde9883 100644 --- a/consensus/tests/state_live_sync.rs +++ b/consensus/tests/state_live_sync.rs @@ -6,7 +6,6 @@ use nimiq_block::Block; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::{AbstractBlockchain, PushResult}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_consensus::{ messages::RequestMissingBlocks, sync::{ @@ -19,6 +18,7 @@ use nimiq_consensus::{ }, syncer::{LiveSync, LiveSyncEvent, LiveSyncPeerEvent, LiveSyncPushEvent}, }, + BlsCache, }; use nimiq_database::{ mdbx::MdbxDatabase, @@ -40,7 +40,6 @@ use nimiq_test_utils::{ block_production::TemporaryBlockProducer, blockchain::{produce_macro_blocks, push_micro_block, signing_key, voting_key}, mock_node::MockNode, - node::TESTING_BLS_CACHE_MAX_CAPACITY, }; use nimiq_transaction::ExecutedTransaction; use nimiq_transaction_builder::TransactionBuilder; @@ -75,12 +74,6 @@ fn blockchain(complete: bool) -> Blockchain { blockchain } -fn bls_cache() -> Arc> { - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))) -} - fn get_incomplete_live_sync( hub: &mut MockHub, ) -> ( @@ -115,7 +108,7 @@ fn get_incomplete_live_sync( incomplete_blockchain_proxy, Arc::clone(&network), state_queue, - bls_cache(), + Arc::new(Mutex::new(BlsCache::new_test())), ); (incomplete_blockchain, live_sync, network, block_tx) diff --git a/consensus/tests/sync_utils.rs b/consensus/tests/sync_utils.rs index 10023fc20f..d060ba8d20 100644 --- a/consensus/tests/sync_utils.rs +++ b/consensus/tests/sync_utils.rs @@ -4,11 +4,11 @@ use futures::{future, StreamExt}; use nimiq_blockchain::{BlockProducer, Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::{AbstractBlockchain, BlockchainEvent, Direction}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; use nimiq_consensus::{ consensus::Consensus, messages::{BlockBodyTopic, BlockHeaderMessage, BlockHeaderTopic}, sync::{syncer::MacroSyncReturn, syncer_proxy::SyncerProxy}, + BlsCache, }; use nimiq_database::mdbx::MdbxDatabase; use nimiq_genesis::NetworkId; @@ -19,7 +19,6 @@ use nimiq_network_mock::MockHub; use nimiq_primitives::policy::Policy; use nimiq_test_utils::{ blockchain::{produce_macro_blocks_with_txns, signing_key, voting_key}, - node::TESTING_BLS_CACHE_MAX_CAPACITY, test_network::TestNetwork, }; use nimiq_utils::{spawn, time::OffsetTime}; @@ -45,9 +44,7 @@ async fn syncer( SyncerProxy::new_history( blockchain.clone(), Arc::clone(network), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), network.subscribe_events(), ) .await @@ -56,9 +53,7 @@ async fn syncer( SyncerProxy::new_full( blockchain.clone(), Arc::clone(network), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), zkp_prover.proxy(), network.subscribe_events(), 0, @@ -69,9 +64,7 @@ async fn syncer( SyncerProxy::new_light( blockchain.clone(), Arc::clone(network), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), zkp_prover.proxy(), network.subscribe_events(), ) @@ -116,9 +109,7 @@ pub async fn sync_two_peers( let syncer1 = SyncerProxy::new_history( blockchain1_proxy.clone(), Arc::clone(&net1), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), net1.subscribe_events(), ) .await; diff --git a/lib/src/client.rs b/lib/src/client.rs index 281dd1e2e0..54a6f38b8a 100644 --- a/lib/src/client.rs +++ b/lib/src/client.rs @@ -6,11 +6,10 @@ use nimiq_block::Block; use nimiq_blockchain::{Blockchain, BlockchainConfig}; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; #[cfg(feature = "full-consensus")] use nimiq_consensus::Error::BlockchainError; use nimiq_consensus::{ - sync::syncer_proxy::SyncerProxy, Consensus as AbstractConsensus, + sync::syncer_proxy::SyncerProxy, BlsCache, Consensus as AbstractConsensus, ConsensusProxy as AbstractConsensusProxy, }; #[cfg(feature = "full-consensus")] @@ -371,9 +370,7 @@ impl ClientInner { config.database, )?; - let bls_cache = Arc::new(Mutex::new(PublicKeyCache::new( - Policy::BLS_CACHE_MAX_CAPACITY, - ))); + let bls_cache = Arc::new(Mutex::new(BlsCache::default())); #[cfg(feature = "full-consensus")] let mut blockchain_config = BlockchainConfig { diff --git a/primitives/Cargo.toml b/primitives/Cargo.toml index 63e790fa8b..7a6a9dba08 100644 --- a/primitives/Cargo.toml +++ b/primitives/Cargo.toml @@ -40,10 +40,7 @@ tsify = { git = "https://github.com/sisou/tsify", branch = "sisou/comments", def ], optional = true } wasm-bindgen = { version = "0.2", optional = true } -nimiq-bls = { workspace = true, features = [ - "lazy", - "serde-derive", -], optional = true } +nimiq-bls = { workspace = true, features = ["serde-derive"], optional = true } nimiq-database-value = { workspace = true, optional = true } nimiq-database-value-derive = { workspace = true, optional = true } nimiq-hash = { workspace = true } diff --git a/primitives/block/Cargo.toml b/primitives/block/Cargo.toml index d7f3f4e731..1a94ac797a 100644 --- a/primitives/block/Cargo.toml +++ b/primitives/block/Cargo.toml @@ -27,7 +27,7 @@ serde = "1.0" serde_repr = "0.1" thiserror = "2.0" -nimiq-bls = { workspace = true, features = ["cache", "serde-derive"]} +nimiq-bls = { workspace = true, features = ["serde-derive"] } nimiq-collections = { workspace = true } nimiq-database-value = { workspace = true } nimiq-database-value-derive = { workspace = true } diff --git a/primitives/block/src/block.rs b/primitives/block/src/block.rs index a31922125e..dafbc35556 100644 --- a/primitives/block/src/block.rs +++ b/primitives/block/src/block.rs @@ -1,6 +1,5 @@ use std::fmt; -use nimiq_bls::cache::PublicKeyCache; use nimiq_database_value_derive::DbSerializable; use nimiq_hash::{Blake2bHash, Blake2sHash, Hash}; use nimiq_keys::Ed25519PublicKey; @@ -11,9 +10,7 @@ use nimiq_serde::{Deserialize, Serialize, SerializedMaxSize}; use nimiq_transaction::ExecutedTransaction; use nimiq_vrf::VrfSeed; -use crate::{ - macro_block::MacroBlock, micro_block::MicroBlock, BlockError, MacroBody, MacroHeader, MicroBody, -}; +use crate::{macro_block::MacroBlock, micro_block::MicroBlock, BlockError, MacroBody, MicroBody}; /// Defines the type of the block, either Micro or Macro (which includes both checkpoint and /// election blocks). @@ -338,25 +335,6 @@ impl Block { } } - /// Updates validator keys from a public key cache. - // TODO remove this function - pub fn update_validator_keys(&self, cache: &mut PublicKeyCache) { - // Prepare validator keys from BLS cache. - if let Block::Macro(MacroBlock { - header: - MacroHeader { - validators: Some(validators), - .. - }, - .. - }) = self - { - for validator in validators.iter() { - cache.get_or_uncompress_lazy_public_key(&validator.voting_key); - } - } - } - /// Verifies the block. /// Note that only intrinsic verifications are performed and further checks /// are needed when completely verifying a block. diff --git a/primitives/src/slots_allocation.rs b/primitives/src/slots_allocation.rs index 3be7955c49..20748ed3bb 100644 --- a/primitives/src/slots_allocation.rs +++ b/primitives/src/slots_allocation.rs @@ -18,7 +18,7 @@ use std::{cmp::max, collections::BTreeMap, ops::Range, slice::Iter}; use ark_ec::CurveGroup; use ark_serialize::CanonicalSerialize; -use nimiq_bls::{lazy::LazyPublicKey as LazyBlsPublicKey, G2Projective, PublicKey as BlsPublicKey}; +use nimiq_bls::{G2Projective, LazyPublicKey as LazyBlsPublicKey, PublicKey as BlsPublicKey}; use nimiq_hash::{Hash, HashOutput}; use nimiq_keys::{Address, Ed25519PublicKey as SchnorrPublicKey}; #[cfg(feature = "parallel")] diff --git a/test-utils/src/node.rs b/test-utils/src/node.rs index 568cae8353..85c3cadbfa 100644 --- a/test-utils/src/node.rs +++ b/test-utils/src/node.rs @@ -3,8 +3,7 @@ use std::{path::PathBuf, sync::Arc}; use nimiq_block::Block; use nimiq_blockchain::{Blockchain, BlockchainConfig}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; -use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, Consensus}; +use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, BlsCache, Consensus}; use nimiq_database::mdbx::MdbxDatabase; use nimiq_genesis_builder::GenesisInfo; use nimiq_network_interface::network::Network as NetworkInterface; @@ -22,8 +21,6 @@ use crate::{ zkp_test_data::{zkp_test_exe, ZKP_TEST_KEYS_PATH}, }; -pub const TESTING_BLS_CACHE_MAX_CAPACITY: usize = 100; - pub struct Node { pub network: Arc, pub blockchain: Arc>, @@ -93,9 +90,7 @@ impl Node { let syncer = SyncerProxy::new_history( blockchain_proxy.clone(), Arc::clone(&network), - Arc::new(Mutex::new(PublicKeyCache::new( - TESTING_BLS_CACHE_MAX_CAPACITY, - ))), + Arc::new(Mutex::new(BlsCache::new_test())), network.subscribe_events(), ) .await; diff --git a/utils/Cargo.toml b/utils/Cargo.toml index 1d55dbabb8..e2117c599e 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -25,6 +25,7 @@ futures = { workspace = true, optional = true } hex = { version = "0.4", optional = true } libp2p-identity = { version = "0.2", optional = true } log = { workspace = true, optional = true } +parking_lot = "0.12" pin-project = "1.1" rand = { version = "0.8", optional = true } rand_core = { version = "0.6", optional = true } diff --git a/utils/src/interner.rs b/utils/src/interner.rs new file mode 100644 index 0000000000..9fabfe8514 --- /dev/null +++ b/utils/src/interner.rs @@ -0,0 +1,208 @@ +use std::{ + borrow::Borrow, + collections::HashSet, + hash::{Hash, Hasher}, + ops::Deref, + sync::{Arc, Weak}, +}; + +use parking_lot::RwLock; + +// TODO: document that you need to call notify_deletion in the `Drop` impl +#[allow(drop_bounds)] +pub trait Interned: Clone + Drop + Eq + Hash { + type Key: Clone + Eq + Hash; + fn from_interner_key(key: InternerKey) -> Self; + fn as_interner_key(&self) -> &InternerKey; + fn interned_strong_count(&self) -> usize; +} + +#[derive(Eq, Hash, PartialEq)] +struct Entry(I); + +/// Makes sure there's only one instance of an object `I` per key `I::Key`. +pub struct Interner(Arc>>); + +struct InternerInner { + cache: HashSet>, +} + +/// Needs to be used to store the key inside the interned value. +/// +/// Equivalent to a [`Interned::Key`]. +/// +/// On destruction of the interned value, [`InternerKey::notify_deletion`] +/// needs to be called. +pub struct InternerKey { + key: I::Key, + interner: Weak>>, +} + +impl InternerKey { + fn from_key_no_interner(key: I::Key) -> InternerKey { + InternerKey { + key, + interner: Weak::new(), + } + } +} + +impl Clone for InternerKey { + fn clone(&self) -> InternerKey { + InternerKey { + key: self.key.clone(), + interner: self.interner.clone(), + } + } +} + +impl Eq for InternerKey {} + +impl Hash for InternerKey { + fn hash(&self, state: &mut H) { + self.key.hash(state) + } +} + +impl PartialEq for InternerKey { + fn eq(&self, other: &Self) -> bool { + self.key.eq(&other.key) + } +} + +impl Deref for InternerKey { + type Target = I::Key; + fn deref(&self) -> &I::Key { + &self.key + } +} + +impl AsRef for InternerKey { + fn as_ref(&self) -> &I::Key { + &self.key + } +} + +impl Borrow> for Entry { + fn borrow(&self) -> &InternerKey { + self.0.as_interner_key() + } +} + +impl Default for Interner { + fn default() -> Interner { + Interner(Arc::new(RwLock::new(InternerInner { + cache: Default::default(), + }))) + } +} + +impl Interner { + pub fn intern(&self, key: &I::Key) -> I { + // Fast path: the key is already in the cache. + let tmp_key = InternerKey::from_key_no_interner(key.clone()); + if let Some(entry) = self.0.read().cache.get(&tmp_key) { + return entry.0.clone(); + } + // Slow path: we need to acquire a write lock for the cache. + let cache = &mut self.0.write().cache; + // We need to re-check whether the key got added already since we + // dropped the lock. + if let Some(entry) = cache.get(&tmp_key) { + return entry.0.clone(); + } + // If it's not in the cache yet, we need to create a new instance. + let interned = I::from_interner_key(InternerKey { + key: tmp_key.key, + interner: Arc::downgrade(&self.0), + }); + assert!(cache.insert(Entry(interned.clone()))); + interned + } + pub fn len(&self) -> usize { + self.0.read().cache.len() + } +} + +impl InternerKey { + pub fn notify_deletion(this: &Self, strong_count: usize) { + assert!(strong_count != 0); + // If the strong count is exactly two, it means that there is the copy + // in the cache and the copy who called `notify_deletion`. + if strong_count != 2 { + return; + } + // If the relevant interner is already gone, just ignore the message. + if let Some(inner) = this.interner.upgrade() { + let mut inner = inner.write(); + // If the interner doesn't have the key anymore, we might be in a + // recursive call to `notify_deletion` since we also drop an + // interned value in this function. + // + // Do nothing in this case. + let Some(entry) = inner + .cache + .take(&InternerKey::from_key_no_interner(this.key.clone())) + else { + return; + }; + // We need to check the strong count again, as there might have + // been calls to the interner from before we took the lock. + let strong_count = entry.0.interned_strong_count(); + if strong_count != 2 { + assert!(inner.cache.insert(entry)); + return; + } + // We need to drop the lock first because dropping `entry` is going + // to cause a recursive call. + drop(inner); + drop(entry); + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use super::{Interned, Interner, InternerKey}; + + #[derive(Clone, Eq, Hash, PartialEq)] + struct Value(Arc>); + + impl Interned for Value { + type Key = u32; + fn from_interner_key(key: InternerKey) -> Value { + Value(Arc::new(key)) + } + fn as_interner_key(&self) -> &InternerKey { + &self.0 + } + fn interned_strong_count(&self) -> usize { + Arc::strong_count(&self.0) + } + } + + impl Drop for Value { + fn drop(&mut self) { + InternerKey::notify_deletion(&self.0, Arc::strong_count(&self.0)); + } + } + + #[test] + fn interner() { + let interner: Interner = Interner::default(); + + let n0 = interner.intern(&0); + assert!(Arc::ptr_eq(&n0.0, &interner.intern(&0).0)); + } + + #[test] + fn interner_drop() { + let interner: Interner = Interner::default(); + let n0 = interner.intern(&0); + + drop(interner); + drop(n0); + } +} diff --git a/utils/src/lib.rs b/utils/src/lib.rs index f4c3f341de..4e2dbbd113 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -2,6 +2,7 @@ pub mod crc; #[cfg(feature = "key-store")] pub mod file_store; +pub mod interner; #[cfg(feature = "key-rng")] pub mod key_rng; pub mod math; diff --git a/validator/src/proposal_buffer.rs b/validator/src/proposal_buffer.rs index 4bd543c927..be81fd80dc 100644 --- a/validator/src/proposal_buffer.rs +++ b/validator/src/proposal_buffer.rs @@ -565,9 +565,8 @@ mod test { use nimiq_block::MacroHeader; use nimiq_blockchain::Blockchain; use nimiq_blockchain_proxy::BlockchainProxy; - use nimiq_bls::cache::PublicKeyCache; use nimiq_consensus::{ - sync::syncer_proxy::SyncerProxy, Consensus, ConsensusEvent, ConsensusProxy, + sync::syncer_proxy::SyncerProxy, BlsCache, Consensus, ConsensusEvent, ConsensusProxy, }; use nimiq_keys::{KeyPair as SchnorrKeyPair, PrivateKey as SchnorrPrivateKey}; use nimiq_network_interface::network::Network as NetworkInterface; @@ -604,7 +603,7 @@ mod test { let syncer = SyncerProxy::new_history( blockchain_proxy.clone(), Arc::clone(&net), - Arc::new(Mutex::new(PublicKeyCache::new(10))), + Arc::new(Mutex::new(BlsCache::new_test())), net.subscribe_events(), ) .await; diff --git a/web-client/tests/wasm.rs b/web-client/tests/wasm.rs index 4f97864041..9a5d1b03af 100644 --- a/web-client/tests/wasm.rs +++ b/web-client/tests/wasm.rs @@ -2,8 +2,7 @@ use std::{sync::Arc, task::Poll}; use futures::{poll, Stream, StreamExt}; use nimiq_blockchain_proxy::BlockchainProxy; -use nimiq_bls::cache::PublicKeyCache; -use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, Consensus}; +use nimiq_consensus::{sync::syncer_proxy::SyncerProxy, BlsCache, Consensus}; use nimiq_genesis::NetworkId; use nimiq_light_blockchain::LightBlockchain; use nimiq_network_interface::network::{Network, Topic}; @@ -30,9 +29,7 @@ pub async fn it_can_initialize_with_mock_network() { let zkp_component = ZKPComponent::new(blockchain_proxy.clone(), Arc::clone(&mock_network), None).await; - let bls_cache = Arc::new(Mutex::new(PublicKeyCache::new( - Policy::BLS_CACHE_MAX_CAPACITY, - ))); + let bls_cache = Arc::new(Mutex::new(BlsCache::default())); let network_events = mock_network.subscribe_events();