From 14ca9a056e73912f6575094511b978f46de69aaa Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 12 Nov 2024 13:24:54 +0100 Subject: [PATCH] complete run method --- crates/engine/tree/src/tree/root.rs | 141 +++++++++++++++++++++++++--- crates/trie/common/Cargo.toml | 1 + crates/trie/common/src/proofs.rs | 89 +++++++++++++++++- crates/trie/parallel/src/root.rs | 5 + 4 files changed, 221 insertions(+), 15 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 75d4dc62d2f68..dfb1879c9b0fe 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -1,5 +1,6 @@ //! State root task related functionality. +use reth_errors::ProviderResult; use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory}; use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProof, TrieInput}; use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError}; @@ -10,6 +11,7 @@ use std::{ mpsc::{self, Receiver, RecvError}, Arc, }, + time::{Duration, Instant}, }; use tracing::debug; @@ -63,6 +65,24 @@ impl StdReceiverStream { } } +type StateRootProofResult = (B256, MultiProof, TrieUpdates, Duration); +type StateRootProofReceiver = mpsc::Receiver>; + +enum StateRootTaskState { + Idle(MultiProof, B256), + Pending(MultiProof, StateRootProofReceiver), +} + +impl StateRootTaskState { + fn add_proofs(&mut self, proofs: MultiProof) { + match self { + Self::Idle(multiproof, _) | Self::Pending(multiproof, _) => { + multiproof.extend(proofs); + } + } + } +} + /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -165,24 +185,117 @@ where } fn run(mut self) -> StateRootResult { - while let Ok(update) = self.state_stream.recv() { - Self::on_state_update( - self.config.consistent_view.clone(), - self.config.input.clone(), - update, - &mut self.state, - &mut self.pending_proofs, - ); - } + let mut task_state = StateRootTaskState::Idle(MultiProof::default(), B256::default()); + let mut trie_updates = TrieUpdates::default(); + + loop { + // try to receive state updates without blocking + match self.state_stream.rx.try_recv() { + Ok(update) => { + debug!(target: "engine::root", len = update.len(), "Received new state update"); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + &mut self.pending_proofs, + ); + continue; + } + Err(mpsc::TryRecvError::Empty) => { + // no new state updates available, continue with other operations + } + Err(mpsc::TryRecvError::Disconnected) => { + // state stream closed, check if we can finish + if self.pending_proofs.is_empty() { + if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state { + return Ok((*state_root, trie_updates)); + } + } + } + } - // TODO: - // * keep track of proof calculation - // * keep track of intermediate root computation - // * return final state root result - Ok((B256::default(), TrieUpdates::default())) + // check pending proofs + while let Some(proof_rx) = self.pending_proofs.front() { + match proof_rx.try_recv() { + Ok(result) => { + let multiproof = result?; + task_state.add_proofs(multiproof); + self.pending_proofs.pop_front(); + continue; + } + Err(mpsc::TryRecvError::Empty) => { + // this proof is not ready yet + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + // channel was closed without sending a result + return Err(ParallelStateRootError::Other( + "Proof calculation task terminated unexpectedly".into(), + )); + } + } + } + + // handle task state transitions + match &mut task_state { + StateRootTaskState::Pending(multiproof, rx) => { + match rx.try_recv() { + Ok(result) => match result { + Ok((state_root, mut new_multiproof, new_trie_updates, elapsed)) => { + debug!(target: "engine::root", %state_root, ?elapsed, "Computed intermediate root"); + trie_updates.extend(new_trie_updates); + new_multiproof.extend(std::mem::take(multiproof)); + task_state = StateRootTaskState::Idle(new_multiproof, state_root); + continue; + } + Err(e) => return Err(ParallelStateRootError::Provider(e)), + }, + Err(mpsc::TryRecvError::Empty) => { + // root calculation not ready yet + } + Err(mpsc::TryRecvError::Disconnected) => { + return Err(ParallelStateRootError::Other( + "Root calculation task terminated unexpectedly".into(), + )); + } + } + } + StateRootTaskState::Idle(multiproof, _) => { + debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); + let view = self.config.consistent_view.clone(); + let input = self.config.input.clone(); + let multiproof = std::mem::take(multiproof); + let state = self.state.clone(); + let (tx, rx) = mpsc::sync_channel(1); + + rayon::spawn(move || { + let result = + calculate_state_root_from_proofs(view, input, multiproof, state); + let _ = tx.send(result); + }); + + task_state = StateRootTaskState::Pending(Default::default(), rx); + continue; + } + } + } } } +fn calculate_state_root_from_proofs( + _view: ConsistentDbView, + _input: Arc, + _multiproof: MultiProof, + _state: HashedPostState, +) -> ProviderResult<(B256, MultiProof, TrieUpdates, Duration)> +where + Factory: DatabaseProviderFactory + Clone, +{ + let started_at = Instant::now(); + Ok((B256::default(), MultiProof::default(), Default::default(), started_at.elapsed())) +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/trie/common/Cargo.toml b/crates/trie/common/Cargo.toml index 0616e2597109d..491f4a218cd89 100644 --- a/crates/trie/common/Cargo.toml +++ b/crates/trie/common/Cargo.toml @@ -39,6 +39,7 @@ proptest.workspace = true proptest-arbitrary-interop.workspace = true hash-db = "=0.15.2" plain_hasher = "0.2" +alloy-primitives = { workspace = true, features = ["getrandom"] } [features] test-utils = [ diff --git a/crates/trie/common/src/proofs.rs b/crates/trie/common/src/proofs.rs index a94b2b96fbdfd..c23e012cfc4bc 100644 --- a/crates/trie/common/src/proofs.rs +++ b/crates/trie/common/src/proofs.rs @@ -12,7 +12,7 @@ use alloy_trie::{ use itertools::Itertools; use reth_primitives_traits::Account; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{hash_map, HashMap}; /// The state multiproof of target accounts and multiproofs of their storage tries. /// Multiproof is effectively a state subtrie that only contains the nodes @@ -76,6 +76,35 @@ impl MultiProof { } Ok(AccountProof { address, info, proof, storage_root, storage_proofs }) } + + /// Extends this multiproof with another one, merging both account and storage + /// proofs. + pub fn extend(&mut self, other: Self) { + self.account_subtree = self + .account_subtree + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .chain(other.account_subtree.into_inner()) + .collect::(); + + for (hashed_address, storage) in other.storages { + match self.storages.entry(hashed_address) { + hash_map::Entry::Occupied(mut entry) => { + debug_assert_eq!(entry.get().root, storage.root); + entry.get_mut().subtree = entry + .get() + .subtree + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .chain(storage.subtree.into_inner()) + .collect::(); + } + hash_map::Entry::Vacant(entry) => { + entry.insert(storage); + } + } + } + } } /// The merkle multiproof of storage trie. @@ -254,3 +283,61 @@ pub mod triehash { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_multiproof_extend_account_proofs() { + let mut proof1 = MultiProof::default(); + let mut proof2 = MultiProof::default(); + + let addr1 = B256::random(); + let addr2 = B256::random(); + + proof1.account_subtree.insert( + Nibbles::unpack(addr1), + alloy_rlp::encode_fixed_size(&U256::from(42)).to_vec().into(), + ); + proof2.account_subtree.insert( + Nibbles::unpack(addr2), + alloy_rlp::encode_fixed_size(&U256::from(43)).to_vec().into(), + ); + + proof1.extend(proof2); + + assert!(proof1.account_subtree.contains_key(&Nibbles::unpack(addr1))); + assert!(proof1.account_subtree.contains_key(&Nibbles::unpack(addr2))); + } + + #[test] + fn test_multiproof_extend_storage_proofs() { + let mut proof1 = MultiProof::default(); + let mut proof2 = MultiProof::default(); + + let addr = B256::random(); + let root = B256::random(); + + let mut subtree1 = ProofNodes::default(); + subtree1.insert( + Nibbles::from_nibbles(vec![0]), + alloy_rlp::encode_fixed_size(&U256::from(42)).to_vec().into(), + ); + proof1.storages.insert(addr, StorageMultiProof { root, subtree: subtree1 }); + + let mut subtree2 = ProofNodes::default(); + subtree2.insert( + Nibbles::from_nibbles(vec![1]), + alloy_rlp::encode_fixed_size(&U256::from(43)).to_vec().into(), + ); + proof2.storages.insert(addr, StorageMultiProof { root, subtree: subtree2 }); + + proof1.extend(proof2); + + let storage = proof1.storages.get(&addr).unwrap(); + assert_eq!(storage.root, root); + assert!(storage.subtree.contains_key(&Nibbles::from_nibbles(vec![0]))); + assert!(storage.subtree.contains_key(&Nibbles::from_nibbles(vec![1]))); + } +} diff --git a/crates/trie/parallel/src/root.rs b/crates/trie/parallel/src/root.rs index e432b91062ca8..18efe6fab2441 100644 --- a/crates/trie/parallel/src/root.rs +++ b/crates/trie/parallel/src/root.rs @@ -4,6 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets use alloy_primitives::B256; use alloy_rlp::{BufMut, Encodable}; use itertools::Itertools; +use reth_db::DatabaseError; use reth_execution_errors::StorageRootError; use reth_provider::{ providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, @@ -228,6 +229,9 @@ pub enum ParallelStateRootError { /// Provider error. #[error(transparent)] Provider(#[from] ProviderError), + /// Other unspecified error. + #[error("{_0}")] + Other(String), } impl From for ProviderError { @@ -237,6 +241,7 @@ impl From for ProviderError { ParallelStateRootError::StorageRoot(StorageRootError::Database(error)) => { Self::Database(error) } + ParallelStateRootError::Other(other) => Self::Database(DatabaseError::Other(other)), } } }