Skip to content

Commit

Permalink
complete run method
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Nov 12, 2024
1 parent f3727c1 commit da06971
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 15 deletions.
141 changes: 127 additions & 14 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! State root task related functionality.
use reth_errors::ProviderResult;
use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory};
use reth_trie::{updates::TrieUpdates, HashedPostState, HashedStorage, MultiProof, TrieInput};
use reth_trie_parallel::{proof::ParallelProof, root::ParallelStateRootError};
Expand All @@ -10,6 +11,7 @@ use std::{
mpsc::{self, Receiver, RecvError},
Arc,
},
time::{Duration, Instant},
};
use tracing::debug;

Expand Down Expand Up @@ -63,6 +65,24 @@ impl StdReceiverStream {
}
}

type StateRootProofResult = (B256, MultiProof, TrieUpdates, Duration);
type StateRootProofReceiver = mpsc::Receiver<ProviderResult<StateRootProofResult>>;

enum StateRootTaskState {
Idle(MultiProof, B256),
Pending(MultiProof, StateRootProofReceiver),
}

impl StateRootTaskState {
fn add_proofs(&mut self, proofs: MultiProof) {
match self {
Self::Idle(multiproof, _) | Self::Pending(multiproof, _) => {
multiproof.extend(proofs);
}
}
}
}

/// Standalone task that receives a transaction state stream and updates relevant
/// data structures to calculate state root.
///
Expand Down Expand Up @@ -165,24 +185,117 @@ where
}

fn run(mut self) -> StateRootResult {
while let Ok(update) = self.state_stream.recv() {
Self::on_state_update(
self.config.consistent_view.clone(),
self.config.input.clone(),
update,
&mut self.state,
&mut self.pending_proofs,
);
}
let mut task_state = StateRootTaskState::Idle(MultiProof::default(), B256::default());
let mut trie_updates = TrieUpdates::default();

loop {
// try to receive state updates without blocking
match self.state_stream.rx.try_recv() {
Ok(update) => {
debug!(target: "engine::root", len = update.len(), "Received new state update");
Self::on_state_update(
self.config.consistent_view.clone(),
self.config.input.clone(),
update,
&mut self.state,
&mut self.pending_proofs,
);
continue;
}
Err(mpsc::TryRecvError::Empty) => {
// no new state updates available, continue with other operations
}
Err(mpsc::TryRecvError::Disconnected) => {
// state stream closed, check if we can finish
if self.pending_proofs.is_empty() {
if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state {
return Ok((*state_root, trie_updates));
}
}
}
}

// TODO:
// * keep track of proof calculation
// * keep track of intermediate root computation
// * return final state root result
Ok((B256::default(), TrieUpdates::default()))
// check pending proofs
while let Some(proof_rx) = self.pending_proofs.front() {
match proof_rx.try_recv() {
Ok(result) => {
let multiproof = result?;
task_state.add_proofs(multiproof);
self.pending_proofs.pop_front();
continue;
}
Err(mpsc::TryRecvError::Empty) => {
// this proof is not ready yet
break;
}
Err(mpsc::TryRecvError::Disconnected) => {
// channel was closed without sending a result
return Err(ParallelStateRootError::Other(
"Proof calculation task terminated unexpectedly".into(),
));
}
}
}

// handle task state transitions
match &mut task_state {
StateRootTaskState::Pending(multiproof, rx) => {
match rx.try_recv() {
Ok(result) => match result {
Ok((state_root, mut new_multiproof, new_trie_updates, elapsed)) => {
debug!(target: "engine::root", %state_root, ?elapsed, "Computed intermediate root");
trie_updates.extend(new_trie_updates);
new_multiproof.extend(std::mem::take(multiproof));
task_state = StateRootTaskState::Idle(new_multiproof, state_root);
continue;
}
Err(e) => return Err(ParallelStateRootError::Provider(e)),
},
Err(mpsc::TryRecvError::Empty) => {
// root calculation not ready yet
}
Err(mpsc::TryRecvError::Disconnected) => {
return Err(ParallelStateRootError::Other(
"Root calculation task terminated unexpectedly".into(),
));
}
}
}
StateRootTaskState::Idle(multiproof, _) => {
debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task");
let view = self.config.consistent_view.clone();
let input = self.config.input.clone();
let multiproof = std::mem::take(multiproof);
let state = self.state.clone();
let (tx, rx) = mpsc::sync_channel(1);

rayon::spawn(move || {
let result =
calculate_state_root_from_proofs(view, input, multiproof, state);
let _ = tx.send(result);
});

task_state = StateRootTaskState::Pending(Default::default(), rx);
continue;
}
}
}
}
}

