Skip to content

Commit

Permalink
Add metrics for proposal fetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
jbearer committed Dec 9, 2024
1 parent 7ffbd17 commit d2cadca
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 178 deletions.
6 changes: 3 additions & 3 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ lint:
cargo clippy --workspace --features testing --all-targets -- -D warnings
cargo clippy --workspace --all-targets --manifest-path sequencer-sqlite/Cargo.toml -- -D warnings
build:
build profile="test":
#!/usr/bin/env bash
set -euxo pipefail
# Use the same target dir for both `build` invocations
export CARGO_TARGET_DIR=${CARGO_TARGET_DIR:-target}
cargo build --profile test
cargo build --profile test --manifest-path ./sequencer-sqlite/Cargo.toml
cargo build --profile {{profile}}
cargo build --profile {{profile}} --manifest-path ./sequencer-sqlite/Cargo.toml
demo-native-mp *args: build
scripts/demo-native -f process-compose.yaml -f process-compose-mp.yml {{args}}
Expand Down
184 changes: 11 additions & 173 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
use std::{fmt::Display, sync::Arc};

use anyhow::Context;
use async_channel::{Receiver, Sender};
use async_lock::RwLock;
use clap::Parser;
use committable::Commitment;
use derivative::Derivative;
use espresso_types::{
parse_duration,
v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
NodeState, PubKey, Transaction, ValidatedState,
};
Expand All @@ -22,63 +18,33 @@ use hotshot::{
};
use hotshot_events_service::events_source::{EventConsumer, EventsStreamer};
use parking_lot::Mutex;
use tokio::{
spawn,
task::JoinHandle,
time::{sleep, timeout},
};
use tokio::{spawn, task::JoinHandle};

use hotshot_orchestrator::client::OrchestratorClient;
use hotshot_types::{
consensus::ConsensusMetricsValue,
data::{EpochNumber, Leaf2, ViewNumber},
data::{Leaf2, ViewNumber},
network::NetworkConfig,
traits::{
metrics::Metrics,
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeType, Versions},
ValidatedState as _,
node_implementation::{NodeType, Versions},
},
utils::{View, ViewInner},
PeerConfig, ValidatorConfig,
};
use std::time::Duration;
use tracing::{Instrument, Level};
use url::Url;

use crate::{
external_event_handler::{self, ExternalEventHandler},
proposal_fetcher::ProposalFetcherConfig,
state_signature::StateSigner,
static_stake_table_commitment, Node, SeqTypes, SequencerApiVersion,
};

/// The consensus handle
pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;

#[derive(Clone, Copy, Debug, Parser)]
pub struct ProposalFetcherConfig {
#[clap(
long = "proposal-fetcher-num-workers",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
default_value = "2"
)]
pub num_workers: usize,

#[clap(
long = "proposal-fetcher-fetch-timeout",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
default_value = "2s",
value_parser = parse_duration,
)]
pub fetch_timeout: Duration,
}

impl Default for ProposalFetcherConfig {
fn default() -> Self {
Self::parse_from(std::iter::empty::<String>())
}
}

/// The sequencer context contains a consensus handle and other sequencer specific information.
#[derive(Derivative, Clone)]
#[derivative(Debug(bound = ""))]
Expand Down Expand Up @@ -203,6 +169,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
event_consumer,
anchor_view,
proposal_fetcher_cfg,
metrics,
)
.with_task_list(tasks))
}
Expand All @@ -221,6 +188,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
event_consumer: impl PersistenceEventConsumer + 'static,
anchor_view: Option<ViewNumber>,
proposal_fetcher_cfg: ProposalFetcherConfig,
metrics: &dyn Metrics,
) -> Self {
let events = handle.event_stream();

Expand All @@ -238,23 +206,12 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
};

// Spawn proposal fetching tasks.
let (send, recv) = async_channel::unbounded();
ctx.spawn(
"proposal scanner",
scan_proposals(ctx.handle.clone(), send.clone()),
proposal_fetcher_cfg.spawn(
&mut ctx.tasks,
ctx.handle.clone(),
persistence.clone(),
metrics,
);
for i in 0..proposal_fetcher_cfg.num_workers {
ctx.spawn(
format!("proposal fetcher {i}"),
fetch_proposals(
ctx.handle.clone(),
persistence.clone(),
send.clone(),
recv.clone(),
proposal_fetcher_cfg.fetch_timeout,
),
);
}

// Spawn event handling loop.
ctx.spawn(
Expand Down Expand Up @@ -472,125 +429,6 @@ async fn handle_events<V: Versions>(
}
}

#[tracing::instrument(skip_all)]
async fn scan_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
fetcher: Sender<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let mut events = consensus.read().await.event_stream();
while let Some(event) = events.next().await {
let EventType::QuorumProposal { proposal, .. } = event.event else {
continue;
};
// Whenever we see a quorum proposal, ensure we have the chain of proposals stretching back
// to the anchor. This allows state replay from the decided state.
let parent_view = proposal.data.justify_qc.view_number;
let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
fetcher.send((parent_view, parent_leaf)).await.ok();
}
}

