From 4f4efbaee341f679fa33bf676e3b0f90904ce295 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Mon, 18 Nov 2024 11:16:30 -0800 Subject: [PATCH] Decrease load on DB (#2291) * Limit number of simultaneous proposal fetches We have seen that in some cases, when storage is slow or the node is way behind on its event stream, it can trigger many fetches of old proposals, which other nodes have all garbage collected. These fetches accumulate over time, leading to a worsening problem. This change limits the number of parallelism in proposal fetching. Instead of spawning a new task dynamically each time we need to fetch a proposal, we spawn a fixed number of worker tasks, each of which will only fetch one proposal at a time. A scanner task follows the event stream and detects when a proposal needs to be fetched, then broadcasts the request to fetch it to the worker tasks. It will be picked up when a worker is free. * Decrease DB load by not storing full payload with anchor leaf * Handle genesis payload edge case * Avoid busy loop --- sequencer/api/public-env-vars.toml | 3 + sequencer/src/context.rs | 204 ++++++++++++++++++----------- sequencer/src/lib.rs | 5 +- sequencer/src/main.rs | 3 + sequencer/src/options.rs | 5 +- sequencer/src/persistence/sql.rs | 13 +- 6 files changed, 150 insertions(+), 83 deletions(-) diff --git a/sequencer/api/public-env-vars.toml b/sequencer/api/public-env-vars.toml index 74fbe2495..9c9d20346 100644 --- a/sequencer/api/public-env-vars.toml +++ b/sequencer/api/public-env-vars.toml @@ -83,6 +83,9 @@ variables = [ "ESPRESSO_SEQUENCER_POSTGRES_PRUNE", "ESPRESSO_SEQUENCER_POSTGRES_USE_TLS", "ESPRESSO_SEQUENCER_POSTGRES_USER", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", "ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE", "ESPRESSO_SEQUENCER_PRUNER_INTERVAL", "ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE", diff --git a/sequencer/src/context.rs b/sequencer/src/context.rs index 47db1c874..9e3a84776 100644 --- a/sequencer/src/context.rs +++ b/sequencer/src/context.rs @@ -1,10 +1,13 @@ use std::{fmt::Display, sync::Arc}; use anyhow::Context; +use async_broadcast::{broadcast, Receiver, Sender}; use async_lock::RwLock; +use clap::Parser; use committable::{Commitment, Committable}; use derivative::Derivative; use espresso_types::{ + parse_duration, v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence}, NodeState, PubKey, Transaction, ValidatedState, }; @@ -52,6 +55,37 @@ use crate::{ /// The consensus handle pub type Consensus = SystemContextHandle, V>; +#[derive(Clone, Copy, Debug, Parser)] +pub struct ProposalFetcherConfig { + #[clap( + long = "proposal-fetcher-num-workers", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS", + default_value = "2" + )] + pub num_workers: usize, + + #[clap( + long = "proposal-fetcher-channel-capacity", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY", + default_value = "100" + )] + pub channel_capacity: usize, + + #[clap( + long = "proposal-fetcher-fetch-timeout", + env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT", + default_value = "2s", + value_parser = parse_duration, + )] + pub fetch_timeout: Duration, +} + +impl Default for ProposalFetcherConfig { + fn default() -> Self { + Self::parse_from(std::iter::empty::()) + } +} + /// The sequencer context contains a consensus handle and other sequencer specific information. #[derive(Derivative, Clone)] #[derivative(Debug(bound = ""))] @@ -100,6 +134,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence event_consumer: impl PersistenceEventConsumer + 'static, _: V, marketplace_config: MarketplaceConfig>, + proposal_fetcher_cfg: ProposalFetcherConfig, ) -> anyhow::Result { let config = &network_config.config; let pub_key = validator_config.public_key; @@ -174,6 +209,7 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config, event_consumer, anchor_view, + proposal_fetcher_cfg, ) .with_task_list(tasks)) } @@ -191,11 +227,12 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config: ValidatorConfig<::SignatureKey>, event_consumer: impl PersistenceEventConsumer + 'static, anchor_view: Option, + proposal_fetcher_cfg: ProposalFetcherConfig, ) -> Self { let events = handle.event_stream(); let node_id = node_state.node_id; - let ctx = Self { + let mut ctx = Self { handle: Arc::new(RwLock::new(handle)), state_signer: Arc::new(state_signer), tasks: Default::default(), @@ -207,20 +244,35 @@ impl, P: SequencerPersistence, V: Versions> Sequence validator_config, }; - // Spawn event handling loops. These can run in the background (detached from `ctx.tasks` - // and thus not explicitly cancelled on `shut_down`) because they each exit on their own - // when the consensus event stream ends. - spawn(fetch_proposals(ctx.handle.clone(), persistence.clone())); - spawn(handle_events( - node_id, - events, - persistence, - ctx.state_signer.clone(), - external_event_handler, - Some(event_streamer.clone()), - event_consumer, - anchor_view, - )); + // Spawn proposal fetching tasks. + let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity); + ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send)); + for i in 0..proposal_fetcher_cfg.num_workers { + ctx.spawn( + format!("proposal fetcher {i}"), + fetch_proposals( + ctx.handle.clone(), + persistence.clone(), + recv.clone(), + proposal_fetcher_cfg.fetch_timeout, + ), + ); + } + + // Spawn event handling loop. + ctx.spawn( + "event handler", + handle_events( + node_id, + events, + persistence, + ctx.state_signer.clone(), + external_event_handler, + Some(event_streamer.clone()), + event_consumer, + anchor_view, + ), + ); ctx } @@ -424,15 +476,14 @@ async fn handle_events( } #[tracing::instrument(skip_all)] -async fn fetch_proposals( +async fn scan_proposals( consensus: Arc>>, - persistence: Arc, + fetcher: Sender<(ViewNumber, Commitment>)>, ) where N: ConnectedNetwork, P: SequencerPersistence, V: Versions, { - let mut tasks = TaskList::default(); let mut events = consensus.read().await.event_stream(); while let Some(event) = events.next().await { let EventType::QuorumProposal { proposal, .. } = event.event else { @@ -442,78 +493,63 @@ async fn fetch_proposals( // to the anchor. This allows state replay from the decided state. let parent_view = proposal.data.justify_qc.view_number; let parent_leaf = proposal.data.justify_qc.data.leaf_commit; - tasks.spawn_short_lived( - format!("fetch proposal {parent_view:?},{parent_leaf}"), - fetch_proposal_chain( - consensus.clone(), - persistence.clone(), - parent_view, - parent_leaf, - ), - ); + fetcher + .broadcast_direct((parent_view, parent_leaf)) + .await + .ok(); } - tasks.shut_down(); } -#[tracing::instrument(skip(consensus, persistence))] -async fn fetch_proposal_chain( +#[tracing::instrument(skip_all)] +async fn fetch_proposals( consensus: Arc>>, persistence: Arc, - mut view: ViewNumber, - mut leaf: Commitment>, + mut scanner: Receiver<(ViewNumber, Commitment>)>, + fetch_timeout: Duration, ) where N: ConnectedNetwork, P: SequencerPersistence, V: Versions, { - while view > load_anchor_view(&*persistence).await { - match persistence.load_quorum_proposal(view).await { - Ok(proposal) => { - // If we already have the proposal in storage, keep traversing the chain to its - // parent. - view = proposal.data.justify_qc.view_number; - leaf = proposal.data.justify_qc.data.leaf_commit; - continue; + let sender = scanner.new_sender(); + while let Some((view, leaf)) = scanner.next().await { + let span = tracing::warn_span!("fetch proposal", ?view, %leaf); + let res: anyhow::Result<()> = async { + let anchor_view = load_anchor_view(&*persistence).await; + if view <= anchor_view { + tracing::debug!(?anchor_view, "skipping already-decided proposal"); + return Ok(()); } - Err(err) => { - tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}"); - } - } - let future = - match consensus - .read() - .await - .request_proposal(view, EpochNumber::genesis(), leaf) - { - Ok(future) => future, + match persistence.load_quorum_proposal(view).await { + Ok(proposal) => { + // If we already have the proposal in storage, keep traversing the chain to its + // parent. + let view = proposal.data.justify_qc.view_number; + let leaf = proposal.data.justify_qc.data.leaf_commit; + sender.broadcast_direct((view, leaf)).await.ok(); + return Ok(()); + } Err(err) => { - tracing::info!(?view, %leaf, "failed to request proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - continue; + tracing::info!("proposal missing from storage; fetching from network: {err:#}"); } - }; - let proposal = match timeout(Duration::from_secs(30), future).await { - Ok(Ok(proposal)) => proposal, - Ok(Err(err)) => { - tracing::info!("error fetching proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - continue; } - Err(_) => { - tracing::info!("timed out fetching proposal"); - sleep(Duration::from_secs(1)).await; - continue; - } - }; - while let Err(err) = persistence.append_quorum_proposal(&proposal).await { - tracing::warn!("error saving fetched proposal: {err:#}"); - sleep(Duration::from_secs(1)).await; - } + let future = + consensus + .read() + .await + .request_proposal(view, EpochNumber::genesis(), leaf)?; + let proposal = timeout(fetch_timeout, future) + .await + .context("timed out fetching proposal")? + .context("error fetching proposal")?; + persistence + .append_quorum_proposal(&proposal) + .await + .context("error saving fetched proposal")?; - // Add the fetched leaf to HotShot state, so consensus can make use of it. - { + // Add the fetched leaf to HotShot state, so consensus can make use of it. let leaf = Leaf::from_quorum_proposal(&proposal.data); let handle = consensus.read().await; let consensus = handle.consensus(); @@ -533,20 +569,28 @@ async fn fetch_proposal_chain( }, }; if let Err(err) = consensus.update_validated_state_map(view, v) { - tracing::warn!(?view, "unable to update validated state map: {err:#}"); + tracing::warn!("unable to update validated state map: {err:#}"); } consensus .update_saved_leaves(leaf, &handle.hotshot.upgrade_lock) .await; - tracing::debug!( - ?view, - "added view to validated state map view proposal fetcher" - ); + tracing::debug!("added view to validated state map view proposal fetcher"); } + + Ok(()) } + .instrument(span) + .await; + if let Err(err) = res { + tracing::warn!("failed to fetch proposal: {err:#}"); - view = proposal.data.justify_qc.view_number; - leaf = proposal.data.justify_qc.data.leaf_commit; + // Avoid busy loop when operations are failing. + sleep(Duration::from_secs(1)).await; + + // If we fail fetching the proposal, don't let it clog up the fetching task. Just push + // it back onto the queue and move onto the next proposal. + sender.broadcast_direct((view, leaf)).await.ok(); + } } } diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index d150d9989..fe07de294 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -12,7 +12,7 @@ mod message_compat_tests; use anyhow::Context; use async_lock::RwLock; use catchup::StatePeers; -use context::SequencerContext; +use context::{ProposalFetcherConfig, SequencerContext}; use espresso_types::{ traits::EventConsumer, BackoffParams, L1Client, L1ClientOptions, NodeState, PubKey, SeqTypes, SolverAuctionResultsProvider, ValidatedState, @@ -198,6 +198,7 @@ pub async fn init_node( is_da: bool, identity: Identity, marketplace_config: MarketplaceConfig>, + proposal_fetcher_config: ProposalFetcherConfig, ) -> anyhow::Result> { // Expose git information via status API. metrics @@ -535,6 +536,7 @@ pub async fn init_node( event_consumer, seq_versions, marketplace_config, + proposal_fetcher_config, ) .await?; if wait_for_orchestrator { @@ -1051,6 +1053,7 @@ pub mod testing { auction_results_provider: Arc::new(SolverAuctionResultsProvider::default()), fallback_builder_url: marketplace_builder_url, }, + Default::default(), ) .await .unwrap() diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 4ddf31fff..c49492998 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -179,6 +179,7 @@ where }), fallback_builder_url: opt.fallback_builder_url, }; + let proposal_fetcher_config = opt.proposal_fetcher_config; // Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in // a special way, in order to populate the API with consensus metrics. Otherwise, we initialize @@ -227,6 +228,7 @@ where opt.is_da, opt.identity, marketplace_config, + proposal_fetcher_config, ) .await } @@ -246,6 +248,7 @@ where opt.is_da, opt.identity, marketplace_config, + proposal_fetcher_config, ) .await? } diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index 07f78614b..88544ec0f 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -19,7 +19,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey}; use libp2p::Multiaddr; use url::Url; -use crate::{api, persistence}; +use crate::{api, context::ProposalFetcherConfig, persistence}; // This options struct is a bit unconventional. The sequencer has multiple optional modules which // can be added, in any combination, to the service. These include, for example, the API server. @@ -376,6 +376,9 @@ pub struct Options { #[clap(flatten)] pub identity: Identity, + + #[clap(flatten)] + pub proposal_fetcher_config: ProposalFetcherConfig, } impl Options { diff --git a/sequencer/src/persistence/sql.rs b/sequencer/src/persistence/sql.rs index 4cd370438..a736d1c0a 100644 --- a/sequencer/src/persistence/sql.rs +++ b/sequencer/src/persistence/sql.rs @@ -377,8 +377,15 @@ impl SequencerPersistence for Persistence { let values = leaf_chain .into_iter() .map(|(info, qc)| { + // The leaf may come with a large payload attached. We don't care about this payload + // because we already store it separately, as part of the DA proposal. Storing it + // here contributes to load on the DB for no reason, so we remove it before + // serializing the leaf. + let mut leaf = info.leaf.clone(); + leaf.unfill_block_payload(); + let view = qc.view_number.u64() as i64; - let leaf_bytes = bincode::serialize(&info.leaf)?; + let leaf_bytes = bincode::serialize(&leaf)?; let qc_bytes = bincode::serialize(&qc)?; Ok((view, leaf_bytes, qc_bytes)) }) @@ -800,6 +807,10 @@ async fn collect_garbage( if let Some(proposal) = da_proposals.remove(&view) { let payload = Payload::from_bytes(&proposal.encoded_transactions, &proposal.metadata); leaf.fill_block_payload_unchecked(payload); + } else if view == ViewNumber::genesis().u64() { + // We don't get a DA proposal for the genesis view, but we know what the payload always + // is. + leaf.fill_block_payload_unchecked(Payload::empty().0); } else { tracing::debug!(view, "DA proposal not available at decide"); }