From 768e0d18755d0409e75208a357212352fbda0405 Mon Sep 17 00:00:00 2001 From: Abdul Basit Date: Thu, 11 Apr 2024 16:37:17 +0300 Subject: [PATCH] separate functions for validate and apply proposal --- Cargo.lock | 2 +- sequencer/src/block.rs | 4 +- sequencer/src/header.rs | 38 ++--- sequencer/src/state.rs | 325 +++++++++++++++------------------------- 4 files changed, 133 insertions(+), 236 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e78a2bc2d..31c0b1c82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4612,7 +4612,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite 0.2.14", - "socket2 0.4.10", + "socket2 0.5.6", "tokio", "tower-service", "tracing", diff --git a/sequencer/src/block.rs b/sequencer/src/block.rs index d12d4aec3..c4a391d6a 100644 --- a/sequencer/src/block.rs +++ b/sequencer/src/block.rs @@ -90,7 +90,9 @@ impl BlockPayload for Payload { BuilderCommitment::from_raw_digest(digest.finalize()) } - fn get_transactions(&self, _: &Self::Metadata) -> &Vec { + fn get_transactions(&self, _metadata: &Self::Metadata) -> &Vec { + // self.enumerate(metadata).map(|(i, t)| t).collect() + unimplemented!() } } diff --git a/sequencer/src/header.rs b/sequencer/src/header.rs index 22cac7599..e1369072d 100644 --- a/sequencer/src/header.rs +++ b/sequencer/src/header.rs @@ -373,7 +373,10 @@ mod test_headers { use crate::{ catchup::mock::MockStateCatchup, l1_client::L1Client, - state::{validate_and_apply_proposal, BlockMerkleTree, Delta, FeeMerkleTree}, + state::{ + apply_proposal, get_l1_deposits, validate_proposal, BlockMerkleTree, Delta, + FeeMerkleTree, + }, NodeState, }; use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; @@ -655,17 +658,11 @@ mod test_headers { validated_state.block_merkle_tree = block_merkle_tree.clone(); parent_header.block_merkle_tree_root = block_merkle_tree_root; let mut proposal = parent_header.clone(); - let mut delta = Delta::default(); // Advance `proposal.height` to trigger validation error. - let result = validate_and_apply_proposal( - &mut validated_state, - &mut delta, - &parent_leaf, - &proposal, - vec![], - ) - .unwrap_err(); + let mut delta = Delta::default(); + apply_proposal(&mut validated_state, &mut delta, &parent_leaf, vec![]); + let result = validate_proposal(&validated_state, &parent_leaf, &proposal).unwrap_err(); assert_eq!( format!("{}", result.root_cause()), "Invalid Height Error: 0, 0" @@ -674,14 +671,7 @@ mod test_headers { // proposed `Header` root should include parent + // parent.commit proposal.height += 1; - let result = validate_and_apply_proposal( - &mut validated_state, - &mut delta, - &parent_leaf, - &proposal, - vec![], - ) - .unwrap_err(); + let result = validate_proposal(&validated_state, &parent_leaf, &proposal).unwrap_err(); // Fails b/c `proposal` has not advanced from `parent` assert!(format!("{}", result.root_cause()).contains("Invalid Block Root Error")); } @@ -749,15 +739,11 @@ mod test_headers { let mut block_merkle_tree = proposal_state.block_merkle_tree.clone(); block_merkle_tree.push(proposal.commit()).unwrap(); + let l1_deposits = get_l1_deposits(&genesis_state, &proposal, &parent_leaf).await; + let mut delta = Delta::default(); - validate_and_apply_proposal( - &mut proposal_state, - &mut delta, - &parent_leaf, - &proposal.clone(), - vec![], - ) - .unwrap(); + apply_proposal(&mut proposal_state, &mut delta, &parent_leaf, l1_deposits); + validate_proposal(&proposal_state, &parent_leaf, &proposal.clone()).unwrap(); assert_eq!( proposal_state.block_merkle_tree.commitment(), proposal.block_merkle_tree_root diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index eae875433..148aeeeb1 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -8,6 +8,7 @@ use ark_serialize::{ CanonicalDeserialize, CanonicalSerialize, Compress, Read, SerializationError, Valid, Validate, }; use async_std::stream::StreamExt; +use async_std::sync::RwLock; use async_trait::async_trait; use committable::{Commitment, Committable, RawCommitmentBuilder}; use contract_bindings::fee_contract::DepositFilter; @@ -44,6 +45,7 @@ use jf_primitives::{ }; use num_traits::CheckedSub; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use std::time::Duration; use std::{collections::HashSet, ops::Add, str::FromStr}; @@ -150,13 +152,10 @@ impl ValidatedState { } } } - -pub fn validate_and_apply_proposal( - state: &mut ValidatedState, - delta: &mut Delta, +pub fn validate_proposal( + state: &ValidatedState, parent_leaf: &Leaf, proposal: &Header, - receipts: Vec, ) -> anyhow::Result<()> { let parent_header = parent_leaf.get_block_header(); // validate height @@ -174,8 +173,6 @@ pub fn validate_and_apply_proposal( fee_merkle_tree, } = state; - // validate proposal is descendent of parent by appending to parent - block_merkle_tree.push(parent_header.commit()).unwrap(); let block_merkle_tree_root = block_merkle_tree.commitment(); anyhow::ensure!( proposal.block_merkle_tree_root == block_merkle_tree_root, @@ -186,17 +183,6 @@ pub fn validate_and_apply_proposal( ) ); - // Insert the fee deposits - for FeeInfo { account, amount } in receipts { - fee_merkle_tree - .update_with(account, |balance| { - Some(balance.cloned().unwrap_or_default().add(amount)) - }) - .expect("update_with succeeds"); - - delta.fees_delta.insert(account); - } - let fee_merkle_tree_root = fee_merkle_tree.commitment(); anyhow::ensure!( proposal.fee_merkle_tree_root == fee_merkle_tree_root, @@ -251,12 +237,8 @@ fn charge_fee( } } -/// Validate builder account by verifying signature and charging the account. -fn validate_and_charge_builder( - fee_merkle_tree: &mut FeeMerkleTree, - delta: &mut Delta, - proposed_header: &Header, -) -> anyhow::Result<()> { +/// Validate builder account by verifying signature +fn validate_builder(proposed_header: &Header) -> anyhow::Result<()> { // Beware of Malice! let builder_signature = proposed_header .builder_signature @@ -274,61 +256,17 @@ fn validate_and_charge_builder( "Invalid Builder Signature" ); - // charge the fee to the builder - if charge_fee(fee_merkle_tree, fee_info).is_err() { - bail!("Insufficient funds") - }; - - delta.fees_delta.insert(fee_info.account); - Ok(()) -} - -/// A pure function to validate and apply a header to the state. -/// -/// It assumes that all state required to validate and apply the header -/// is available in the `validated_state`. -fn validate_and_apply_header( - validated_state: &mut ValidatedState, - delta: &mut Delta, - parent_leaf: &Leaf, - proposed_header: &Header, - l1_deposits: Vec, -) -> Result<(), BlockError> { - // validate proposed header against parent - match validate_and_apply_proposal( - validated_state, - delta, - parent_leaf, - proposed_header, - l1_deposits, - ) { - // Note that currently only block state is updated. - Ok(validated_state) => validated_state, - Err(e) => { - tracing::warn!("Invalid Proposal: {}", e); - return Err(BlockError::InvalidBlockHeader); - } - }; - - // Validate builder by verifying signature and charging account - if let Err(e) = - validate_and_charge_builder(&mut validated_state.fee_merkle_tree, delta, proposed_header) - { - tracing::warn!("Invalid Builder: {}", e); - return Err(BlockError::InvalidBlockHeader); - }; - Ok(()) } #[derive(Debug)] -struct SqlStateCatchup<'a> { - db: &'a mut SqlDataSource, +struct SqlStateCatchup { + db: Arc>>, block_height: u64, } #[async_trait] -impl<'a> StateCatchup for SqlStateCatchup<'a> { +impl StateCatchup for SqlStateCatchup { async fn fetch_accounts( &self, _view: ViewNumber, @@ -341,6 +279,8 @@ impl<'a> StateCatchup for SqlStateCatchup<'a> { let proof = self .db + .read() + .await .get_path( Snapshot::::Index(block_height), account, @@ -388,6 +328,8 @@ impl<'a> StateCatchup for SqlStateCatchup<'a> { let proof = self .db + .read() + .await .get_path(Snapshot::::Index(bh), bh - 1) .await?; @@ -401,23 +343,25 @@ impl<'a> StateCatchup for SqlStateCatchup<'a> { } pub async fn update_state_storage( - storage: &mut SqlDataSource, - instance: &NodeState, + storage: Arc>>, + instance: &mut NodeState, parent_leaf: &LeafQueryData, proposed_leaf: &LeafQueryData, ) -> anyhow::Result<()> { let proposed_leaf = proposed_leaf.leaf(); let parent_leaf = parent_leaf.leaf(); - let header = parent_leaf.get_block_header(); + let header = proposed_leaf.get_block_header(); let validated_state = ValidatedState::from_header(header); let catchup = SqlStateCatchup { - db: storage, + db: storage.clone(), block_height: parent_leaf.get_height(), }; + instance.peers = Arc::new(catchup); + let (state, delta) = validated_state - .apply_header(instance, parent_leaf, header, catchup) + .apply_header(instance, parent_leaf, header) .await?; let block_number = proposed_leaf.get_height(); @@ -440,6 +384,8 @@ pub async fn update_state_storage( ); storage + .write() + .await .store_state::(proof.proof, path, block_number) .await .context("failed to insert merkle nodes for block merkle tree")?; @@ -456,11 +402,15 @@ pub async fn update_state_storage( ); storage + .write() + .await .store_state::(proof.proof, path, block_number) .await .context("failed to insert merkle nodes for block merkle tree")?; storage + .write() + .await .set_last_state_height(block_number as usize) .await .context("failed to set last state height")?; @@ -469,21 +419,29 @@ pub async fn update_state_storage( } pub async fn update_state_storage_loop( - mut storage: SqlDataSource, - instance: NodeState, + storage: SqlDataSource, + mut instance: NodeState, ) -> anyhow::Result<()> { // get last saved merklized state let last_height = storage.get_last_state_height().await?; - let mut parent_leaf = storage.get_leaf(last_height).await.resolve().await; - let mut leaves = storage.subscribe_leaves(last_height + 1).await; + let storage = Arc::new(RwLock::new(storage)); + + let mut parent_leaf = storage + .read() + .await + .get_leaf(last_height) + .await + .resolve() + .await; + let mut leaves = storage.read().await.subscribe_leaves(last_height + 1).await; while let Some(leaf) = leaves.next().await { loop { - match update_state_storage(&mut storage, &instance, &parent_leaf, &leaf).await { + match update_state_storage(storage.clone(), &mut instance, &parent_leaf, &leaf).await { Ok(()) => { - if let Err(e) = storage.commit().await { + if let Err(e) = storage.write().await.commit().await { tracing::error!("failed to commit for leaf {leaf:?} {e}",); async_std::task::sleep(Duration::from_secs(2)).await; continue; @@ -492,7 +450,7 @@ pub async fn update_state_storage_loop( break; } Err(e) => { - storage.revert().await; + storage.write().await.revert().await; tracing::error!( "failed to update state storage for leaf {leaf:?} {e} retrying..", ); @@ -512,56 +470,13 @@ impl ValidatedState { instance: &NodeState, parent_leaf: &Leaf, proposed_header: &Header, - catchup: impl StateCatchup, ) -> anyhow::Result<(Self, Delta)> { // Clone state to avoid mutation. Consumer can take update // through returned value. - let mut validated_state = self.clone(); + let l1_deposits = get_l1_deposits(instance, proposed_header, parent_leaf).await; - // Fetch the new L1 deposits between parent and current finalized L1 block. - let l1_deposits = if let Some(block_info) = proposed_header.l1_finalized { - instance - .l1_client - .get_finalized_deposits( - parent_leaf - .get_block_header() - .l1_finalized - .map(|block_info| block_info.number), - block_info.number, - ) - .await - } else { - vec![] - }; - - // pushing a block into merkle tree shouldn't fail - validated_state - .block_merkle_tree - .push(parent_leaf.get_block_header().commit()) - .unwrap(); - - let mut delta = Delta::default(); - for FeeInfo { account, amount } in l1_deposits.iter() { - validated_state - .fee_merkle_tree - .update_with(account, |balance| { - Some(balance.cloned().unwrap_or_default().add(*amount)) - }) - .expect("update_with succeeds"); - delta.fees_delta.insert(*account); - } - - if charge_fee( - &mut validated_state.fee_merkle_tree, - proposed_header.fee_info, - ) - .is_err() - { - bail!("Insufficient funds") - }; - - delta.fees_delta.insert(proposed_header.fee_info.account); + let mut validated_state = self.clone(); let accounts = std::iter::once(proposed_header.fee_info.account); @@ -576,11 +491,11 @@ impl ValidatedState { if self.need_to_fetch_blocks_mt_frontier() { tracing::warn!("fetching block frontier from peers"); - // Unwrapping here is okay as we retry until we get the proof or until the task is canceled. - catchup + instance + .peers + .as_ref() .remember_blocks_merkle_tree(view, &mut validated_state.block_merkle_tree) - .await - .unwrap(); + .await?; } // Fetch missing fee state entries @@ -590,14 +505,15 @@ impl ValidatedState { missing_accounts.len() ); - let missing_account_proofs = catchup + let missing_account_proofs = instance + .peers + .as_ref() .fetch_accounts( view, validated_state.fee_merkle_tree.commitment(), missing_accounts, ) - .await - .expect("failed to fetch accounts"); + .await?; // Remember the fee state entries for account in missing_account_proofs.iter() { @@ -608,10 +524,69 @@ impl ValidatedState { } } + let mut delta = Delta::default(); + + apply_proposal(&mut validated_state, &mut delta, parent_leaf, l1_deposits); + // validate_builder(&mut self.fee_merkle_tree, &mut delta, proposed_header)?; + if charge_fee( + &mut validated_state.fee_merkle_tree, + proposed_header.fee_info, + ) + .is_err() + { + bail!("Insufficient funds") + }; + + delta.fees_delta.insert(proposed_header.fee_info.account); + Ok((validated_state, delta)) } } +pub async fn get_l1_deposits( + instance: &NodeState, + header: &Header, + parent_leaf: &Leaf, +) -> Vec { + if let Some(block_info) = header.l1_finalized { + instance + .l1_client + .get_finalized_deposits( + parent_leaf + .get_block_header() + .l1_finalized + .map(|block_info| block_info.number), + block_info.number, + ) + .await + } else { + vec![] + } +} + +pub fn apply_proposal( + validated_state: &mut ValidatedState, + delta: &mut Delta, + parent_leaf: &Leaf, + l1_deposits: Vec, +) { + // pushing a block into merkle tree shouldn't fail + validated_state + .block_merkle_tree + .push(parent_leaf.get_block_header().commit()) + .unwrap(); + + for FeeInfo { account, amount } in l1_deposits.iter() { + validated_state + .fee_merkle_tree + .update_with(account, |balance| { + Some(balance.cloned().unwrap_or_default().add(*amount)) + }) + .expect("update_with succeeds"); + delta.fees_delta.insert(*account); + } +} + impl HotShotState for ValidatedState { type Error = BlockError; type Instance = NodeState; @@ -632,83 +607,17 @@ impl HotShotState for ValidatedState { parent_leaf: &Leaf, proposed_header: &Header, ) -> Result<(Self, Self::Delta), Self::Error> { - // Clone state to avoid mutation. Consumer can take update - // through returned value. - let mut validated_state = self.clone(); - - let accounts = std::iter::once(proposed_header.fee_info.account); - + // convert to function // Fetch the new L1 deposits between parent and current finalized L1 block. - let l1_deposits = if let Some(block_info) = proposed_header.l1_finalized { - instance - .l1_client - .get_finalized_deposits( - parent_leaf - .get_block_header() - .l1_finalized - .map(|block_info| block_info.number), - block_info.number, - ) - .await - } else { - vec![] - }; - - // Find missing state entries - let missing_accounts = self.forgotten_accounts( - accounts.chain(l1_deposits.iter().map(|fee_info| fee_info.account)), - ); - let view = parent_leaf.get_view_number(); - - // Ensure merkle tree has frontier - if self.need_to_fetch_blocks_mt_frontier() { - tracing::warn!("fetching block frontier from peers"); - instance - .peers - .as_ref() - .remember_blocks_merkle_tree(view, &mut validated_state.block_merkle_tree) - .await - .unwrap(); - } - - // Fetch missing fee state entries - if !missing_accounts.is_empty() { - tracing::warn!( - "fetching {} missing accounts from peers", - missing_accounts.len() - ); - - let missing_account_proofs = instance - .peers - .as_ref() - .fetch_accounts( - view, - validated_state.fee_merkle_tree.commitment(), - missing_accounts, - ) - .await - .expect("failed to fetch accounts"); - - // Remember the fee state entries - for account in missing_account_proofs.iter() { - account - .proof - .remember(&mut validated_state.fee_merkle_tree) - .expect("proof previously verified"); - } - } + validate_builder(proposed_header).unwrap(); - let mut delta = Delta::default(); + let (validated_state, delta) = self + .apply_header(instance, parent_leaf, proposed_header) + .await + .unwrap(); - // Lastly validate and apply the header - validate_and_apply_header( - &mut validated_state, - &mut delta, - parent_leaf, - proposed_header, - l1_deposits, - )?; + validate_proposal(self, parent_leaf, proposed_header).unwrap(); Ok((validated_state, delta)) }