#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
sender: Sender<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
receiver: Receiver<(ViewNumber, Commitment<Leaf2<SeqTypes>>)>,
fetch_timeout: Duration,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let mut receiver = std::pin::pin!(receiver);
while let Some((view, leaf)) = receiver.next().await {
let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
let res: anyhow::Result<()> = async {
let anchor_view = load_anchor_view(&*persistence).await;
if view <= anchor_view {
tracing::debug!(?anchor_view, "skipping already-decided proposal");
return Ok(());
}

match persistence.load_quorum_proposal(view).await {
Ok(proposal) => {
// If we already have the proposal in storage, keep traversing the chain to its
// parent.
let view = proposal.data.justify_qc.view_number;
let leaf = proposal.data.justify_qc.data.leaf_commit;
sender.send((view, leaf)).await.ok();
return Ok(());
}
Err(err) => {
tracing::info!("proposal missing from storage; fetching from network: {err:#}");
}
}

let future =
consensus
.read()
.await
.request_proposal(view, EpochNumber::genesis(), leaf)?;
let proposal = timeout(fetch_timeout, future)
.await
.context("timed out fetching proposal")?
.context("error fetching proposal")?;
persistence
.append_quorum_proposal(&proposal)
.await
.context("error saving fetched proposal")?;

// Add the fetched leaf to HotShot state, so consensus can make use of it.
let leaf = Leaf2::from_quorum_proposal(&proposal.data);
let handle = consensus.read().await;
let consensus = handle.consensus();
let mut consensus = consensus.write().await;
if matches!(
consensus.validated_state_map().get(&view),
None | Some(View {
// Replace a Da-only view with a Leaf view, which has strictly more information.
view_inner: ViewInner::Da { .. }
})
) {
let state = Arc::new(ValidatedState::from_header(leaf.block_header()));
if let Err(err) = consensus.update_leaf(leaf, state, None) {
tracing::warn!("unable to update leaf: {err:#}");
}
}

Ok(())
}
.instrument(span)
.await;
if let Err(err) = res {
tracing::warn!("failed to fetch proposal: {err:#}");

// Avoid busy loop when operations are failing.
sleep(Duration::from_secs(1)).await;

// If we fail fetching the proposal, don't let it clog up the fetching task. Just push
// it back onto the queue and move onto the next proposal.
sender.send((view, leaf)).await.ok();
}
}
}

async fn load_anchor_view(persistence: &impl SequencerPersistence) -> ViewNumber {
loop {
match persistence.load_anchor_view().await {
Ok(view) => break view,
Err(err) => {
tracing::warn!("error loading anchor view: {err:#}");
sleep(Duration::from_secs(1)).await;
}
}
}
}

#[derive(Debug, Default, Clone)]
#[allow(clippy::type_complexity)]
pub(crate) struct TaskList(Arc<Mutex<Vec<(String, JoinHandle<()>)>>>);
Expand Down
4 changes: 3 additions & 1 deletion sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod api;
pub mod catchup;
pub mod context;
pub mod genesis;
mod proposal_fetcher;

mod external_event_handler;
pub mod options;
Expand All @@ -13,14 +14,15 @@ mod message_compat_tests;

use anyhow::Context;
use catchup::StatePeers;
use context::{ProposalFetcherConfig, SequencerContext};
use context::SequencerContext;
use espresso_types::{
traits::EventConsumer, BackoffParams, L1ClientOptions, NodeState, PubKey, SeqTypes,
SolverAuctionResultsProvider, ValidatedState,
};
use genesis::L1Finalized;
use hotshot::traits::election::static_committee::StaticCommittee;
use hotshot_types::traits::election::Membership;
use proposal_fetcher::ProposalFetcherConfig;
use std::sync::Arc;
use tokio::select;
// Should move `STAKE_TABLE_CAPACITY` in the sequencer repo when we have variate stake table support
Expand Down
2 changes: 1 addition & 1 deletion sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey};
use libp2p::Multiaddr;
use url::Url;

use crate::{api, context::ProposalFetcherConfig, persistence};
use crate::{api, persistence, proposal_fetcher::ProposalFetcherConfig};

// This options struct is a bit unconventional. The sequencer has multiple optional modules which
// can be added, in any combination, to the service. These include, for example, the API server.
Expand Down
Loading

0 comments on commit d2cadca

Please sign in to comment.