From 098a08b09316ec0d5166a1d1cf523b0997969c01 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 18 Nov 2024 16:34:53 +0100 Subject: [PATCH 01/22] feat(engine): proof fetching on state update for StateRootTask --- Cargo.lock | 3 + crates/engine/tree/Cargo.toml | 6 + crates/engine/tree/src/tree/root.rs | 365 +++++++++++++++++++++++++--- crates/trie/parallel/src/proof.rs | 8 +- crates/trie/parallel/src/root.rs | 5 + 5 files changed, 350 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2b754e8af291..3838053d8e8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7251,6 +7251,7 @@ dependencies = [ "crossbeam-channel", "futures", "metrics", + "rayon", "reth-beacon-consensus", "reth-blockchain-tree", "reth-blockchain-tree-api", @@ -7262,6 +7263,7 @@ dependencies = [ "reth-errors", "reth-ethereum-engine-primitives", "reth-evm", + "reth-execution-errors", "reth-exex-types", "reth-metrics", "reth-network-p2p", @@ -7281,6 +7283,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-trie", + "reth-trie-db", "reth-trie-parallel", "reth-trie-sparse", "revm-primitives", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 7a71ce411eb8..a2fed9e7c184 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -21,6 +21,7 @@ reth-consensus.workspace = true reth-engine-primitives.workspace = true reth-errors.workspace = true reth-evm.workspace = true +reth-execution-errors.workspace = true reth-network-p2p.workspace = true reth-payload-builder-primitives.workspace = true reth-payload-builder.workspace = true @@ -32,6 +33,7 @@ reth-prune.workspace = true reth-revm.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true +reth-trie-db.workspace = true reth-trie-parallel.workspace = true reth-trie-sparse.workspace = true reth-trie.workspace = true @@ -55,6 +57,7 @@ metrics.workspace = true reth-metrics = { workspace = true, features = ["common"] } # misc +rayon.workspace = true tracing.workspace = true # optional deps for test-utils @@ -110,4 +113,7 @@ test-utils = [ "reth-static-file", "reth-tracing", "reth-trie/test-utils", + "reth-prune-types?/test-utils", + "reth-primitives-traits/test-utils", + "reth-trie-db/test-utils", ] diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 27f835ec754b..8ab705326ec9 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -1,16 +1,28 @@ //! State root task related functionality. -use alloy_primitives::map::FbHashMap; +use alloy_primitives::map::{DefaultHashBuilder, FbHashMap, FbHashSet, HashMap, HashSet}; use alloy_rlp::{BufMut, Encodable}; -use reth_provider::providers::ConsistentDbView; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use reth_errors::ProviderResult; +use reth_execution_errors::TrieWitnessError; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, +}; use reth_trie::{ - updates::TrieUpdates, HashedPostState, MultiProof, Nibbles, TrieAccount, TrieInput, - EMPTY_ROOT_HASH, + hashed_cursor::HashedPostStateCursorFactory, + proof::Proof, + trie_cursor::InMemoryTrieCursorFactory, + updates::{TrieUpdates, TrieUpdatesSorted}, + witness::{next_root_from_proofs, target_nodes}, + HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof, Nibbles, TrieAccount, + TrieInput, EMPTY_ROOT_HASH, }; -use reth_trie_parallel::root::ParallelStateRootError; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; +use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError}; use reth_trie_sparse::{SparseStateTrie, SparseStateTrieResult}; -use revm_primitives::{map::FbHashSet, EvmState, B256}; +use revm_primitives::{keccak256, EvmState, B256}; use std::{ + collections::{BTreeMap, VecDeque}, sync::{ mpsc::{self, Receiver, RecvError}, Arc, @@ -72,6 +84,24 @@ impl StdReceiverStream { } } +type StateRootProofResult = (B256, MultiProof, TrieUpdates, Duration); +type StateRootProofReceiver = mpsc::Receiver>; + +enum StateRootTaskState { + Idle(MultiProof, B256), + Pending(MultiProof, StateRootProofReceiver), +} + +impl StateRootTaskState { + fn add_proofs(&mut self, proofs: MultiProof) { + match self { + Self::Idle(multiproof, _) | Self::Pending(multiproof, _) => { + multiproof.extend(proofs); + } + } + } +} + /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -86,19 +116,20 @@ pub(crate) struct StateRootTask { state_stream: StdReceiverStream, /// Task configuration. config: StateRootConfig, + /// Current state. + state: HashedPostState, + /// Channels to retrieve proof calculation results from. + pending_proofs: VecDeque>>, } #[allow(dead_code)] impl StateRootTask where - Factory: Send + 'static, + Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, { /// Creates a new `StateRootTask`. - pub(crate) const fn new( - config: StateRootConfig, - state_stream: StdReceiverStream, - ) -> Self { - Self { config, state_stream } + pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { + Self { config, state_stream, state: Default::default(), pending_proofs: Default::default() } } /// Spawns the state root task and returns a handle to await its result. @@ -118,31 +149,295 @@ where /// Handles state updates. fn on_state_update( - _view: &reth_provider::providers::ConsistentDbView, - _input: &std::sync::Arc, - _state: EvmState, + view: ConsistentDbView, + input: Arc, + update: EvmState, + state: &mut HashedPostState, + pending_proofs: &mut VecDeque>>, ) { - // Default implementation of state update handling - // TODO: calculate hashed state update and dispatch proof gathering for it. + let mut hashed_state_update = HashedPostState::default(); + for (address, account) in update { + if account.is_touched() { + let hashed_address = keccak256(address); + + let destroyed = account.is_selfdestructed(); + hashed_state_update.accounts.insert( + hashed_address, + if destroyed || account.is_empty() { None } else { Some(account.info.into()) }, + ); + + if destroyed || !account.storage.is_empty() { + let storage = account.storage.into_iter().filter_map(|(slot, value)| { + (!destroyed && value.is_changed()) + .then(|| (keccak256(B256::from(slot)), value.present_value)) + }); + hashed_state_update + .storages + .insert(hashed_address, HashedStorage::from_iter(destroyed, storage)); + } + } + } + + // Dispatch proof gathering for this state update + let targets = hashed_state_update + .accounts + .keys() + .filter(|hashed_address| { + !state.accounts.contains_key(*hashed_address) && + !state.storages.contains_key(*hashed_address) + }) + .map(|hashed_address| (*hashed_address, HashSet::default())) + .chain(hashed_state_update.storages.iter().map(|(hashed_address, storage)| { + (*hashed_address, storage.storage.keys().copied().collect()) + })) + .collect::>(); + + let (tx, rx) = mpsc::sync_channel(1); + rayon::spawn(move || { + let result = ParallelProof::new(view, input).multiproof(targets); + let _ = tx.send(result); + }); + + pending_proofs.push_back(rx); + + state.extend(hashed_state_update); + } + + fn run(mut self) -> StateRootResult { + let mut task_state = StateRootTaskState::Idle(MultiProof::default(), B256::default()); + let mut trie_updates = TrieUpdates::default(); + + loop { + // try to receive state updates without blocking + match self.state_stream.rx.try_recv() { + Ok(update) => { + debug!(target: "engine::root", len = update.len(), "Received new state update"); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + &mut self.pending_proofs, + ); + continue; + } + Err(mpsc::TryRecvError::Empty) => { + // no new state updates available, continue with other operations + } + Err(mpsc::TryRecvError::Disconnected) => { + // state stream closed, check if we can finish + if self.pending_proofs.is_empty() { + if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state { + return Ok((*state_root, trie_updates)); + } + } + } + } + + // check pending proofs + while let Some(proof_rx) = self.pending_proofs.front() { + match proof_rx.try_recv() { + Ok(result) => { + let multiproof = result?; + task_state.add_proofs(multiproof); + self.pending_proofs.pop_front(); + continue; + } + Err(mpsc::TryRecvError::Empty) => { + // this proof is not ready yet + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + // channel was closed without sending a result + return Err(ParallelStateRootError::Other( + "Proof calculation task terminated unexpectedly".into(), + )); + } + } + } + + // handle task state transitions + match &mut task_state { + StateRootTaskState::Pending(multiproof, rx) => { + match rx.try_recv() { + Ok(result) => match result { + Ok((state_root, mut new_multiproof, new_trie_updates, elapsed)) => { + debug!(target: "engine::root", %state_root, ?elapsed, "Computed intermediate root"); + trie_updates.extend(new_trie_updates); + new_multiproof.extend(std::mem::take(multiproof)); + task_state = StateRootTaskState::Idle(new_multiproof, state_root); + continue; + } + Err(e) => return Err(ParallelStateRootError::Provider(e)), + }, + Err(mpsc::TryRecvError::Empty) => { + // root calculation not ready yet + } + Err(mpsc::TryRecvError::Disconnected) => { + return Err(ParallelStateRootError::Other( + "Root calculation task terminated unexpectedly".into(), + )); + } + } + } + StateRootTaskState::Idle(multiproof, _) => { + debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); + let view = self.config.consistent_view.clone(); + let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); + let input_state_sorted = self.config.input.state.clone().into_sorted(); + let multiproof = std::mem::take(multiproof); + let state = self.state.clone(); + let (tx, rx) = mpsc::sync_channel(1); + + rayon::spawn(move || { + let result = calculate_state_root_from_proofs( + view, + &input_nodes_sorted, + &input_state_sorted, + multiproof, + state, + ); + let _ = tx.send(result); + }); + + task_state = StateRootTaskState::Pending(Default::default(), rx); + continue; + } + } + } } } -#[allow(dead_code)] -impl StateRootTask +fn calculate_state_root_from_proofs( + view: ConsistentDbView, + input_nodes_sorted: &TrieUpdatesSorted, + input_state_sorted: &HashedPostStateSorted, + multiproof: MultiProof, + state: HashedPostState, +) -> ProviderResult<(B256, MultiProof, TrieUpdates, Duration)> where - Factory: Send + 'static, + Factory: DatabaseProviderFactory + Clone, { - fn run(self) -> StateRootResult { - while let Ok(state) = self.state_stream.recv() { - Self::on_state_update(&self.config.consistent_view, &self.config.input, state); - } + let started_at = Instant::now(); - // TODO: - // * keep track of proof calculation - // * keep track of intermediate root computation - // * return final state root result - Ok((B256::default(), TrieUpdates::default())) - } + let provider_ro = view.provider_ro()?; + + let proof_targets: HashMap> = state + .accounts + .keys() + .map(|hashed_address| (*hashed_address, HashSet::default())) + .chain(state.storages.iter().map(|(hashed_address, storage)| { + (*hashed_address, storage.storage.keys().copied().collect()) + })) + .collect(); + + let account_trie_nodes = proof_targets + .into_par_iter() + .map(|(hashed_address, hashed_slots)| { + // Gather and record storage trie nodes for this account. + let mut storage_trie_nodes = BTreeMap::default(); + let storage = state.storages.get(&hashed_address); + for hashed_slot in hashed_slots { + let slot_key = Nibbles::unpack(hashed_slot); + let slot_value = storage + .and_then(|s| s.storage.get(&hashed_slot)) + .filter(|v| !v.is_zero()) + .map(|v| alloy_rlp::encode_fixed_size(v).to_vec()); + let proof = multiproof + .storages + .get(&hashed_address) + .map(|proof| { + proof + .subtree + .iter() + .filter(|e| slot_key.starts_with(e.0)) + .collect::>() + }) + .unwrap_or_default(); + storage_trie_nodes.extend(target_nodes(slot_key.clone(), slot_value, None, proof)?); + } + + let storage_root = next_root_from_proofs(storage_trie_nodes, |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); + let mut slots = HashSet::with_hasher(DefaultHashBuilder::default()); + slots.insert(B256::from_slice(&padded_key)); + targets.insert(hashed_address, slots); + let proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + input_nodes_sorted, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + input_state_sorted, + ), + ) + .multiproof(targets) + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .storages + .get(&hashed_address) + .and_then(|storage_multiproof| storage_multiproof.subtree.get(&key)) + .cloned() + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; + + // Gather and record account trie nodes. + let account = state + .accounts + .get(&hashed_address) + .ok_or(TrieWitnessError::MissingAccount(hashed_address))?; + let value = (account.is_some() || storage_root != EMPTY_ROOT_HASH).then(|| { + let mut encoded = Vec::with_capacity(128); + TrieAccount::from((account.unwrap_or_default(), storage_root)) + .encode(&mut encoded as &mut dyn BufMut); + encoded + }); + let key = Nibbles::unpack(hashed_address); + let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); + Ok(target_nodes(key.clone(), value, None, proof)?) + }) + .collect::>>()?; + + let state_root = + next_root_from_proofs(account_trie_nodes.into_iter().flatten(), |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); + targets.insert( + B256::from_slice(&padded_key), + HashSet::with_hasher(DefaultHashBuilder::default()), + ); + let proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + input_nodes_sorted, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + input_state_sorted, + ), + ) + .multiproof(targets) + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .account_subtree + .get(&key) + .cloned() + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; + + Ok((state_root, multiproof, Default::default(), started_at.elapsed())) } /// Updates the sparse trie with the given proofs and state, and returns the updated trie and the @@ -216,7 +511,11 @@ fn update_sparse_trie( #[cfg(test)] mod tests { use super::*; - use reth_provider::{providers::ConsistentDbView, test_utils::MockEthProvider}; + use reth_provider::{ + providers::ConsistentDbView, + test_utils::{create_test_provider_factory, MockNodeTypesWithDB}, + ProviderFactory, + }; use reth_trie::TrieInput; use revm_primitives::{ Account, AccountInfo, AccountStatus, Address, EvmState, EvmStorage, EvmStorageSlot, @@ -224,8 +523,8 @@ mod tests { }; use std::sync::Arc; - fn create_mock_config() -> StateRootConfig { - let factory = MockEthProvider::default(); + fn create_mock_config() -> StateRootConfig> { + let factory = create_test_provider_factory(); let view = ConsistentDbView::new(factory, None); let input = Arc::new(TrieInput::default()); StateRootConfig { consistent_view: view, input } diff --git a/crates/trie/parallel/src/proof.rs b/crates/trie/parallel/src/proof.rs index dcb1a0231dd1..f285079f2526 100644 --- a/crates/trie/parallel/src/proof.rs +++ b/crates/trie/parallel/src/proof.rs @@ -33,7 +33,7 @@ pub struct ParallelProof { /// Consistent view of the database. view: ConsistentDbView, /// Trie input. - input: TrieInput, + input: Arc, /// Parallel state root metrics. #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics, @@ -41,7 +41,7 @@ pub struct ParallelProof { impl ParallelProof { /// Create new state proof generator. - pub fn new(view: ConsistentDbView, input: TrieInput) -> Self { + pub fn new(view: ConsistentDbView, input: Arc) -> Self { Self { view, input, @@ -62,8 +62,8 @@ where ) -> Result { let mut tracker = ParallelTrieTracker::default(); - let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted()); - let hashed_state_sorted = Arc::new(self.input.state.into_sorted()); + let trie_nodes_sorted = self.input.nodes.clone().into_sorted(); + let hashed_state_sorted = self.input.state.clone().into_sorted(); // Extend prefix sets with targets let mut prefix_sets = self.input.prefix_sets.clone(); diff --git a/crates/trie/parallel/src/root.rs b/crates/trie/parallel/src/root.rs index b4e300c7290e..8d2b18f5e111 100644 --- a/crates/trie/parallel/src/root.rs +++ b/crates/trie/parallel/src/root.rs @@ -4,6 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets use alloy_primitives::B256; use alloy_rlp::{BufMut, Encodable}; use itertools::Itertools; +use reth_db::DatabaseError; use reth_execution_errors::StorageRootError; use reth_provider::{ providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, @@ -225,6 +226,9 @@ pub enum ParallelStateRootError { /// Provider error. #[error(transparent)] Provider(#[from] ProviderError), + /// Other unspecified error. + #[error("{_0}")] + Other(String), } impl From for ProviderError { @@ -234,6 +238,7 @@ impl From for ProviderError { ParallelStateRootError::StorageRoot(StorageRootError::Database(error)) => { Self::Database(error) } + ParallelStateRootError::Other(other) => Self::Database(DatabaseError::Other(other)), } } } From 154aa8c095eb41174a8d9acfee9eaa7e5361cf61 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 18 Nov 2024 16:48:12 +0100 Subject: [PATCH 02/22] fix account trie nodes collection --- crates/engine/tree/src/tree/root.rs | 70 +++++++++++++++-------------- 1 file changed, 37 insertions(+), 33 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 8ab705326ec9..a99c91c57533 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -403,39 +403,43 @@ where let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); Ok(target_nodes(key.clone(), value, None, proof)?) }) - .collect::>>()?; - - let state_root = - next_root_from_proofs(account_trie_nodes.into_iter().flatten(), |key: Nibbles| { - // Right pad the target with 0s. - let mut padded_key = key.pack(); - padded_key.resize(32, 0); - let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); - targets.insert( - B256::from_slice(&padded_key), - HashSet::with_hasher(DefaultHashBuilder::default()), - ); - let proof = Proof::new( - InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - input_nodes_sorted, - ), - HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - input_state_sorted, - ), - ) - .multiproof(targets) - .unwrap(); - - // The subtree only contains the proof for a single target. - let node = proof - .account_subtree - .get(&key) - .cloned() - .ok_or(TrieWitnessError::MissingTargetNode(key))?; - Ok(node) - })?; + .collect::>>>() + .into_iter() + .collect::, _>>()? + .into_iter() + .flatten() + .collect::>(); + + let state_root = next_root_from_proofs(account_trie_nodes, |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); + targets.insert( + B256::from_slice(&padded_key), + HashSet::with_hasher(DefaultHashBuilder::default()), + ); + let proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + input_nodes_sorted, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + input_state_sorted, + ), + ) + .multiproof(targets) + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .account_subtree + .get(&key) + .cloned() + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; Ok((state_root, multiproof, Default::default(), started_at.elapsed())) } From d9501829dfd93e0cfd841048cc5cbfee55dba85e Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 18 Nov 2024 17:35:48 +0100 Subject: [PATCH 03/22] add state root from proofs bench and simplify account trie nodes collection --- crates/engine/tree/Cargo.toml | 4 + .../tree/benches/state_root_from_proofs.rs | 81 +++++++++++++++++++ crates/engine/tree/src/tree/mod.rs | 1 + crates/engine/tree/src/tree/root.rs | 15 ++-- 4 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 crates/engine/tree/benches/state_root_from_proofs.rs diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index a2fed9e7c184..6f2128f82e24 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -93,6 +93,10 @@ crossbeam-channel = "0.5.13" name = "channel_perf" harness = false +[[bench]] +name = "state_root_from_proofs" +harness = false + [features] test-utils = [ "reth-blockchain-tree/test-utils", diff --git a/crates/engine/tree/benches/state_root_from_proofs.rs b/crates/engine/tree/benches/state_root_from_proofs.rs new file mode 100644 index 000000000000..4c8e85696ead --- /dev/null +++ b/crates/engine/tree/benches/state_root_from_proofs.rs @@ -0,0 +1,81 @@ +#![allow(missing_docs)] + +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use reth_engine_tree::tree::calculate_state_root_from_proofs; +use reth_provider::{providers::ConsistentDbView, test_utils::create_test_provider_factory}; +use reth_trie::{ + updates::TrieUpdatesSorted, HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof, +}; +use revm_primitives::{ + keccak256, Account, AccountInfo, AccountStatus, Address, EvmStorage, EvmStorageSlot, HashMap, + HashSet, B256, U256, +}; + +fn create_test_state(size: usize) -> (HashMap>, HashedPostState) { + let mut state = HashedPostState::default(); + let mut targets = HashMap::default(); + + for i in 0..size { + let address = Address::random(); + let hashed_address = keccak256(address); + + // Create account + let info = AccountInfo { + balance: U256::from(100 + i), + nonce: i as u64, + code_hash: B256::random(), + code: Default::default(), + }; + + // Create storage with multiple slots + let mut storage = EvmStorage::default(); + let mut slots = HashSet::default(); + for j in 0..100 { + let slot = U256::from(j); + let value = U256::from(100 + j); + storage.insert(slot, EvmStorageSlot::new(value)); + slots.insert(keccak256(B256::from(slot))); + } + + let account = Account { info, storage: storage.clone(), status: AccountStatus::Loaded }; + + state.accounts.insert(hashed_address, Some(account.info.into())); + state.storages.insert( + hashed_address, + HashedStorage::from_iter( + false, + storage.into_iter().map(|(k, v)| (keccak256(B256::from(k)), v.present_value)), + ), + ); + targets.insert(hashed_address, slots); + } + + (targets, state) +} + +fn bench_state_root_collection(c: &mut Criterion) { + let factory = create_test_provider_factory(); + let view = ConsistentDbView::new(factory, None); + + let mut group = c.benchmark_group("state_root_collection"); + for size in &[10, 100, 1000] { + let (_targets, state) = create_test_state(*size); + let multiproof = MultiProof::default(); + + group.bench_with_input(format!("size_{}", size), size, |b, _| { + b.iter(|| { + black_box(calculate_state_root_from_proofs( + view.clone(), + &TrieUpdatesSorted::default(), + &HashedPostStateSorted::default(), + multiproof.clone(), + state.clone(), + )) + }); + }); + } + group.finish(); +} + +criterion_group!(benches, bench_state_root_collection); +criterion_main!(benches); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8819cda966b0..2b5fcf524d44 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -76,6 +76,7 @@ pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; pub use persistence_state::PersistenceState; pub use reth_engine_primitives::InvalidBlockHook; +pub use root::calculate_state_root_from_proofs; mod root; diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index a99c91c57533..112c662459fa 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -308,7 +308,8 @@ where } } -fn calculate_state_root_from_proofs( +/// Calculate state root from proofs. +pub fn calculate_state_root_from_proofs( view: ConsistentDbView, input_nodes_sorted: &TrieUpdatesSorted, input_state_sorted: &HashedPostStateSorted, @@ -401,14 +402,12 @@ where }); let key = Nibbles::unpack(hashed_address); let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); - Ok(target_nodes(key.clone(), value, None, proof)?) + target_nodes(key.clone(), value, None, proof) }) - .collect::>>>() - .into_iter() - .collect::, _>>()? - .into_iter() - .flatten() - .collect::>(); + .try_reduce(BTreeMap::new, |mut acc, map| { + acc.extend(map.into_iter()); + Ok(acc) + })?; let state_root = next_root_from_proofs(account_trie_nodes, |key: Nibbles| { // Right pad the target with 0s. From 06db565919dc45208e8563a7bc4487740e023c31 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 19 Nov 2024 12:41:23 +0100 Subject: [PATCH 04/22] extend test to check state root value --- Cargo.lock | 1 + crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/root.rs | 172 ++++++++++++++++++++++------ 3 files changed, 142 insertions(+), 32 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3838053d8e8a..a53180e4ff82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7251,6 +7251,7 @@ dependencies = [ "crossbeam-channel", "futures", "metrics", + "rand 0.8.5", "rayon", "reth-beacon-consensus", "reth-blockchain-tree", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 6f2128f82e24..57dead386936 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -88,6 +88,7 @@ alloy-rlp.workspace = true assert_matches.workspace = true criterion.workspace = true crossbeam-channel = "0.5.13" +rand.workspace = true [[bench]] name = "channel_perf" diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 112c662459fa..90319260dbf0 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -514,60 +514,168 @@ fn update_sparse_trie( #[cfg(test)] mod tests { use super::*; + use rand::{prelude::SliceRandom, Rng}; + use reth_primitives::{Account as RethAccount, StorageEntry}; use reth_provider::{ - providers::ConsistentDbView, - test_utils::{create_test_provider_factory, MockNodeTypesWithDB}, - ProviderFactory, + providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, }; - use reth_trie::TrieInput; + use reth_trie::{test_utils::state_root, TrieInput}; use revm_primitives::{ - Account, AccountInfo, AccountStatus, Address, EvmState, EvmStorage, EvmStorageSlot, - HashMap, B256, U256, + Account as RevmAccount, AccountInfo, AccountStatus, Address, EvmState, EvmStorageSlot, + HashMap, B256, KECCAK_EMPTY, U256, }; use std::sync::Arc; - fn create_mock_config() -> StateRootConfig> { - let factory = create_test_provider_factory(); - let view = ConsistentDbView::new(factory, None); - let input = Arc::new(TrieInput::default()); - StateRootConfig { consistent_view: view, input } + fn convert_revm_to_reth_account(revm_account: &RevmAccount) -> RethAccount { + RethAccount { + balance: revm_account.info.balance, + nonce: revm_account.info.nonce, + bytecode_hash: if revm_account.info.code_hash == KECCAK_EMPTY { + None + } else { + Some(revm_account.info.code_hash) + }, + } } - fn create_mock_state() -> revm_primitives::EvmState { - let mut state_changes: EvmState = HashMap::default(); - let storage = EvmStorage::from_iter([(U256::from(1), EvmStorageSlot::new(U256::from(2)))]); - let account = Account { - info: AccountInfo { - balance: U256::from(100), - nonce: 10, - code_hash: B256::random(), - code: Default::default(), - }, - storage, - status: AccountStatus::Loaded, - }; + fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec { + let mut rng = rand::thread_rng(); + let mut all_addresses: Vec
= + (0..num_accounts).map(|_| Address::random()).collect(); + let mut updates = Vec::new(); + + for _ in 0..updates_per_account { + let num_accounts_in_update = rng.gen_range(1..=num_accounts); + let mut state_update = EvmState::default(); + + all_addresses.shuffle(&mut rng); + let selected_addresses = &all_addresses[0..num_accounts_in_update]; + + for &address in selected_addresses { + let mut storage = HashMap::default(); + if rng.gen_bool(0.7) { + for _ in 0..rng.gen_range(1..10) { + let slot = U256::from(rng.gen::()); + storage.insert( + slot, + EvmStorageSlot::new_changed(U256::ZERO, U256::from(rng.gen::())), + ); + } + } - let address = Address::random(); - state_changes.insert(address, account); + let account = RevmAccount { + info: AccountInfo { + balance: U256::from(rng.gen::()), + nonce: rng.gen::(), + code_hash: KECCAK_EMPTY, + code: Some(Default::default()), + }, + storage, + status: AccountStatus::Touched, + }; + + state_update.insert(address, account); + } + + updates.push(state_update); + } - state_changes + updates } #[test] fn test_state_root_task() { - let config = create_mock_config(); + let factory = create_test_provider_factory(); let (tx, rx) = std::sync::mpsc::channel(); let stream = StdReceiverStream::new(rx); + let state_updates = create_mock_state_updates(100, 10); + let mut hashed_state = HashedPostState::default(); + let mut accumulated_state: HashMap)> = + HashMap::default(); + + { + let provider_rw = factory.provider_rw().expect("failed to get provider"); + + for update in &state_updates { + let account_updates = update.iter().map(|(address, account)| { + (*address, Some(convert_revm_to_reth_account(account))) + }); + provider_rw + .insert_account_for_hashing(account_updates) + .expect("failed to insert accounts"); + + let storage_updates = update.iter().map(|(address, account)| { + let storage_entries = account.storage.iter().map(|(slot, value)| { + StorageEntry { key: B256::from(*slot), value: value.present_value } + }); + (*address, storage_entries) + }); + provider_rw + .insert_storage_for_hashing(storage_updates) + .expect("failed to insert storage"); + } + provider_rw.commit().expect("failed to commit changes"); + } + + for update in &state_updates { + for (address, account) in update { + let hashed_address = keccak256(*address); + + if account.is_touched() { + let destroyed = account.is_selfdestructed(); + hashed_state.accounts.insert( + hashed_address, + if destroyed || account.is_empty() { + None + } else { + Some(account.info.clone().into()) + }, + ); + + if destroyed || !account.storage.is_empty() { + let storage = account + .storage + .iter() + .filter(|&(_slot, value)| (!destroyed && value.is_changed())) + .map(|(slot, value)| { + (keccak256(B256::from(*slot)), value.present_value) + }); + hashed_state + .storages + .insert(hashed_address, HashedStorage::from_iter(destroyed, storage)); + } + } + + let storage: HashMap = account + .storage + .iter() + .map(|(k, v)| (B256::from(*k), v.present_value)) + .collect(); + + accumulated_state + .insert(*address, (convert_revm_to_reth_account(account), storage)); + } + } + + let config = StateRootConfig { + consistent_view: ConsistentDbView::new(factory, None), + input: Arc::new(TrieInput::from_state(hashed_state)), + }; let task = StateRootTask::new(config, stream); let handle = task.spawn(); - for _ in 0..10 { - tx.send(create_mock_state()).expect("failed to send state"); + for update in state_updates { + tx.send(update).expect("failed to send state"); } drop(tx); - let result = handle.wait_for_result(); - assert!(result.is_ok(), "sync block execution failed"); + let (root_from_task, _) = handle.wait_for_result().expect("task failed"); + let root_from_base = state_root(accumulated_state); + + assert_eq!( + root_from_task, root_from_base, + "State root mismatch: task={root_from_task:?}, base={root_from_base:?}" + ); } } From 38d0b668f034294d2c5fadf85a87fdeae8d460c2 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 20 Nov 2024 15:21:23 +0100 Subject: [PATCH 05/22] use reth_testing_utils::generators --- Cargo.lock | 1 + crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/root.rs | 8 +++----- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a53180e4ff82..e2a4093d0075 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7282,6 +7282,7 @@ dependencies = [ "reth-stages-api", "reth-static-file", "reth-tasks", + "reth-testing-utils", "reth-tracing", "reth-trie", "reth-trie-db", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 57dead386936..a651843c38ee 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -80,6 +80,7 @@ reth-prune.workspace = true reth-rpc-types-compat.workspace = true reth-stages = { workspace = true, features = ["test-utils"] } reth-static-file.workspace = true +reth-testing-utils.workspace = true reth-tracing.workspace = true # alloy diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 90319260dbf0..7591bf8b7006 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -514,11 +514,11 @@ fn update_sparse_trie( #[cfg(test)] mod tests { use super::*; - use rand::{prelude::SliceRandom, Rng}; use reth_primitives::{Account as RethAccount, StorageEntry}; use reth_provider::{ providers::ConsistentDbView, test_utils::create_test_provider_factory, HashingWriter, }; + use reth_testing_utils::generators::{self, Rng}; use reth_trie::{test_utils::state_root, TrieInput}; use revm_primitives::{ Account as RevmAccount, AccountInfo, AccountStatus, Address, EvmState, EvmStorageSlot, @@ -539,16 +539,14 @@ mod tests { } fn create_mock_state_updates(num_accounts: usize, updates_per_account: usize) -> Vec { - let mut rng = rand::thread_rng(); - let mut all_addresses: Vec
= - (0..num_accounts).map(|_| Address::random()).collect(); + let mut rng = generators::rng(); + let all_addresses: Vec
= (0..num_accounts).map(|_| rng.gen()).collect(); let mut updates = Vec::new(); for _ in 0..updates_per_account { let num_accounts_in_update = rng.gen_range(1..=num_accounts); let mut state_update = EvmState::default(); - all_addresses.shuffle(&mut rng); let selected_addresses = &all_addresses[0..num_accounts_in_update]; for &address in selected_addresses { From acf8254be09559e329e09ff1a5d475a26fa1686a Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 20 Nov 2024 16:21:59 +0100 Subject: [PATCH 06/22] fix test --- crates/engine/tree/src/tree/root.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 7591bf8b7006..3dcadebb8185 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -651,8 +651,9 @@ mod tests { .map(|(k, v)| (B256::from(*k), v.present_value)) .collect(); - accumulated_state - .insert(*address, (convert_revm_to_reth_account(account), storage)); + let entry = accumulated_state.entry(*address).or_default(); + entry.0 = convert_revm_to_reth_account(account); + entry.1.extend(storage); } } From ea292e8109ea572a4da6eeeed0eac61c694a09b7 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 20 Nov 2024 16:34:34 +0100 Subject: [PATCH 07/22] prevent state root calculation and returning final result without the ongoing proof calculations received --- crates/engine/tree/src/tree/root.rs | 60 ++++++++++++++++++----------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 3dcadebb8185..15ceffa35d7e 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -120,6 +120,9 @@ pub(crate) struct StateRootTask { state: HashedPostState, /// Channels to retrieve proof calculation results from. pending_proofs: VecDeque>>, + /// Prevents triggering state root calculation and returning the final result + /// without all the ongoing proof calculations received. + pending_calculation: bool, } #[allow(dead_code)] @@ -129,7 +132,13 @@ where { /// Creates a new `StateRootTask`. pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { - Self { config, state_stream, state: Default::default(), pending_proofs: Default::default() } + Self { + config, + state_stream, + state: Default::default(), + pending_proofs: Default::default(), + pending_calculation: false, + } } /// Spawns the state root task and returns a handle to await its result. @@ -226,7 +235,7 @@ where } Err(mpsc::TryRecvError::Disconnected) => { // state stream closed, check if we can finish - if self.pending_proofs.is_empty() { + if self.pending_proofs.is_empty() && !self.pending_calculation { if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state { return Ok((*state_root, trie_updates)); } @@ -241,6 +250,7 @@ where let multiproof = result?; task_state.add_proofs(multiproof); self.pending_proofs.pop_front(); + self.pending_calculation = true; continue; } Err(mpsc::TryRecvError::Empty) => { @@ -281,27 +291,31 @@ where } } StateRootTaskState::Idle(multiproof, _) => { - debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); - let view = self.config.consistent_view.clone(); - let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); - let input_state_sorted = self.config.input.state.clone().into_sorted(); - let multiproof = std::mem::take(multiproof); - let state = self.state.clone(); - let (tx, rx) = mpsc::sync_channel(1); - - rayon::spawn(move || { - let result = calculate_state_root_from_proofs( - view, - &input_nodes_sorted, - &input_state_sorted, - multiproof, - state, - ); - let _ = tx.send(result); - }); - - task_state = StateRootTaskState::Pending(Default::default(), rx); - continue; + if self.pending_calculation { + debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); + let view = self.config.consistent_view.clone(); + let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); + let input_state_sorted = self.config.input.state.clone().into_sorted(); + let multiproof = std::mem::take(multiproof); + let state = self.state.clone(); + let (tx, rx) = mpsc::sync_channel(1); + + rayon::spawn(move || { + let result = calculate_state_root_from_proofs( + view, + &input_nodes_sorted, + &input_state_sorted, + multiproof, + state, + ); + let _ = tx.send(result); + }); + + self.pending_calculation = false; + + task_state = StateRootTaskState::Pending(Default::default(), rx); + continue; + } } } } From 02e75e8f7c158011ddf542c1fbce43d786c1ce66 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 21 Nov 2024 12:23:44 +0100 Subject: [PATCH 08/22] do not share provider among workers --- crates/engine/tree/src/tree/root.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 15ceffa35d7e..c882977ea463 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -335,8 +335,6 @@ where { let started_at = Instant::now(); - let provider_ro = view.provider_ro()?; - let proof_targets: HashMap> = state .accounts .keys() @@ -349,6 +347,7 @@ where let account_trie_nodes = proof_targets .into_par_iter() .map(|(hashed_address, hashed_slots)| { + let provider_ro = view.provider_ro().unwrap(); // Gather and record storage trie nodes for this account. let mut storage_trie_nodes = BTreeMap::default(); let storage = state.storages.get(&hashed_address); @@ -423,6 +422,8 @@ where Ok(acc) })?; + let provider_ro = view.provider_ro()?; + let state_root = next_root_from_proofs(account_trie_nodes, |key: Nibbles| { // Right pad the target with 0s. let mut padded_key = key.pack(); @@ -601,7 +602,7 @@ mod tests { let (tx, rx) = std::sync::mpsc::channel(); let stream = StdReceiverStream::new(rx); - let state_updates = create_mock_state_updates(100, 10); + let state_updates = create_mock_state_updates(400, 20); let mut hashed_state = HashedPostState::default(); let mut accumulated_state: HashMap)> = HashMap::default(); From 74bf59ffb291c2707679ee92041271468eb9129a Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 21 Nov 2024 12:47:39 +0100 Subject: [PATCH 09/22] add error traces for multiproof tasks --- crates/engine/tree/src/tree/root.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index c882977ea463..4df6a7d6fb85 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -29,7 +29,7 @@ use std::{ }, time::{Duration, Instant}, }; -use tracing::debug; +use tracing::{debug, error}; /// The level below which the sparse trie hashes are calculated in [`update_sparse_trie`]. const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2; @@ -204,7 +204,13 @@ where let (tx, rx) = mpsc::sync_channel(1); rayon::spawn(move || { let result = ParallelProof::new(view, input).multiproof(targets); - let _ = tx.send(result); + if let Err(ref e) = result { + error!(target: "engine::root", error = ?e, "Could not calculate multiproof"); + } + + if let Err(e) = tx.send(result) { + error!(target: "engine::root", error = ?e, "Could not send multiproof result"); + } }); pending_proofs.push_back(rx); From 9eadda32a4ed0525988b7c3157723380f633522e Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 21 Nov 2024 13:18:48 +0100 Subject: [PATCH 10/22] use map_init to initialize provider once per thread in calculate_state_root_from_proofs --- crates/engine/tree/src/tree/root.rs | 145 +++++++++++++++------------- 1 file changed, 76 insertions(+), 69 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 4df6a7d6fb85..47f6a8184365 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -352,77 +352,84 @@ where let account_trie_nodes = proof_targets .into_par_iter() - .map(|(hashed_address, hashed_slots)| { - let provider_ro = view.provider_ro().unwrap(); - // Gather and record storage trie nodes for this account. - let mut storage_trie_nodes = BTreeMap::default(); - let storage = state.storages.get(&hashed_address); - for hashed_slot in hashed_slots { - let slot_key = Nibbles::unpack(hashed_slot); - let slot_value = storage - .and_then(|s| s.storage.get(&hashed_slot)) - .filter(|v| !v.is_zero()) - .map(|v| alloy_rlp::encode_fixed_size(v).to_vec()); - let proof = multiproof - .storages - .get(&hashed_address) - .map(|proof| { - proof - .subtree - .iter() - .filter(|e| slot_key.starts_with(e.0)) - .collect::>() - }) - .unwrap_or_default(); - storage_trie_nodes.extend(target_nodes(slot_key.clone(), slot_value, None, proof)?); - } + .map_init( + || view.provider_ro().unwrap(), + |provider_ro, (hashed_address, hashed_slots)| { + // Gather and record storage trie nodes for this account. + let mut storage_trie_nodes = BTreeMap::default(); + let storage = state.storages.get(&hashed_address); + for hashed_slot in hashed_slots { + let slot_key = Nibbles::unpack(hashed_slot); + let slot_value = storage + .and_then(|s| s.storage.get(&hashed_slot)) + .filter(|v| !v.is_zero()) + .map(|v| alloy_rlp::encode_fixed_size(v).to_vec()); + let proof = multiproof + .storages + .get(&hashed_address) + .map(|proof| { + proof + .subtree + .iter() + .filter(|e| slot_key.starts_with(e.0)) + .collect::>() + }) + .unwrap_or_default(); + storage_trie_nodes.extend(target_nodes( + slot_key.clone(), + slot_value, + None, + proof, + )?); + } - let storage_root = next_root_from_proofs(storage_trie_nodes, |key: Nibbles| { - // Right pad the target with 0s. - let mut padded_key = key.pack(); - padded_key.resize(32, 0); - let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); - let mut slots = HashSet::with_hasher(DefaultHashBuilder::default()); - slots.insert(B256::from_slice(&padded_key)); - targets.insert(hashed_address, slots); - let proof = Proof::new( - InMemoryTrieCursorFactory::new( - DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), - input_nodes_sorted, - ), - HashedPostStateCursorFactory::new( - DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), - input_state_sorted, - ), - ) - .multiproof(targets) - .unwrap(); - - // The subtree only contains the proof for a single target. - let node = proof - .storages + let storage_root = next_root_from_proofs(storage_trie_nodes, |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut targets = HashMap::with_hasher(DefaultHashBuilder::default()); + let mut slots = HashSet::with_hasher(DefaultHashBuilder::default()); + slots.insert(B256::from_slice(&padded_key)); + targets.insert(hashed_address, slots); + let proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + input_nodes_sorted, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + input_state_sorted, + ), + ) + .multiproof(targets) + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .storages + .get(&hashed_address) + .and_then(|storage_multiproof| storage_multiproof.subtree.get(&key)) + .cloned() + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; + + // Gather and record account trie nodes. + let account = state + .accounts .get(&hashed_address) - .and_then(|storage_multiproof| storage_multiproof.subtree.get(&key)) - .cloned() - .ok_or(TrieWitnessError::MissingTargetNode(key))?; - Ok(node) - })?; - - // Gather and record account trie nodes. - let account = state - .accounts - .get(&hashed_address) - .ok_or(TrieWitnessError::MissingAccount(hashed_address))?; - let value = (account.is_some() || storage_root != EMPTY_ROOT_HASH).then(|| { - let mut encoded = Vec::with_capacity(128); - TrieAccount::from((account.unwrap_or_default(), storage_root)) - .encode(&mut encoded as &mut dyn BufMut); - encoded - }); - let key = Nibbles::unpack(hashed_address); - let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); - target_nodes(key.clone(), value, None, proof) - }) + .ok_or(TrieWitnessError::MissingAccount(hashed_address))?; + let value = (account.is_some() || storage_root != EMPTY_ROOT_HASH).then(|| { + let mut encoded = Vec::with_capacity(128); + TrieAccount::from((account.unwrap_or_default(), storage_root)) + .encode(&mut encoded as &mut dyn BufMut); + encoded + }); + let key = Nibbles::unpack(hashed_address); + let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); + target_nodes(key.clone(), value, None, proof) + }, + ) .try_reduce(BTreeMap::new, |mut acc, map| { acc.extend(map.into_iter()); Ok(acc) From 7032bfe895a989d2b27fc8e90bba907b4138bff4 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 10:11:14 +0100 Subject: [PATCH 11/22] Update crates/engine/tree/src/tree/root.rs Co-authored-by: Roman Krasiuk --- crates/engine/tree/src/tree/root.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 47f6a8184365..2336ef01c40c 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -177,7 +177,7 @@ where if destroyed || !account.storage.is_empty() { let storage = account.storage.into_iter().filter_map(|(slot, value)| { - (!destroyed && value.is_changed()) + value.is_changed() .then(|| (keccak256(B256::from(slot)), value.present_value)) }); hashed_state_update From a4c3bbf4189fb6f86f2a410f4935943255ebbd07 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 10:45:19 +0100 Subject: [PATCH 12/22] fmt --- crates/engine/tree/src/tree/root.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 2336ef01c40c..8f37d4f76652 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -177,7 +177,8 @@ where if destroyed || !account.storage.is_empty() { let storage = account.storage.into_iter().filter_map(|(slot, value)| { - value.is_changed() + value + .is_changed() .then(|| (keccak256(B256::from(slot)), value.present_value)) }); hashed_state_update From cebcbd818a3fd5e3dc91eaac66220c60e1c27ad8 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 15:17:42 +0100 Subject: [PATCH 13/22] refactor: StateRootMessages and single channel for managing task state --- crates/engine/tree/src/tree/root.rs | 420 +++++++++++++++++++--------- 1 file changed, 290 insertions(+), 130 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 8f37d4f76652..39b7b658256c 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -22,14 +22,15 @@ use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError}; use reth_trie_sparse::{SparseStateTrie, SparseStateTrieResult}; use revm_primitives::{keccak256, EvmState, B256}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::BTreeMap, sync::{ - mpsc::{self, Receiver, RecvError}, + mpsc::{self, Receiver, RecvError, Sender}, Arc, }, + thread, time::{Duration, Instant}, }; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; /// The level below which the sparse trie hashes are calculated in [`update_sparse_trie`]. const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2; @@ -84,21 +85,81 @@ impl StdReceiverStream { } } -type StateRootProofResult = (B256, MultiProof, TrieUpdates, Duration); -type StateRootProofReceiver = mpsc::Receiver>; +/// Messages that can be received by the state root task +#[derive(Debug)] +pub(crate) enum StateRootMessage { + /// New state update from transaction execution + StateUpdate(EvmState), + /// Input state stream has closed + InputStreamClosed, + /// Proof calculation completed for a specific state update + ProofCalculated { + /// The calculated proof + proof: MultiProof, + /// The index of this proof in the sequence of state updates + sequence_number: u64, + }, + /// State root calculation completed + RootCalculated { + /// The calculated state root + root: B256, + /// The trie updates produced during calculation + updates: TrieUpdates, + /// Time taken to calculate the root + elapsed: Duration, + }, +} -enum StateRootTaskState { - Idle(MultiProof, B256), - Pending(MultiProof, StateRootProofReceiver), +/// Handle to track proof calculation ordering +#[derive(Debug, Default)] +pub(crate) struct ProofSequencer { + /// The next expected proof sequence number + next_sequence: u64, + /// Buffer for out-of-order proofs + pending_proofs: BTreeMap, } -impl StateRootTaskState { - fn add_proofs(&mut self, proofs: MultiProof) { - match self { - Self::Idle(multiproof, _) | Self::Pending(multiproof, _) => { - multiproof.extend(proofs); +impl ProofSequencer { + /// Creates a new proof sequencer + pub(crate) fn new() -> Self { + Self::default() + } + + /// Gets the next sequence number and increments the counter + pub(crate) fn next_sequence(&mut self) -> u64 { + let seq = self.next_sequence; + self.next_sequence += 1; + seq + } + + /// Adds a proof and returns all sequential proofs if we have a continuous sequence + pub(crate) fn add_proof(&mut self, sequence: u64, proof: MultiProof) -> Vec { + self.pending_proofs.insert(sequence, proof); + + let mut has_next = true; + let mut ready_proofs = Vec::new(); + + while has_next { + has_next = false; + + // find the lowest sequence number in pending proofs + if let Some((&lowest_seq, _)) = self.pending_proofs.iter().next() { + if lowest_seq <= self.next_sequence { + if let Some(proof) = self.pending_proofs.remove(&lowest_seq) { + has_next = true; + ready_proofs.push(proof); + self.next_sequence = lowest_seq + 1; + } + } } } + + ready_proofs + } + + /// Returns true if we still have pending proofs + pub(crate) fn has_pending(&self) -> bool { + !self.pending_proofs.is_empty() } } @@ -110,19 +171,22 @@ impl StateRootTaskState { /// fetches the proofs for relevant accounts from the database and reveal them /// to the tree. /// Then it updates relevant leaves according to the result of the transaction. -#[allow(dead_code)] +#[derive(Debug)] pub(crate) struct StateRootTask { - /// Incoming state updates. - state_stream: StdReceiverStream, - /// Task configuration. + /// Internal channel for receiving all state root related messages + rx: Receiver, + /// Internal channel for sending messages to the state root task + tx: Sender, + /// Task configuration config: StateRootConfig, - /// Current state. + /// Current state state: HashedPostState, - /// Channels to retrieve proof calculation results from. - pending_proofs: VecDeque>>, - /// Prevents triggering state root calculation and returning the final result - /// without all the ongoing proof calculations received. - pending_calculation: bool, + /// Proof sequencing handler + proof_sequencer: ProofSequencer, + /// Whether we're currently calculating a root + calculating_root: bool, + /// Background thread handle for input state stream processing + _stream_handler: Option>, } #[allow(dead_code)] @@ -130,14 +194,42 @@ impl StateRootTask where Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, { - /// Creates a new `StateRootTask`. + /// Creates a new state root task with the unified message channel pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { + let (tx, rx) = mpsc::channel(); + + // spawn a background thread to forward state updates from StdReceiverStream + // to our internal message channel + let stream_tx = tx.clone(); + let stream_handler = thread::Builder::new() + .name("State Stream Handler".to_string()) + .spawn(move || { + loop { + match state_stream.recv() { + Ok(state) => { + if stream_tx.send(StateRootMessage::StateUpdate(state)).is_err() { + break; + } + } + Err(_) => { + // stream closed normally + debug!(target: "engine::root", "State stream closed normally"); + let _ = stream_tx.send(StateRootMessage::InputStreamClosed); + break; + } + } + } + }) + .expect("failed to spawn stream handler thread"); + Self { config, - state_stream, + rx, + tx, state: Default::default(), - pending_proofs: Default::default(), - pending_calculation: false, + proof_sequencer: ProofSequencer::new(), + calculating_root: false, + _stream_handler: Some(stream_handler), } } @@ -162,7 +254,8 @@ where input: Arc, update: EvmState, state: &mut HashedPostState, - pending_proofs: &mut VecDeque>>, + proof_sequence_number: u64, + state_root_message_sender: Sender, ) { let mut hashed_state_update = HashedPostState::default(); for (address, account) in update { @@ -202,127 +295,192 @@ where })) .collect::>(); - let (tx, rx) = mpsc::sync_channel(1); rayon::spawn(move || { let result = ParallelProof::new(view, input).multiproof(targets); - if let Err(ref e) = result { - error!(target: "engine::root", error = ?e, "Could not calculate multiproof"); - } - - if let Err(e) = tx.send(result) { - error!(target: "engine::root", error = ?e, "Could not send multiproof result"); + match result { + Ok(proof) => { + let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated { + proof, + sequence_number: proof_sequence_number, + }); + } + Err(e) => { + error!(target: "engine::root", error = ?e, "Could not calculate multiproof"); + } } }); - pending_proofs.push_back(rx); - state.extend(hashed_state_update); } - fn run(mut self) -> StateRootResult { - let mut task_state = StateRootTaskState::Idle(MultiProof::default(), B256::default()); - let mut trie_updates = TrieUpdates::default(); + /// Handler for new proof calculated, aggregates all the existing sequential proofs. + fn on_proof(&mut self, proof: MultiProof, sequence_number: u64) -> Option { + let ready_proofs = self.proof_sequencer.add_proof(sequence_number, proof); - loop { - // try to receive state updates without blocking - match self.state_stream.rx.try_recv() { - Ok(update) => { - debug!(target: "engine::root", len = update.len(), "Received new state update"); - Self::on_state_update( - self.config.consistent_view.clone(), - self.config.input.clone(), - update, - &mut self.state, - &mut self.pending_proofs, + if ready_proofs.is_empty() { + None + } else { + // combine all ready proofs into one + ready_proofs.into_iter().reduce(|mut acc, proof| { + acc.extend(proof); + acc + }) + } + } + + /// Spawns root calculation with the current state and proofs + fn spawn_root_calculation(&mut self, multiproof: MultiProof) { + trace!( + target: "engine::root", + account_proofs = multiproof.account_subtree.len(), + storage_proofs = multiproof.storages.len(), + "Spawning root calculation" + ); + + let tx = self.tx.clone(); + let view = self.config.consistent_view.clone(); + let input = self.config.input.clone(); + let state = self.state.clone(); + + self.calculating_root = true; + + rayon::spawn(move || { + let result = calculate_state_root_from_proofs( + view, + &input.nodes.clone().into_sorted(), + &input.state.clone().into_sorted(), + multiproof, + state, + ); + match result { + Ok((root, updates, elapsed)) => { + trace!( + target: "engine::root", + %root, + ?elapsed, + "Root calculation completed, sending result" ); - continue; + let _ = tx.send(StateRootMessage::RootCalculated { root, updates, elapsed }); } - Err(mpsc::TryRecvError::Empty) => { - // no new state updates available, continue with other operations - } - Err(mpsc::TryRecvError::Disconnected) => { - // state stream closed, check if we can finish - if self.pending_proofs.is_empty() && !self.pending_calculation { - if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state { - return Ok((*state_root, trie_updates)); - } - } + Err(e) => { + error!(target: "engine::root", error = ?e, "Could not calculate state root"); } } + }); + } - // check pending proofs - while let Some(proof_rx) = self.pending_proofs.front() { - match proof_rx.try_recv() { - Ok(result) => { - let multiproof = result?; - task_state.add_proofs(multiproof); - self.pending_proofs.pop_front(); - self.pending_calculation = true; - continue; - } - Err(mpsc::TryRecvError::Empty) => { - // this proof is not ready yet - break; - } - Err(mpsc::TryRecvError::Disconnected) => { - // channel was closed without sending a result - return Err(ParallelStateRootError::Other( - "Proof calculation task terminated unexpectedly".into(), - )); + fn run(mut self) -> StateRootResult { + let mut current_multiproof = MultiProof::default(); + let mut trie_updates = TrieUpdates::default(); + let mut current_root = B256::default(); + let mut updates_received = 0; + let mut proofs_processed = 0; + let mut roots_calculated = 0; + let mut input_stream_closed = false; + + loop { + match self.rx.recv() { + Ok(message) => match message { + StateRootMessage::StateUpdate(update) => { + updates_received += 1; + trace!( + target: "engine::root", + len = update.len(), + total_updates = updates_received, + "Received new state update" + ); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + self.proof_sequencer.next_sequence(), + self.tx.clone(), + ); } - } - } + StateRootMessage::ProofCalculated { proof, sequence_number } => { + proofs_processed += 1; + trace!( + target: "engine::root", + sequence = sequence_number, + total_proofs = proofs_processed, + "Processing calculated proof" + ); - // handle task state transitions - match &mut task_state { - StateRootTaskState::Pending(multiproof, rx) => { - match rx.try_recv() { - Ok(result) => match result { - Ok((state_root, mut new_multiproof, new_trie_updates, elapsed)) => { - debug!(target: "engine::root", %state_root, ?elapsed, "Computed intermediate root"); - trie_updates.extend(new_trie_updates); - new_multiproof.extend(std::mem::take(multiproof)); - task_state = StateRootTaskState::Idle(new_multiproof, state_root); - continue; + if let Some(combined_proof) = self.on_proof(proof, sequence_number) { + if self.calculating_root { + current_multiproof.extend(combined_proof); + } else { + self.spawn_root_calculation(combined_proof); } - Err(e) => return Err(ParallelStateRootError::Provider(e)), - }, - Err(mpsc::TryRecvError::Empty) => { - // root calculation not ready yet - } - Err(mpsc::TryRecvError::Disconnected) => { - return Err(ParallelStateRootError::Other( - "Root calculation task terminated unexpectedly".into(), - )); } } - } - StateRootTaskState::Idle(multiproof, _) => { - if self.pending_calculation { - debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task"); - let view = self.config.consistent_view.clone(); - let input_nodes_sorted = self.config.input.nodes.clone().into_sorted(); - let input_state_sorted = self.config.input.state.clone().into_sorted(); - let multiproof = std::mem::take(multiproof); - let state = self.state.clone(); - let (tx, rx) = mpsc::sync_channel(1); - - rayon::spawn(move || { - let result = calculate_state_root_from_proofs( - view, - &input_nodes_sorted, - &input_state_sorted, - multiproof, - state, + StateRootMessage::RootCalculated { root, updates, elapsed } => { + roots_calculated += 1; + trace!( + target: "engine::root", + %root, + ?elapsed, + roots_calculated, + "Computed intermediate root" + ); + current_root = root; + trie_updates.extend(updates); + self.calculating_root = false; + + // only spawn new calculation if we have accumulated new proofs + if !current_multiproof.account_subtree.is_empty() || + !current_multiproof.storages.is_empty() + { + trace!( + target: "engine::root", + account_proofs = current_multiproof.account_subtree.len(), + storage_proofs = current_multiproof.storages.len(), + "Spawning subsequent root calculation" ); - let _ = tx.send(result); - }); - - self.pending_calculation = false; - - task_state = StateRootTaskState::Pending(Default::default(), rx); - continue; + self.spawn_root_calculation(std::mem::take(&mut current_multiproof)); + } else if input_stream_closed && + proofs_processed >= updates_received && + !self.proof_sequencer.has_pending() + { + debug!( + target: "engine::root", + total_updates = updates_received, + total_proofs = proofs_processed, + roots_calculated, + "All proofs processed, ending calculation" + ); + return Ok((current_root, trie_updates)); + } } + StateRootMessage::InputStreamClosed => { + trace!( + target: "engine::root", + updates = updates_received, + proofs = proofs_processed, + "Input state stream closed" + ); + input_stream_closed = true; + + // check if we can finish immediately + if !self.calculating_root && + !self.proof_sequencer.has_pending() && + proofs_processed >= updates_received + { + return Ok((current_root, trie_updates)); + } + } + }, + Err(_) => { + // this means our internal message channel is closed, which shouldn't happen + // in normal operation since we hold both ends + error!( + target: "engine::root", + "Internal message channel closed unexpectedly" + ); + return Err(ParallelStateRootError::Other( + "Internal message channel closed unexpectedly".into(), + )); } } } @@ -336,7 +494,7 @@ pub fn calculate_state_root_from_proofs( input_state_sorted: &HashedPostStateSorted, multiproof: MultiProof, state: HashedPostState, -) -> ProviderResult<(B256, MultiProof, TrieUpdates, Duration)> +) -> ProviderResult<(B256, TrieUpdates, Duration)> where Factory: DatabaseProviderFactory + Clone, { @@ -469,7 +627,7 @@ where Ok(node) })?; - Ok((state_root, multiproof, Default::default(), started_at.elapsed())) + Ok((state_root, Default::default(), started_at.elapsed())) } /// Updates the sparse trie with the given proofs and state, and returns the updated trie and the @@ -612,6 +770,8 @@ mod tests { #[test] fn test_state_root_task() { + reth_tracing::init_test_tracing(); + let factory = create_test_provider_factory(); let (tx, rx) = std::sync::mpsc::channel(); let stream = StdReceiverStream::new(rx); From 00345a641e736c2cba3a7a82d4672e83cad6486b Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 17:12:18 +0100 Subject: [PATCH 14/22] fix add proof --- crates/engine/tree/src/tree/root.rs | 129 ++++++++++++++++++++++------ 1 file changed, 105 insertions(+), 24 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 39b7b658256c..9eac54d770f9 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -134,23 +134,13 @@ impl ProofSequencer { /// Adds a proof and returns all sequential proofs if we have a continuous sequence pub(crate) fn add_proof(&mut self, sequence: u64, proof: MultiProof) -> Vec { + self.next_sequence = self.next_sequence.max(sequence + 1); self.pending_proofs.insert(sequence, proof); + let mut ready_proofs = Vec::with_capacity(self.pending_proofs.len()); - let mut has_next = true; - let mut ready_proofs = Vec::new(); - - while has_next { - has_next = false; - - // find the lowest sequence number in pending proofs - if let Some((&lowest_seq, _)) = self.pending_proofs.iter().next() { - if lowest_seq <= self.next_sequence { - if let Some(proof) = self.pending_proofs.remove(&lowest_seq) { - has_next = true; - ready_proofs.push(proof); - self.next_sequence = lowest_seq + 1; - } - } + while let Some((&lowest_seq, _)) = self.pending_proofs.iter().next() { + if let Some(proof) = self.pending_proofs.remove(&lowest_seq) { + ready_proofs.push(proof); } } @@ -330,6 +320,11 @@ where /// Spawns root calculation with the current state and proofs fn spawn_root_calculation(&mut self, multiproof: MultiProof) { + if self.calculating_root { + return; + } + self.calculating_root = true; + trace!( target: "engine::root", account_proofs = multiproof.account_subtree.len(), @@ -342,8 +337,6 @@ where let input = self.config.input.clone(); let state = self.state.clone(); - self.calculating_root = true; - rayon::spawn(move || { let result = calculate_state_root_from_proofs( view, @@ -422,16 +415,29 @@ where %root, ?elapsed, roots_calculated, + proofs = proofs_processed, + updates = updates_received, "Computed intermediate root" ); current_root = root; trie_updates.extend(updates); self.calculating_root = false; + let has_new_proofs = !current_multiproof.account_subtree.is_empty() || + !current_multiproof.storages.is_empty(); + let all_proofs_received = proofs_processed >= updates_received; + let no_pending = !self.proof_sequencer.has_pending(); + + trace!( + target: "engine::root", + has_new_proofs, + all_proofs_received, + no_pending, + "State check" + ); + // only spawn new calculation if we have accumulated new proofs - if !current_multiproof.account_subtree.is_empty() || - !current_multiproof.storages.is_empty() - { + if has_new_proofs { trace!( target: "engine::root", account_proofs = current_multiproof.account_subtree.len(), @@ -439,10 +445,7 @@ where "Spawning subsequent root calculation" ); self.spawn_root_calculation(std::mem::take(&mut current_multiproof)); - } else if input_stream_closed && - proofs_processed >= updates_received && - !self.proof_sequencer.has_pending() - { + } else if input_stream_closed && all_proofs_received && no_pending { debug!( target: "engine::root", total_updates = updates_received, @@ -866,4 +869,82 @@ mod tests { "State root mismatch: task={root_from_task:?}, base={root_from_base:?}" ); } + + #[test] + fn test_add_proof_in_sequence() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + + let ready = sequencer.add_proof(0, proof1); + assert_eq!(ready.len(), 1); + assert!(!sequencer.has_pending()); + + let ready = sequencer.add_proof(1, proof2); + assert_eq!(ready.len(), 1); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_out_of_order() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + let proof3 = MultiProof::default(); + + let ready = sequencer.add_proof(2, proof3); + assert_eq!(ready.len(), 0); + assert!(sequencer.has_pending()); + + let ready = sequencer.add_proof(0, proof1); + assert_eq!(ready.len(), 1); + assert!(sequencer.has_pending()); + + let ready = sequencer.add_proof(1, proof2); + assert_eq!(ready.len(), 2); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_with_gaps() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof3 = MultiProof::default(); + + let ready = sequencer.add_proof(0, proof1); + assert_eq!(ready.len(), 1); + + let ready = sequencer.add_proof(2, proof3); + assert_eq!(ready.len(), 0); + assert!(sequencer.has_pending()); + } + + #[test] + fn test_add_proof_duplicate_sequence() { + let mut sequencer = ProofSequencer::new(); + let proof1 = MultiProof::default(); + let proof2 = MultiProof::default(); + + let ready = sequencer.add_proof(0, proof1); + assert_eq!(ready.len(), 1); + + let ready = sequencer.add_proof(0, proof2); + assert_eq!(ready.len(), 1); + assert!(!sequencer.has_pending()); + } + + #[test] + fn test_add_proof_batch_processing() { + let mut sequencer = ProofSequencer::new(); + let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect(); + + sequencer.add_proof(4, proofs[4].clone()); + sequencer.add_proof(2, proofs[2].clone()); + sequencer.add_proof(1, proofs[1].clone()); + sequencer.add_proof(3, proofs[3].clone()); + + let ready = sequencer.add_proof(0, proofs[0].clone()); + assert_eq!(ready.len(), 5); + assert!(!sequencer.has_pending()); + } } From 589b1b22e34c71b05473f367fa874969ba6856a1 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 18:46:00 +0100 Subject: [PATCH 15/22] fix add_proof --- crates/engine/tree/src/tree/root.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 9eac54d770f9..4b1f79eba7b2 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -134,17 +134,22 @@ impl ProofSequencer { /// Adds a proof and returns all sequential proofs if we have a continuous sequence pub(crate) fn add_proof(&mut self, sequence: u64, proof: MultiProof) -> Vec { - self.next_sequence = self.next_sequence.max(sequence + 1); + if sequence < self.next_sequence { + return vec![proof]; + } + + // Insert the new proof into pending proofs self.pending_proofs.insert(sequence, proof); - let mut ready_proofs = Vec::with_capacity(self.pending_proofs.len()); - while let Some((&lowest_seq, _)) = self.pending_proofs.iter().next() { - if let Some(proof) = self.pending_proofs.remove(&lowest_seq) { - ready_proofs.push(proof); - } + let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len()); + + // Keep taking proofs from pending_proofs as long as they form a consecutive sequence + while let Some(proof) = self.pending_proofs.remove(&self.next_sequence) { + consecutive_proofs.push(proof); + self.next_sequence += 1; } - ready_proofs + consecutive_proofs } /// Returns true if we still have pending proofs From 227fdc4db38727adb483b6d8c09c8eeb5a332306 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Fri, 22 Nov 2024 20:46:50 +0100 Subject: [PATCH 16/22] reduce test state elements --- crates/engine/tree/src/tree/root.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 4b1f79eba7b2..5f39304c92c4 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -784,7 +784,7 @@ mod tests { let (tx, rx) = std::sync::mpsc::channel(); let stream = StdReceiverStream::new(rx); - let state_updates = create_mock_state_updates(400, 20); + let state_updates = create_mock_state_updates(10, 10); let mut hashed_state = HashedPostState::default(); let mut accumulated_state: HashMap)> = HashMap::default(); From 992e96db326a8022943e9e308f1bd147fb2fcfd3 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 25 Nov 2024 12:47:26 +0100 Subject: [PATCH 17/22] refactor: remove incoming state updates thread --- crates/engine/tree/src/tree/root.rs | 292 ++++++++++++++-------------- 1 file changed, 151 insertions(+), 141 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 5f39304c92c4..cc8d6d70e910 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -27,7 +27,6 @@ use std::{ mpsc::{self, Receiver, RecvError, Sender}, Arc, }, - thread, time::{Duration, Instant}, }; use tracing::{debug, error, trace}; @@ -69,6 +68,7 @@ pub(crate) struct StateRootConfig { } /// Wrapper for std channel receiver to maintain compatibility with `UnboundedReceiverStream` +#[derive(Debug)] #[allow(dead_code)] pub(crate) struct StdReceiverStream { rx: Receiver, @@ -88,10 +88,6 @@ impl StdReceiverStream { /// Messages that can be received by the state root task #[derive(Debug)] pub(crate) enum StateRootMessage { - /// New state update from transaction execution - StateUpdate(EvmState), - /// Input state stream has closed - InputStreamClosed, /// Proof calculation completed for a specific state update ProofCalculated { /// The calculated proof @@ -158,6 +154,14 @@ impl ProofSequencer { } } +/// Statistics tracking. +#[derive(Debug, Default)] +struct TaskStats { + updates_received: u64, + proofs_processed: u64, + roots_calculated: u64, +} + /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -180,8 +184,8 @@ pub(crate) struct StateRootTask { proof_sequencer: ProofSequencer, /// Whether we're currently calculating a root calculating_root: bool, - /// Background thread handle for input state stream processing - _stream_handler: Option>, + /// Incoming state updates. + state_stream: StdReceiverStream, } #[allow(dead_code)] @@ -193,30 +197,6 @@ where pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { let (tx, rx) = mpsc::channel(); - // spawn a background thread to forward state updates from StdReceiverStream - // to our internal message channel - let stream_tx = tx.clone(); - let stream_handler = thread::Builder::new() - .name("State Stream Handler".to_string()) - .spawn(move || { - loop { - match state_stream.recv() { - Ok(state) => { - if stream_tx.send(StateRootMessage::StateUpdate(state)).is_err() { - break; - } - } - Err(_) => { - // stream closed normally - debug!(target: "engine::root", "State stream closed normally"); - let _ = stream_tx.send(StateRootMessage::InputStreamClosed); - break; - } - } - } - }) - .expect("failed to spawn stream handler thread"); - Self { config, rx, @@ -224,7 +204,7 @@ where state: Default::default(), proof_sequencer: ProofSequencer::new(), calculating_root: false, - _stream_handler: Some(stream_handler), + state_stream, } } @@ -367,128 +347,158 @@ where }); } + /// Handles internal messages (proofs and root calculations) and tracks state + fn handle_internal_message( + &mut self, + message: StateRootMessage, + stats: &mut TaskStats, + current_multiproof: &mut MultiProof, + trie_updates: &mut TrieUpdates, + current_root: &mut B256, + ) -> Option { + match message { + StateRootMessage::ProofCalculated { proof, sequence_number } => { + stats.proofs_processed += 1; + trace!( + target: "engine::root", + sequence = sequence_number, + total_proofs = stats.proofs_processed, + "Processing calculated proof" + ); + + if let Some(combined_proof) = self.on_proof(proof, sequence_number) { + if self.calculating_root { + current_multiproof.extend(combined_proof); + } else { + self.spawn_root_calculation(combined_proof); + } + } + None + } + StateRootMessage::RootCalculated { root, updates, elapsed } => { + stats.roots_calculated += 1; + trace!( + target: "engine::root", + %root, + ?elapsed, + roots_calculated = stats.roots_calculated, + proofs = stats.proofs_processed, + updates = stats.updates_received, + "Computed intermediate root" + ); + *current_root = root; + trie_updates.extend(updates); + self.calculating_root = false; + + let has_new_proofs = !current_multiproof.account_subtree.is_empty() || + !current_multiproof.storages.is_empty(); + let all_proofs_received = stats.proofs_processed >= stats.updates_received; + let no_pending = !self.proof_sequencer.has_pending(); + + if has_new_proofs { + trace!( + target: "engine::root", + account_proofs = current_multiproof.account_subtree.len(), + storage_proofs = current_multiproof.storages.len(), + "Spawning subsequent root calculation" + ); + self.spawn_root_calculation(std::mem::take(current_multiproof)); + None + } else if all_proofs_received && no_pending { + debug!( + target: "engine::root", + total_updates = stats.updates_received, + total_proofs = stats.proofs_processed, + roots_calculated = stats.roots_calculated, + "All proofs processed, ending calculation" + ); + Some(Ok((*current_root, trie_updates.clone()))) + } else { + None + } + } + } + } + + /// Handle internal message channel errors + fn handle_internal_error() -> StateRootResult { + error!(target: "engine::root", "Internal message channel closed unexpectedly"); + Err(ParallelStateRootError::Other("Internal message channel closed unexpectedly".into())) + } + fn run(mut self) -> StateRootResult { + let mut stats = TaskStats::default(); let mut current_multiproof = MultiProof::default(); let mut trie_updates = TrieUpdates::default(); let mut current_root = B256::default(); - let mut updates_received = 0; - let mut proofs_processed = 0; - let mut roots_calculated = 0; - let mut input_stream_closed = false; loop { - match self.rx.recv() { - Ok(message) => match message { - StateRootMessage::StateUpdate(update) => { - updates_received += 1; - trace!( - target: "engine::root", - len = update.len(), - total_updates = updates_received, - "Received new state update" - ); - Self::on_state_update( - self.config.consistent_view.clone(), - self.config.input.clone(), - update, - &mut self.state, - self.proof_sequencer.next_sequence(), - self.tx.clone(), - ); - } - StateRootMessage::ProofCalculated { proof, sequence_number } => { - proofs_processed += 1; - trace!( - target: "engine::root", - sequence = sequence_number, - total_proofs = proofs_processed, - "Processing calculated proof" - ); - - if let Some(combined_proof) = self.on_proof(proof, sequence_number) { - if self.calculating_root { - current_multiproof.extend(combined_proof); - } else { - self.spawn_root_calculation(combined_proof); + match self.state_stream.rx.try_recv() { + Ok(update) => { + stats.updates_received += 1; + trace!( + target: "engine::root", + len = update.len(), + total_updates = stats.updates_received, + "Received new state update" + ); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + self.proof_sequencer.next_sequence(), + self.tx.clone(), + ); + } + Err(mpsc::TryRecvError::Empty) => { + // No state updates available, try to process internal messages + match self.rx.recv() { + Ok(message) => { + if let Some(result) = self.handle_internal_message( + message, + &mut stats, + &mut current_multiproof, + &mut trie_updates, + &mut current_root, + ) { + return result; } } + Err(_) => return Self::handle_internal_error(), } - StateRootMessage::RootCalculated { root, updates, elapsed } => { - roots_calculated += 1; - trace!( - target: "engine::root", - %root, - ?elapsed, - roots_calculated, - proofs = proofs_processed, - updates = updates_received, - "Computed intermediate root" - ); - current_root = root; - trie_updates.extend(updates); - self.calculating_root = false; - - let has_new_proofs = !current_multiproof.account_subtree.is_empty() || - !current_multiproof.storages.is_empty(); - let all_proofs_received = proofs_processed >= updates_received; - let no_pending = !self.proof_sequencer.has_pending(); - - trace!( - target: "engine::root", - has_new_proofs, - all_proofs_received, - no_pending, - "State check" - ); + } + Err(mpsc::TryRecvError::Disconnected) => { + trace!( + target: "engine::root", + updates = stats.updates_received, + proofs = stats.proofs_processed, + "State stream closed" + ); - // only spawn new calculation if we have accumulated new proofs - if has_new_proofs { - trace!( - target: "engine::root", - account_proofs = current_multiproof.account_subtree.len(), - storage_proofs = current_multiproof.storages.len(), - "Spawning subsequent root calculation" - ); - self.spawn_root_calculation(std::mem::take(&mut current_multiproof)); - } else if input_stream_closed && all_proofs_received && no_pending { - debug!( - target: "engine::root", - total_updates = updates_received, - total_proofs = proofs_processed, - roots_calculated, - "All proofs processed, ending calculation" - ); - return Ok((current_root, trie_updates)); - } + // Check if we can finish immediately + if !self.calculating_root && + !self.proof_sequencer.has_pending() && + stats.proofs_processed >= stats.updates_received + { + return Ok((current_root, trie_updates)); } - StateRootMessage::InputStreamClosed => { - trace!( - target: "engine::root", - updates = updates_received, - proofs = proofs_processed, - "Input state stream closed" - ); - input_stream_closed = true; - - // check if we can finish immediately - if !self.calculating_root && - !self.proof_sequencer.has_pending() && - proofs_processed >= updates_received - { - return Ok((current_root, trie_updates)); + + // Otherwise, continue processing remaining proofs + match self.rx.recv() { + Ok(message) => { + if let Some(result) = self.handle_internal_message( + message, + &mut stats, + &mut current_multiproof, + &mut trie_updates, + &mut current_root, + ) { + return result; + } } + Err(_) => return Self::handle_internal_error(), } - }, - Err(_) => { - // this means our internal message channel is closed, which shouldn't happen - // in normal operation since we hold both ends - error!( - target: "engine::root", - "Internal message channel closed unexpectedly" - ); - return Err(ParallelStateRootError::Other( - "Internal message channel closed unexpectedly".into(), - )); } } } From 0d4e70bdf59bb87f479aa26c0bd6798d13501dd6 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 25 Nov 2024 12:54:43 +0100 Subject: [PATCH 18/22] StateRootMessage -> InternalMessage --- crates/engine/tree/src/tree/root.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index cc8d6d70e910..ebdf90c7f0b3 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -85,9 +85,9 @@ impl StdReceiverStream { } } -/// Messages that can be received by the state root task +/// Messages used internally by the state root task #[derive(Debug)] -pub(crate) enum StateRootMessage { +pub(crate) enum InternalMessage { /// Proof calculation completed for a specific state update ProofCalculated { /// The calculated proof @@ -173,9 +173,9 @@ struct TaskStats { #[derive(Debug)] pub(crate) struct StateRootTask { /// Internal channel for receiving all state root related messages - rx: Receiver, + rx: Receiver, /// Internal channel for sending messages to the state root task - tx: Sender, + tx: Sender, /// Task configuration config: StateRootConfig, /// Current state @@ -230,7 +230,7 @@ where update: EvmState, state: &mut HashedPostState, proof_sequence_number: u64, - state_root_message_sender: Sender, + state_root_message_sender: Sender, ) { let mut hashed_state_update = HashedPostState::default(); for (address, account) in update { @@ -274,7 +274,7 @@ where let result = ParallelProof::new(view, input).multiproof(targets); match result { Ok(proof) => { - let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated { + let _ = state_root_message_sender.send(InternalMessage::ProofCalculated { proof, sequence_number: proof_sequence_number, }); @@ -338,7 +338,7 @@ where ?elapsed, "Root calculation completed, sending result" ); - let _ = tx.send(StateRootMessage::RootCalculated { root, updates, elapsed }); + let _ = tx.send(InternalMessage::RootCalculated { root, updates, elapsed }); } Err(e) => { error!(target: "engine::root", error = ?e, "Could not calculate state root"); @@ -350,14 +350,14 @@ where /// Handles internal messages (proofs and root calculations) and tracks state fn handle_internal_message( &mut self, - message: StateRootMessage, + message: InternalMessage, stats: &mut TaskStats, current_multiproof: &mut MultiProof, trie_updates: &mut TrieUpdates, current_root: &mut B256, ) -> Option { match message { - StateRootMessage::ProofCalculated { proof, sequence_number } => { + InternalMessage::ProofCalculated { proof, sequence_number } => { stats.proofs_processed += 1; trace!( target: "engine::root", @@ -375,7 +375,7 @@ where } None } - StateRootMessage::RootCalculated { root, updates, elapsed } => { + InternalMessage::RootCalculated { root, updates, elapsed } => { stats.roots_calculated += 1; trace!( target: "engine::root", From 44d3a61672cdaf9ad6e03544a7919fb1e26c2ff4 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 25 Nov 2024 15:41:21 +0000 Subject: [PATCH 19/22] feat(engine): use non-parallel multiproof for state root task (#12849) --- crates/engine/tree/src/tree/root.rs | 16 +++++++++++++--- crates/trie/trie/src/input.rs | 2 +- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index ebdf90c7f0b3..af645d605e43 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -17,8 +17,8 @@ use reth_trie::{ HashedPostState, HashedPostStateSorted, HashedStorage, MultiProof, Nibbles, TrieAccount, TrieInput, EMPTY_ROOT_HASH, }; -use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; -use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError}; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseProof, DatabaseTrieCursorFactory}; +use reth_trie_parallel::root::ParallelStateRootError; use reth_trie_sparse::{SparseStateTrie, SparseStateTrieResult}; use revm_primitives::{keccak256, EvmState, B256}; use std::{ @@ -271,7 +271,17 @@ where .collect::>(); rayon::spawn(move || { - let result = ParallelProof::new(view, input).multiproof(targets); + let provider = match view.provider_ro() { + Ok(provider) => provider, + Err(error) => { + error!(target: "engine::root", ?error, "Could not get provider"); + return; + } + }; + + // TODO: replace with parallel proof + let result = + Proof::overlay_multiproof(provider.tx_ref(), input.as_ref().clone(), targets); match result { Ok(proof) => { let _ = state_root_message_sender.send(InternalMessage::ProofCalculated { diff --git a/crates/trie/trie/src/input.rs b/crates/trie/trie/src/input.rs index 18f9ada2f4ab..ea71558c2c1f 100644 --- a/crates/trie/trie/src/input.rs +++ b/crates/trie/trie/src/input.rs @@ -1,7 +1,7 @@ use crate::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState}; /// Inputs for trie-related computations. -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct TrieInput { /// The collection of cached in-memory intermediate trie nodes that /// can be reused for computation. From 776cef8a38944eb1edea9bcdaa2d60b34a8f3496 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 25 Nov 2024 17:08:40 +0100 Subject: [PATCH 20/22] add StateRootMessage::StateUpdate variant and manage internal task state with single channel --- crates/engine/tree/src/tree/root.rs | 277 +++++++++++----------------- 1 file changed, 111 insertions(+), 166 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index af645d605e43..d4a6879f94f9 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -87,7 +87,10 @@ impl StdReceiverStream { /// Messages used internally by the state root task #[derive(Debug)] -pub(crate) enum InternalMessage { +#[allow(dead_code)] +pub(crate) enum StateRootMessage { + /// New state update from transaction execution + StateUpdate(EvmState), /// Proof calculation completed for a specific state update ProofCalculated { /// The calculated proof @@ -154,14 +157,6 @@ impl ProofSequencer { } } -/// Statistics tracking. -#[derive(Debug, Default)] -struct TaskStats { - updates_received: u64, - proofs_processed: u64, - roots_calculated: u64, -} - /// Standalone task that receives a transaction state stream and updates relevant /// data structures to calculate state root. /// @@ -172,10 +167,10 @@ struct TaskStats { /// Then it updates relevant leaves according to the result of the transaction. #[derive(Debug)] pub(crate) struct StateRootTask { - /// Internal channel for receiving all state root related messages - rx: Receiver, - /// Internal channel for sending messages to the state root task - tx: Sender, + /// Receiver for state root related messages + rx: Receiver, + /// Sender for state root related messages + tx: Sender, /// Task configuration config: StateRootConfig, /// Current state @@ -184,8 +179,6 @@ pub(crate) struct StateRootTask { proof_sequencer: ProofSequencer, /// Whether we're currently calculating a root calculating_root: bool, - /// Incoming state updates. - state_stream: StdReceiverStream, } #[allow(dead_code)] @@ -194,9 +187,11 @@ where Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, { /// Creates a new state root task with the unified message channel - pub(crate) fn new(config: StateRootConfig, state_stream: StdReceiverStream) -> Self { - let (tx, rx) = mpsc::channel(); - + pub(crate) fn new( + config: StateRootConfig, + tx: Sender, + rx: Receiver, + ) -> Self { Self { config, rx, @@ -204,7 +199,6 @@ where state: Default::default(), proof_sequencer: ProofSequencer::new(), calculating_root: false, - state_stream, } } @@ -230,7 +224,7 @@ where update: EvmState, state: &mut HashedPostState, proof_sequence_number: u64, - state_root_message_sender: Sender, + state_root_message_sender: Sender, ) { let mut hashed_state_update = HashedPostState::default(); for (address, account) in update { @@ -284,7 +278,7 @@ where Proof::overlay_multiproof(provider.tx_ref(), input.as_ref().clone(), targets); match result { Ok(proof) => { - let _ = state_root_message_sender.send(InternalMessage::ProofCalculated { + let _ = state_root_message_sender.send(StateRootMessage::ProofCalculated { proof, sequence_number: proof_sequence_number, }); @@ -348,7 +342,7 @@ where ?elapsed, "Root calculation completed, sending result" ); - let _ = tx.send(InternalMessage::RootCalculated { root, updates, elapsed }); + let _ = tx.send(StateRootMessage::RootCalculated { root, updates, elapsed }); } Err(e) => { error!(target: "engine::root", error = ?e, "Could not calculate state root"); @@ -357,158 +351,110 @@ where }); } - /// Handles internal messages (proofs and root calculations) and tracks state - fn handle_internal_message( - &mut self, - message: InternalMessage, - stats: &mut TaskStats, - current_multiproof: &mut MultiProof, - trie_updates: &mut TrieUpdates, - current_root: &mut B256, - ) -> Option { - match message { - InternalMessage::ProofCalculated { proof, sequence_number } => { - stats.proofs_processed += 1; - trace!( - target: "engine::root", - sequence = sequence_number, - total_proofs = stats.proofs_processed, - "Processing calculated proof" - ); - - if let Some(combined_proof) = self.on_proof(proof, sequence_number) { - if self.calculating_root { - current_multiproof.extend(combined_proof); - } else { - self.spawn_root_calculation(combined_proof); - } - } - None - } - InternalMessage::RootCalculated { root, updates, elapsed } => { - stats.roots_calculated += 1; - trace!( - target: "engine::root", - %root, - ?elapsed, - roots_calculated = stats.roots_calculated, - proofs = stats.proofs_processed, - updates = stats.updates_received, - "Computed intermediate root" - ); - *current_root = root; - trie_updates.extend(updates); - self.calculating_root = false; - - let has_new_proofs = !current_multiproof.account_subtree.is_empty() || - !current_multiproof.storages.is_empty(); - let all_proofs_received = stats.proofs_processed >= stats.updates_received; - let no_pending = !self.proof_sequencer.has_pending(); - - if has_new_proofs { - trace!( - target: "engine::root", - account_proofs = current_multiproof.account_subtree.len(), - storage_proofs = current_multiproof.storages.len(), - "Spawning subsequent root calculation" - ); - self.spawn_root_calculation(std::mem::take(current_multiproof)); - None - } else if all_proofs_received && no_pending { - debug!( - target: "engine::root", - total_updates = stats.updates_received, - total_proofs = stats.proofs_processed, - roots_calculated = stats.roots_calculated, - "All proofs processed, ending calculation" - ); - Some(Ok((*current_root, trie_updates.clone()))) - } else { - None - } - } - } - } - - /// Handle internal message channel errors - fn handle_internal_error() -> StateRootResult { - error!(target: "engine::root", "Internal message channel closed unexpectedly"); - Err(ParallelStateRootError::Other("Internal message channel closed unexpectedly".into())) - } - fn run(mut self) -> StateRootResult { - let mut stats = TaskStats::default(); let mut current_multiproof = MultiProof::default(); let mut trie_updates = TrieUpdates::default(); - let mut current_root = B256::default(); + let mut current_root: B256; + let mut updates_received = 0; + let mut proofs_processed = 0; + let mut roots_calculated = 0; loop { - match self.state_stream.rx.try_recv() { - Ok(update) => { - stats.updates_received += 1; - trace!( - target: "engine::root", - len = update.len(), - total_updates = stats.updates_received, - "Received new state update" - ); - Self::on_state_update( - self.config.consistent_view.clone(), - self.config.input.clone(), - update, - &mut self.state, - self.proof_sequencer.next_sequence(), - self.tx.clone(), - ); - } - Err(mpsc::TryRecvError::Empty) => { - // No state updates available, try to process internal messages - match self.rx.recv() { - Ok(message) => { - if let Some(result) = self.handle_internal_message( - message, - &mut stats, - &mut current_multiproof, - &mut trie_updates, - &mut current_root, - ) { - return result; - } - } - Err(_) => return Self::handle_internal_error(), + match self.rx.recv() { + Ok(message) => match message { + StateRootMessage::StateUpdate(update) => { + updates_received += 1; + trace!( + target: "engine::root", + len = update.len(), + total_updates = updates_received, + "Received new state update" + ); + Self::on_state_update( + self.config.consistent_view.clone(), + self.config.input.clone(), + update, + &mut self.state, + self.proof_sequencer.next_sequence(), + self.tx.clone(), + ); } - } - Err(mpsc::TryRecvError::Disconnected) => { - trace!( - target: "engine::root", - updates = stats.updates_received, - proofs = stats.proofs_processed, - "State stream closed" - ); + StateRootMessage::ProofCalculated { proof, sequence_number } => { + proofs_processed += 1; + trace!( + target: "engine::root", + sequence = sequence_number, + total_proofs = proofs_processed, + "Processing calculated proof" + ); - // Check if we can finish immediately - if !self.calculating_root && - !self.proof_sequencer.has_pending() && - stats.proofs_processed >= stats.updates_received - { - return Ok((current_root, trie_updates)); + if let Some(combined_proof) = self.on_proof(proof, sequence_number) { + if self.calculating_root { + current_multiproof.extend(combined_proof); + } else { + self.spawn_root_calculation(combined_proof); + } + } } + StateRootMessage::RootCalculated { root, updates, elapsed } => { + roots_calculated += 1; + trace!( + target: "engine::root", + %root, + ?elapsed, + roots_calculated, + proofs = proofs_processed, + updates = updates_received, + "Computed intermediate root" + ); + current_root = root; + trie_updates.extend(updates); + self.calculating_root = false; + + let has_new_proofs = !current_multiproof.account_subtree.is_empty() || + !current_multiproof.storages.is_empty(); + let all_proofs_received = proofs_processed >= updates_received; + let no_pending = !self.proof_sequencer.has_pending(); + + trace!( + target: "engine::root", + has_new_proofs, + all_proofs_received, + no_pending, + "State check" + ); - // Otherwise, continue processing remaining proofs - match self.rx.recv() { - Ok(message) => { - if let Some(result) = self.handle_internal_message( - message, - &mut stats, - &mut current_multiproof, - &mut trie_updates, - &mut current_root, - ) { - return result; - } + // only spawn new calculation if we have accumulated new proofs + if has_new_proofs { + trace!( + target: "engine::root", + account_proofs = current_multiproof.account_subtree.len(), + storage_proofs = current_multiproof.storages.len(), + "Spawning subsequent root calculation" + ); + self.spawn_root_calculation(std::mem::take(&mut current_multiproof)); + } else if all_proofs_received && no_pending { + debug!( + target: "engine::root", + total_updates = updates_received, + total_proofs = proofs_processed, + roots_calculated, + "All proofs processed, ending calculation" + ); + return Ok((current_root, trie_updates)); } - Err(_) => return Self::handle_internal_error(), } + }, + Err(_) => { + // this means our internal message channel is closed, which shouldn't happen + // in normal operation since we hold both ends + error!( + target: "engine::root", + "Internal message channel closed unexpectedly" + ); + return Err(ParallelStateRootError::Other( + "Internal message channel closed unexpectedly".into(), + )); } } } @@ -802,7 +748,6 @@ mod tests { let factory = create_test_provider_factory(); let (tx, rx) = std::sync::mpsc::channel(); - let stream = StdReceiverStream::new(rx); let state_updates = create_mock_state_updates(10, 10); let mut hashed_state = HashedPostState::default(); @@ -878,11 +823,11 @@ mod tests { consistent_view: ConsistentDbView::new(factory, None), input: Arc::new(TrieInput::from_state(hashed_state)), }; - let task = StateRootTask::new(config, stream); + let task = StateRootTask::new(config, tx.clone(), rx); let handle = task.spawn(); for update in state_updates { - tx.send(update).expect("failed to send state"); + tx.send(StateRootMessage::StateUpdate(update)).expect("failed to send state"); } drop(tx); From 6b3d67ad03789b15a5fc3531ca3b897217b8d843 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 26 Nov 2024 13:20:20 +0100 Subject: [PATCH 21/22] Update crates/engine/tree/src/tree/root.rs Co-authored-by: Alexey Shekhirin --- crates/engine/tree/src/tree/root.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index d4a6879f94f9..64e82f05a75d 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -69,7 +69,6 @@ pub(crate) struct StateRootConfig { /// Wrapper for std channel receiver to maintain compatibility with `UnboundedReceiverStream` #[derive(Debug)] -#[allow(dead_code)] pub(crate) struct StdReceiverStream { rx: Receiver, } From 344a9b880eb13aa16a4296b9e879b3ea5689f644 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 27 Nov 2024 11:01:29 +0100 Subject: [PATCH 22/22] fix add_proof --- crates/engine/tree/Cargo.toml | 1 - crates/engine/tree/src/tree/root.rs | 36 +++++++++++++++++++++-------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index a651843c38ee..01d7e7e20241 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -120,6 +120,5 @@ test-utils = [ "reth-tracing", "reth-trie/test-utils", "reth-prune-types?/test-utils", - "reth-primitives-traits/test-utils", "reth-trie-db/test-utils", ] diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 64e82f05a75d..32bfbf68604b 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -111,8 +111,10 @@ pub(crate) enum StateRootMessage { /// Handle to track proof calculation ordering #[derive(Debug, Default)] pub(crate) struct ProofSequencer { - /// The next expected proof sequence number + /// The next proof sequence number to be produced. next_sequence: u64, + /// The next sequence number expected to be delivered. + next_to_deliver: u64, /// Buffer for out-of-order proofs pending_proofs: BTreeMap, } @@ -132,19 +134,31 @@ impl ProofSequencer { /// Adds a proof and returns all sequential proofs if we have a continuous sequence pub(crate) fn add_proof(&mut self, sequence: u64, proof: MultiProof) -> Vec { - if sequence < self.next_sequence { - return vec![proof]; + if sequence >= self.next_to_deliver { + self.pending_proofs.insert(sequence, proof); } - // Insert the new proof into pending proofs - self.pending_proofs.insert(sequence, proof); + // return early if we don't have the next expected proof + if !self.pending_proofs.contains_key(&self.next_to_deliver) { + return Vec::new() + } let mut consecutive_proofs = Vec::with_capacity(self.pending_proofs.len()); + let mut current_sequence = self.next_to_deliver; - // Keep taking proofs from pending_proofs as long as they form a consecutive sequence - while let Some(proof) = self.pending_proofs.remove(&self.next_sequence) { + // keep collecting proofs as long as we have consecutive sequence numbers + while let Some(proof) = self.pending_proofs.remove(¤t_sequence) { consecutive_proofs.push(proof); - self.next_sequence += 1; + current_sequence += 1; + + // if we don't have the next number, stop collecting + if !self.pending_proofs.contains_key(¤t_sequence) { + break; + } + } + + if !consecutive_proofs.is_empty() { + self.next_to_deliver += consecutive_proofs.len() as u64; } consecutive_proofs @@ -844,6 +858,7 @@ mod tests { let mut sequencer = ProofSequencer::new(); let proof1 = MultiProof::default(); let proof2 = MultiProof::default(); + sequencer.next_sequence = 2; let ready = sequencer.add_proof(0, proof1); assert_eq!(ready.len(), 1); @@ -860,6 +875,7 @@ mod tests { let proof1 = MultiProof::default(); let proof2 = MultiProof::default(); let proof3 = MultiProof::default(); + sequencer.next_sequence = 3; let ready = sequencer.add_proof(2, proof3); assert_eq!(ready.len(), 0); @@ -879,6 +895,7 @@ mod tests { let mut sequencer = ProofSequencer::new(); let proof1 = MultiProof::default(); let proof3 = MultiProof::default(); + sequencer.next_sequence = 3; let ready = sequencer.add_proof(0, proof1); assert_eq!(ready.len(), 1); @@ -898,7 +915,7 @@ mod tests { assert_eq!(ready.len(), 1); let ready = sequencer.add_proof(0, proof2); - assert_eq!(ready.len(), 1); + assert_eq!(ready.len(), 0); assert!(!sequencer.has_pending()); } @@ -906,6 +923,7 @@ mod tests { fn test_add_proof_batch_processing() { let mut sequencer = ProofSequencer::new(); let proofs: Vec<_> = (0..5).map(|_| MultiProof::default()).collect(); + sequencer.next_sequence = 5; sequencer.add_proof(4, proofs[4].clone()); sequencer.add_proof(2, proofs[2].clone());