Skip to content

Commit

Permalink
error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dimazhornyk committed Jan 7, 2025
1 parent d8a97f6 commit cf10dbc
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use zksync_da_client::DataAvailabilityClient;
use zksync_node_sync::data_availability_fetcher::DataAvailabilityFetcher;
use zksync_types::{Address, L2ChainId};

use crate::{
implementations::resources::{
da_client::DAClientResource,
eth_interface::{EthInterfaceResource, GatewayEthInterfaceResource},
healthcheck::AppHealthCheckResource,
main_node_client::MainNodeClientResource,
Expand All @@ -26,7 +28,7 @@ pub struct DataAvailabilityFetcherLayer {
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
pub main_node_client: MainNodeClientResource,
pub l1_client: EthInterfaceResource,
pub da_client: DAClientResource,
#[context(default)]
pub app_health: AppHealthCheckResource,
}
Expand Down Expand Up @@ -59,10 +61,10 @@ impl WiringLayer for DataAvailabilityFetcherLayer {
async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let pool = input.master_pool.get().await?;
let MainNodeClientResource(client) = input.main_node_client;
let EthInterfaceResource(l1_client) = input.l1_client;
let DAClientResource(da_client) = input.da_client;

tracing::info!("Running data availability fetcher.");
let task = DataAvailabilityFetcher::new(client, pool);
let task = DataAvailabilityFetcher::new(client, pool, da_client);

// Insert healthcheck
input
Expand Down
65 changes: 52 additions & 13 deletions core/node/node_sync/src/data_availability_fetcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::time::Duration;

use anyhow::Context as _;
use anyhow::{bail, Context as _};
use serde::Serialize;
#[cfg(test)]
use tokio::sync::mpsc;
Expand Down Expand Up @@ -32,6 +32,16 @@ enum DataAvailabilityFetcherHealth {
},
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum DataAvailabilityFetcherError {
#[error("error fetching data from main node: {0}")]
Rpc(#[from] anyhow::Error),
#[error("error calling DA layer: {0}")]
DALayer(#[from] anyhow::Error),
#[error("internal error")]
Internal(#[from] anyhow::Error),
}

#[derive(Debug)]
enum StepOutcome {
UpdatedBatch(L1BatchNumber),
Expand Down Expand Up @@ -122,16 +132,30 @@ impl DataAvailabilityFetcher {
})
}

async fn step(&mut self) -> Result<StepOutcome, anyhow::Error> {
let Some(l1_batch_to_fetch) = self.get_batch_to_fetch().await? else {
async fn step(&mut self) -> Result<StepOutcome, DataAvailabilityFetcherError> {
let l1_batch_to_fetch = self
.get_batch_to_fetch()
.await
.map_err(|err| DataAvailabilityFetcherError::Internal(err))?;

let Some(l1_batch_to_fetch) = l1_batch_to_fetch else {
return Ok(StepOutcome::NoProgress);
};

tracing::debug!("Fetching DA info for L1 batch #{l1_batch_to_fetch}");
let da_details = self
.client
.get_data_availability_details(l1_batch_to_fetch)
.await?;
.await
.map_err(|err| DataAvailabilityFetcherError::Rpc(err.into()))?;

if da_details.pubdata_type.to_string() != self.da_client.name() {
return Err(DataAvailabilityFetcherError::Internal(anyhow::anyhow!(
"DA client mismatch, used in config: {}, received from main node: {}",
self.da_client.name(),
da_details.pubdata_type
)));
}

if da_details.inclusion_data.is_none() {
return Ok(StepOutcome::NoInclusionDataFromMainNode);
Expand All @@ -140,7 +164,12 @@ impl DataAvailabilityFetcher {
let inclusion_data = self
.da_client
.get_inclusion_data(da_details.blob_id.as_str())
.await?;
.await
.map_err(|err| {
DataAvailabilityFetcherError::DALayer(anyhow::anyhow!(
"Error fetching inclusion data: {err}"
))
})?;

let Some(inclusion_data) = inclusion_data else {
Ok(StepOutcome::UnableToFetchInclusionData)
Expand All @@ -149,7 +178,8 @@ impl DataAvailabilityFetcher {
let mut connection = self
.pool
.connection_tagged("data_availability_fetcher")
.await?;
.await
.map_err(|err| DataAvailabilityFetcherError::Internal(err.generalize()))?;
connection
.data_availability_dal()
.insert_l1_batch_da(
Expand All @@ -159,7 +189,8 @@ impl DataAvailabilityFetcher {
da_details.pubdata_type,
Some(inclusion_data.data.as_slice()),
)
.await?;
.await
.map_err(|err| DataAvailabilityFetcherError::Internal(err.generalize()))?;

tracing::debug!(
"Updated L1 batch #{} with DA blob id: {}",
Expand Down Expand Up @@ -191,13 +222,25 @@ impl DataAvailabilityFetcher {
self.update_health(last_updated_l1_batch);
false
}
Ok(StepOutcome::NoProgress | StepOutcome::RemoteHashMissing) => {
Ok(StepOutcome::NoProgress) | Ok(StepOutcome::NoInclusionDataFromMainNode) => {
// Update health status even if no progress was made to timely clear a previously set
// "affected" health.
self.update_health(last_updated_l1_batch);
true
}
Err(err) if err.is_retriable() => {
Ok(StepOutcome::UnableToFetchInclusionData) => {
tracing::warn!(
"No inclusion data for the batch from DA layer, will retry later"
);
self.update_health(last_updated_l1_batch);
true
}
Err(err) => {
match err {
DataAvailabilityFetcherError::Rpc(e) => {}
DataAvailabilityFetcherError::DALayer(e) => {}
DataAvailabilityFetcherError::Internal(e) => {}
}
tracing::warn!(
"Transient error in tree data fetcher, will retry after a delay: {err:?}"
);
Expand All @@ -207,10 +250,6 @@ impl DataAvailabilityFetcher {
self.health_updater.update(health.into());
true
}
Err(err) => {
tracing::error!("Fatal error in tree data fetcher: {err:?}");
return Err(err.into());
}
};

if need_to_sleep
Expand Down

0 comments on commit cf10dbc

Please sign in to comment.