Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decrease load on DB #2291

Merged
merged 4 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sequencer/api/public-env-vars.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ variables = [
"ESPRESSO_SEQUENCER_POSTGRES_PRUNE",
"ESPRESSO_SEQUENCER_POSTGRES_USE_TLS",
"ESPRESSO_SEQUENCER_POSTGRES_USER",
"ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY",
"ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
"ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
"ESPRESSO_SEQUENCER_PRUNER_BATCH_SIZE",
"ESPRESSO_SEQUENCER_PRUNER_INTERVAL",
"ESPRESSO_SEQUENCER_PRUNER_MAX_USAGE",
Expand Down
203 changes: 122 additions & 81 deletions sequencer/src/context.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::{fmt::Display, sync::Arc};

use anyhow::Context;
use async_broadcast::{broadcast, Receiver, Sender};
use async_lock::RwLock;
use clap::Parser;
use committable::{Commitment, Committable};
use derivative::Derivative;
use espresso_types::{
parse_duration,
v0::traits::{EventConsumer as PersistenceEventConsumer, SequencerPersistence},
NodeState, PubKey, Transaction, ValidatedState,
};
Expand Down Expand Up @@ -52,6 +55,37 @@ use crate::{
/// The consensus handle
pub type Consensus<N, P, V> = SystemContextHandle<SeqTypes, Node<N, P>, V>;

#[derive(Clone, Copy, Debug, Parser)]
pub struct ProposalFetcherConfig {
#[clap(
long = "proposal-fetcher-num-workers",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_NUM_WORKERS",
default_value = "2"
)]
pub num_workers: usize,

#[clap(
long = "proposal-fetcher-channel-capacity",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_CHANNEL_CAPACITY",
default_value = "100"
)]
pub channel_capacity: usize,

#[clap(
long = "proposal-fetcher-fetch-timeout",
env = "ESPRESSO_SEQUENCER_PROPOSAL_FETCHER_FETCH_TIMEOUT",
default_value = "2s",
value_parser = parse_duration,
)]
pub fetch_timeout: Duration,
}

impl Default for ProposalFetcherConfig {
fn default() -> Self {
Self::parse_from(std::iter::empty::<String>())
}
}

