diff --git a/core/node/node_framework/src/implementations/layers/data_availability_fetcher.rs b/core/node/node_framework/src/implementations/layers/data_availability_fetcher.rs index 67fdb647ed78..c73105c0ecd1 100644 --- a/core/node/node_framework/src/implementations/layers/data_availability_fetcher.rs +++ b/core/node/node_framework/src/implementations/layers/data_availability_fetcher.rs @@ -1,8 +1,10 @@ +use zksync_da_client::DataAvailabilityClient; use zksync_node_sync::data_availability_fetcher::DataAvailabilityFetcher; use zksync_types::{Address, L2ChainId}; use crate::{ implementations::resources::{ + da_client::DAClientResource, eth_interface::{EthInterfaceResource, GatewayEthInterfaceResource}, healthcheck::AppHealthCheckResource, main_node_client::MainNodeClientResource, @@ -26,7 +28,7 @@ pub struct DataAvailabilityFetcherLayer { pub struct Input { pub master_pool: PoolResource, pub main_node_client: MainNodeClientResource, - pub l1_client: EthInterfaceResource, + pub da_client: DAClientResource, #[context(default)] pub app_health: AppHealthCheckResource, } @@ -59,10 +61,10 @@ impl WiringLayer for DataAvailabilityFetcherLayer { async fn wire(self, input: Self::Input) -> Result { let pool = input.master_pool.get().await?; let MainNodeClientResource(client) = input.main_node_client; - let EthInterfaceResource(l1_client) = input.l1_client; + let DAClientResource(da_client) = input.da_client; tracing::info!("Running data availability fetcher."); - let task = DataAvailabilityFetcher::new(client, pool); + let task = DataAvailabilityFetcher::new(client, pool, da_client); // Insert healthcheck input diff --git a/core/node/node_sync/src/data_availability_fetcher/mod.rs b/core/node/node_sync/src/data_availability_fetcher/mod.rs index f0e56ff4cb61..ed0abb64bca1 100644 --- a/core/node/node_sync/src/data_availability_fetcher/mod.rs +++ b/core/node/node_sync/src/data_availability_fetcher/mod.rs @@ -2,7 +2,7 @@ use std::time::Duration; -use anyhow::Context as _; +use anyhow::{bail, Context as _}; use serde::Serialize; #[cfg(test)] use tokio::sync::mpsc; @@ -32,6 +32,16 @@ enum DataAvailabilityFetcherHealth { }, } +#[derive(Debug, thiserror::Error)] +pub(crate) enum DataAvailabilityFetcherError { + #[error("error fetching data from main node: {0}")] + Rpc(#[from] anyhow::Error), + #[error("error calling DA layer: {0}")] + DALayer(#[from] anyhow::Error), + #[error("internal error")] + Internal(#[from] anyhow::Error), +} + #[derive(Debug)] enum StepOutcome { UpdatedBatch(L1BatchNumber), @@ -122,8 +132,13 @@ impl DataAvailabilityFetcher { }) } - async fn step(&mut self) -> Result { - let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else { + async fn step(&mut self) -> Result { + let l1_batch_to_fetch = self + .get_batch_to_fetch() + .await + .map_err(|err| DataAvailabilityFetcherError::Internal(err))?; + + let Some(l1_batch_to_fetch) = l1_batch_to_fetch else { return Ok(StepOutcome::NoProgress); }; @@ -131,7 +146,16 @@ impl DataAvailabilityFetcher { let da_details = self .client .get_data_availability_details(l1_batch_to_fetch) - .await?; + .await + .map_err(|err| DataAvailabilityFetcherError::Rpc(err.into()))?; + + if da_details.pubdata_type.to_string() != self.da_client.name() { + return Err(DataAvailabilityFetcherError::Internal(anyhow::anyhow!( + "DA client mismatch, used in config: {}, received from main node: {}", + self.da_client.name(), + da_details.pubdata_type + ))); + } if da_details.inclusion_data.is_none() { return Ok(StepOutcome::NoInclusionDataFromMainNode); @@ -140,7 +164,12 @@ impl DataAvailabilityFetcher { let inclusion_data = self .da_client .get_inclusion_data(da_details.blob_id.as_str()) - .await?; + .await + .map_err(|err| { + DataAvailabilityFetcherError::DALayer(anyhow::anyhow!( + "Error fetching inclusion data: {err}" + )) + })?; let Some(inclusion_data) = inclusion_data else { Ok(StepOutcome::UnableToFetchInclusionData) @@ -149,7 +178,8 @@ impl DataAvailabilityFetcher { let mut connection = self .pool .connection_tagged("data_availability_fetcher") - .await?; + .await + .map_err(|err| DataAvailabilityFetcherError::Internal(err.generalize()))?; connection .data_availability_dal() .insert_l1_batch_da( @@ -159,7 +189,8 @@ impl DataAvailabilityFetcher { da_details.pubdata_type, Some(inclusion_data.data.as_slice()), ) - .await?; + .await + .map_err(|err| DataAvailabilityFetcherError::Internal(err.generalize()))?; tracing::debug!( "Updated L1 batch #{} with DA blob id: {}", @@ -191,13 +222,25 @@ impl DataAvailabilityFetcher { self.update_health(last_updated_l1_batch); false } - Ok(StepOutcome::NoProgress | StepOutcome::RemoteHashMissing) => { + Ok(StepOutcome::NoProgress) | Ok(StepOutcome::NoInclusionDataFromMainNode) => { // Update health status even if no progress was made to timely clear a previously set // "affected" health. self.update_health(last_updated_l1_batch); true } - Err(err) if err.is_retriable() => { + Ok(StepOutcome::UnableToFetchInclusionData) => { + tracing::warn!( + "No inclusion data for the batch from DA layer, will retry later" + ); + self.update_health(last_updated_l1_batch); + true + } + Err(err) => { + match err { + DataAvailabilityFetcherError::Rpc(e) => {} + DataAvailabilityFetcherError::DALayer(e) => {} + DataAvailabilityFetcherError::Internal(e) => {} + } tracing::warn!( "Transient error in tree data fetcher, will retry after a delay: {err:?}" ); @@ -207,10 +250,6 @@ impl DataAvailabilityFetcher { self.health_updater.update(health.into()); true } - Err(err) => { - tracing::error!("Fatal error in tree data fetcher: {err:?}"); - return Err(err.into()); - } }; if need_to_sleep