diff --git a/blockchain-interface/src/error.rs b/blockchain-interface/src/error.rs index 478c53cc91..1796ae511a 100644 --- a/blockchain-interface/src/error.rs +++ b/blockchain-interface/src/error.rs @@ -28,6 +28,23 @@ pub enum BlockchainEvent { EpochFinalized(Blake2bHash), } +impl BlockchainEvent { + /// Returns all added block hashes by this event + pub fn added_hashes(&self) -> Vec { + match self { + BlockchainEvent::EpochFinalized(_) => vec![], + BlockchainEvent::Extended(hash) => vec![hash.clone()], + BlockchainEvent::Finalized(_) => vec![], + BlockchainEvent::HistoryAdopted(hash) => vec![hash.clone()], + BlockchainEvent::Rebranched(adopted_blocks, _reverted_blocks) => adopted_blocks + .iter() + .map(|(hash, _block)| hash.clone()) + .collect(), + BlockchainEvent::Stored(block) => vec![block.hash()], + } + } +} + #[derive(Error, Debug, Clone, PartialEq, Eq)] pub enum BlockchainError { #[error("Invalid genesis block stored. Verify you are on the correct network or reset your consensus database.")] diff --git a/consensus/src/consensus/remote_data_store.rs b/consensus/src/consensus/remote_data_store.rs index 1bdcfee2d4..0bd4faf5b1 100644 --- a/consensus/src/consensus/remote_data_store.rs +++ b/consensus/src/consensus/remote_data_store.rs @@ -3,11 +3,14 @@ use std::{ sync::Arc, }; +use futures::StreamExt; use nimiq_account::{ Account, DataStoreReadOps, Staker, StakingContract, StakingContractStore, Tombstone, Validator, }; +use nimiq_block::Block; use nimiq_blockchain_interface::AbstractBlockchain; use nimiq_blockchain_proxy::BlockchainProxy; +use nimiq_hash::Blake2bHash; use nimiq_keys::Address; use nimiq_network_interface::{ network::{CloseReason, Network}, @@ -48,6 +51,69 @@ enum RemoteDataStoreOps { } impl RemoteDataStore { + const MAX_BLOCK_DRIFT: usize = 10; + + /// Waits for the block with given `block_hash` to be encountered and returns it. Will only wait for + /// [RemoteDataStore::MAX_BLOCK_DRIFT] events that add blocks. + /// + /// Returns None if the block cannot be retrieved even after waiting + async fn get_or_await_block( + blockchain: &BlockchainProxy, + block_hash: &Blake2bHash, + ) -> Option { + // Either get the block if it exists yet, or get the notifier stream. + // Blockchain must be dropped here, as it will be held across an await otherwise. + let blockchain_event_stream = { + let blockchain = blockchain.read(); + // First try to obtain, from our chain store, the block that was used to generate the proof + if let Ok(block) = blockchain.get_block(block_hash, false) { + return Some(block); + } + blockchain.notifier_as_stream() + }; + + // Wait for MAX_BLOCK_DRIFT events that add blocks, while checking if the added blocks contain the + // block hash in question + let item = blockchain_event_stream + .filter_map(|event| { + let hashes = event.added_hashes(); + if hashes.is_empty() { + std::future::ready(None) + } else { + std::future::ready(Some(hashes)) + } + }) + .take(Self::MAX_BLOCK_DRIFT) + .filter(|hashes| std::future::ready(hashes.contains(block_hash))) + .next() + .await; + + if item.is_none() { + // If we couldn't find the block within the events limit, then we cannot verify the proof + // A malicious peer could just send random hashes. + log::debug!( + block_hash = %block_hash, + max_block_drift = Self::MAX_BLOCK_DRIFT, + "Received an accounts proof, but we could not find the block that was used to generate the proof.", + ); + + return None; + }; + + // Obtain, from our chain store, the block that was used to generate the proof, which should now be available. + let blockchain = blockchain.read(); + if let Ok(block) = blockchain.get_block(block_hash, false) { + return Some(block); + } + + log::error!( + block_hash = %block_hash, + "Block hash was encountered, but could not be retrieved", + ); + + None + } + /// Gets a proof for a deserializable item in a remote accounts trie and returns /// the item if a valid proof was obtained or `None` if not. pub(crate) async fn get_trie( @@ -86,35 +152,32 @@ impl RemoteDataStore { match response { Ok(Ok(response)) => { - let blockchain = blockchain.read(); - // First try to obtain, from our chain store, the block that was used to generate the proof - let block = blockchain.get_block(&response.block_hash, false).ok(); + // Get the block referenced by the proof, or discard the proof as it cannot be verified + let Some(block) = + Self::get_or_await_block(&blockchain, &response.block_hash).await + else { + // If the block for the proof couldn't be found, use another peer + continue; + }; - if let Some(block) = block { - // Now we need to verify the proof - if let Ok(values) = response - .proof - .verify_values(block.state_root(), &keys.iter().collect::>()) - { - return Ok(values - .into_iter() - .map(|(key, value)| { - (key, value.map(|v| T::deserialize_from_vec(&v).unwrap())) - }) - .collect()); - } else { - // If the proof does not verify, we disconnect from the peer - log::warn!(%peer_id, "Banning peer because the accounts proof didn't verify"); - network - .disconnect_peer(peer_id, CloseReason::MaliciousPeer) - .await; - break; - } - } else { - // If we couldn't find the block, then we cannot verify the proof - // A malicious peer could just send random hashes. - log::debug!(block_hash = %response.block_hash, "Received an accounts proof, but we could not find the block that was used to generate the proof"); + // Now we need to verify the proof + if let Ok(values) = response + .proof + .verify_values(block.state_root(), &keys.iter().collect::>()) + { + return Ok(values + .into_iter() + .map(|(key, value)| { + (key, value.map(|v| T::deserialize_from_vec(&v).unwrap())) + }) + .collect()); } + + // If the proof does not verify, we disconnect from the peer + log::warn!(%peer_id, "Banning peer because the accounts proof didn't verify"); + network + .disconnect_peer(peer_id, CloseReason::MaliciousPeer) + .await; } Ok(Err(error)) => { // If there is no proof, then we just continue with the next peer