/// The sequencer context contains a consensus handle and other sequencer specific information.
#[derive(Derivative, Clone)]
#[derivative(Debug(bound = ""))]
Expand Down Expand Up @@ -100,6 +134,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
event_consumer: impl PersistenceEventConsumer + 'static,
_: V,
marketplace_config: MarketplaceConfig<SeqTypes, Node<N, P>>,
proposal_fetcher_cfg: ProposalFetcherConfig,
) -> anyhow::Result<Self> {
let config = &network_config.config;
let pub_key = validator_config.public_key;
Expand Down Expand Up @@ -174,6 +209,7 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
validator_config,
event_consumer,
anchor_view,
proposal_fetcher_cfg,
)
.with_task_list(tasks))
}
Expand All @@ -191,11 +227,12 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
validator_config: ValidatorConfig<<SeqTypes as NodeType>::SignatureKey>,
event_consumer: impl PersistenceEventConsumer + 'static,
anchor_view: Option<ViewNumber>,
proposal_fetcher_cfg: ProposalFetcherConfig,
) -> Self {
let events = handle.event_stream();

let node_id = node_state.node_id;
let ctx = Self {
let mut ctx = Self {
handle: Arc::new(RwLock::new(handle)),
state_signer: Arc::new(state_signer),
tasks: Default::default(),
Expand All @@ -207,20 +244,35 @@ impl<N: ConnectedNetwork<PubKey>, P: SequencerPersistence, V: Versions> Sequence
validator_config,
};

// Spawn event handling loops. These can run in the background (detached from `ctx.tasks`
// and thus not explicitly cancelled on `shut_down`) because they each exit on their own
// when the consensus event stream ends.
spawn(fetch_proposals(ctx.handle.clone(), persistence.clone()));
spawn(handle_events(
node_id,
events,
persistence,
ctx.state_signer.clone(),
external_event_handler,
Some(event_streamer.clone()),
event_consumer,
anchor_view,
));
// Spawn proposal fetching tasks.
let (send, recv) = broadcast(proposal_fetcher_cfg.channel_capacity);
ctx.spawn("proposal scanner", scan_proposals(ctx.handle.clone(), send));
for i in 0..proposal_fetcher_cfg.num_workers {
ctx.spawn(
format!("proposal fetcher {i}"),
fetch_proposals(
ctx.handle.clone(),
persistence.clone(),
recv.clone(),
proposal_fetcher_cfg.fetch_timeout,
),
);
}

// Spawn event handling loop.
ctx.spawn(
"event handler",
handle_events(
node_id,
events,
persistence,
ctx.state_signer.clone(),
external_event_handler,
Some(event_streamer.clone()),
event_consumer,
anchor_view,
),
);

ctx
}
Expand Down Expand Up @@ -424,15 +476,14 @@ async fn handle_events<V: Versions>(
}

#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
async fn scan_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
fetcher: Sender<(ViewNumber, Commitment<Leaf<SeqTypes>>)>,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
let mut tasks = TaskList::default();
let mut events = consensus.read().await.event_stream();
while let Some(event) = events.next().await {
let EventType::QuorumProposal { proposal, .. } = event.event else {
Expand All @@ -442,78 +493,63 @@ async fn fetch_proposals<N, P, V>(
// to the anchor. This allows state replay from the decided state.
let parent_view = proposal.data.justify_qc.view_number;
let parent_leaf = proposal.data.justify_qc.data.leaf_commit;
tasks.spawn_short_lived(
format!("fetch proposal {parent_view:?},{parent_leaf}"),
fetch_proposal_chain(
consensus.clone(),
persistence.clone(),
parent_view,
parent_leaf,
),
);
fetcher
.broadcast_direct((parent_view, parent_leaf))
.await
.ok();
}
tasks.shut_down();
}

#[tracing::instrument(skip(consensus, persistence))]
async fn fetch_proposal_chain<N, P, V>(
#[tracing::instrument(skip_all)]
async fn fetch_proposals<N, P, V>(
consensus: Arc<RwLock<Consensus<N, P, V>>>,
persistence: Arc<impl SequencerPersistence>,
mut view: ViewNumber,
mut leaf: Commitment<Leaf<SeqTypes>>,
mut scanner: Receiver<(ViewNumber, Commitment<Leaf<SeqTypes>>)>,
fetch_timeout: Duration,
) where
N: ConnectedNetwork<PubKey>,
P: SequencerPersistence,
V: Versions,
{
while view > load_anchor_view(&*persistence).await {
match persistence.load_quorum_proposal(view).await {
Ok(proposal) => {
// If we already have the proposal in storage, keep traversing the chain to its
// parent.
view = proposal.data.justify_qc.view_number;
leaf = proposal.data.justify_qc.data.leaf_commit;
continue;
}
Err(err) => {
tracing::info!(?view, %leaf, "proposal missing from storage; fetching from network: {err:#}");
let sender = scanner.new_sender();
while let Some((view, leaf)) = scanner.next().await {
let span = tracing::warn_span!("fetch proposal", ?view, %leaf);
let res: anyhow::Result<()> = async {
let anchor_view = load_anchor_view(&*persistence).await;
if view <= anchor_view {
tracing::debug!(?anchor_view, "skipping already-decided proposal");
return Ok(());
}
}

let future =
match consensus
.read()
.await
.request_proposal(view, EpochNumber::genesis(), leaf)
{
Ok(future) => future,
match persistence.load_quorum_proposal(view).await {
Ok(proposal) => {
// If we already have the proposal in storage, keep traversing the chain to its
// parent.
let view = proposal.data.justify_qc.view_number;
let leaf = proposal.data.justify_qc.data.leaf_commit;
sender.broadcast_direct((view, leaf)).await.ok();
return Ok(());
}
Err(err) => {
tracing::info!(?view, %leaf, "failed to request proposal: {err:#}");
sleep(Duration::from_secs(1)).await;
continue;
tracing::info!("proposal missing from storage; fetching from network: {err:#}");
}
};
let proposal = match timeout(Duration::from_secs(30), future).await {
Ok(Ok(proposal)) => proposal,
Ok(Err(err)) => {
tracing::info!("error fetching proposal: {err:#}");
sleep(Duration::from_secs(1)).await;
continue;
}
Err(_) => {
tracing::info!("timed out fetching proposal");
sleep(Duration::from_secs(1)).await;
continue;
}
};

while let Err(err) = persistence.append_quorum_proposal(&proposal).await {
tracing::warn!("error saving fetched proposal: {err:#}");
sleep(Duration::from_secs(1)).await;
}
let future =
consensus
.read()
.await
.request_proposal(view, EpochNumber::genesis(), leaf)?;
let proposal = timeout(fetch_timeout, future)
.await
.context("timed out fetching proposal")?
.context("error fetching proposal")?;
persistence
.append_quorum_proposal(&proposal)
.await
.context("error saving fetched proposal")?;

// Add the fetched leaf to HotShot state, so consensus can make use of it.
{
// Add the fetched leaf to HotShot state, so consensus can make use of it.
let leaf = Leaf::from_quorum_proposal(&proposal.data);
let handle = consensus.read().await;
let consensus = handle.consensus();
Expand All @@ -533,20 +569,25 @@ async fn fetch_proposal_chain<N, P, V>(
},
};
if let Err(err) = consensus.update_validated_state_map(view, v) {
tracing::warn!(?view, "unable to update validated state map: {err:#}");
tracing::warn!("unable to update validated state map: {err:#}");
}
consensus
.update_saved_leaves(leaf, &handle.hotshot.upgrade_lock)
.await;
tracing::debug!(
?view,
"added view to validated state map view proposal fetcher"
);
tracing::debug!("added view to validated state map view proposal fetcher");
}
}

view = proposal.data.justify_qc.view_number;
leaf = proposal.data.justify_qc.data.leaf_commit;
Ok(())
}
.instrument(span)
.await;
if let Err(err) = res {
tracing::warn!("failed to fetch proposal: {err:#}");

// If we fail fetching the proposal, don't let it clog up the fetching task. Just push
// it back onto the queue and move onto the next proposal.
sender.broadcast_direct((view, leaf)).await.ok();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this create a busy loop if the task is failing consistently?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it kind of does. I was thinking we would be rate limited by the finite number of workers and time spent waiting on I/O for each failure. But I guess that's not guaranteed, depending on what the failure is. I'll add a sleep here

}
}
}

Expand Down
5 changes: 4 additions & 1 deletion sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod message_compat_tests;
use anyhow::Context;
use async_lock::RwLock;
use catchup::StatePeers;
use context::SequencerContext;
use context::{ProposalFetcherConfig, SequencerContext};
use espresso_types::{
traits::EventConsumer, BackoffParams, L1Client, L1ClientOptions, NodeState, PubKey, SeqTypes,
SolverAuctionResultsProvider, ValidatedState,
Expand Down Expand Up @@ -203,6 +203,7 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
is_da: bool,
identity: Identity,
marketplace_config: MarketplaceConfig<SeqTypes, Node<network::Production, P::Persistence>>,
proposal_fetcher_config: ProposalFetcherConfig,
) -> anyhow::Result<SequencerContext<network::Production, P::Persistence, V>> {
// Expose git information via status API.
metrics
Expand Down Expand Up @@ -550,6 +551,7 @@ pub async fn init_node<P: PersistenceOptions, V: Versions>(
event_consumer,
seq_versions,
marketplace_config,
proposal_fetcher_config,
)
.await?;
if wait_for_orchestrator {
Expand Down Expand Up @@ -1066,6 +1068,7 @@ pub mod testing {
auction_results_provider: Arc::new(SolverAuctionResultsProvider::default()),
fallback_builder_url: marketplace_builder_url,
},
Default::default(),
)
.await
.unwrap()
Expand Down
3 changes: 3 additions & 0 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ where
}),
fallback_builder_url: opt.fallback_builder_url,
};
let proposal_fetcher_config = opt.proposal_fetcher_config;

// Initialize HotShot. If the user requested the HTTP module, we must initialize the handle in
// a special way, in order to populate the API with consensus metrics. Otherwise, we initialize
Expand Down Expand Up @@ -227,6 +228,7 @@ where
opt.is_da,
opt.identity,
marketplace_config,
proposal_fetcher_config,
)
.await
}
Expand All @@ -246,6 +248,7 @@ where
opt.is_da,
opt.identity,
marketplace_config,
proposal_fetcher_config,
)
.await?
}
Expand Down
5 changes: 4 additions & 1 deletion sequencer/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use hotshot_types::{light_client::StateSignKey, signature_key::BLSPrivKey};
use libp2p::Multiaddr;
use url::Url;

use crate::{api, persistence};
use crate::{api, context::ProposalFetcherConfig, persistence};

// This options struct is a bit unconventional. The sequencer has multiple optional modules which
// can be added, in any combination, to the service. These include, for example, the API server.
Expand Down Expand Up @@ -376,6 +376,9 @@ pub struct Options {

#[clap(flatten)]
pub identity: Identity,

#[clap(flatten)]
pub proposal_fetcher_config: ProposalFetcherConfig,
}

impl Options {
Expand Down
Loading