Skip to content

Commit

Permalink
prevent state root calculation and returning final result without the…
Browse files Browse the repository at this point in the history
… ongoing proof calculations received
  • Loading branch information
fgimenez committed Nov 20, 2024
1 parent 8ee65e6 commit 52e1366
Showing 1 changed file with 37 additions and 23 deletions.
60 changes: 37 additions & 23 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ pub(crate) struct StateRootTask<Factory> {
state: HashedPostState,
/// Channels to retrieve proof calculation results from.
pending_proofs: VecDeque<Receiver<Result<MultiProof, ParallelStateRootError>>>,
/// Prevents triggering state root calculation and returning the final result
/// without all the ongoing proof calculations received.
pending_calculation: bool,
}

#[allow(dead_code)]
Expand All @@ -125,7 +128,13 @@ where
{
/// Creates a new `StateRootTask`.
pub(crate) fn new(config: StateRootConfig<Factory>, state_stream: StdReceiverStream) -> Self {
Self { config, state_stream, state: Default::default(), pending_proofs: Default::default() }
Self {
config,
state_stream,
state: Default::default(),
pending_proofs: Default::default(),
pending_calculation: false,
}
}

/// Spawns the state root task and returns a handle to await its result.
Expand Down Expand Up @@ -222,7 +231,7 @@ where
}
Err(mpsc::TryRecvError::Disconnected) => {
// state stream closed, check if we can finish
if self.pending_proofs.is_empty() {
if self.pending_proofs.is_empty() && !self.pending_calculation {
if let StateRootTaskState::Idle(_multiproof, state_root) = &task_state {
return Ok((*state_root, trie_updates));
}
Expand All @@ -237,6 +246,7 @@ where
let multiproof = result?;
task_state.add_proofs(multiproof);
self.pending_proofs.pop_front();
self.pending_calculation = true;
continue;
}
Err(mpsc::TryRecvError::Empty) => {
Expand Down Expand Up @@ -277,27 +287,31 @@ where
}
}
StateRootTaskState::Idle(multiproof, _) => {
debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task");
let view = self.config.consistent_view.clone();
let input_nodes_sorted = self.config.input.nodes.clone().into_sorted();
let input_state_sorted = self.config.input.state.clone().into_sorted();
let multiproof = std::mem::take(multiproof);
let state = self.state.clone();
let (tx, rx) = mpsc::sync_channel(1);

rayon::spawn(move || {
let result = calculate_state_root_from_proofs(
view,
&input_nodes_sorted,
&input_state_sorted,
multiproof,
state,
);
let _ = tx.send(result);
});

task_state = StateRootTaskState::Pending(Default::default(), rx);
continue;
if self.pending_calculation {
debug!(target: "engine::root", accounts_len = self.state.accounts.len(), "Spawning state root calculation from proofs task");
let view = self.config.consistent_view.clone();
let input_nodes_sorted = self.config.input.nodes.clone().into_sorted();
let input_state_sorted = self.config.input.state.clone().into_sorted();
let multiproof = std::mem::take(multiproof);
let state = self.state.clone();
let (tx, rx) = mpsc::sync_channel(1);

rayon::spawn(move || {
let result = calculate_state_root_from_proofs(
view,
&input_nodes_sorted,
&input_state_sorted,
multiproof,
state,
);
let _ = tx.send(result);
});

self.pending_calculation = false;

task_state = StateRootTaskState::Pending(Default::default(), rx);
continue;
}
}
}
}
Expand Down

0 comments on commit 52e1366

Please sign in to comment.