fn calculate_state_root_from_proofs<Factory>(
_view: ConsistentDbView<Factory>,
_input: Arc<TrieInput>,
_multiproof: MultiProof,
_state: HashedPostState,
) -> ProviderResult<(B256, MultiProof, TrieUpdates, Duration)>
where
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone,
{
let started_at = Instant::now();
Ok((B256::default(), MultiProof::default(), Default::default(), started_at.elapsed()))
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions crates/trie/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ proptest.workspace = true
proptest-arbitrary-interop.workspace = true
hash-db = "=0.15.2"
plain_hasher = "0.2"
alloy-primitives = { workspace = true, features = ["getrandom"] }

[features]
test-utils = [
Expand Down
89 changes: 88 additions & 1 deletion crates/trie/common/src/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use alloy_trie::{
use itertools::Itertools;
use reth_primitives_traits::Account;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::collections::{hash_map, HashMap};

/// The state multiproof of target accounts and multiproofs of their storage tries.
/// Multiproof is effectively a state subtrie that only contains the nodes
Expand Down Expand Up @@ -76,6 +76,35 @@ impl MultiProof {
}
Ok(AccountProof { address, info, proof, storage_root, storage_proofs })
}

/// Extends this multiproof with another one, merging both account and storage
/// proofs.
pub fn extend(&mut self, other: Self) {
self.account_subtree = self
.account_subtree
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.chain(other.account_subtree.into_inner())
.collect::<ProofNodes>();

for (hashed_address, storage) in other.storages {
match self.storages.entry(hashed_address) {
hash_map::Entry::Occupied(mut entry) => {
debug_assert_eq!(entry.get().root, storage.root);
entry.get_mut().subtree = entry
.get()
.subtree
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.chain(storage.subtree.into_inner())
.collect::<ProofNodes>();
}
hash_map::Entry::Vacant(entry) => {
entry.insert(storage);
}
}
}
}
}

/// The merkle multiproof of storage trie.
Expand Down Expand Up @@ -255,3 +284,61 @@ pub mod triehash {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_multiproof_extend_account_proofs() {
let mut proof1 = MultiProof::default();
let mut proof2 = MultiProof::default();

let addr1 = B256::random();
let addr2 = B256::random();

proof1.account_subtree.insert(
Nibbles::unpack(addr1),
alloy_rlp::encode_fixed_size(&U256::from(42)).to_vec().into(),
);
proof2.account_subtree.insert(
Nibbles::unpack(addr2),
alloy_rlp::encode_fixed_size(&U256::from(43)).to_vec().into(),
);

proof1.extend(proof2);

assert!(proof1.account_subtree.contains_key(&Nibbles::unpack(addr1)));
assert!(proof1.account_subtree.contains_key(&Nibbles::unpack(addr2)));
}

#[test]
fn test_multiproof_extend_storage_proofs() {
let mut proof1 = MultiProof::default();
let mut proof2 = MultiProof::default();

let addr = B256::random();
let root = B256::random();

let mut subtree1 = ProofNodes::default();
subtree1.insert(
Nibbles::from_nibbles(vec![0]),
alloy_rlp::encode_fixed_size(&U256::from(42)).to_vec().into(),
);
proof1.storages.insert(addr, StorageMultiProof { root, subtree: subtree1 });

let mut subtree2 = ProofNodes::default();
subtree2.insert(
Nibbles::from_nibbles(vec![1]),
alloy_rlp::encode_fixed_size(&U256::from(43)).to_vec().into(),
);
proof2.storages.insert(addr, StorageMultiProof { root, subtree: subtree2 });

proof1.extend(proof2);

let storage = proof1.storages.get(&addr).unwrap();
assert_eq!(storage.root, root);
assert!(storage.subtree.contains_key(&Nibbles::from_nibbles(vec![0])));
assert!(storage.subtree.contains_key(&Nibbles::from_nibbles(vec![1])));
}
}
5 changes: 5 additions & 0 deletions crates/trie/parallel/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets
use alloy_primitives::B256;
use alloy_rlp::{BufMut, Encodable};
use itertools::Itertools;
use reth_db::DatabaseError;
use reth_execution_errors::StorageRootError;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
Expand Down Expand Up @@ -228,6 +229,9 @@ pub enum ParallelStateRootError {
/// Provider error.
#[error(transparent)]
Provider(#[from] ProviderError),
/// Other unspecified error.
#[error("{_0}")]
Other(String),
}

impl From<ParallelStateRootError> for ProviderError {
Expand All @@ -237,6 +241,7 @@ impl From<ParallelStateRootError> for ProviderError {
ParallelStateRootError::StorageRoot(StorageRootError::Database(error)) => {
Self::Database(error)
}
ParallelStateRootError::Other(other) => Self::Database(DatabaseError::Other(other)),
}
}
}
Expand Down

0 comments on commit da06971

Please sign in to comment.