Skip to content

Commit

Permalink
Add proposal fetching task
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Oct 11, 2024
1 parent 6cde9e7 commit 79d17b9
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 15 deletions.
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

145 changes: 143 additions & 2 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::fmt::Display;

use anyhow::Context;
use async_compatibility_layer::art::async_timeout;
use async_std::{
sync::{Arc, RwLock},
task::{spawn, JoinHandle},
task::{sleep, spawn, JoinHandle},
};
use committable::{Commitment, Committable};
use derivative::Derivative;
use espresso_types::{
v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
Expand All @@ -30,10 +32,13 @@ use hotshot_types::{
election::Membership,
metrics::Metrics,
network::{ConnectedNetwork, Topic},
node_implementation::Versions,
node_implementation::{ConsensusTime, Versions},
ValidatedState as _,
},
utils::{View, ViewInner},
PeerConfig,
};
use std::time::Duration;
use url::Url;

use crate::{
Expand Down Expand Up @@ -205,6 +210,10 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
node_state,
config,
};
ctx.spawn(
"proposal fetcher",
fetch_proposals(ctx.handle.clone(), persistence.clone()),
);
ctx.spawn(
"main event handler",
handle_events(
Expand Down Expand Up @@ -399,6 +408,138 @@ async fn handle_events<V: Versions>(
}
}

#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let mut tasks = TaskList::default();
let mut events = consensus.read().await.event_stream();
while let Some(event) = events.next().await {
let EventType::QuorumProposal { proposal, .. } = event.event else {
continue;
};
// Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
// to the anchor. This allows state replay from the decided state.
let parent_view = proposal.data.justify_qc.view_number;
let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
tasks.spawn(
format!("fetch proposal {parent_view:?},{parent_leaf}"),
fetch_proposal_chain(
consensus.clone(),
persistence.clone(),
parent_view,
parent_leaf,
),
);
}
}

#[tracing::instrument(skip(consensus, persistence))]
async fn fetch_proposal_chain<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
mut view: ViewNumber,
mut leaf: Commitment<Leaf<SeqTypes>>,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let anchor_view = loop {
match persistence.load_anchor_leaf().await {
Ok(Some((leaf, _))) => break leaf.view_number(),
Ok(None) => break ViewNumber::genesis(),
Err(err) => {
tracing::warn!("error loading anchor view: {err:#}");
sleep(Duration::from_secs(1)).await;
}
}
};
while view > anchor_view {
match persistence.load_quorum_proposal(view).await {
Ok(proposal) => {
// If we already have the proposal in storage, keep traversing the chain to its
// parent.
view = proposal.data.view_number;
leaf = proposal.data.justify_qc.data.leaf_commit;
continue;
}
Err(err) => {
tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}");
}
}

let proposal = loop {
let future = match consensus.read().await.request_proposal(view, leaf) {
Ok(future) => future,
Err(err) => {
tracing::warn!(?view, %leaf, "failed to request proposal: {err:#}");
sleep(Duration::from_secs(1)).await;
continue;
}
};
match async_timeout(Duration::from_secs(30), future).await {
Ok(Ok(proposal)) => break proposal,
Ok(Err(err)) => {
tracing::warn!("error fetching proposal: {err:#}");
}
Err(_) => {
tracing::warn!("timed out fetching proposal");
}
}

// Sleep before retrying to avoid hot loop.
sleep(Duration::from_secs(1)).await;
};

while let Err(err) = persistence.append_quorum_proposal(&proposal).await {
tracing::warn!("error saving fetched proposal: {err:#}");
sleep(Duration::from_secs(1)).await;
}

// Add the fetched leaf to HotShot state, so consensus can make use of it.
{
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let handle = consensus.read().await;
let consensus = handle.consensus();
let mut consensus = consensus.write().await;
if matches!(
consensus.validated_state_map().get(&view),
None | Some(View {
// Replace a Da-only view with a Leaf view, which has strictly more information.
view_inner: ViewInner::Da { .. }
})
) {
let v = View {
view_inner: ViewInner::Leaf {
leaf: Committable::commit(&leaf),
state: Arc::new(ValidatedState::from_header(leaf.block_header())),
delta: None,
},
};
if let Err(err) = consensus.update_validated_state_map(view, v) {
tracing::warn!(?view, "unable to update validated state map: {err:#}");
}
consensus
.update_saved_leaves(leaf, &handle.hotshot.upgrade_lock)
.await;
tracing::info!(
?view,
"added view to validated state map view proposal fetcher"
);
}
}

view = proposal.data.justify_qc.view_number;
leaf = proposal.data.justify_qc.data.leaf_commit;
}
}

#[derive(Debug, Default)]
pub(crate) struct TaskList(Vec<(String, JoinHandle<()>)>);

Expand Down
12 changes: 12 additions & 0 deletions sequencer/src/persistence/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,18 @@ impl SequencerPersistence for Persistence {
Ok(map)
}

async fn load_quorum_proposal(
&self,
view: ViewNumber,
) -> anyhow::Result<Proposal<SeqTypes, QuorumProposal<SeqTypes>>> {
let inner = self.inner.read().await;
let dir_path = inner.quorum_proposals_dir_path();
let file_path = dir_path.join(view.to_string()).with_extension("txt");
let bytes = fs::read(file_path)?;
let proposal = bincode::deserialize(&bytes)?;
Ok(proposal)
}

async fn load_upgrade_certificate(
&self,
) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
Expand Down
7 changes: 7 additions & 0 deletions sequencer/src/persistence/no_storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Mock implementation of persistence, for testing.
#![cfg(any(test, feature = "testing"))]

use anyhow::bail;
use async_std::sync::Arc;
use async_trait::async_trait;
use espresso_types::{
Expand Down Expand Up @@ -113,6 +114,12 @@ impl SequencerPersistence for NoStorage {
) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposal<SeqTypes>>>> {
Ok(Default::default())
}
async fn load_quorum_proposal(
&self,
view: ViewNumber,
) -> anyhow::Result<Proposal<SeqTypes, QuorumProposal<SeqTypes>>> {
bail!("proposal {view:?} not available");
}
async fn load_upgrade_certificate(
&self,
) -> anyhow::Result<Option<UpgradeCertificate<SeqTypes>>> {
Expand Down
18 changes: 18 additions & 0 deletions sequencer/src/persistence/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,24 @@ impl SequencerPersistence for Persistence {
))
}

async fn load_quorum_proposal(
&self,
view: ViewNumber,
) -> anyhow::Result<Proposal<SeqTypes, QuorumProposal<SeqTypes>>> {
let row = self
.db
.read()
.await?
.query_one(
"SELECT * FROM quorum_proposals WHERE view = $1 LIMIT 1",
[view.u64() as i64],
)
.await?;
let data: Vec<u8> = row.try_get("data")?;
let proposal = bincode::deserialize(&data)?;
Ok(proposal)
}

async fn append_vid(
&self,
proposal: &Proposal<SeqTypes, VidDisperseShare<SeqTypes>>,
Expand Down
5 changes: 5 additions & 0 deletions types/src/v0/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ pub trait SequencerPersistence: Sized + Send + Sync + 'static {
&self,
) -> anyhow::Result<BTreeMap<ViewNumber, Proposal<SeqTypes, QuorumProposal<SeqTypes>>>>;

async fn load_quorum_proposal(
&self,
view: ViewNumber,
) -> anyhow::Result<Proposal<SeqTypes, QuorumProposal<SeqTypes>>>;

async fn load_vid_share(
&self,
view: ViewNumber,
Expand Down

0 comments on commit 79d17b9

Please sign in to comment.