Skip to content

Commit

Permalink
feat(engine): wire StateRootTask in EngineApiTreeHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Nov 29, 2024
1 parent 88bde87 commit 41dd658
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 29 deletions.
78 changes: 52 additions & 26 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::root::{ParallelStateRoot, ParallelStateRootError};
use revm_primitives::ResultAndState;
use root::{StateRootConfig, StateRootMessage, StateRootTask};
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque},
Expand Down Expand Up @@ -2210,13 +2211,26 @@ where

let exec_time = Instant::now();

// TODO: create StateRootTask with the receiving end of a channel and
// pass the sending end of the channel to the state hook.
let noop_state_hook = |_result_and_state: &ResultAndState| {};
let (state_root_tx, state_root_rx) = std::sync::mpsc::channel();

let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;

let input = self
.compute_trie_input(consistent_view.clone(), block.parent_hash)
.map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?;
let state_root_config = StateRootConfig { consistent_view, input: Arc::new(input) };
let state_root_task =
StateRootTask::new(state_root_config, state_root_tx.clone(), state_root_rx);
let state_root_handle = state_root_task.spawn();
let state_hook = move |result_and_state: &ResultAndState| {
let _ =
state_root_tx.send(StateRootMessage::StateUpdate(result_and_state.state.clone()));
};

let output = self.metrics.executor.execute_metered(
executor,
(&block, U256::MAX).into(),
Box::new(noop_state_hook),
Box::new(state_hook),
)?;

trace!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
Expand All @@ -2241,18 +2255,21 @@ where
let root_time = Instant::now();
let mut state_root_result = None;

// TODO: switch to calculate state root using `StateRootTask`.

// We attempt to compute state root in parallel if we are currently not persisting anything
// to database. This is safe, because the database state cannot change until we
// finish parallel computation. It is important that nothing is being persisted as
// we are computing in parallel, because we initialize a different database transaction
// per thread and it might end up with a different view of the database.
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_parallel(block.parent_hash, &hashed_state)
{
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut input = self
.compute_trie_input(consistent_view.clone(), block.parent_hash)
.map_err(|e| InsertBlockErrorKindTwo::Other(Box::new(e)))?;
// Extend with block we are validating root for.
input.append_ref(&hashed_state);

state_root_result = match self.compute_state_root_parallel(consistent_view, input) {
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
Expand All @@ -2263,6 +2280,14 @@ where
}

let (state_root, trie_output) = if let Some(result) = state_root_result {
match state_root_handle.wait_for_result() {
Ok(state_root_task_result) => {
info!(target: "engine::tree", block=?sealed_block.num_hash(), state_root_task_result=?state_root_task_result.0, regular_state_root_result = ?result.0);
}
Err(e) => {
info!(target: "engine::tree", error=?e, "on state root task wait_for_result")
}
}
result
} else {
debug!(target: "engine::tree", block=?sealed_block.num_hash(), persistence_in_progress, "Failed to compute state root in parallel");
Expand Down Expand Up @@ -2317,23 +2342,11 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
fn compute_state_root_parallel(
fn compute_trie_input(
&self,
consistent_view: ConsistentDbView<P>,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
// TODO: when we switch to calculate state root using `StateRootTask` this
// method can be still useful to calculate the required `TrieInput` to
// create the task.
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
) -> Result<TrieInput, ParallelStateRootError> {
let mut input = TrieInput::default();

if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
Expand All @@ -2353,9 +2366,22 @@ where
input.append(revert_state);
}

// Extend with block we are validating root for.
input.append_ref(hashed_state);
Ok(input)
}

/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
fn compute_state_root_parallel(
&self,
consistent_view: ConsistentDbView<P>,
input: TrieInput,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
}

Expand Down
3 changes: 0 additions & 3 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ pub(crate) type StateRootResult = Result<(B256, TrieUpdates), ParallelStateRootE

/// Handle to a spawned state root task.
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) struct StateRootHandle {
/// Channel for receiving the final result.
rx: mpsc::Receiver<StateRootResult>,
}

#[allow(dead_code)]
impl StateRootHandle {
/// Creates a new handle from a receiver.
pub(crate) const fn new(rx: mpsc::Receiver<StateRootResult>) -> Self {
Expand All @@ -60,7 +58,6 @@ pub(crate) struct StateRootConfig<Factory> {

/// Messages used internally by the state root task
#[derive(Debug)]
#[allow(dead_code)]
pub(crate) enum StateRootMessage {
/// New state update from transaction execution
StateUpdate(EvmState),
Expand Down

0 comments on commit 41dd658

Please sign in to comment.