diff --git a/Cargo.lock b/Cargo.lock index 63677f1eb6..ab33e5327e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4099,7 +4099,7 @@ dependencies = [ [[package]] name = "hotshot" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-broadcast", @@ -4147,7 +4147,7 @@ dependencies = [ [[package]] name = "hotshot-builder-api" version = "0.1.7" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "async-trait", "clap", @@ -4248,7 +4248,7 @@ dependencies = [ [[package]] name = "hotshot-example-types" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-broadcast", @@ -4281,7 +4281,7 @@ dependencies = [ [[package]] name = "hotshot-fakeapi" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-lock 2.8.0", @@ -4300,7 +4300,7 @@ dependencies = [ [[package]] name = "hotshot-macros" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "derive_builder", "proc-macro2", @@ -4311,7 +4311,7 @@ dependencies = [ [[package]] name = "hotshot-orchestrator" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-compatibility-layer", @@ -4397,7 +4397,7 @@ dependencies = [ [[package]] name = "hotshot-stake-table" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "ark-bn254", "ark-ed-on-bn254", @@ -4459,7 +4459,7 @@ dependencies = [ [[package]] name = "hotshot-task" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-broadcast", @@ -4474,7 +4474,7 @@ dependencies = [ [[package]] name = "hotshot-task-impls" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-broadcast", @@ -4511,7 +4511,7 @@ dependencies = [ [[package]] name = "hotshot-testing" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-broadcast", @@ -4611,7 +4611,7 @@ dependencies = [ [[package]] name = "hotshot-types" version = "0.1.11" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "ark-bn254", @@ -5902,7 +5902,7 @@ dependencies = [ [[package]] name = "libp2p-networking" version = "0.5.77" -source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#7ed26b6326b07e7faea1e1b568e9619439d8fb6f" +source = "git+https://www.github.com/EspressoSystems/HotShot.git?branch=jb/state-catchup-patch#3fd360ad48794e6f90f09187a4d30f6e1f8fe94d" dependencies = [ "anyhow", "async-compatibility-layer", @@ -7644,7 +7644,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.77", diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 2f343c83b5..2f8441d501 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,10 +1,12 @@ use std::fmt::Display; use anyhow::Context; +use async_compatibility_layer::art::async_timeout; use async_std::{ sync::{Arc, RwLock}, - task::{spawn, JoinHandle}, + task::{sleep, spawn, JoinHandle}, }; +use committable::{Commitment, Committable}; use derivative::Derivative; use espresso_types::{ v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, @@ -30,10 +32,13 @@ use hotshot_types::{ election::Membership, metrics::Metrics, network::{ConnectedNetwork, Topic}, - node_implementation::Versions, + node_implementation::{ConsensusTime, Versions}, + ValidatedState as _, }, + utils::{View, ViewInner}, PeerConfig, }; +use std::time::Duration; use url::Url; use crate::{ @@ -205,6 +210,10 @@ impl, P: SequencerPersistence, V: Versions> Sequence node_state, config, }; + ctx.spawn( + "proposal fetcher", + fetch_proposals(ctx.handle.clone(), persistence.clone()), + ); ctx.spawn( "main event handler", handle_events( @@ -399,6 +408,138 @@ async fn handle_events( } } +#[tracing::instrument(skip_all)] +async fn fetch_proposals( + consensus: Arc>>, + persistence: Arc, +) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + let mut tasks = TaskList::default(); + let mut events = consensus.read().await.event_stream(); + while let Some(event) = events.next().await { + let EventType::QuorumProposal { proposal, .. } = event.event else { + continue; + }; + // Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back + // to the anchor. This allows state replay from the decided state. + let parent_view = proposal.data.justify_qc.view_number; + let parent_leaf = proposal.data.justify_qc.data.leaf_commit; + tasks.spawn( + format!("fetch proposal {parent_view:?},{parent_leaf}"), + fetch_proposal_chain( + consensus.clone(), + persistence.clone(), + parent_view, + parent_leaf, + ), + ); + } +} + +#[tracing::instrument(skip(consensus, persistence))] +async fn fetch_proposal_chain( + consensus: Arc>>, + persistence: Arc, + mut view: ViewNumber, + mut leaf: Commitment>, +) where + N: ConnectedNetwork, + P: SequencerPersistence, + V: Versions, +{ + let anchor_view = loop { + match persistence.load_anchor_leaf().await { + Ok(Some((leaf, _))) => break leaf.view_number(), + Ok(None) => break ViewNumber::genesis(), + Err(err) => { + tracing::warn!("error loading anchor view: {err:#}"); + sleep(Duration::from_secs(1)).await; + } + } + }; + while view > anchor_view { + match persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + view = proposal.data.view_number; + leaf = proposal.data.justify_qc.data.leaf_commit; + continue; + } + Err(err) => { + tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}"); + } + } + + let proposal = loop { + let future = match consensus.read().await.request_proposal(view, leaf) { + Ok(future) => future, + Err(err) => { + tracing::warn!(?view, %leaf, "failed to request proposal: {err:#}"); + sleep(Duration::from_secs(1)).await; + continue; + } + }; + match async_timeout(Duration::from_secs(30), future).await { + Ok(Ok(proposal)) => break proposal, + Ok(Err(err)) => { + tracing::warn!("error fetching proposal: {err:#}"); + } + Err(_) => { + tracing::warn!("timed out fetching proposal"); + } + } + + // Sleep before retrying to avoid hot loop. + sleep(Duration::from_secs(1)).await; + }; + + while let Err(err) = persistence.append_quorum_proposal(&proposal).await { + tracing::warn!("error saving fetched proposal: {err:#}"); + sleep(Duration::from_secs(1)).await; + } + + // Add the fetched leaf to HotShot state, so consensus can make use of it. + { + let leaf = Leaf::from_quorum_proposal(&proposal.data); + let handle = consensus.read().await; + let consensus = handle.consensus(); + let mut consensus = consensus.write().await; + if matches!( + consensus.validated_state_map().get(&view), + None | Some(View { + // Replace a Da-only view with a Leaf view, which has strictly more information. + view_inner: ViewInner::Da { .. } + }) + ) { + let v = View { + view_inner: ViewInner::Leaf { + leaf: Committable::commit(&leaf), + state: Arc::new(ValidatedState::from_header(leaf.block_header())), + delta: None, + }, + }; + if let Err(err) = consensus.update_validated_state_map(view, v) { + tracing::warn!(?view, "unable to update validated state map: {err:#}"); + } + consensus + .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) + .await; + tracing::info!( + ?view, + "added view to validated state map view proposal fetcher" + ); + } + } + + view = proposal.data.justify_qc.view_number; + leaf = proposal.data.justify_qc.data.leaf_commit; + } +} + #[derive(Debug, Default)] pub(crate) struct TaskList(Vec<(String, JoinHandle<()>)>); diff --git a/sequencer/src/persistence/fs.rs b/sequencer/src/persistence/fs.rs index 2f346be266..9a1a6025bd 100644 --- a/sequencer/src/persistence/fs.rs +++ b/sequencer/src/persistence/fs.rs @@ -703,6 +703,18 @@ impl SequencerPersistence for Persistence { Ok(map) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + let inner = self.inner.read().await; + let dir_path = inner.quorum_proposals_dir_path(); + let file_path = dir_path.join(view.to_string()).with_extension("txt"); + let bytes = fs::read(file_path)?; + let proposal = bincode::deserialize(&bytes)?; + Ok(proposal) + } + async fn load_upgrade_certificate( &self, ) -> anyhow::Result>> { diff --git a/sequencer/src/persistence/no_storage.rs b/sequencer/src/persistence/no_storage.rs index 63193e606c..af9b32b689 100644 --- a/sequencer/src/persistence/no_storage.rs +++ b/sequencer/src/persistence/no_storage.rs @@ -1,6 +1,7 @@ //! Mock implementation of persistence, for testing. #![cfg(any(test, feature = "testing"))] +use anyhow::bail; use async_std::sync::Arc; use async_trait::async_trait; use espresso_types::{ @@ -113,6 +114,12 @@ impl SequencerPersistence for NoStorage { ) -> anyhow::Result>>> { Ok(Default::default()) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + bail!("proposal {view:?} not available"); + } async fn load_upgrade_certificate( &self, ) -> anyhow::Result>> { diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 7a1441c9e3..2f670ef065 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -492,6 +492,24 @@ impl SequencerPersistence for Persistence { )) } + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>> { + let row = self + .db + .read() + .await? + .query_one( + "SELECT * FROM quorum_proposals WHERE view = $1 LIMIT 1", + [view.u64() as i64], + ) + .await?; + let data: Vec = row.try_get("data")?; + let proposal = bincode::deserialize(&data)?; + Ok(proposal) + } + async fn append_vid( &self, proposal: &Proposal>, diff --git a/types/src/v0/traits.rs b/types/src/v0/traits.rs index 22382eff22..a4893cac09 100644 --- a/types/src/v0/traits.rs +++ b/types/src/v0/traits.rs @@ -370,6 +370,11 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static { &self, ) -> anyhow::Result>>>; + async fn load_quorum_proposal( + &self, + view: ViewNumber, + ) -> anyhow::Result>>; + async fn load_vid_share( &self, view: ViewNumber,