Skip to content

Commit

Permalink
chore: Generic data prims EngineSyncController (#13037)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <[email protected]>
  • Loading branch information
TropicalDog17 and mattsse authored Dec 11, 2024
1 parent 6b7bf2a commit b34f23d
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use alloy_primitives::{BlockNumber, B256};
use futures::FutureExt;
use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
EthBlockClient,
BlockClient,
};
use reth_node_types::{BodyTy, HeaderTy};
use reth_primitives::{BlockBody, EthPrimitives, NodePrimitives, SealedBlock};
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
Expand All @@ -35,7 +36,7 @@ use tracing::trace;
pub(crate) struct EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: EthBlockClient,
Client: BlockClient,
{
/// A downloader that can download full blocks from the network.
full_block_client: FullBlockClient<Client>,
Expand All @@ -51,10 +52,10 @@ where
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent>,
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock<HeaderTy<N>, BodyTy<N>>>>,
/// Max block after which the consensus engine would terminate the sync. Used for debugging
/// purposes.
max_block: Option<BlockNumber>,
Expand All @@ -65,7 +66,7 @@ where
impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: EthBlockClient + 'static,
Client: BlockClient,
{
/// Create a new instance
pub(crate) fn new(
Expand All @@ -74,7 +75,7 @@ where
pipeline_task_spawner: Box<dyn TaskSpawner>,
max_block: Option<BlockNumber>,
chain_spec: Arc<N::ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
event_sender: EventSender<BeaconConsensusEngineEvent<N::Primitives>>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
Expand All @@ -92,7 +93,13 @@ where
metrics: EngineSyncMetrics::default(),
}
}
}

impl<N, Client> EngineSyncController<N, Client>
where
N: ProviderNodeTypes,
Client: BlockClient<Header = HeaderTy<N>, Body = BodyTy<N>> + 'static,
{
/// Sets the metrics for the active downloads
fn update_block_download_metrics(&self) {
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
Expand Down Expand Up @@ -234,7 +241,7 @@ where
/// Advances the pipeline state.
///
/// This checks for the result in the channel, or returns pending if the pipeline is idle.
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
fn poll_pipeline(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
let res = match self.pipeline_state {
PipelineState::Idle(_) => return Poll::Pending,
PipelineState::Running(ref mut fut) => {
Expand All @@ -259,7 +266,7 @@ where

/// This will spawn the pipeline if it is idle and a target is set or if the pipeline is set to
/// run continuously.
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> {
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent<N::Primitives>> {
match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take()?;
Expand All @@ -286,7 +293,7 @@ where
}

/// Advances the sync process.
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent> {
pub(crate) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<EngineSyncEvent<N::Primitives>> {
// try to spawn a pipeline if a target is set
if let Some(event) = self.try_spawn_pipeline() {
return Poll::Ready(event)
Expand Down Expand Up @@ -423,7 +430,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::poll;
use reth_chainspec::{ChainSpec, ChainSpecBuilder, MAINNET};
use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient};
use reth_network_p2p::{either::Either, test_utils::TestFullBlockClient, EthBlockClient};
use reth_primitives::{BlockBody, SealedHeader};
use reth_provider::{
test_utils::{create_test_provider_factory_with_chain_spec, MockNodeTypesWithDB},
Expand Down

0 comments on commit b34f23d

Please sign in to comment.