Skip to content

Commit

Permalink
Fix fee catchup after restart (#2160)
Browse files Browse the repository at this point in the history
* Add regression test for fee catchup after restart

This test reproduces an issue discovered in staging, in which, after
a staggered restart of all nodes, the network has forgotten the fee
state for all accounts except those which actively produced blocks
during the restart. Since the state is needed for undecided blocks,
it cannot directly be obtained from persisted merklized state, which
is only available for decided blocks.

* Add bulk endpoint for account catchup

* Enable account catchup from persisted decided storage

* Cache fetched state in validated state map after fetching from database

* Support migration from existing nodes populating leaf_hash column

* Update HotShot  to enable caching of fetched account states

* Rename migration after merge

* Add proposal fetching task

* Return leaf when fetching account state so we can always add it to the HotShot state map

* Fix deadlock in restart tests

During experimentation I changed the restart tests from starting up
all nodes in parallel to one at a time. This broke the no-CDN tests
because, in some cases, you need multiple nodes to restart before
libp2p can become ready, but this blocks the initialization of the
first restarted node.

Switched it back to starting up in parallel and things are working
again.

* Update sequencer/api/catchup.toml

Co-authored-by: Mathis <[email protected]>

* Avoid dropping TaskList, to avoid blocking executor thread in drop handler

* Exit proposal fetching task if it becomes obsolete

The simplest, most robust change was to make the task a flat retry
loop, instead of having a nested retry loop for the fetch itself,
and to update the anchor view each iteration of the loop, so that
the loop exits if we reach a decide and the anchor view becomes
more recent than the view we are fetching.

* Enable frontier catchup from storage (#2183)

* Enable frontier catchup from storage

* Prevent recursive state reconstruction

* Prefetch accounts used for paying fees during state reconstruction

* Remove no-longer-necessary catchup impl for DB transactions

* Completely disable catchup when not required (NullStateCatchup)

* Fix backoff CLI parsing

* Use the proper chain config during state reconstruction

Closes #2186

* Add comment

* Fix migration conflict

* Make migrations consistent with release-gambit

---------

Co-authored-by: Mathis <[email protected]>
  • Loading branch information
jbearer and sveitser authored Oct 22, 2024
1 parent d31da37 commit 3563c90
Show file tree
Hide file tree
Showing 29 changed files with 1,661 additions and 420 deletions.
15 changes: 3 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions sequencer/api/catchup.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,20 @@ the proof is a Merkle _non-membership_ proof.
```
"""

[route.accounts]
PATH = ["/:height/:view/accounts"]
":height" = "Integer"
":view" = "Integer"
METHOD = "POST"
DOC = """
Bulk version of `/:height/:view/account`. The request body should be a JSON array consisting of
TaggedBase64-encoded fee accounts.
The response is a `FeeMerkleTree` containing sub-trees for each of the requested accounts, which is
a more condensed way to represent the union of account proofs for each requested account. Individual
Merkle proofs for each account can be extracted from this tree.
"""

[route.blocks]
PATH = ["/:height/:view/blocks"]
":height" = "Integer"
Expand Down
2 changes: 2 additions & 0 deletions sequencer/api/migrations/V38__add_quorum_proposal_hash.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE quorum_proposals
ADD COLUMN leaf_hash VARCHAR;
165 changes: 131 additions & 34 deletions sequencer/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ use anyhow::{bail, Context};
use async_once_cell::Lazy;
use async_std::sync::{Arc, RwLock};
use async_trait::async_trait;
use committable::Commitment;
use committable::{Commitment, Committable};
use data_source::{CatchupDataSource, SubmitDataSource};
use derivative::Derivative;
use espresso_types::{
v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData, BlockMerkleTree,
FeeAccountProof, MockSequencerVersions, NodeState, PubKey, Transaction,
retain_accounts, v0::traits::SequencerPersistence, v0_3::ChainConfig, AccountQueryData,
BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleTree, MockSequencerVersions, NodeState,
PubKey, Transaction, ValidatedState,
};
use ethers::prelude::Address;
use futures::{
future::{BoxFuture, Future, FutureExt},
stream::BoxStream,
Expand All @@ -26,14 +26,17 @@ use hotshot_types::{
event::Event,
light_client::StateSignatureRequestBody,
network::NetworkConfig,
traits::{network::ConnectedNetwork, node_implementation::Versions},
traits::{network::ConnectedNetwork, node_implementation::Versions, ValidatedState as _},
utils::{View, ViewInner},
};
use jf_merkle_tree::MerkleTreeScheme;

use self::data_source::{HotShotConfigDataSource, PublicNetworkConfig, StateSignatureDataSource};
use self::data_source::{
HotShotConfigDataSource, NodeStateDataSource, PublicNetworkConfig, StateSignatureDataSource,
};
use crate::{
context::Consensus, network, state_signature::StateSigner, SeqTypes, SequencerApiVersion,
SequencerContext,
catchup::CatchupStorage, context::Consensus, network, state_signature::StateSigner, SeqTypes,
SequencerApiVersion, SequencerContext,
};

pub mod data_source;
Expand Down Expand Up @@ -105,10 +108,6 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> ApiState
Arc::clone(&self.consensus.as_ref().get().await.get_ref().handle)
}

async fn node_state(&self) -> &NodeState {
&self.consensus.as_ref().get().await.get_ref().node_state
}

async fn network_config(&self) -> NetworkConfig<PubKey> {
self.consensus.as_ref().get().await.get_ref().config.clone()
}
Expand Down Expand Up @@ -189,44 +188,128 @@ impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> SubmitDa
}
}

impl<N, P, D, V> NodeStateDataSource for StorageState<N, P, D, V>
where
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
D: Sync,
{
async fn node_state(&self) -> &NodeState {
self.as_ref().node_state().await
}
}

impl<
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
D: CatchupDataSource + Send + Sync,
D: CatchupStorage + Send + Sync,
> CatchupDataSource for StorageState<N, P, D, V>
{
#[tracing::instrument(skip(self))]
async fn get_account(
#[tracing::instrument(skip(self, instance))]
async fn get_accounts(
&self,
instance: &NodeState,
height: u64,
view: ViewNumber,
account: Address,
) -> anyhow::Result<AccountQueryData> {
accounts: &[FeeAccount],
) -> anyhow::Result<FeeMerkleTree> {
// Check if we have the desired state in memory.
match self.as_ref().get_account(height, view, account).await {
Ok(account) => return Ok(account),
match self
.as_ref()
.get_accounts(instance, height, view, accounts)
.await
{
Ok(accounts) => return Ok(accounts),
Err(err) => {
tracing::info!("account is not in memory, trying storage: {err:#}");
tracing::info!("accounts not in memory, trying storage: {err:#}");
}
}

// Try storage.
self.inner().get_account(height, view, account).await
let (tree, leaf) = self
.inner()
.get_accounts(instance, height, view, accounts)
.await
.context("accounts not in memory, and could not fetch from storage")?;
// If we successfully fetched accounts from storage, try to add them back into the in-memory
// state.
let handle = self.as_ref().consensus().await;
let handle = handle.read().await;
let consensus = handle.consensus();
let mut consensus = consensus.write().await;
let (state, delta, leaf_commit) = match consensus.validated_state_map().get(&view) {
Some(View {
view_inner: ViewInner::Leaf { state, delta, leaf },
}) => {
let mut state = (**state).clone();

// Add the fetched accounts to the state.
for account in accounts {
if let Some((proof, _)) = FeeAccountProof::prove(&tree, (*account).into()) {
if let Err(err) = proof.remember(&mut state.fee_merkle_tree) {
tracing::warn!(
?view,
%account,
"cannot update fetched account state: {err:#}"
);
}
} else {
tracing::warn!(?view, %account, "cannot update fetched account state because account is not in the merkle tree");
};
}

(Arc::new(state), delta.clone(), *leaf)
}
_ => {
// If we don't already have a leaf for this view, or if we don't have the view
// at all, we can create a new view based on the recovered leaf and add it to
// our state map. In this case, we must also add the leaf to the saved leaves
// map to ensure consistency.
let mut state = ValidatedState::from_header(leaf.block_header());
state.fee_merkle_tree = tree.clone();
let res = (Arc::new(state), None, Committable::commit(&leaf));
consensus
.update_saved_leaves(leaf, &handle.hotshot.upgrade_lock)
.await;
res
}
};
if let Err(err) = consensus.update_validated_state_map(
view,
View {
view_inner: ViewInner::Leaf {
state,
delta,
leaf: leaf_commit,
},
},
) {
tracing::warn!(?view, "cannot update fetched account state: {err:#}");
}
tracing::info!(?view, "updated with fetched account state");

Ok(tree)
}

#[tracing::instrument(skip(self))]
async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result<BlocksFrontier> {
async fn get_frontier(
&self,
instance: &NodeState,
height: u64,
view: ViewNumber,
) -> anyhow::Result<BlocksFrontier> {
// Check if we have the desired state in memory.
match self.as_ref().get_frontier(height, view).await {
match self.as_ref().get_frontier(instance, height, view).await {
Ok(frontier) => return Ok(frontier),
Err(err) => {
tracing::info!("frontier is not in memory, trying storage: {err:#}");
}
}

// Try storage.
self.inner().get_frontier(height, view).await
self.inner().get_frontier(instance, height, view).await
}

async fn get_chain_config(
Expand Down Expand Up @@ -265,16 +348,28 @@ impl<
// }
// }

impl<N, V, P> NodeStateDataSource for ApiState<N, P, V>
where
N: ConnectedNetwork<PubKey>,
V: Versions,
P: SequencerPersistence,
{
async fn node_state(&self) -> &NodeState {
&self.consensus.as_ref().get().await.get_ref().node_state
}
}

impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupDataSource
for ApiState<N, P, V>
{
#[tracing::instrument(skip(self))]
async fn get_account(
#[tracing::instrument(skip(self, _instance))]
async fn get_accounts(
&self,
_instance: &NodeState,
height: u64,
view: ViewNumber,
account: Address,
) -> anyhow::Result<AccountQueryData> {
accounts: &[FeeAccount],
) -> anyhow::Result<FeeMerkleTree> {
let state = self
.consensus()
.await
Expand All @@ -285,14 +380,16 @@ impl<N: ConnectedNetwork<PubKey>, V: Versions, P: SequencerPersistence> CatchupD
.context(format!(
"state not available for height {height}, view {view:?}"
))?;
let (proof, balance) = FeeAccountProof::prove(&state.fee_merkle_tree, account).context(
format!("account {account} not available for height {height}, view {view:?}"),
)?;
Ok(AccountQueryData { balance, proof })
retain_accounts(&state.fee_merkle_tree, accounts.iter().copied())
}

#[tracing::instrument(skip(self))]
async fn get_frontier(&self, height: u64, view: ViewNumber) -> anyhow::Result<BlocksFrontier> {
async fn get_frontier(
&self,
_instance: &NodeState,
height: u64,
view: ViewNumber,
) -> anyhow::Result<BlocksFrontier> {
let state = self
.consensus()
.await
Expand Down Expand Up @@ -1925,7 +2022,7 @@ mod test {

// Fetch the config from node 1, a different node than the one running the service.
let validator = ValidatorConfig::generated_from_seed_indexed([0; 32], 1, 1, false);
let mut config = peers.fetch_config(validator.clone()).await;
let mut config = peers.fetch_config(validator.clone()).await.unwrap();

// Check the node-specific information in the recovered config is correct.
assert_eq!(config.node_index, 1);
Expand Down
Loading

0 comments on commit 3563c90

Please sign in to comment.