diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 0ee5aca64..45070b283 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -79,6 +79,9 @@ variables = [ "ESPRESSO_SEQUENCER_POSTGRES_PRUNE", "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS", "ESPRESSO_SEQUENCER_POSTGRES_USER", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE", "ESPRESSO_SEQUENCER_PRUNER_INTERVAL", "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE", diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index a7351e9ef..72de7d7db 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,10 +1,13 @@ use std::{fmt::Display, sync::Arc}; use anyhow::Context; +use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLock; +use clap::Parser; use committable::{Commitment, Committable}; use derivative::Derivative; use espresso_types::{ + parse_duration, v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, NodeState, PubKey, Transaction, ValidatedState, }; @@ -52,6 +55,37 @@ use crate::{ /// The consensus handle pub type Consensus = SystemContextHandle, V>; +#[derive(Clone, Copy, Debug, Parser)] +pub struct ProposalFetcherConfig { + #[clap( + long = "proposal-fetcher-num-workers", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", + default_value = "2" + )] + pub num_workers: usize, + + #[clap( + long = "proposal-fetcher-channel-capacity", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", + default_value = "100" + )] + pub channel_capacity: usize, + + #[clap( + long = "proposal-fetcher-fetch-timeout", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + default_value = "2s", + value_parser = parse_duration, + )] + pub fetch_timeout: Duration, +} + +impl Default for ProposalFetcherConfig { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + /// The sequencer context contains a consensus handle and other sequencer specific information. #[derive(Derivative, Clone)] #[derivative(Debug(bound = ""))] @@ -100,6 +134,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, _: V, marketplace_config: MarketplaceConfig>, + proposal_fetcher_cfg: ProposalFetcherConfig, ) -> anyhow::Result { let config = &network_config.config; let pub_key = validator_config.public_key; @@ -174,6 +209,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config, event_consumer, anchor_view, + proposal_fetcher_cfg, ) .with_task_list(tasks)) } @@ -191,11 +227,12 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config: ValidatorConfig<::SignatureKey>, event_consumer: impl PersistenceEventConsumer + 'static, anchor_view: Option, + proposal_fetcher_cfg: ProposalFetcherConfig, ) -> Self { let events = handle.event_stream(); let node_id = node_state.node_id; - let ctx = Self { + let mut ctx = Self { handle: Arc::new(RwLock::new(handle)), state_signer: Arc::new(state_signer), tasks: Default::default(), @@ -207,20 +244,35 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config, }; - // Spawn event handling loops. These can run in the background (detached from `ctx.tasks` - // and thus not explicitly cancelled on `shut_down`) because they each exit on their own - // when the consensus event stream ends. - spawn(fetch_proposals(ctx.handle.clone(), persistence.clone())); - spawn(handle_events( - node_id, - events, - persistence, - ctx.state_signer.clone(), - external_event_handler, - Some(event_streamer.clone()), - event_consumer, - anchor_view, - )); + // Spawn proposal fetching tasks. + let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity); + ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send)); + for i in 0..proposal_fetcher_cfg.num_workers { + ctx.spawn( + format!("proposal fetcher {i}"), + fetch_proposals( + ctx.handle.clone(), + persistence.clone(), + recv.clone(), + proposal_fetcher_cfg.fetch_timeout, + ), + ); + } + + // Spawn event handling loop. + ctx.spawn( + "event handler", + handle_events( + node_id, + events, + persistence, + ctx.state_signer.clone(), + external_event_handler, + Some(event_streamer.clone()), + event_consumer, + anchor_view, + ), + ); ctx } @@ -424,15 +476,14 @@ async fn handle_events( } #[tracing::instrument(skip_all)] -async fn fetch_proposals( +async fn scan_proposals( consensus: Arc>>, - persistence: Arc, + fetcher: Sender<(ViewNumber, Commitment>)>, ) where N: ConnectedNetwork, P: SequencerPersistence, V: Versions, { - let mut tasks = TaskList::default(); let mut events = consensus.read().await.event_stream(); while let Some(event) = events.next().await { let EventType::QuorumProposal { proposal, .. } = event.event else { @@ -442,78 +493,63 @@ async fn fetch_proposals( // to the anchor. This allows state replay from the decided state. let parent_view = proposal.data.justify_qc.view_number; let parent_leaf = proposal.data.justify_qc.data.leaf_commit; - tasks.spawn_short_lived( - format!("fetch proposal {parent_view:?},{parent_leaf}"), - fetch_proposal_chain( - consensus.clone(), - persistence.clone(), - parent_view, - parent_leaf, - ), - ); + fetcher + .broadcast_direct((parent_view, parent_leaf)) + .await + .ok(); } - tasks.shut_down(); } -#[tracing::instrument(skip(consensus, persistence))] -async fn fetch_proposal_chain( +#[tracing::instrument(skip_all)] +async fn fetch_proposals( consensus: Arc>>, persistence: Arc, - mut view: ViewNumber, - mut leaf: Commitment>, + mut scanner: Receiver<(ViewNumber, Commitment>)>, + fetch_timeout: Duration, ) where N: ConnectedNetwork, P: SequencerPersistence, V: Versions, { - while view > load_anchor_view(&*persistence).await { - match persistence.load_quorum_proposal(view).await { - Ok(proposal) => { - // If we already have the proposal in storage, keep traversing the chain to its - // parent. - view = proposal.data.justify_qc.view_number; - leaf = proposal.data.justify_qc.data.leaf_commit; - continue; - } - Err(err) => { - tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}"); + let sender = scanner.new_sender(); + while let Some((view, leaf)) = scanner.next().await { + let span = tracing::warn_span!("fetch proposal", ?view, %leaf); + let res: anyhow::Result<()> = async { + let anchor_view = load_anchor_view(&*persistence).await; + if view <= anchor_view { + tracing::debug!(?anchor_view, "skipping already-decided proposal"); + return Ok(()); } - } - let future = - match consensus - .read() - .await - .request_proposal(view, EpochNumber::genesis(), leaf) - { - Ok(future) => future, + match persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + let view = proposal.data.justify_qc.view_number; + let leaf = proposal.data.justify_qc.data.leaf_commit; + sender.broadcast_direct((view, leaf)).await.ok(); + return Ok(()); + } Err(err) => { - tracing::info!(?view, %leaf, "failed to request proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - continue; + tracing::info!("proposal missing from storage; fetching from network: {err:#}"); } - }; - let proposal = match timeout(Duration::from_secs(30), future).await { - Ok(Ok(proposal)) => proposal, - Ok(Err(err)) => { - tracing::info!("error fetching proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - continue; - } - Err(_) => { - tracing::info!("timed out fetching proposal"); - sleep(Duration::from_secs(1)).await; - continue; } - }; - while let Err(err) = persistence.append_quorum_proposal(&proposal).await { - tracing::warn!("error saving fetched proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - } + let future = + consensus + .read() + .await + .request_proposal(view, EpochNumber::genesis(), leaf)?; + let proposal = timeout(fetch_timeout, future) + .await + .context("timed out fetching proposal")? + .context("error fetching proposal")?; + persistence + .append_quorum_proposal(&proposal) + .await + .context("error saving fetched proposal")?; - // Add the fetched leaf to HotShot state, so consensus can make use of it. - { + // Add the fetched leaf to HotShot state, so consensus can make use of it. let leaf = Leaf::from_quorum_proposal(&proposal.data); let handle = consensus.read().await; let consensus = handle.consensus(); @@ -533,20 +569,25 @@ async fn fetch_proposal_chain( }, }; if let Err(err) = consensus.update_validated_state_map(view, v) { - tracing::warn!(?view, "unable to update validated state map: {err:#}"); + tracing::warn!("unable to update validated state map: {err:#}"); } consensus .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) .await; - tracing::debug!( - ?view, - "added view to validated state map view proposal fetcher" - ); + tracing::debug!("added view to validated state map view proposal fetcher"); } - } - view = proposal.data.justify_qc.view_number; - leaf = proposal.data.justify_qc.data.leaf_commit; + Ok(()) + } + .instrument(span) + .await; + if let Err(err) = res { + tracing::warn!("failed to fetch proposal: {err:#}"); + + // If we fail fetching the proposal, don't let it clog up the fetching task. Just push + // it back onto the queue and move onto the next proposal. + sender.broadcast_direct((view, leaf)).await.ok(); + } } } diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index a4e7ed4fc..2d368b8c9 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -12,7 +12,7 @@ mod message_compat_tests; use anyhow::Context; use async_lock::RwLock; use catchup::StatePeers; -use context::SequencerContext; +use context::{ProposalFetcherConfig, SequencerContext}; use espresso_types::{ traits::EventConsumer, BackoffParams, L1Client, L1ClientOptions, NodeState, PubKey, SeqTypes, SolverAuctionResultsProvider, ValidatedState, @@ -203,6 +203,7 @@ pub async fn init_node( is_da: bool, identity: Identity, marketplace_config: MarketplaceConfig>, + proposal_fetcher_config: ProposalFetcherConfig, ) -> anyhow::Result> { // Expose git information via status API. metrics @@ -550,6 +551,7 @@ pub async fn init_node( event_consumer, seq_versions, marketplace_config, + proposal_fetcher_config, ) .await?; if wait_for_orchestrator { @@ -1066,6 +1068,7 @@ pub mod testing { auction_results_provider: Arc::new(SolverAuctionResultsProvider::default()), fallback_builder_url: marketplace_builder_url, }, + Default::default(), ) .await .unwrap() diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 4ddf31fff..c49492998 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -179,6 +179,7 @@ where }), fallback_builder_url: opt.fallback_builder_url, }; + let proposal_fetcher_config = opt.proposal_fetcher_config; // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize @@ -227,6 +228,7 @@ where opt.is_da, opt.identity, marketplace_config, + proposal_fetcher_config, ) .await } @@ -246,6 +248,7 @@ where opt.is_da, opt.identity, marketplace_config, + proposal_fetcher_config, ) .await? } diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 07f78614b..88544ec0f 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -19,7 +19,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey}; use libp2p::Multiaddr; use url::Url; -use crate::{api, persistence}; +use crate::{api, context::ProposalFetcherConfig, persistence}; // This options struct is a bit unconventional. The sequencer has multiple optional modules which // can be added, in any combination, to the service. These include, for example, the API server. @@ -376,6 +376,9 @@ pub struct Options { #[clap(flatten)] pub identity: Identity, + + #[clap(flatten)] + pub proposal_fetcher_config: ProposalFetcherConfig, } impl Options {