From 3563c9057fcc3568cff4c67194554251710572bb Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Tue, 22 Oct 2024 10:46:01 -0700 Subject: [PATCH] Fix fee catchup after restart (#2160) * Add regression test for fee catchup after restart This test reproduces an issue discovered in staging, in which, after a staggered restart of all nodes, the network has forgotten the fee state for all accounts except those which actively produced blocks during the restart. Since the state is needed for undecided blocks, it cannot directly be obtained from persisted merklized state, which is only available for decided blocks. * Add bulk endpoint for account catchup * Enable account catchup from persisted decided storage * Cache fetched state in validated state map after fetching from database * Support migration from existing nodes populating leaf_hash column * Update HotShot to enable caching of fetched account states * Rename migration after merge * Add proposal fetching task * Return leaf when fetching account state so we can always add it to the HotShot state map * Fix deadlock in restart tests During experimentation I changed the restart tests from starting up all nodes in parallel to one at a time. This broke the no-CDN tests because, in some cases, you need multiple nodes to restart before libp2p can become ready, but this blocks the initialization of the first restarted node. Switched it back to starting up in parallel and things are working again. * Update sequencer/api/catchup.toml Co-authored-by: Mathis * Avoid dropping TaskList, to avoid blocking executor thread in drop handler * Exit proposal fetching task if it becomes obsolete The simplest, most robust change was to make the task a flat retry loop, instead of having a nested retry loop for the fetch itself, and to update the anchor view each iteration of the loop, so that the loop exits if we reach a decide and the anchor view becomes more recent than the view we are fetching. * Enable frontier catchup from storage (#2183) * Enable frontier catchup from storage * Prevent recursive state reconstruction * Prefetch accounts used for paying fees during state reconstruction * Remove no-longer-necessary catchup impl for DB transactions * Completely disable catchup when not required (NullStateCatchup) * Fix backoff CLI parsing * Use the proper chain config during state reconstruction Closes #2186 * Add comment * Fix migration conflict * Make migrations consistent with release-gambit --------- Co-authored-by: Mathis --- Cargo.lock | 15 +- sequencer/api/catchup.toml | 14 + .../V38__add_quorum_proposal_hash.sql | 2 + ...icate.sql => V39__upgrade_certificate.sql} | 0 ...f_chain.sql => V40__anchor_leaf_chain.sql} | 0 sequencer/src/api.rs | 165 +++++-- sequencer/src/api/data_source.rs | 73 +-- sequencer/src/api/endpoints.rs | 50 ++- sequencer/src/api/fs.rs | 6 +- sequencer/src/api/options.rs | 8 +- sequencer/src/api/sql.rs | 421 +++++++++++++++--- sequencer/src/block/full_payload/payload.rs | 2 +- sequencer/src/catchup.rs | 276 +++++++++--- sequencer/src/context.rs | 182 +++++++- sequencer/src/lib.rs | 2 +- sequencer/src/persistence/fs.rs | 12 + sequencer/src/persistence/no_storage.rs | 7 + sequencer/src/persistence/sql.rs | 141 +++++- sequencer/src/restart_tests.rs | 270 +++++++++-- sequencer/src/state.rs | 69 ++- .../v0/impls/block/full_payload/payload.rs | 15 +- types/src/v0/impls/fee_info.rs | 28 ++ types/src/v0/impls/header.rs | 95 +++- types/src/v0/impls/instance_state.rs | 25 +- types/src/v0/impls/mod.rs | 7 +- types/src/v0/impls/state.rs | 36 +- types/src/v0/mod.rs | 4 +- types/src/v0/traits.rs | 138 +++--- types/src/v0/utils.rs | 18 +- 29 files changed, 1661 insertions(+), 420 deletions(-) create mode 100644 sequencer/api/migrations/V38__add_quorum_proposal_hash.sql rename sequencer/api/migrations/{V38__upgrade_certificate.sql => V39__upgrade_certificate.sql} (100%) rename sequencer/api/migrations/{V37__anchor_leaf_chain.sql => V40__anchor_leaf_chain.sql} (100%) diff --git a/Cargo.lock b/Cargo.lock index 65c36f33b..18f76f800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4928,7 +4928,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core 0.52.0", + "windows-core", ] [[package]] @@ -9247,7 +9247,7 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" dependencies = [ - "heck 0.5.0", + "heck 0.4.1", "proc-macro2", "quote", "syn 2.0.77", @@ -11174,7 +11174,7 @@ version = "0.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" dependencies = [ - "windows-core 0.51.1", + "windows-core", "windows-targets 0.48.5", ] @@ -11187,15 +11187,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "windows-core" -version = "0.52.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" -dependencies = [ - "windows-targets 0.52.6", -] - [[package]] name = "windows-registry" version = "0.2.0" diff --git a/sequencer/api/catchup.toml b/sequencer/api/catchup.toml index af7d942ba..8de2f5eda 100644 --- a/sequencer/api/catchup.toml +++ b/sequencer/api/catchup.toml @@ -27,6 +27,20 @@ the proof is a Merkle _non-membership_ proof. ``` """ +[route.accounts] +PATH = ["/:height/:view/accounts"] +":height" = "Integer" +":view" = "Integer" +METHOD = "POST" +DOC = """ +Bulk version of `/:height/:view/account`. The request body should be a JSON array consisting of +TaggedBase64-encoded fee accounts. + +The response is a `FeeMerkleTree` containing sub-trees for each of the requested accounts, which is +a more condensed way to represent the union of account proofs for each requested account. Individual +Merkle proofs for each account can be extracted from this tree. +""" + [route.blocks] PATH = ["/:height/:view/blocks"] ":height" = "Integer" diff --git a/sequencer/api/migrations/V38__add_quorum_proposal_hash.sql b/sequencer/api/migrations/V38__add_quorum_proposal_hash.sql new file mode 100644 index 000000000..a38893ec3 --- /dev/null +++ b/sequencer/api/migrations/V38__add_quorum_proposal_hash.sql @@ -0,0 +1,2 @@ +ALTER TABLE quorum_proposals + ADD COLUMN leaf_hash VARCHAR; diff --git a/sequencer/api/migrations/V38__upgrade_certificate.sql b/sequencer/api/migrations/V39__upgrade_certificate.sql similarity index 100% rename from sequencer/api/migrations/V38__upgrade_certificate.sql rename to sequencer/api/migrations/V39__upgrade_certificate.sql diff --git a/sequencer/api/migrations/V37__anchor_leaf_chain.sql b/sequencer/api/migrations/V40__anchor_leaf_chain.sql similarity index 100% rename from sequencer/api/migrations/V37__anchor_leaf_chain.sql rename to sequencer/api/migrations/V40__anchor_leaf_chain.sql diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 3ed59729c..1af0cab08 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -4,14 +4,14 @@ use anyhow::{bail, Context}; use async_once_cell::Lazy; use async_std::sync::{Arc, RwLock}; use async_trait::async_trait; -use committable::Commitment; +use committable::{Commitment, Committable}; use data_source::{CatchupDataSource, SubmitDataSource}; use derivative::Derivative; use espresso_types::{ - v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData, BlockMerkleTree, - FeeAccountProof, MockSequencerVersions, NodeState, PubKey, Transaction, + retain_accounts, v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData, + BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleTree, MockSequencerVersions, NodeState, + PubKey, Transaction, ValidatedState, }; -use ethers::prelude::Address; use futures::{ future::{BoxFuture, Future, FutureExt}, stream::BoxStream, @@ -26,14 +26,17 @@ use hotshot_types::{ event::Event, light_client::StateSignatureRequestBody, network::NetworkConfig, - traits::{network::ConnectedNetwork, node_implementation::Versions}, + traits::{network::ConnectedNetwork, node_implementation::Versions, ValidatedState as _}, + utils::{View, ViewInner}, }; use jf_merkle_tree::MerkleTreeScheme; -use self::data_source::{HotShotConfigDataSource, PublicNetworkConfig, StateSignatureDataSource}; +use self::data_source::{ + HotShotConfigDataSource, NodeStateDataSource, PublicNetworkConfig, StateSignatureDataSource, +}; use crate::{ - context::Consensus, network, state_signature::StateSigner, SeqTypes, SequencerApiVersion, - SequencerContext, + catchup::CatchupStorage, context::Consensus, network, state_signature::StateSigner, SeqTypes, + SequencerApiVersion, SequencerContext, }; pub mod data_source; @@ -105,10 +108,6 @@ impl, P: SequencerPersistence, V: Versions> ApiState Arc::clone(&self.consensus.as_ref().get().await.get_ref().handle) } - async fn node_state(&self) -> &NodeState { - &self.consensus.as_ref().get().await.get_ref().node_state - } - async fn network_config(&self) -> NetworkConfig { self.consensus.as_ref().get().await.get_ref().config.clone() } @@ -189,36 +188,120 @@ impl, V: Versions, P: SequencerPersistence> SubmitDa } } +impl NodeStateDataSource for StorageState +where + N: ConnectedNetwork, + V: Versions, + P: SequencerPersistence, + D: Sync, +{ + async fn node_state(&self) -> &NodeState { + self.as_ref().node_state().await + } +} + impl< N: ConnectedNetwork, V: Versions, P: SequencerPersistence, - D: CatchupDataSource + Send + Sync, + D: CatchupStorage + Send + Sync, > CatchupDataSource for StorageState { - #[tracing::instrument(skip(self))] - async fn get_account( + #[tracing::instrument(skip(self, instance))] + async fn get_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, - account: Address, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { // Check if we have the desired state in memory. - match self.as_ref().get_account(height, view, account).await { - Ok(account) => return Ok(account), + match self + .as_ref() + .get_accounts(instance, height, view, accounts) + .await + { + Ok(accounts) => return Ok(accounts), Err(err) => { - tracing::info!("account is not in memory, trying storage: {err:#}"); + tracing::info!("accounts not in memory, trying storage: {err:#}"); } } // Try storage. - self.inner().get_account(height, view, account).await + let (tree, leaf) = self + .inner() + .get_accounts(instance, height, view, accounts) + .await + .context("accounts not in memory, and could not fetch from storage")?; + // If we successfully fetched accounts from storage, try to add them back into the in-memory + // state. + let handle = self.as_ref().consensus().await; + let handle = handle.read().await; + let consensus = handle.consensus(); + let mut consensus = consensus.write().await; + let (state, delta, leaf_commit) = match consensus.validated_state_map().get(&view) { + Some(View { + view_inner: ViewInner::Leaf { state, delta, leaf }, + }) => { + let mut state = (**state).clone(); + + // Add the fetched accounts to the state. + for account in accounts { + if let Some((proof, _)) = FeeAccountProof::prove(&tree, (*account).into()) { + if let Err(err) = proof.remember(&mut state.fee_merkle_tree) { + tracing::warn!( + ?view, + %account, + "cannot update fetched account state: {err:#}" + ); + } + } else { + tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree"); + }; + } + + (Arc::new(state), delta.clone(), *leaf) + } + _ => { + // If we don't already have a leaf for this view, or if we don't have the view + // at all, we can create a new view based on the recovered leaf and add it to + // our state map. In this case, we must also add the leaf to the saved leaves + // map to ensure consistency. + let mut state = ValidatedState::from_header(leaf.block_header()); + state.fee_merkle_tree = tree.clone(); + let res = (Arc::new(state), None, Committable::commit(&leaf)); + consensus + .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) + .await; + res + } + }; + if let Err(err) = consensus.update_validated_state_map( + view, + View { + view_inner: ViewInner::Leaf { + state, + delta, + leaf: leaf_commit, + }, + }, + ) { + tracing::warn!(?view, "cannot update fetched account state: {err:#}"); + } + tracing::info!(?view, "updated with fetched account state"); + + Ok(tree) } #[tracing::instrument(skip(self))] - async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result { + async fn get_frontier( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + ) -> anyhow::Result { // Check if we have the desired state in memory. - match self.as_ref().get_frontier(height, view).await { + match self.as_ref().get_frontier(instance, height, view).await { Ok(frontier) => return Ok(frontier), Err(err) => { tracing::info!("frontier is not in memory, trying storage: {err:#}"); @@ -226,7 +309,7 @@ impl< } // Try storage. - self.inner().get_frontier(height, view).await + self.inner().get_frontier(instance, height, view).await } async fn get_chain_config( @@ -265,16 +348,28 @@ impl< // } // } +impl NodeStateDataSource for ApiState +where + N: ConnectedNetwork, + V: Versions, + P: SequencerPersistence, +{ + async fn node_state(&self) -> &NodeState { + &self.consensus.as_ref().get().await.get_ref().node_state + } +} + impl, V: Versions, P: SequencerPersistence> CatchupDataSource for ApiState { - #[tracing::instrument(skip(self))] - async fn get_account( + #[tracing::instrument(skip(self, _instance))] + async fn get_accounts( &self, + _instance: &NodeState, height: u64, view: ViewNumber, - account: Address, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { let state = self .consensus() .await @@ -285,14 +380,16 @@ impl, V: Versions, P: SequencerPersistence> CatchupD .context(format!( "state not available for height {height}, view {view:?}" ))?; - let (proof, balance) = FeeAccountProof::prove(&state.fee_merkle_tree, account).context( - format!("account {account} not available for height {height}, view {view:?}"), - )?; - Ok(AccountQueryData { balance, proof }) + retain_accounts(&state.fee_merkle_tree, accounts.iter().copied()) } #[tracing::instrument(skip(self))] - async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result { + async fn get_frontier( + &self, + _instance: &NodeState, + height: u64, + view: ViewNumber, + ) -> anyhow::Result { let state = self .consensus() .await @@ -1925,7 +2022,7 @@ mod test { // Fetch the config from node 1, a different node than the one running the service. let validator = ValidatorConfig::generated_from_seed_indexed([0; 32], 1, 1, false); - let mut config = peers.fetch_config(validator.clone()).await; + let mut config = peers.fetch_config(validator.clone()).await.unwrap(); // Check the node-specific information in the recovered config is correct. assert_eq!(config.node_index, 1); diff --git a/sequencer/src/api/data_source.rs b/sequencer/src/api/data_source.rs index 507b0b6f6..7e16e4c9d 100644 --- a/sequencer/src/api/data_source.rs +++ b/sequencer/src/api/data_source.rs @@ -1,18 +1,17 @@ use std::{num::NonZeroUsize, time::Duration}; -use anyhow::{bail, Context}; +use anyhow::Context; use async_trait::async_trait; use committable::Commitment; use espresso_types::{ v0::traits::{PersistenceOptions, SequencerPersistence}, v0_3::ChainConfig, - PubKey, Transaction, + FeeAccount, FeeAccountProof, FeeMerkleTree, NodeState, PubKey, Transaction, }; -use ethers::prelude::Address; use futures::future::Future; use hotshot_query_service::{ availability::AvailabilityDataSource, - data_source::{MetricsDataSource, VersionedDataSource}, + data_source::VersionedDataSource, fetching::provider::{AnyProvider, QueryServiceProvider}, node::NodeDataSource, status::StatusDataSource, @@ -110,7 +109,11 @@ pub(crate) trait StateSignatureDataSource> { async fn get_state_signature(&self, height: u64) -> Option; } -pub(crate) trait CatchupDataSource { +pub(crate) trait NodeStateDataSource { + fn node_state(&self) -> impl Send + Future; +} + +pub(crate) trait CatchupDataSource: Sync { /// Get the state of the requested `account`. /// /// The state is fetched from a snapshot at the given height and view, which _must_ correspond! @@ -119,19 +122,36 @@ pub(crate) trait CatchupDataSource { /// decided view. fn get_account( &self, - _height: u64, - _view: ViewNumber, - _account: Address, + instance: &NodeState, + height: u64, + view: ViewNumber, + account: FeeAccount, ) -> impl Send + Future> { - // Merklized state catchup is only supported by persistence backends that provide merklized - // state storage. This default implementation is overridden for those that do. Otherwise, - // catchup can still be provided by fetching undecided merklized state from consensus - // memory. - async { - bail!("merklized state catchup is not supported for this data source"); + async move { + let tree = self + .get_accounts(instance, height, view, &[account]) + .await?; + let (proof, balance) = FeeAccountProof::prove(&tree, account.into()).context( + format!("account {account} not available for height {height}, view {view:?}"), + )?; + Ok(AccountQueryData { balance, proof }) } } + /// Get the state of the requested `accounts`. + /// + /// The state is fetched from a snapshot at the given height and view, which _must_ correspond! + /// `height` is provided to simplify lookups for backends where data is not indexed by view. + /// This function is intended to be used for catchup, so `view` should be no older than the last + /// decided view. + fn get_accounts( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + accounts: &[FeeAccount], + ) -> impl Send + Future>; + /// Get the blocks Merkle tree frontier. /// /// The state is fetched from a snapshot at the given height and view, which _must_ correspond! @@ -140,30 +160,17 @@ pub(crate) trait CatchupDataSource { /// decided view. fn get_frontier( &self, - _height: u64, - _view: ViewNumber, - ) -> impl Send + Future> { - // Merklized state catchup is only supported by persistence backends that provide merklized - // state storage. This default implementation is overridden for those that do. Otherwise, - // catchup can still be provided by fetching undecided merklized state from consensus - // memory. - async { - bail!("merklized state catchup is not supported for this data source"); - } - } + instance: &NodeState, + height: u64, + view: ViewNumber, + ) -> impl Send + Future>; fn get_chain_config( &self, - _commitment: Commitment, - ) -> impl Send + Future> { - async { - bail!("chain config catchup is not supported for this data source"); - } - } + commitment: Commitment, + ) -> impl Send + Future>; } -impl CatchupDataSource for MetricsDataSource {} - /// This struct defines the public Hotshot validator configuration. /// Private key and state key pairs are excluded for security reasons. diff --git a/sequencer/src/api/endpoints.rs b/sequencer/src/api/endpoints.rs index e3a097d92..8b22fe985 100644 --- a/sequencer/src/api/endpoints.rs +++ b/sequencer/src/api/endpoints.rs @@ -7,7 +7,7 @@ use std::{ use anyhow::Result; use committable::Committable; -use espresso_types::{NamespaceId, NsProof, PubKey, Transaction}; +use espresso_types::{FeeAccount, NamespaceId, NsProof, PubKey, Transaction}; use futures::{try_join, FutureExt}; use hotshot_query_service::{ availability::{self, AvailabilityDataSource, CustomSnafu, FetchBlockSnafu}, @@ -32,8 +32,8 @@ use vbs::version::StaticVersionType; use super::{ data_source::{ - CatchupDataSource, HotShotConfigDataSource, SequencerDataSource, StateSignatureDataSource, - SubmitDataSource, + CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, SequencerDataSource, + StateSignatureDataSource, SubmitDataSource, }, StorageState, }; @@ -215,7 +215,7 @@ pub(super) fn catchup( ) -> Result> where S: 'static + Send + Sync + ReadState, - S::State: Send + Sync + CatchupDataSource, + S::State: Send + Sync + NodeStateDataSource + CatchupDataSource, { let toml = toml::from_str::(include_str!("../../api/catchup.toml"))?; let mut api = Api::::new(toml)?; @@ -239,12 +239,50 @@ where })?; state - .get_account(height, ViewNumber::new(view), account) + .get_account( + state.node_state().await, + height, + ViewNumber::new(view), + account, + ) .await .map_err(|err| Error::catch_all(StatusCode::NOT_FOUND, format!("{err:#}"))) } .boxed() })? + .at("accounts", |req, state| { + async move { + let height = req + .integer_param("height") + .map_err(Error::from_request_error)?; + let view = req + .integer_param("view") + .map_err(Error::from_request_error)?; + let accounts = req + .body_auto::, ApiVer>(ApiVer::instance()) + .map_err(Error::from_request_error)?; + + state + .read(|state| { + async move { + state + .get_accounts( + state.node_state().await, + height, + ViewNumber::new(view), + &accounts, + ) + .await + .map_err(|err| { + Error::catch_all(StatusCode::NOT_FOUND, format!("{err:#}")) + }) + } + .boxed() + }) + .await + } + .boxed() + })? .get("blocks", |req, state| { async move { let height = req @@ -255,7 +293,7 @@ where .map_err(Error::from_request_error)?; state - .get_frontier(height, ViewNumber::new(view)) + .get_frontier(state.node_state().await, height, ViewNumber::new(view)) .await .map_err(|err| Error::catch_all(StatusCode::NOT_FOUND, format!("{err:#}"))) } diff --git a/sequencer/src/api/fs.rs b/sequencer/src/api/fs.rs index 542eeb7fe..004831f8a 100644 --- a/sequencer/src/api/fs.rs +++ b/sequencer/src/api/fs.rs @@ -3,8 +3,8 @@ use std::path::Path; use async_trait::async_trait; use hotshot_query_service::data_source::FileSystemDataSource; -use super::data_source::{CatchupDataSource, Provider, SequencerDataSource}; -use crate::{persistence::fs::Options, SeqTypes}; +use super::data_source::{Provider, SequencerDataSource}; +use crate::{catchup::CatchupStorage, persistence::fs::Options, SeqTypes}; pub type DataSource = FileSystemDataSource; @@ -26,7 +26,7 @@ impl SequencerDataSource for DataSource { } } -impl CatchupDataSource for DataSource {} +impl CatchupStorage for DataSource {} #[cfg(test)] mod impl_testable_data_source { diff --git a/sequencer/src/api/options.rs b/sequencer/src/api/options.rs index e053b6711..79f35d7af 100644 --- a/sequencer/src/api/options.rs +++ b/sequencer/src/api/options.rs @@ -27,14 +27,15 @@ use vbs::version::StaticVersionType; use super::{ data_source::{ - provider, CatchupDataSource, HotShotConfigDataSource, SequencerDataSource, - StateSignatureDataSource, SubmitDataSource, + provider, CatchupDataSource, HotShotConfigDataSource, NodeStateDataSource, + SequencerDataSource, StateSignatureDataSource, SubmitDataSource, }, endpoints, fs, sql, update::ApiEventConsumer, ApiState, StorageState, }; use crate::{ + catchup::CatchupStorage, context::{SequencerContext, TaskList}, persistence, state::update_state_storage_loop, @@ -265,7 +266,7 @@ impl Options { where N: ConnectedNetwork, P: SequencerPersistence, - D: SequencerDataSource + CatchupDataSource + Send + Sync + 'static, + D: SequencerDataSource + CatchupStorage + Send + Sync + 'static, for<'a> D::Transaction<'a>: UpdateDataSource, { let metrics = ds.populate_metrics(); @@ -391,6 +392,7 @@ impl Options { + Sync + SubmitDataSource + StateSignatureDataSource + + NodeStateDataSource + CatchupDataSource + HotShotConfigDataSource, N: ConnectedNetwork, diff --git a/sequencer/src/api/sql.rs b/sequencer/src/api/sql.rs index 6e2b1c441..fd9458b41 100644 --- a/sequencer/src/api/sql.rs +++ b/sequencer/src/api/sql.rs @@ -1,28 +1,45 @@ -use anyhow::{bail, Context}; +use anyhow::{bail, ensure, Context}; use async_trait::async_trait; -use committable::Commitment; - -use espresso_types::{v0_3::ChainConfig, BlockMerkleTree, FeeAccountProof, FeeMerkleTree}; -use ethers::prelude::Address; -use hotshot_query_service::data_source::storage::sql::Write; +use committable::{Commitment, Committable}; +use espresso_types::{ + get_l1_deposits, + v0_3::{ChainConfig, IterableFeeInfo}, + BlockMerkleTree, FeeAccount, FeeMerkleTree, Leaf, NodeState, ValidatedState, +}; +use hotshot::traits::ValidatedState as _; use hotshot_query_service::{ + availability::LeafId, data_source::{ sql::{Config, SqlDataSource, Transaction}, - storage::{sql::query_as, MerklizedStateStorage, SqlStorage}, + storage::{ + sql::{query_as, Db, TransactionMode, Write}, + AvailabilityStorage, MerklizedStateStorage, NodeStorage, SqlStorage, + }, VersionedDataSource, }, merklized_state::Snapshot, Resolvable, }; -use hotshot_types::data::ViewNumber; -use jf_merkle_tree::{prelude::MerkleNode, MerkleTreeScheme}; +use hotshot_types::{ + data::{QuorumProposal, ViewNumber}, + message::Proposal, + traits::node_implementation::ConsensusTime, +}; +use jf_merkle_tree::{ + prelude::MerkleNode, ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, + LookupResult, MerkleTreeScheme, +}; +use sqlx::{Encode, Type}; +use std::collections::{HashSet, VecDeque}; use super::{ - data_source::{CatchupDataSource, Provider, SequencerDataSource}, - AccountQueryData, BlocksFrontier, + data_source::{Provider, SequencerDataSource}, + BlocksFrontier, }; use crate::{ + catchup::{CatchupStorage, NullStateCatchup}, persistence::{sql::Options, ChainConfigPersistence}, + state::compute_state_update, SeqTypes, }; @@ -58,56 +75,73 @@ impl SequencerDataSource for DataSource { } } -impl CatchupDataSource for SqlStorage { - async fn get_account( +impl CatchupStorage for SqlStorage { + async fn get_accounts( &self, + instance: &NodeState, height: u64, - _view: ViewNumber, - account: Address, - ) -> anyhow::Result { - let proof = self - .read() - .await - .context(format!( - "opening transaction to fetch account {account}; height {height}" - ))? - .get_path( - Snapshot::::Index(height), - account.into(), - ) + view: ViewNumber, + accounts: &[FeeAccount], + ) -> anyhow::Result<(FeeMerkleTree, Leaf)> { + let mut tx = self.read().await.context(format!( + "opening transaction to fetch account {accounts:?}; height {height}" + ))?; + + let block_height = NodeStorage::::block_height(&mut tx) .await - .context(format!("fetching account {account}; height {height}"))?; + .context("getting block height")? as u64; + ensure!( + block_height > 0, + "cannot get accounts for height {height}: no blocks available" + ); - match proof.proof.first().context(format!( - "empty proof for account {account}; height {height}" - ))? { - MerkleNode::Leaf { pos, elem, .. } => Ok(AccountQueryData { - balance: (*elem).into(), - proof: FeeAccountProof::presence(*pos, proof), - }), - - MerkleNode::Empty => Ok(AccountQueryData { - balance: 0_u64.into(), - proof: FeeAccountProof::absence(account.into(), proof), - }), - _ => { - bail!("Invalid proof"); - } + // Check if we have the desired state snapshot. If so, we can load the desired accounts + // directly. + if height < block_height { + load_accounts(&mut tx, height, accounts).await + } else { + // If we do not have the exact snapshot we need, we can try going back to the last + // snapshot we _do_ have and replaying subsequent blocks to compute the desired state. + let (state, leaf) = + reconstruct_state(instance, &mut tx, block_height - 1, view, accounts).await?; + Ok((state.fee_merkle_tree, leaf)) } } - async fn get_frontier(&self, height: u64, _view: ViewNumber) -> anyhow::Result { - self.read() - .await - .context(format!( - "opening transaction to fetch frontier at height {height}" - ))? - .get_path( - Snapshot::::Index(height), - height - 1, - ) + async fn get_frontier( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + ) -> anyhow::Result { + let mut tx = self.read().await.context(format!( + "opening transaction to fetch frontier at height {height}" + ))?; + + let block_height = NodeStorage::::block_height(&mut tx) .await - .context(format!("fetching frontier at height {height}")) + .context("getting block height")? as u64; + ensure!( + block_height > 0, + "cannot get frontier for height {height}: no blocks available" + ); + + // Check if we have the desired state snapshot. If so, we can load the desired frontier + // directly. + if height < block_height { + load_frontier(&mut tx, height).await + } else { + // If we do not have the exact snapshot we need, we can try going back to the last + // snapshot we _do_ have and replaying subsequent blocks to compute the desired state. + let (state, _) = + reconstruct_state(instance, &mut tx, block_height - 1, view, &[]).await?; + match state.block_merkle_tree.lookup(height - 1) { + LookupResult::Ok(_, proof) => Ok(proof), + _ => { + bail!("state snapshot {view:?},{height} was found but does not contain frontier at height {}; this should not be possible", height - 1); + } + } + } } async fn get_chain_config( @@ -117,29 +151,37 @@ impl CatchupDataSource for SqlStorage { let mut tx = self.read().await.context(format!( "opening transaction to fetch chain config {commitment}" ))?; + load_chain_config(&mut tx, commitment).await + } +} - let (data,) = query_as::<(Vec,)>("SELECT * from chain_config where commitment = $1") - .bind(commitment.to_string()) - .fetch_one(tx.as_mut()) +impl CatchupStorage for DataSource { + async fn get_accounts( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + accounts: &[FeeAccount], + ) -> anyhow::Result<(FeeMerkleTree, Leaf)> { + self.as_ref() + .get_accounts(instance, height, view, accounts) .await - .unwrap(); - - bincode::deserialize(&data[..]).context("failed to deserialize") } -} -impl CatchupDataSource for DataSource { - async fn get_account( + async fn get_frontier( &self, + instance: &NodeState, height: u64, view: ViewNumber, - account: Address, - ) -> anyhow::Result { - self.as_ref().get_account(height, view, account).await + ) -> anyhow::Result { + self.as_ref().get_frontier(instance, height, view).await } - async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result { - self.as_ref().get_frontier(height, view).await + async fn get_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { + self.as_ref().get_chain_config(commitment).await } } @@ -159,6 +201,251 @@ impl ChainConfigPersistence for Transaction { } } +async fn load_frontier( + tx: &mut Transaction, + height: u64, +) -> anyhow::Result { + tx.get_path( + Snapshot::::Index(height), + height - 1, + ) + .await + .context(format!("fetching frontier at height {height}")) +} + +async fn load_accounts( + tx: &mut Transaction, + height: u64, + accounts: &[FeeAccount], +) -> anyhow::Result<(FeeMerkleTree, Leaf)> { + let leaf = tx + .get_leaf(LeafId::::from(height as usize)) + .await + .context(format!("leaf {height} not available"))?; + let header = leaf.header(); + + let mut snapshot = FeeMerkleTree::from_commitment(header.fee_merkle_tree_root()); + for account in accounts { + let proof = tx + .get_path( + Snapshot::::Index( + header.height(), + ), + *account, + ) + .await + .context(format!( + "fetching account {account}; height {}", + header.height() + ))?; + match proof.proof.first().context(format!( + "empty proof for account {account}; height {}", + header.height() + ))? { + MerkleNode::Leaf { pos, elem, .. } => { + snapshot.remember(*pos, *elem, proof)?; + } + MerkleNode::Empty => { + snapshot.non_membership_remember(*account, proof)?; + } + _ => { + bail!("Invalid proof"); + } + } + } + + Ok((snapshot, leaf.leaf().clone())) +} + +async fn load_chain_config( + tx: &mut Transaction, + commitment: Commitment, +) -> anyhow::Result { + let (data,) = query_as::<(Vec,)>("SELECT data from chain_config where commitment = $1") + .bind(commitment.to_string()) + .fetch_one(tx.as_mut()) + .await + .unwrap(); + + bincode::deserialize(&data[..]).context("failed to deserialize") +} + +#[tracing::instrument(skip(instance, tx))] +async fn reconstruct_state( + instance: &NodeState, + tx: &mut Transaction, + from_height: u64, + to_view: ViewNumber, + accounts: &[FeeAccount], +) -> anyhow::Result<(ValidatedState, Leaf)> { + tracing::info!("attempting to reconstruct fee state"); + let from_leaf = tx + .get_leaf((from_height as usize).into()) + .await + .context(format!("leaf {from_height} not available"))?; + let from_leaf = from_leaf.leaf(); + ensure!( + from_leaf.view_number() < to_view, + "state reconstruction: starting state {:?} must be before ending state {to_view:?}", + from_leaf.view_number(), + ); + + // Get the sequence of headers we will be applying to compute the latest state. + let mut leaves = VecDeque::new(); + let to_leaf = get_leaf_from_proposal(tx, "view = $1", &(to_view.u64() as i64)) + .await + .context(format!( + "unable to reconstruct state because leaf {to_view:?} is not available" + ))?; + let mut parent = to_leaf.parent_commitment(); + tracing::debug!(?to_leaf, ?parent, view = ?to_view, "have required leaf"); + leaves.push_front(to_leaf.clone()); + while parent != Committable::commit(from_leaf) { + let leaf = get_leaf_from_proposal(tx, "leaf_hash = $1", &parent.to_string()) + .await + .context(format!( + "unable to reconstruct state because leaf {parent} is not available" + ))?; + parent = leaf.parent_commitment(); + tracing::debug!(?leaf, ?parent, "have required leaf"); + leaves.push_front(leaf); + } + + // Get the initial state. + let mut parent = from_leaf; + let mut state = ValidatedState::from_header(parent.block_header()); + + // Pre-load the state with the accounts we care about to ensure they will be present in the + // final state. + let mut accounts = accounts.iter().copied().collect::>(); + // Add in all the accounts we will need to replay any of the headers, to ensure that we don't + // need to do catchup recursively. + let (catchup, dependencies) = header_dependencies(tx, instance, parent, &leaves).await?; + accounts.extend(dependencies); + let accounts = accounts.into_iter().collect::>(); + state.fee_merkle_tree = load_accounts(tx, from_height, &accounts) + .await + .context("unable to reconstruct state because accounts are not available at origin")? + .0; + ensure!( + state.fee_merkle_tree.commitment() == parent.block_header().fee_merkle_tree_root(), + "loaded fee state does not match parent header" + ); + + // We need the blocks frontier as well, to apply the STF. + let frontier = load_frontier(tx, from_height) + .await + .context("unable to reconstruct state because frontier is not available at origin")?; + match frontier + .proof + .first() + .context("empty proof for frontier at origin")? + { + MerkleNode::Leaf { pos, elem, .. } => state + .block_merkle_tree + .remember(*pos, *elem, frontier) + .context("failed to remember frontier")?, + _ => bail!("invalid frontier proof"), + } + + // Apply subsequent headers to compute the later state. + for proposal in &leaves { + state = compute_state_update(&state, instance, &catchup, parent, proposal) + .await + .context(format!( + "unable to reconstruct state because state update {} failed", + proposal.height(), + ))? + .0; + parent = proposal; + } + + tracing::info!(from_height, ?to_view, "successfully reconstructed state"); + Ok((state, to_leaf)) +} + +/// Get the dependencies needed to apply the STF to the given list of headers. +/// +/// Returns +/// * A state catchup implementation seeded with all the chain configs required to apply the headers +/// in `leaves` +/// * The set of accounts that must be preloaded to apply these headers +async fn header_dependencies( + tx: &mut Transaction, + instance: &NodeState, + mut parent: &Leaf, + leaves: impl IntoIterator, +) -> anyhow::Result<(NullStateCatchup, HashSet)> { + let mut catchup = NullStateCatchup::default(); + let mut accounts = HashSet::default(); + + for proposal in leaves { + let header = proposal.block_header(); + let height = header.height(); + let view = proposal.view_number(); + tracing::debug!(height, ?view, "fetching dependencies for proposal"); + + let header_cf = header.chain_config(); + let chain_config = if header_cf.commit() == instance.chain_config.commit() { + instance.chain_config + } else { + match header_cf.resolve() { + Some(cf) => cf, + None => { + tracing::info!( + height, + ?view, + commit = %header_cf.commit(), + "chain config not available, attempting to load from storage", + ); + let cf = load_chain_config(tx, header_cf.commit()) + .await + .context(format!( + "loading chain config {} for header {},{:?}", + header_cf.commit(), + header.height(), + proposal.view_number() + ))?; + + // If we had to fetch a chain config now, store it in the catchup implementation + // so the STF will be able to look it up later. + catchup.add_chain_config(cf); + cf + } + } + }; + + accounts.insert(chain_config.fee_recipient); + accounts.extend( + get_l1_deposits(instance, header, parent, chain_config.fee_contract) + .await + .into_iter() + .map(|fee| fee.account()), + ); + accounts.extend(header.fee_info().accounts()); + parent = proposal; + } + Ok((catchup, accounts)) +} + +async fn get_leaf_from_proposal( + tx: &mut Transaction, + where_clause: &str, + param: P, +) -> anyhow::Result +where + P: Type + for<'q> Encode<'q, Db>, +{ + let (data,) = query_as::<(Vec,)>(&format!( + "SELECT data FROM quorum_proposals WHERE {where_clause} LIMIT 1", + )) + .bind(param) + .fetch_one(tx.as_mut()) + .await?; + let proposal: Proposal> = bincode::deserialize(&data)?; + Ok(Leaf::from_quorum_proposal(&proposal.data)) +} + #[cfg(any(test, feature = "testing"))] mod impl_testable_data_source { diff --git a/sequencer/src/block/full_payload/payload.rs b/sequencer/src/block/full_payload/payload.rs index b0078ff09..52c45abe4 100644 --- a/sequencer/src/block/full_payload/payload.rs +++ b/sequencer/src/block/full_payload/payload.rs @@ -165,7 +165,7 @@ impl BlockPayload for Payload { .peers .as_ref() .fetch_chain_config(validated_state_cf.commit()) - .await + .await? } } }; diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index 4cdb8ac2f..7998ce449 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -7,25 +7,24 @@ use committable::Committable; use espresso_types::{ v0::traits::{PersistenceOptions, StateCatchup}, v0_3::ChainConfig, - AccountQueryData, BackoffParams, BlockMerkleTree, FeeAccount, FeeMerkleCommitment, + BackoffParams, BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment, + FeeMerkleTree, Leaf, NodeState, }; -use futures::future::FutureExt; -use hotshot_types::network::NetworkConfig; +use futures::future::{Future, FutureExt}; use hotshot_types::{ - data::ViewNumber, traits::node_implementation::ConsensusTime as _, ValidatorConfig, + data::ViewNumber, network::NetworkConfig, traits::node_implementation::ConsensusTime as _, + ValidatorConfig, }; use jf_merkle_tree::{prelude::MerkleNode, ForgetableMerkleTreeScheme, MerkleTreeScheme}; use serde::de::DeserializeOwned; +use std::collections::HashMap; use surf_disco::Request; use tide_disco::error::ServerError; use url::Url; use vbs::version::StaticVersionType; use crate::{ - api::{ - data_source::{CatchupDataSource, PublicNetworkConfig}, - BlocksFrontier, - }, + api::{data_source::PublicNetworkConfig, BlocksFrontier}, PubKey, }; @@ -86,7 +85,7 @@ impl StatePeers { pub async fn fetch_config( &self, my_own_validator_config: ValidatorConfig, - ) -> NetworkConfig { + ) -> anyhow::Result> { self.backoff() .retry(self, move |provider| { let my_own_validator_config = my_own_validator_config.clone(); @@ -117,39 +116,49 @@ impl StatePeers { #[async_trait] impl StateCatchup for StatePeers { - #[tracing::instrument(skip(self))] - async fn try_fetch_account( + #[tracing::instrument(skip(self, _instance))] + async fn try_fetch_accounts( &self, + _instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { for client in self.clients.iter() { - tracing::info!("Fetching account {account:?} from {}", client.url); - match client - .get::(&format!( - "catchup/{height}/{}/account/{account}", - view.u64(), - )) - .send() - .await + tracing::info!("Fetching accounts from {}", client.url); + let req = match client + .inner + .post::(&format!("catchup/{height}/{}/accounts", view.u64(),)) + .body_binary(&accounts.to_vec()) { - Ok(res) => { - if res.proof.account != account.into() { - tracing::warn!("Invalid proof received from peer {:?}", client.url); - continue; - } - - match res.proof.verify(&fee_merkle_tree_root) { - Ok(_) => return Ok(res), - Err(err) => tracing::warn!("Error verifying account proof: {}", err), - } + Ok(req) => req, + Err(err) => { + tracing::warn!("failed to construct accounts catchup request: {err:#}"); + continue; } + }; + let snapshot = match req.send().await { + Ok(res) => res, Err(err) => { - tracing::warn!("Error fetching account from peer: {}", err); + tracing::warn!("Error fetching accounts from peer: {err:#}"); + continue; + } + }; + + // Verify proofs. + for account in accounts { + let Some((proof, _)) = FeeAccountProof::prove(&snapshot, (*account).into()) else { + tracing::warn!("response from peer missing account {account}"); + continue; + }; + if let Err(err) = proof.verify(&fee_merkle_tree_root) { + tracing::warn!("peer gave invalid proof for account {account}: {err:#}"); + continue; } } + + return Ok(snapshot); } bail!("Could not fetch account from any peer"); } @@ -157,6 +166,7 @@ impl StateCatchup for StatePeers { #[tracing::instrument(skip(self, mt), height = mt.num_leaves())] async fn try_remember_blocks_merkle_tree( &self, + _instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, @@ -225,6 +235,100 @@ impl StateCatchup for StatePeers { } } +pub(crate) trait CatchupStorage: Sync { + /// Get the state of the requested `accounts`. + /// + /// The state is fetched from a snapshot at the given height and view, which _must_ correspond! + /// `height` is provided to simplify lookups for backends where data is not indexed by view. + /// This function is intended to be used for catchup, so `view` should be no older than the last + /// decided view. + /// + /// If successful, this function also returns the leaf from `view`, if it is available. This can + /// be used to add the recovered state to HotShot's state map, so that future requests can get + /// the state from memory rather than storage. + fn get_accounts( + &self, + _instance: &NodeState, + _height: u64, + _view: ViewNumber, + _accounts: &[FeeAccount], + ) -> impl Send + Future> { + // Merklized state catchup is only supported by persistence backends that provide merklized + // state storage. This default implementation is overridden for those that do. Otherwise, + // catchup can still be provided by fetching undecided merklized state from consensus + // memory. + async { + bail!("merklized state catchup is not supported for this data source"); + } + } + + /// Get the blocks Merkle tree frontier. + /// + /// The state is fetched from a snapshot at the given height and view, which _must_ correspond! + /// `height` is provided to simplify lookups for backends where data is not indexed by view. + /// This function is intended to be used for catchup, so `view` should be no older than the last + /// decided view. + fn get_frontier( + &self, + _instance: &NodeState, + _height: u64, + _view: ViewNumber, + ) -> impl Send + Future> { + // Merklized state catchup is only supported by persistence backends that provide merklized + // state storage. This default implementation is overridden for those that do. Otherwise, + // catchup can still be provided by fetching undecided merklized state from consensus + // memory. + async { + bail!("merklized state catchup is not supported for this data source"); + } + } + + fn get_chain_config( + &self, + _commitment: Commitment, + ) -> impl Send + Future> { + async { + bail!("chain config catchup is not supported for this data source"); + } + } +} + +impl CatchupStorage for hotshot_query_service::data_source::MetricsDataSource {} + +impl CatchupStorage for hotshot_query_service::data_source::ExtensibleDataSource +where + T: CatchupStorage, + S: Sync, +{ + async fn get_accounts( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + accounts: &[FeeAccount], + ) -> anyhow::Result<(FeeMerkleTree, Leaf)> { + self.inner() + .get_accounts(instance, height, view, accounts) + .await + } + + async fn get_frontier( + &self, + instance: &NodeState, + height: u64, + view: ViewNumber, + ) -> anyhow::Result { + self.inner().get_frontier(instance, height, view).await + } + + async fn get_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { + self.inner().get_chain_config(commitment).await + } +} + #[derive(Debug)] pub(crate) struct SqlStateCatchup { db: Arc, @@ -240,43 +344,30 @@ impl SqlStateCatchup { #[async_trait] impl StateCatchup for SqlStateCatchup where - T: CatchupDataSource + std::fmt::Debug + Send + Sync, + T: CatchupStorage + std::fmt::Debug + Send + Sync, { // TODO: add a test for the account proof validation // issue # 2102 (https://github.com/EspressoSystems/espresso-sequencer/issues/2102) - #[tracing::instrument(skip(self))] - async fn try_fetch_account( + #[tracing::instrument(skip(self, instance))] + async fn try_fetch_accounts( &self, + instance: &NodeState, block_height: u64, view: ViewNumber, - fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { - let res = self + _fee_merkle_tree_root: FeeMerkleCommitment, + accounts: &[FeeAccount], + ) -> anyhow::Result { + Ok(self .db - .get_account(block_height, view, account.into()) - .await?; - - if res.proof.account != account.into() { - panic!( - "Critical error: Mismatched fee account proof: expected {:?}, got {:?}. - This may indicate a compromised database", - account, res.proof.account - ); - } - - match res.proof.verify(&fee_merkle_tree_root) { - Ok(_) => return Ok(res), - Err(err) => panic!( - "Critical error: failed to verify account proof from the database: {}", - err - ), - } + .get_accounts(instance, block_height, view, accounts) + .await? + .0) } #[tracing::instrument(skip(self))] async fn try_remember_blocks_merkle_tree( &self, + instance: &NodeState, bh: u64, view: ViewNumber, mt: &mut BlockMerkleTree, @@ -285,7 +376,7 @@ where return Ok(()); } - let proof = self.db.get_frontier(bh, view).await?; + let proof = self.db.get_frontier(instance, bh, view).await?; match proof .proof .first() @@ -320,3 +411,72 @@ where &self.backoff } } + +/// Disable catchup entirely. +#[derive(Clone, Debug)] +pub struct NullStateCatchup { + backoff: BackoffParams, + chain_configs: HashMap, ChainConfig>, +} + +impl Default for NullStateCatchup { + fn default() -> Self { + Self { + backoff: BackoffParams::disabled(), + chain_configs: Default::default(), + } + } +} + +impl NullStateCatchup { + /// Add a chain config preimage which can be fetched by hash during STF evaluation. + /// + /// [`NullStateCatchup`] is used to disable catchup entirely when evaluating the STF, which + /// requires the [`ValidatedState`](espresso_types::ValidatedState) to be pre-seeded with all + /// the dependencies of STF evaluation. However, the STF also depends on having the preimage of + /// various [`ChainConfig`] commitments, which are not stored in the + /// [`ValidatedState`](espresso_types::ValidatedState), but which instead must be supplied by a + /// separate preimage oracle. Thus, [`NullStateCatchup`] may be populated with a set of + /// [`ChainConfig`]s, which it can feed to the STF during evaluation. + pub fn add_chain_config(&mut self, cf: ChainConfig) { + self.chain_configs.insert(cf.commit(), cf); + } +} + +#[async_trait] +impl StateCatchup for NullStateCatchup { + async fn try_fetch_accounts( + &self, + _instance: &NodeState, + _height: u64, + _view: ViewNumber, + _fee_merkle_tree_root: FeeMerkleCommitment, + _account: &[FeeAccount], + ) -> anyhow::Result { + bail!("state catchup is disabled"); + } + + async fn try_remember_blocks_merkle_tree( + &self, + _instance: &NodeState, + _height: u64, + _view: ViewNumber, + _mt: &mut BlockMerkleTree, + ) -> anyhow::Result<()> { + bail!("state catchup is disabled"); + } + + async fn try_fetch_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { + self.chain_configs + .get(&commitment) + .copied() + .context(format!("chain config {commitment} not available")) + } + + fn backoff(&self) -> &BackoffParams { + &self.backoff + } +} diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 992005e54..2b51b8989 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,10 +1,12 @@ use std::fmt::Display; use anyhow::Context; +use async_compatibility_layer::art::async_timeout; use async_std::{ sync::{Arc, RwLock}, - task::{spawn, JoinHandle}, + task::{sleep, spawn, JoinHandle}, }; +use committable::{Commitment, Committable}; use derivative::Derivative; use espresso_types::{ v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, @@ -25,16 +27,19 @@ use hotshot_orchestrator::client::OrchestratorClient; use hotshot_query_service::Leaf; use hotshot_types::{ consensus::ConsensusMetricsValue, - data::ViewNumber, + data::{EpochNumber, ViewNumber}, network::NetworkConfig, traits::{ election::Membership, metrics::Metrics, network::{ConnectedNetwork, Topic}, - node_implementation::Versions, + node_implementation::{ConsensusTime, Versions}, + ValidatedState as _, }, + utils::{View, ViewInner}, PeerConfig, }; +use std::time::Duration; use url::Url; use crate::{ @@ -196,7 +201,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence let events = handle.event_stream(); let node_id = node_state.node_id; - let mut ctx = Self { + let ctx = Self { handle: Arc::new(RwLock::new(handle)), state_signer: Arc::new(state_signer), tasks: Default::default(), @@ -206,19 +211,21 @@ impl, P: SequencerPersistence, V: Versions> Sequence node_state, config, }; - ctx.spawn( - "main event handler", - handle_events( - node_id, - events, - persistence, - ctx.state_signer.clone(), - external_event_handler, - Some(event_streamer.clone()), - event_consumer, - anchor_view, - ), - ); + + // Spawn event handling loops. These can run in the background (detached from `ctx.tasks` + // and thus not explicitly cancelled on `shut_down`) because they each exit on their own + // when the consensus event stream ends. + spawn(fetch_proposals(ctx.handle.clone(), persistence.clone())); + spawn(handle_events( + node_id, + events, + persistence, + ctx.state_signer.clone(), + external_event_handler, + Some(event_streamer.clone()), + event_consumer, + anchor_view, + )); ctx } @@ -349,6 +356,7 @@ impl, P: SequencerPersistence, V: Versions> Drop } } +#[tracing::instrument(skip_all, fields(node_id))] #[allow(clippy::too_many_arguments)] async fn handle_events( node_id: u64, @@ -396,6 +404,146 @@ async fn handle_events( } } +#[tracing::instrument(skip_all)] +async fn fetch_proposals( + consensus: Arc>>, + persistence: Arc, +) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + let mut tasks = TaskList::default(); + let mut events = consensus.read().await.event_stream(); + while let Some(event) = events.next().await { + let EventType::QuorumProposal { proposal, .. } = event.event else { + continue; + }; + // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back + // to the anchor. This allows state replay from the decided state. + let parent_view = proposal.data.justify_qc.view_number; + let parent_leaf = proposal.data.justify_qc.data.leaf_commit; + tasks.spawn( + format!("fetch proposal {parent_view:?},{parent_leaf}"), + fetch_proposal_chain( + consensus.clone(), + persistence.clone(), + parent_view, + parent_leaf, + ), + ); + } + tasks.shut_down().await; +} + +#[tracing::instrument(skip(consensus, persistence))] +async fn fetch_proposal_chain( + consensus: Arc>>, + persistence: Arc, + mut view: ViewNumber, + mut leaf: Commitment>, +) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + while view > load_anchor_view(&*persistence).await { + match persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + view = proposal.data.justify_qc.view_number; + leaf = proposal.data.justify_qc.data.leaf_commit; + continue; + } + Err(err) => { + tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}"); + } + } + + let future = + match consensus + .read() + .await + .request_proposal(view, EpochNumber::genesis(), leaf) + { + Ok(future) => future, + Err(err) => { + tracing::warn!(?view, %leaf, "failed to request proposal: {err:#}"); + sleep(Duration::from_secs(1)).await; + continue; + } + }; + let proposal = match async_timeout(Duration::from_secs(30), future).await { + Ok(Ok(proposal)) => proposal, + Ok(Err(err)) => { + tracing::warn!("error fetching proposal: {err:#}"); + sleep(Duration::from_secs(1)).await; + continue; + } + Err(_) => { + tracing::warn!("timed out fetching proposal"); + sleep(Duration::from_secs(1)).await; + continue; + } + }; + + while let Err(err) = persistence.append_quorum_proposal(&proposal).await { + tracing::warn!("error saving fetched proposal: {err:#}"); + sleep(Duration::from_secs(1)).await; + } + + // Add the fetched leaf to HotShot state, so consensus can make use of it. + { + let leaf = Leaf::from_quorum_proposal(&proposal.data); + let handle = consensus.read().await; + let consensus = handle.consensus(); + let mut consensus = consensus.write().await; + if matches!( + consensus.validated_state_map().get(&view), + None | Some(View { + // Replace a Da-only view with a Leaf view, which has strictly more information. + view_inner: ViewInner::Da { .. } + }) + ) { + let v = View { + view_inner: ViewInner::Leaf { + leaf: Committable::commit(&leaf), + state: Arc::new(ValidatedState::from_header(leaf.block_header())), + delta: None, + }, + }; + if let Err(err) = consensus.update_validated_state_map(view, v) { + tracing::warn!(?view, "unable to update validated state map: {err:#}"); + } + consensus + .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) + .await; + tracing::info!( + ?view, + "added view to validated state map view proposal fetcher" + ); + } + } + + view = proposal.data.justify_qc.view_number; + leaf = proposal.data.justify_qc.data.leaf_commit; + } +} + +async fn load_anchor_view(persistence: &impl SequencerPersistence) -> ViewNumber { + loop { + match persistence.load_anchor_leaf().await { + Ok(Some((leaf, _))) => break leaf.view_number(), + Ok(None) => break ViewNumber::genesis(), + Err(err) => { + tracing::warn!("error loading anchor view: {err:#}"); + sleep(Duration::from_secs(1)).await; + } + } + } +} + #[derive(Debug, Default)] pub(crate) struct TaskList(Vec<(String, JoinHandle<()>)>); diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index fab292c6d..f0f990838 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -309,7 +309,7 @@ pub async fn init_node( tracing::info!(?peers, "loading network config from peers"); let peers = StatePeers::::from_urls(peers, network_params.catchup_backoff); - let config = peers.fetch_config(my_config.clone()).await; + let config = peers.fetch_config(my_config.clone()).await?; tracing::info!( node_id = config.node_index, diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 2f346be26..9a1a6025b 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -703,6 +703,18 @@ impl SequencerPersistence for Persistence { Ok(map) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + let inner = self.inner.read().await; + let dir_path = inner.quorum_proposals_dir_path(); + let file_path = dir_path.join(view.to_string()).with_extension("txt"); + let bytes = fs::read(file_path)?; + let proposal = bincode::deserialize(&bytes)?; + Ok(proposal) + } + async fn load_upgrade_certificate( &self, ) -> anyhow::Result>> { diff --git a/sequencer/src/persistence/no_storage.rs b/sequencer/src/persistence/no_storage.rs index 63193e606..af9b32b68 100644 --- a/sequencer/src/persistence/no_storage.rs +++ b/sequencer/src/persistence/no_storage.rs @@ -1,6 +1,7 @@ //! Mock implementation of persistence, for testing. #![cfg(any(test, feature = "testing"))] +use anyhow::bail; use async_std::sync::Arc; use async_trait::async_trait; use espresso_types::{ @@ -113,6 +114,12 @@ impl SequencerPersistence for NoStorage { ) -> anyhow::Result>>> { Ok(Default::default()) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + bail!("proposal {view:?} not available"); + } async fn load_upgrade_certificate( &self, ) -> anyhow::Result>> { diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 1fb8213f4..71136343d 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -2,17 +2,19 @@ use anyhow::Context; use async_std::sync::Arc; use async_trait::async_trait; use clap::Parser; +use committable::Committable; use derivative::Derivative; use espresso_types::{ parse_duration, v0::traits::{EventConsumer, PersistenceOptions, SequencerPersistence, StateCatchup}, BackoffParams, Leaf, NetworkConfig, Payload, }; +use futures::stream::StreamExt; use hotshot_query_service::data_source::storage::sql::Write; use hotshot_query_service::data_source::{ storage::{ pruning::PrunerCfg, - sql::{include_migrations, Config, SqlStorage, Transaction}, + sql::{include_migrations, query_as, Config, SqlStorage, Transaction}, }, Transaction as _, VersionedDataSource, }; @@ -236,10 +238,12 @@ impl PersistenceOptions for Options { type Persistence = Persistence; async fn create(self) -> anyhow::Result { - Ok(Persistence { + let persistence = Persistence { store_undecided_state: self.store_undecided_state, db: SqlStorage::connect(self.try_into()?).await?, - }) + }; + persistence.migrate_quorum_proposal_leaf_hashes().await?; + Ok(persistence) } async fn reset(self) -> anyhow::Result<()> { @@ -254,6 +258,39 @@ pub struct Persistence { store_undecided_state: bool, } +impl Persistence { + /// Ensure the `leaf_hash` column is populated for all existing quorum proposals. + /// + /// This column was added in a migration, but because it requires computing a commitment of the + /// existing data, it is not easy to populate in the SQL migration itself. Thus, on startup, we + /// check if there are any just-migrated quorum proposals with a `NULL` value for this column, + /// and if so we populate the column manually. + async fn migrate_quorum_proposal_leaf_hashes(&self) -> anyhow::Result<()> { + let mut tx = self.db.write().await?; + let mut proposals = tx.fetch("SELECT * FROM quorum_proposals"); + let mut updates = vec![]; + while let Some(row) = proposals.next().await { + let row = row?; + let hash: Option = row.try_get("leaf_hash")?; + if hash.is_none() { + let view: i64 = row.try_get("view")?; + let data: Vec = row.try_get("data")?; + let proposal: Proposal> = + bincode::deserialize(&data)?; + let leaf = Leaf::from_quorum_proposal(&proposal.data); + let leaf_hash = Committable::commit(&leaf); + tracing::info!(view, %leaf_hash, "populating quorum proposal leaf hash"); + updates.push((view, leaf_hash.to_string())); + } + } + drop(proposals); + + tx.upsert("quorum_proposals", ["view", "leaf_hash"], ["view"], updates) + .await?; + tx.commit().await + } +} + #[async_trait] impl SequencerPersistence for Persistence { fn into_catchup_provider( @@ -452,6 +489,20 @@ impl SequencerPersistence for Persistence { )) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + let mut tx = self.db.read().await?; + let (data,) = + query_as::<(Vec,)>("SELECT data FROM quorum_proposals WHERE view = $1 LIMIT 1") + .bind(view.u64() as i64) + .fetch_one(tx.as_mut()) + .await?; + let proposal = bincode::deserialize(&data)?; + Ok(proposal) + } + async fn append_vid( &self, proposal: &Proposal>, @@ -530,12 +581,13 @@ impl SequencerPersistence for Persistence { ) -> anyhow::Result<()> { let view_number = proposal.data.view_number().u64(); let proposal_bytes = bincode::serialize(&proposal).context("serializing proposal")?; + let leaf_hash = Committable::commit(&Leaf::from_quorum_proposal(&proposal.data)); let mut tx = self.db.write().await?; tx.upsert( "quorum_proposals", - ["view", "data"], + ["view", "leaf_hash", "data"], ["view"], - [(view_number as i64, proposal_bytes)], + [(view_number as i64, leaf_hash.to_string(), proposal_bytes)], ) .await?; tx.commit().await @@ -794,3 +846,82 @@ mod generic_tests { instantiate_persistence_tests!(Persistence); } + +#[cfg(test)] +mod test { + use super::*; + use crate::{persistence::testing::TestablePersistence, BLSPubKey, PubKey}; + use espresso_types::{NodeState, ValidatedState}; + use futures::stream::TryStreamExt; + use hotshot_example_types::node_types::TestVersions; + use hotshot_types::traits::signature_key::SignatureKey; + + #[async_std::test] + async fn test_quorum_proposals_leaf_hash_migration() { + // Create some quorum proposals to test with. + let leaf = Leaf::genesis(&ValidatedState::default(), &NodeState::mock()).await; + let privkey = BLSPubKey::generated_from_seed_indexed([0; 32], 1).1; + let signature = PubKey::sign(&privkey, &[]).unwrap(); + let mut quorum_proposal = Proposal { + data: QuorumProposal:: { + block_header: leaf.block_header().clone(), + view_number: ViewNumber::genesis(), + justify_qc: QuorumCertificate::genesis::( + &ValidatedState::default(), + &NodeState::mock(), + ) + .await, + upgrade_certificate: None, + proposal_certificate: None, + }, + signature, + _pd: Default::default(), + }; + + let qp1 = quorum_proposal.clone(); + + quorum_proposal.data.view_number = ViewNumber::new(1); + let qp2 = quorum_proposal.clone(); + + let qps = [qp1, qp2]; + + // Create persistence and add the quorum proposals with NULL leaf hash. + let db = Persistence::tmp_storage().await; + let persistence = Persistence::connect(&db).await; + let mut tx = persistence.db.write().await.unwrap(); + let params = qps + .iter() + .map(|qp| { + ( + qp.data.view_number.u64() as i64, + bincode::serialize(&qp).unwrap(), + ) + }) + .collect::>(); + tx.upsert("quorum_proposals", ["view", "data"], ["view"], params) + .await + .unwrap(); + tx.commit().await.unwrap(); + + // Create a new persistence and ensure the commitments get populated. + let persistence = Persistence::connect(&db).await; + let mut tx = persistence.db.read().await.unwrap(); + let rows = tx + .fetch("SELECT * FROM quorum_proposals ORDER BY view ASC") + .try_collect::>() + .await + .unwrap(); + assert_eq!(rows.len(), qps.len()); + for (row, qp) in rows.into_iter().zip(qps) { + assert_eq!(row.get::("view"), qp.data.view_number.u64() as i64); + assert_eq!( + row.get::, _>("data"), + bincode::serialize(&qp).unwrap() + ); + assert_eq!( + row.get::("leaf_hash"), + Committable::commit(&Leaf::from_quorum_proposal(&qp.data)).to_string() + ); + } + } +} diff --git a/sequencer/src/restart_tests.rs b/sequencer/src/restart_tests.rs index 76c7a3665..82bca2aea 100644 --- a/sequencer/src/restart_tests.rs +++ b/sequencer/src/restart_tests.rs @@ -8,7 +8,8 @@ use cdn_broker::{reexports::crypto::signature::KeyPair, Broker, Config as Broker use cdn_marshal::{Config as MarshalConfig, Marshal}; use derivative::Derivative; use espresso_types::{ - traits::PersistenceOptions, MockSequencerVersions, PrivKey, PubKey, SeqTypes, + eth_signature_key::EthKeyPair, traits::PersistenceOptions, v0_3::ChainConfig, FeeAccount, + MockSequencerVersions, PrivKey, PubKey, SeqTypes, Transaction, }; use ethers::utils::{Anvil, AnvilInstance}; use futures::{ @@ -17,6 +18,10 @@ use futures::{ }; use hotshot::traits::implementations::derive_libp2p_peer_id; use hotshot_orchestrator::run_orchestrator; +use hotshot_testing::{ + block_builder::{SimpleBuilderImplementation, TestBuilderImplementation}, + test_builder::BuilderChange, +}; use hotshot_types::network::{Libp2pConfig, NetworkConfig}; use hotshot_types::{ event::{Event, EventType}, @@ -26,14 +31,22 @@ use hotshot_types::{ use itertools::Itertools; use portpicker::pick_unused_port; use sequencer::{ - api::{self, data_source::testing::TestableSequencerDataSource, options::Http}, + api::{ + self, + data_source::testing::TestableSequencerDataSource, + options::{Http, Query}, + }, genesis::{L1Finalized, StakeTableConfig}, network::cdn::{TestingDef, WrappedSignatureKey}, + testing::wait_for_decide_on_handle, + SequencerApiVersion, }; use sequencer_utils::test_utils::setup_test; use std::{collections::HashSet, path::Path, time::Duration}; +use surf_disco::{error::ClientError, Url}; use tempfile::TempDir; use vbs::version::Version; +use vec1::vec1; async fn test_restart_helper(network: (usize, usize), restart: (usize, usize), cdn: bool) { setup_test(); @@ -150,6 +163,34 @@ async fn slow_test_restart_all_da_without_cdn() { test_restart_helper((2, 8), (2, 0), false).await; } +#[ignore] +#[async_std::test] +async fn slow_test_restart_staggered() { + setup_test(); + + let mut network = TestNetwork::new(4, 6, false).await; + + // Check that the builder works at the beginning. + network.check_builder().await; + + // Restart nodes in a staggered fashion, so that progress never halts, but eventually every node + // has been restarted. This can lead to a situation where no node has the full validated state + // in memory, so we will need a pretty advanced form of catchup in order to make progress and + // process blocks after this. + for i in 0..4 { + network.restart_and_progress([i], []).await; + } + // Restart the remaining regular nodes. + for i in 0..6 { + network.restart_and_progress([], [i]).await; + } + + // Check that we can still build blocks after the restart. + network.check_builder().await; + + network.shut_down().await; +} + #[derive(Clone, Copy, Debug)] struct NetworkParams<'a> { genesis_file: &'a Path, @@ -157,6 +198,7 @@ struct NetworkParams<'a> { cdn_port: u16, l1_provider: &'a str, peer_ports: &'a [u16], + api_ports: &'a [u16], } #[derive(Clone, Debug)] @@ -208,7 +250,13 @@ impl TestNode { ..Default::default() }; if node.is_da { - modules.query = Some(Default::default()); + modules.query = Some(Query { + peers: network + .api_ports + .iter() + .map(|port| format!("http://127.0.0.1:{port}").parse().unwrap()) + .collect(), + }); modules.state = Some(Default::default()); } @@ -270,16 +318,7 @@ impl TestNode { // with a backoff. let mut retries = 5; let mut delay = Duration::from_secs(1); - let genesis = Genesis { - chain_config: Default::default(), - stake_table: StakeTableConfig { capacity: 10 }, - accounts: Default::default(), - l1_finalized: L1Finalized::Number { number: 0 }, - header: Default::default(), - upgrades: Default::default(), - base_version: Version { major: 0, minor: 1 }, - upgrade_version: Version { major: 0, minor: 2 }, - }; + let genesis = Genesis::from_file(&self.opt.genesis_file).unwrap(); let ctx = loop { match init_with_storage( genesis.clone(), @@ -392,6 +431,51 @@ impl TestNode { bail!("node {node_id} event stream ended unexpectedly"); } + + async fn check_builder(&self, port: u16) { + tracing::info!("testing builder liveness"); + + // Configure the builder to shut down in 50 views, so we don't leak resources or ports. + let ctx = self.context.as_ref().unwrap(); + let down_view = ctx.consensus().read().await.cur_view().await + 50; + + // Start a builder. + let url: Url = format!("http://localhost:{port}").parse().unwrap(); + let task = >::start( + self.num_nodes, + format!("http://0.0.0.0:{port}").parse().unwrap(), + (), + [(down_view.u64(), BuilderChange::Down)] + .into_iter() + .collect(), + ) + .await; + task.start(Box::new(ctx.event_stream().await)); + + // Wait for the API to start serving. + let client = surf_disco::Client::::new(url); + assert!( + client.connect(Some(Duration::from_secs(60))).await, + "timed out connecting to builder API" + ); + + // Submit a transaction and wait for it to be sequenced. + let mut events = ctx.event_stream().await; + let tx = Transaction::random(&mut rand::thread_rng()); + ctx.submit_transaction(tx.clone()).await.unwrap(); + let block = async_timeout( + Duration::from_secs(60), + wait_for_decide_on_handle(&mut events, &tx), + ) + .await + .expect("timed out waiting for transaction to be sequenced"); + tracing::info!(block, "transaction sequenced"); + + // Wait until the builder is cleaned up. + while ctx.consensus().read().await.cur_view().await <= down_view { + sleep(Duration::from_secs(1)).await; + } + } } #[derive(Derivative)] @@ -400,6 +484,7 @@ struct TestNetwork { da_nodes: Vec>, regular_nodes: Vec>, tmp: TempDir, + builder_port: u16, orchestrator_task: Option>, broker_task: Option>, marshal_task: Option>, @@ -428,14 +513,21 @@ impl TestNetwork { let tmp = TempDir::new().unwrap(); let genesis_file = tmp.path().join("genesis.toml"); let genesis = Genesis { - chain_config: Default::default(), + chain_config: ChainConfig { + base_fee: 1.into(), + ..Default::default() + }, stake_table: StakeTableConfig { capacity: 10 }, - accounts: Default::default(), l1_finalized: L1Finalized::Number { number: 0 }, header: Default::default(), upgrades: Default::default(), base_version: Version { major: 0, minor: 1 }, upgrade_version: Version { major: 0, minor: 2 }, + + // Start with a funded account, so we can test catchup after restart. + accounts: [(builder_account(), 1000000000.into())] + .into_iter() + .collect(), }; genesis.to_file(&genesis_file).unwrap(); @@ -444,7 +536,12 @@ impl TestNetwork { .collect::>(); let orchestrator_port = ports.pick(); - let orchestrator_task = Some(start_orchestrator(orchestrator_port, &node_params)); + let builder_port = ports.pick(); + let orchestrator_task = Some(start_orchestrator( + orchestrator_port, + &node_params, + builder_port, + )); let cdn_dir = tmp.path().join("cdn"); let cdn_port = ports.pick(); @@ -463,6 +560,11 @@ impl TestNetwork { let anvil = Anvil::new().port(anvil_port).spawn(); let anvil_endpoint = anvil.endpoint(); + let api_ports = node_params + .iter() + .take(da_nodes) + .map(|node| node.api_port) + .collect::>(); let peer_ports = node_params .iter() .map(|node| node.api_port) @@ -472,6 +574,7 @@ impl TestNetwork { orchestrator_port, cdn_port, l1_provider: &anvil_endpoint, + api_ports: &api_ports, peer_ports: &peer_ports, }; @@ -486,6 +589,7 @@ impl TestNetwork { ) .await, tmp, + builder_port, orchestrator_task, broker_task, marshal_task, @@ -519,22 +623,71 @@ impl TestNetwork { .unwrap(); } + async fn check_builder(&self) { + self.da_nodes[0].check_builder(self.builder_port).await; + } + /// Restart indicated number of DA and non-DA nodes. /// /// If possible (less than a quorum of nodes have been stopped), check that remaining nodes can /// still make progress without the restarted nodes. In any case, check that the network as a /// whole makes progress once the restarted nodes are back online. async fn restart(&mut self, da_nodes: usize, regular_nodes: usize) { - tracing::info!(da_nodes, regular_nodes, "shutting down some nodes"); + self.restart_helper(0..da_nodes, 0..regular_nodes, false) + .await; + self.check_progress().await; + } + + /// Restart indicated nodes, ensuring progress is maintained at all times. + /// + /// This is a lighter weight version of [`restart`](Self::restart). While the former includes + /// heavy checks that all nodes are progressing, which makes it useful as a stress test, this + /// function does the minimum required to check that progress is maintained at all times across + /// the network as a whole. This makes it a useful building block for more complex patterns, + /// like a staggered restart. + async fn restart_and_progress( + &mut self, + da_nodes: impl IntoIterator, + regular_nodes: impl IntoIterator, + ) { + self.restart_helper(da_nodes, regular_nodes, true).await; + + // Just wait for one decide after the restart, so we don't restart subsequent nodes too + // quickly. + tracing::info!("waiting for progress after restart"); + let mut events = self.da_nodes[0].event_stream().await.unwrap(); + let timeout = Duration::from_secs((2 * self.num_nodes()) as u64); + async_timeout(timeout, async { + loop { + let event = events + .next() + .await + .expect("event stream terminated unexpectedly"); + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + tracing::info!(?leaf_chain, "got decide, chain is progressing"); + break; + } + }) + .await + .expect("timed out waiting for progress after restart"); + } + + async fn restart_helper( + &mut self, + da_nodes: impl IntoIterator, + regular_nodes: impl IntoIterator, + assert_progress: bool, + ) { + let da_nodes = da_nodes.into_iter().collect::>(); + let regular_nodes = regular_nodes.into_iter().collect::>(); + tracing::info!(?da_nodes, ?regular_nodes, "shutting down nodes"); + join_all( - self.da_nodes[..da_nodes] - .iter_mut() + select(&mut self.da_nodes, &da_nodes) .map(TestNode::stop) - .chain( - self.regular_nodes[..regular_nodes] - .iter_mut() - .map(TestNode::stop), - ), + .chain(select(&mut self.regular_nodes, ®ular_nodes).map(TestNode::stop)), ) .await; @@ -545,8 +698,8 @@ impl TestNetwork { // and have one dishonest leader every 4, thus preventing consensus from progressing. let quorum_threshold = 3 * self.num_nodes() / 4 + 1; let da_threshold = 2 * self.da_nodes.len() / 3 + 1; - if self.num_nodes() - da_nodes - regular_nodes > quorum_threshold - && self.da_nodes.len() - da_nodes >= da_threshold + if self.num_nodes() - da_nodes.len() - regular_nodes.len() > quorum_threshold + && self.da_nodes.len() - da_nodes.len() >= da_threshold { // If we are shutting down less than f nodes, the remaining nodes should be able to make // progress, and we will check that that is the case. @@ -557,16 +710,23 @@ impl TestNetwork { // after it and will be able to commit. Thus, we just grab an event stream and look for // any decide. tracing::info!("waiting for remaining nodes to progress"); - let mut events = if da_nodes < self.da_nodes.len() { - self.da_nodes[da_nodes].event_stream().await.unwrap() - } else { - self.regular_nodes[regular_nodes] - .event_stream() - .await - .unwrap() - }; + // Find the first DA node we _didn't_ shut down. + let da_node = self + .da_nodes + .iter() + .enumerate() + .find_map(|(i, node)| { + if da_nodes.contains(&i) { + None + } else { + Some(node) + } + }) + .unwrap(); + let mut events = da_node.event_stream().await.unwrap(); + // Wait for a few decides, the first couple may be from before the restart. - for _ in 0..self.num_nodes() { + for _ in 0..5 { let timeout = Duration::from_secs((2 * self.num_nodes()) as u64); async_timeout(timeout, async { loop { @@ -585,6 +745,15 @@ impl TestNetwork { .expect("timed out waiting for progress with nodes down"); } } else { + assert!( + !assert_progress, + "test requested that progress continue after shutdown, but also requested that too many nodes be shut down: {}/{} DA, {}/{} regular", + da_nodes.len(), + self.da_nodes.len(), + regular_nodes.len(), + self.regular_nodes.len(), + ); + // Make sure there is a brief delay before restarting the nodes; we need the OS to // have time to clean up the ports they were using. tracing::info!( @@ -594,17 +763,11 @@ impl TestNetwork { } join_all( - self.da_nodes[..da_nodes] - .iter_mut() + select(&mut self.da_nodes, &da_nodes) .map(TestNode::start) - .chain( - self.regular_nodes[..regular_nodes] - .iter_mut() - .map(TestNode::start), - ), + .chain(select(&mut self.regular_nodes, ®ular_nodes).map(TestNode::start)), ) .await; - self.check_progress().await; } async fn shut_down(mut self) { @@ -623,10 +786,10 @@ impl TestNetwork { } } -fn start_orchestrator(port: u16, nodes: &[NodeParams]) -> JoinHandle<()> { +fn start_orchestrator(port: u16, nodes: &[NodeParams], builder_port: u16) -> JoinHandle<()> { // We don't run a builder in these tests, so use a very short timeout before nodes decide to // build an empty block on their own. - let builder_timeout = Duration::from_millis(10); + let builder_timeout = Duration::from_millis(100); // These tests frequently have nodes down and views failing, so we use a fairly short view // timeout. let view_timeout = Duration::from_secs(2); @@ -656,6 +819,7 @@ fn start_orchestrator(port: u16, nodes: &[NodeParams]) -> JoinHandle<()> { config.config.known_nodes_without_stake = vec![]; config.config.next_view_timeout = view_timeout.as_millis() as u64; config.config.builder_timeout = builder_timeout; + config.config.builder_urls = vec1![format!("http://localhost:{builder_port}").parse().unwrap()]; let bind = format!("http://0.0.0.0:{port}").parse().unwrap(); spawn(async move { @@ -746,3 +910,19 @@ impl PortPicker { } } } + +fn builder_key_pair() -> EthKeyPair { + use hotshot_types::traits::signature_key::BuilderSignatureKey; + FeeAccount::generated_from_seed_indexed([1; 32], 0).1 +} + +fn builder_account() -> FeeAccount { + builder_key_pair().fee_account() +} + +fn select<'a, T>(nodes: &'a mut [T], is: &'a [usize]) -> impl Iterator { + nodes + .iter_mut() + .enumerate() + .filter_map(|(i, elem)| if is.contains(&i) { Some(elem) } else { None }) +} diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index ebfaa32a9..ca5598538 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -4,7 +4,8 @@ use std::{sync::Arc, time::Duration}; use anyhow::{bail, ensure, Context}; use async_std::stream::StreamExt; use espresso_types::{ - v0_3::ChainConfig, BlockMerkleTree, Delta, FeeAccount, FeeMerkleTree, ValidatedState, + traits::StateCatchup, v0_3::ChainConfig, BlockMerkleTree, Delta, FeeAccount, FeeMerkleTree, + Leaf, ValidatedState, }; use futures::future::Future; use hotshot::traits::ValidatedState as HotShotState; @@ -18,18 +19,18 @@ use hotshot_query_service::{ use jf_merkle_tree::{LookupResult, MerkleTreeScheme, ToTraversalPath, UniversalMerkleTreeScheme}; use crate::{ - api::data_source::CatchupDataSource, catchup::SqlStateCatchup, - persistence::ChainConfigPersistence, NodeState, SeqTypes, + catchup::{CatchupStorage, SqlStateCatchup}, + persistence::ChainConfigPersistence, + NodeState, SeqTypes, }; -async fn compute_state_update( +pub(crate) async fn compute_state_update( state: &ValidatedState, instance: &NodeState, - parent_leaf: &LeafQueryData, - proposed_leaf: &LeafQueryData, + peers: &impl StateCatchup, + parent_leaf: &Leaf, + proposed_leaf: &Leaf, ) -> anyhow::Result<(ValidatedState, Delta)> { - let proposed_leaf = proposed_leaf.leaf(); - let parent_leaf = parent_leaf.leaf(); let header = proposed_leaf.block_header(); // Check internal consistency. @@ -54,7 +55,7 @@ async fn compute_state_update( ); state - .apply_header(instance, parent_leaf, header, header.version()) + .apply_header(instance, peers, parent_leaf, header, header.version()) .await } @@ -84,6 +85,7 @@ async fn store_state_update( fee_merkle_tree.height(), ); + tracing::debug!(%delta, "inserting fee account"); UpdateStateData::::insert_merkle_nodes( tx, proof, @@ -105,6 +107,7 @@ async fn store_state_update( ); { + tracing::debug!("inserting blocks frontier"); UpdateStateData::::insert_merkle_nodes( tx, proof, @@ -115,6 +118,7 @@ async fn store_state_update( .context("failed to store block merkle nodes")?; } + tracing::debug!(block_number, "updating state height"); UpdateStateData::::set_last_state_height( tx, block_number as usize, @@ -136,6 +140,7 @@ async fn update_state_storage( parent_state: &ValidatedState, storage: &Arc, instance: &NodeState, + peers: &impl StateCatchup, parent_leaf: &LeafQueryData, proposed_leaf: &LeafQueryData, ) -> anyhow::Result @@ -145,10 +150,17 @@ where { let parent_chain_config = parent_state.chain_config; - let (state, delta) = compute_state_update(parent_state, instance, parent_leaf, proposed_leaf) - .await - .context("computing state update")?; + let (state, delta) = compute_state_update( + parent_state, + instance, + peers, + parent_leaf.leaf(), + proposed_leaf.leaf(), + ) + .await + .context("computing state update")?; + tracing::debug!("storing state update"); let mut tx = storage .write() .await @@ -207,6 +219,7 @@ where Ok(()) } +#[tracing::instrument(skip_all)] pub(crate) async fn update_state_storage_loop( storage: Arc, instance: impl Future, @@ -215,14 +228,19 @@ where T: SequencerStateDataSource, for<'a> T::Transaction<'a>: SequencerStateUpdate, { - let mut instance = instance.await; - instance.peers = Arc::new(SqlStateCatchup::new(storage.clone(), Default::default())); + let instance = instance.await; + let peers = SqlStateCatchup::new(storage.clone(), Default::default()); // get last saved merklized state let (last_height, parent_leaf, mut leaves) = { let last_height = storage.get_last_state_height().await?; let current_height = storage.block_height().await?; - tracing::info!(last_height, current_height, "updating state storage"); + tracing::info!( + node_id = instance.node_id, + last_height, + current_height, + "updating state storage" + ); let parent_leaf = storage.get_leaf(last_height).await; let leaves = storage.subscribe_leaves(last_height + 1).await; @@ -248,8 +266,21 @@ where while let Some(leaf) = leaves.next().await { loop { - match update_state_storage(&parent_state, &storage, &instance, &parent_leaf, &leaf) - .await + tracing::debug!( + height = leaf.height(), + node_id = instance.node_id, + ?leaf, + "updating persistent merklized state" + ); + match update_state_storage( + &parent_state, + &storage, + &instance, + &peers, + &parent_leaf, + &leaf, + ) + .await { Ok(state) => { parent_leaf = leaf; @@ -274,7 +305,7 @@ pub(crate) trait SequencerStateDataSource: + AvailabilityDataSource + StatusDataSource + VersionedDataSource - + CatchupDataSource + + CatchupStorage + MerklizedStateHeightPersistence { } @@ -285,7 +316,7 @@ impl SequencerStateDataSource for T where + AvailabilityDataSource + StatusDataSource + VersionedDataSource - + CatchupDataSource + + CatchupStorage + MerklizedStateHeightPersistence { } diff --git a/types/src/v0/impls/block/full_payload/payload.rs b/types/src/v0/impls/block/full_payload/payload.rs index 06b7bae51..d40be7d4e 100644 --- a/types/src/v0/impls/block/full_payload/payload.rs +++ b/types/src/v0/impls/block/full_payload/payload.rs @@ -32,6 +32,8 @@ pub enum BlockBuildingError { MissingGenesis, #[error("Genesis transaction in non-genesis block")] UnexpectedGenesis, + #[error("ChainConfig is not available")] + MissingChainConfig(String), } impl Payload { @@ -153,13 +155,12 @@ impl BlockPayload for Payload { } else { match validated_state_cf.resolve() { Some(cf) => cf, - None => { - instance_state - .peers - .as_ref() - .fetch_chain_config(validated_state_cf.commit()) - .await - } + None => instance_state + .peers + .as_ref() + .fetch_chain_config(validated_state_cf.commit()) + .await + .map_err(|err| BlockBuildingError::MissingChainConfig(format!("{err:#}")))?, } }; diff --git a/types/src/v0/impls/fee_info.rs b/types/src/v0/impls/fee_info.rs index ca9e5f625..b1bce9e9e 100644 --- a/types/src/v0/impls/fee_info.rs +++ b/types/src/v0/impls/fee_info.rs @@ -420,6 +420,34 @@ impl From<(FeeAccountProof, U256)> for AccountQueryData { } } +/// Get a partial snapshot of the given fee state, which contains only the specified accounts. +/// +/// Fails if one of the requested accounts is not represented in the original `state`. +pub fn retain_accounts( + state: &FeeMerkleTree, + accounts: impl IntoIterator, +) -> anyhow::Result { + let mut snapshot = FeeMerkleTree::from_commitment(state.commitment()); + for account in accounts { + match state.universal_lookup(account) { + LookupResult::Ok(elem, proof) => { + // This remember cannot fail, since we just constructed a valid proof, and are + // remembering into a tree with the same commitment. + snapshot.remember(account, *elem, proof).unwrap(); + } + LookupResult::NotFound(proof) => { + // Likewise this cannot fail. + snapshot.non_membership_remember(account, proof).unwrap() + } + LookupResult::NotInMemory => { + bail!("missing account {account}"); + } + } + } + + Ok(snapshot) +} + #[cfg(test)] mod test { use ethers::abi::Address; diff --git a/types/src/v0/impls/header.rs b/types/src/v0/impls/header.rs index 6315a4dda..1fe9eaffa 100644 --- a/types/src/v0/impls/header.rs +++ b/types/src/v0/impls/header.rs @@ -10,9 +10,10 @@ use hotshot_types::{ BlockPayload, ValidatedState as _, }, utils::BuilderCommitment, - vid::{VidCommitment, VidCommon}, + vid::{VidCommitment, VidCommon, VidSchemeType}, }; use jf_merkle_tree::{AppendableMerkleTreeScheme, MerkleTreeScheme}; +use jf_vid::VidScheme; use serde::{ de::{self, MapAccess, SeqAccess, Visitor}, Deserialize, Deserializer, Serialize, Serializer, @@ -521,16 +522,16 @@ impl Header { async fn get_chain_config( validated_state: &ValidatedState, instance_state: &NodeState, - ) -> ChainConfig { + ) -> anyhow::Result { let validated_cf = validated_state.chain_config; let instance_cf = instance_state.chain_config; if validated_cf.commit() == instance_cf.commit() { - return instance_cf; + return Ok(instance_cf); } match validated_cf.resolve() { - Some(cf) => cf, + Some(cf) => Ok(cf), None => { tracing::info!("fetching chain config {} from peers", validated_cf.commit()); @@ -732,6 +733,17 @@ impl BlockHeader for Header { /// Build a header with the parent validate state, instance-level state, parent leaf, payload /// commitment, metadata, and auction results. This is only used in post-marketplace versions + #[tracing::instrument( + skip_all, + fields( + height = parent_leaf.block_header().block_number() + 1, + parent_view = ?parent_leaf.view_number(), + payload_commitment, + payload_size = VidSchemeType::get_payload_byte_len(&_vid_common), + ?auction_results, + version, + ) + )] async fn new_marketplace( parent_state: &::ValidatedState, instance_state: &<::ValidatedState as hotshot_types::traits::ValidatedState>::Instance, @@ -744,6 +756,8 @@ impl BlockHeader for Header { auction_results: Option, version: Version, ) -> Result { + tracing::info!("preparing to propose marketplace header"); + let height = parent_leaf.height(); let view = parent_leaf.view_number(); @@ -755,10 +769,10 @@ impl BlockHeader for Header { UpgradeType::Marketplace { chain_config } => chain_config, UpgradeType::Fee { chain_config } => chain_config, }, - None => Header::get_chain_config(&validated_state, instance_state).await, + None => Header::get_chain_config(&validated_state, instance_state).await?, } } else { - Header::get_chain_config(&validated_state, instance_state).await + Header::get_chain_config(&validated_state, instance_state).await? }; validated_state.chain_config = chain_config.into(); @@ -807,6 +821,7 @@ impl BlockHeader for Header { .peers .as_ref() .fetch_accounts( + instance_state, height, view, parent_state.fee_merkle_tree.commitment(), @@ -815,9 +830,8 @@ impl BlockHeader for Header { .await?; // Insert missing fee state entries - for account in missing_account_proofs.iter() { - account - .proof + for proof in missing_account_proofs.iter() { + proof .remember(&mut validated_state.fee_merkle_tree) .context("remembering fee account")?; } @@ -829,7 +843,12 @@ impl BlockHeader for Header { instance_state .peers .as_ref() - .remember_blocks_merkle_tree(height, view, &mut validated_state.block_merkle_tree) + .remember_blocks_merkle_tree( + instance_state, + height, + view, + &mut validated_state.block_merkle_tree, + ) .await .context("remembering block proof")?; } @@ -850,6 +869,16 @@ impl BlockHeader for Header { )?) } + #[tracing::instrument( + skip_all, + fields( + height = parent_leaf.block_header().block_number() + 1, + parent_view = ?parent_leaf.view_number(), + payload_commitment, + payload_size = VidSchemeType::get_payload_byte_len(&_vid_common), + version, + ) + )] async fn new_legacy( parent_state: &ValidatedState, instance_state: &NodeState, @@ -861,6 +890,8 @@ impl BlockHeader for Header { _vid_common: VidCommon, version: Version, ) -> Result { + tracing::info!("preparing to propose legacy header"); + let height = parent_leaf.height(); let view = parent_leaf.view_number(); @@ -870,12 +901,12 @@ impl BlockHeader for Header { match instance_state.upgrades.get(&version) { Some(upgrade) => match upgrade.upgrade_type { UpgradeType::Fee { chain_config } => chain_config, - _ => Header::get_chain_config(&validated_state, instance_state).await, + _ => Header::get_chain_config(&validated_state, instance_state).await?, }, - None => Header::get_chain_config(&validated_state, instance_state).await, + None => Header::get_chain_config(&validated_state, instance_state).await?, } } else { - Header::get_chain_config(&validated_state, instance_state).await + Header::get_chain_config(&validated_state, instance_state).await? }; validated_state.chain_config = chain_config.into(); @@ -921,6 +952,7 @@ impl BlockHeader for Header { .peers .as_ref() .fetch_accounts( + instance_state, height, view, parent_state.fee_merkle_tree.commitment(), @@ -929,9 +961,8 @@ impl BlockHeader for Header { .await?; // Insert missing fee state entries - for account in missing_account_proofs.iter() { - account - .proof + for proof in missing_account_proofs.iter() { + proof .remember(&mut validated_state.fee_merkle_tree) .context("remembering fee account")?; } @@ -943,7 +974,12 @@ impl BlockHeader for Header { instance_state .peers .as_ref() - .remember_blocks_merkle_tree(height, view, &mut validated_state.block_merkle_tree) + .remember_blocks_merkle_tree( + instance_state, + height, + view, + &mut validated_state.block_merkle_tree, + ) .await .context("remembering block proof")?; } @@ -1398,7 +1434,13 @@ mod test_headers { // Pass a different chain config to trigger a chain config validation error. let state = validated_state - .apply_header(&genesis.instance_state, &parent_leaf, &proposal, ver) + .apply_header( + &genesis.instance_state, + &genesis.instance_state.peers, + &parent_leaf, + &proposal, + ver, + ) .await .unwrap() .0; @@ -1421,7 +1463,13 @@ mod test_headers { // Advance `proposal.height` to trigger validation error. let validated_state = validated_state - .apply_header(&genesis.instance_state, &parent_leaf, &proposal, ver) + .apply_header( + &genesis.instance_state, + &genesis.instance_state.peers, + &parent_leaf, + &proposal, + ver, + ) .await .unwrap() .0; @@ -1445,7 +1493,13 @@ mod test_headers { *proposal.height_mut() += 1; let validated_state = validated_state - .apply_header(&genesis.instance_state, &parent_leaf, &proposal, ver) + .apply_header( + &genesis.instance_state, + &genesis.instance_state.peers, + &parent_leaf, + &proposal, + ver, + ) .await .unwrap() .0; @@ -1550,6 +1604,7 @@ mod test_headers { let proposal_state = proposal_state .apply_header( &genesis_state, + &genesis_state.peers, &parent_leaf, &proposal, StaticVersion::<0, 1>::version(), diff --git a/types/src/v0/impls/instance_state.rs b/types/src/v0/impls/instance_state.rs index 0b6ebcf26..10e851b39 100644 --- a/types/src/v0/impls/instance_state.rs +++ b/types/src/v0/impls/instance_state.rs @@ -1,6 +1,6 @@ -use crate::{ - v0::traits::StateCatchup, v0_3::ChainConfig, GenesisHeader, L1BlockInfo, L1Client, PubKey, - Timestamp, Upgrade, UpgradeMode, +use crate::v0::{ + retain_accounts, traits::StateCatchup, v0_3::ChainConfig, FeeMerkleTree, GenesisHeader, + L1BlockInfo, L1Client, PubKey, Timestamp, Upgrade, UpgradeMode, }; use hotshot_types::traits::states::InstanceState; use hotshot_types::HotShotConfig; @@ -173,10 +173,7 @@ pub mod mock { use jf_merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme}; use super::*; - use crate::{ - v0_1::{AccountQueryData, FeeAccountProof}, - BackoffParams, BlockMerkleTree, FeeAccount, FeeMerkleCommitment, - }; + use crate::{BackoffParams, BlockMerkleTree, FeeAccount, FeeMerkleCommitment}; #[derive(Debug, Clone, Default)] pub struct MockStateCatchup { @@ -195,24 +192,24 @@ pub mod mock { #[async_trait] impl StateCatchup for MockStateCatchup { - async fn try_fetch_account( + async fn try_fetch_accounts( &self, + _instance: &NodeState, _height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { let src = &self.state[&view].fee_merkle_tree; assert_eq!(src.commitment(), fee_merkle_tree_root); - tracing::info!("catchup: fetching account {account:?} for view {view:?}"); - Ok(FeeAccountProof::prove(src, account.into()) - .unwrap_or_else(|| panic!("Account {account:?} not in memory")) - .into()) + tracing::info!("catchup: fetching accounts {accounts:?} for view {view:?}"); + retain_accounts(src, accounts.iter().copied()) } async fn try_remember_blocks_merkle_tree( &self, + _instance: &NodeState, _height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, diff --git a/types/src/v0/impls/mod.rs b/types/src/v0/impls/mod.rs index 43f25a047..3190f54d5 100644 --- a/types/src/v0/impls/mod.rs +++ b/types/src/v0/impls/mod.rs @@ -12,7 +12,10 @@ mod state; mod transaction; pub use auction::SolverAuctionResultsProvider; -pub use fee_info::FeeError; +pub use fee_info::{retain_accounts, FeeError}; pub use instance_state::{mock, NodeState}; pub use state::ProposalValidationError; -pub use state::{validate_proposal, BuilderValidationError, StateValidationError, ValidatedState}; +pub use state::{ + get_l1_deposits, validate_proposal, BuilderValidationError, StateValidationError, + ValidatedState, +}; diff --git a/types/src/v0/impls/state.rs b/types/src/v0/impls/state.rs index fc2d8fb78..5bf766fc0 100644 --- a/types/src/v0/impls/state.rs +++ b/types/src/v0/impls/state.rs @@ -31,6 +31,7 @@ use super::{ BlockSize, FeeMerkleCommitment, }; use crate::{ + traits::StateCatchup, v0_3::{ChainConfig, FullNetworkTx, IterableFeeInfo, ResolvableChainConfig}, BlockMerkleTree, Delta, FeeAccount, FeeAmount, FeeInfo, FeeMerkleTree, Header, Leaf, NsTableValidationError, PayloadByteLen, SeqTypes, UpgradeType, BLOCK_MERKLE_TREE_HEIGHT, @@ -430,6 +431,7 @@ impl ValidatedState { pub async fn apply_header( &self, instance: &NodeState, + peers: &impl StateCatchup, parent_leaf: &Leaf, proposed_header: &Header, version: Version, @@ -441,7 +443,7 @@ impl ValidatedState { validated_state.apply_upgrade(instance, version); let chain_config = validated_state - .get_chain_config(instance, &proposed_header.chain_config()) + .get_chain_config(instance, peers, &proposed_header.chain_config()) .await?; if Some(chain_config) != validated_state.chain_config.resolve() { @@ -476,10 +478,9 @@ impl ValidatedState { ?parent_view, "fetching block frontier from peers" ); - instance - .peers - .as_ref() + peers .remember_blocks_merkle_tree( + instance, parent_height, parent_view, &mut validated_state.block_merkle_tree, @@ -496,10 +497,9 @@ impl ValidatedState { "fetching missing accounts from peers" ); - let missing_account_proofs = instance - .peers - .as_ref() + let missing_account_proofs = peers .fetch_accounts( + instance, parent_height, parent_view, validated_state.fee_merkle_tree.commitment(), @@ -508,9 +508,8 @@ impl ValidatedState { .await?; // Remember the fee state entries - for account in missing_account_proofs.iter() { - account - .proof + for proof in missing_account_proofs.iter() { + proof .remember(&mut validated_state.fee_merkle_tree) .expect("proof previously verified"); } @@ -560,6 +559,7 @@ impl ValidatedState { pub(crate) async fn get_chain_config( &self, instance: &NodeState, + peers: &impl StateCatchup, header_cf: &ResolvableChainConfig, ) -> anyhow::Result { let state_cf = self.chain_config; @@ -571,13 +571,7 @@ impl ValidatedState { let cf = match (state_cf.resolve(), header_cf.resolve()) { (Some(cf), _) => cf, (_, Some(cf)) if cf.commit() == state_cf.commit() => cf, - (_, Some(_)) | (None, None) => { - instance - .peers - .as_ref() - .fetch_chain_config(state_cf.commit()) - .await - } + (_, Some(_)) | (None, None) => peers.fetch_chain_config(state_cf.commit()).await?, }; Ok(cf) @@ -701,7 +695,13 @@ impl HotShotState for ValidatedState { // Unwrapping here is okay as we retry in a loop //so we should either get a validated state or until hotshot cancels the task let (validated_state, delta) = self - .apply_header(instance, parent_leaf, proposed_header, version) + .apply_header( + instance, + &instance.peers, + parent_leaf, + proposed_header, + version, + ) .await .unwrap(); diff --git a/types/src/v0/mod.rs b/types/src/v0/mod.rs index 11dc166ab..147880984 100644 --- a/types/src/v0/mod.rs +++ b/types/src/v0/mod.rs @@ -17,8 +17,8 @@ pub mod traits; mod utils; pub use header::Header; pub use impls::{ - mock, validate_proposal, BuilderValidationError, FeeError, ProposalValidationError, - StateValidationError, + get_l1_deposits, mock, retain_accounts, validate_proposal, BuilderValidationError, FeeError, + ProposalValidationError, StateValidationError, }; pub use utils::*; use vbs::version::{StaticVersion, StaticVersionType}; diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index a7931402d..32bc3aa09 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -24,54 +24,62 @@ use hotshot_types::{ use serde::{de::DeserializeOwned, Serialize}; use crate::{ - v0::impls::ValidatedState, v0_3::ChainConfig, AccountQueryData, BackoffParams, BlockMerkleTree, - Event, FeeAccount, FeeMerkleCommitment, Leaf, NetworkConfig, SeqTypes, + v0::impls::ValidatedState, v0_3::ChainConfig, BackoffParams, BlockMerkleTree, Event, + FeeAccount, FeeAccountProof, FeeMerkleCommitment, FeeMerkleTree, Leaf, NetworkConfig, SeqTypes, }; use super::impls::NodeState; #[async_trait] pub trait StateCatchup: Send + Sync + std::fmt::Debug { - /// Try to fetch the given account state, failing without retrying if unable. - async fn try_fetch_account( + /// Try to fetch the given accounts state, failing without retrying if unable. + async fn try_fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result; + account: &[FeeAccount], + ) -> anyhow::Result; /// Fetch the given list of accounts, retrying on transient errors. async fn fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, accounts: Vec, - ) -> anyhow::Result> { - let mut ret = vec![]; - for account in accounts { - let account = self - .backoff() - .retry(self, |provider| { - provider - .try_fetch_account(height, view, fee_merkle_tree_root, account) + ) -> anyhow::Result> { + self.backoff() + .retry(self, |provider| { + async { + let tree = provider + .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, &accounts) + .await .map_err(|err| { err.context(format!( - "fetching account {account}, height {height}, view {view:?}" + "fetching accounts {accounts:?}, height {height}, view {view:?}" )) + })?; + accounts + .iter() + .map(|account| { + FeeAccountProof::prove(&tree, (*account).into()) + .context(format!("missing account {account}")) + .map(|(proof, _)| proof) }) - .boxed() - }) - .await; - ret.push(account); - } - Ok(ret) + .collect::>>() + } + .boxed() + }) + .await } /// Try to fetch and remember the blocks frontier, failing without retrying if unable. async fn try_remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, @@ -80,18 +88,18 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug { /// Fetch and remember the blocks frontier, retrying on transient errors. async fn remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { self.backoff() .retry(mt, |mt| { - self.try_remember_blocks_merkle_tree(height, view, mt) + self.try_remember_blocks_merkle_tree(instance, height, view, mt) .map_err(|err| err.context("fetching frontier")) .boxed() }) - .await; - Ok(()) + .await } async fn try_fetch_chain_config( @@ -99,7 +107,10 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug { commitment: Commitment, ) -> anyhow::Result; - async fn fetch_chain_config(&self, commitment: Commitment) -> ChainConfig { + async fn fetch_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { self.backoff() .retry(self, |provider| { provider @@ -115,48 +126,54 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug { #[async_trait] impl StateCatchup for Box { - async fn try_fetch_account( + async fn try_fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { (**self) - .try_fetch_account(height, view, fee_merkle_tree_root, account) + .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) .await } async fn fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, accounts: Vec, - ) -> anyhow::Result> { + ) -> anyhow::Result> { (**self) - .fetch_accounts(height, view, fee_merkle_tree_root, accounts) + .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) .await } async fn try_remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(height, view, mt) + .try_remember_blocks_merkle_tree(instance, height, view, mt) .await } async fn remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { - (**self).remember_blocks_merkle_tree(height, view, mt).await + (**self) + .remember_blocks_merkle_tree(instance, height, view, mt) + .await } async fn try_fetch_chain_config( @@ -166,7 +183,10 @@ impl StateCatchup for Box { (**self).try_fetch_chain_config(commitment).await } - async fn fetch_chain_config(&self, commitment: Commitment) -> ChainConfig { + async fn fetch_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { (**self).fetch_chain_config(commitment).await } @@ -177,48 +197,54 @@ impl StateCatchup for Box { #[async_trait] impl StateCatchup for Arc { - async fn try_fetch_account( + async fn try_fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { (**self) - .try_fetch_account(height, view, fee_merkle_tree_root, account) + .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) .await } async fn fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, accounts: Vec, - ) -> anyhow::Result> { + ) -> anyhow::Result> { (**self) - .fetch_accounts(height, view, fee_merkle_tree_root, accounts) + .fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) .await } async fn try_remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { (**self) - .try_remember_blocks_merkle_tree(height, view, mt) + .try_remember_blocks_merkle_tree(instance, height, view, mt) .await } async fn remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { - (**self).remember_blocks_merkle_tree(height, view, mt).await + (**self) + .remember_blocks_merkle_tree(instance, height, view, mt) + .await } async fn try_fetch_chain_config( @@ -228,7 +254,10 @@ impl StateCatchup for Arc { (**self).try_fetch_chain_config(commitment).await } - async fn fetch_chain_config(&self, commitment: Commitment) -> ChainConfig { + async fn fetch_chain_config( + &self, + commitment: Commitment, + ) -> anyhow::Result { (**self).fetch_chain_config(commitment).await } @@ -240,22 +269,23 @@ impl StateCatchup for Arc { /// Catchup from multiple providers tries each provider in a round robin fashion until it succeeds. #[async_trait] impl StateCatchup for Vec { - #[tracing::instrument(skip(self))] - async fn try_fetch_account( + #[tracing::instrument(skip(self, instance))] + async fn try_fetch_accounts( &self, + instance: &NodeState, height: u64, view: ViewNumber, fee_merkle_tree_root: FeeMerkleCommitment, - account: FeeAccount, - ) -> anyhow::Result { + accounts: &[FeeAccount], + ) -> anyhow::Result { for provider in self { match provider - .try_fetch_account(height, view, fee_merkle_tree_root, account) + .try_fetch_accounts(instance, height, view, fee_merkle_tree_root, accounts) .await { - Ok(account) => return Ok(account), + Ok(tree) => return Ok(tree), Err(err) => { - tracing::warn!(%account, ?provider, "failed to fetch account: {err:#}"); + tracing::warn!(?accounts, ?provider, "failed to fetch accounts: {err:#}"); } } } @@ -266,13 +296,14 @@ impl StateCatchup for Vec { #[tracing::instrument(skip(self, mt))] async fn try_remember_blocks_merkle_tree( &self, + instance: &NodeState, height: u64, view: ViewNumber, mt: &mut BlockMerkleTree, ) -> anyhow::Result<()> { for provider in self { match provider - .try_remember_blocks_merkle_tree(height, view, mt) + .try_remember_blocks_merkle_tree(instance, height, view, mt) .await { Ok(()) => return Ok(()), @@ -357,6 +388,11 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { &self, ) -> anyhow::Result>>>; + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>>; + async fn load_vid_share( &self, view: ViewNumber, diff --git a/types/src/v0/utils.rs b/types/src/v0/utils.rs index 6fb254733..ff7652f36 100644 --- a/types/src/v0/utils.rs +++ b/types/src/v0/utils.rs @@ -216,6 +216,10 @@ pub struct BackoffParams { default_value = "1:10" )] jitter: Ratio, + + /// Disable retries and just fail after one failed attempt. + #[clap(short, long, env = "ESPRESSO_SEQUENCER_CATCHUP_BACKOFF_DISABLE")] + disable: bool, } impl Default for BackoffParams { @@ -225,15 +229,25 @@ impl Default for BackoffParams { } impl BackoffParams { + pub fn disabled() -> Self { + Self { + disable: true, + ..Default::default() + } + } + pub async fn retry( &self, mut state: S, f: impl for<'a> Fn(&'a mut S) -> BoxFuture<'a, anyhow::Result>, - ) -> T { + ) -> anyhow::Result { let mut delay = self.base; loop { match f(&mut state).await { - Ok(res) => break res, + Ok(res) => break Ok(res), + Err(err) if self.disable => { + return Err(err.context("Retryable operation failed; retries disabled")); + } Err(err) => { tracing::warn!( "Retryable operation failed, will retry after {delay:?}: {err:#}"