Skip to content

Commit

Permalink
fix(sync): pending block retrying mechanism & flaky tests (#304)
Browse files Browse the repository at this point in the history
Co-authored-by: Trantorian1 <[email protected]>
  • Loading branch information
cchudant and Trantorian1 authored Oct 7, 2024
1 parent bd185a2 commit 68a60dd
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 104 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- fix(sync): pending block retrying mechanism
- fix:(tests): Add testing feature to mc-db dev dependency (#294)
- feat: new crate gateway client & server
- test: Starknet-js basic tests added
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ mockall = "0.13.0"
serial_test = "3.1.1"
itertools = "0.13.0"
regex = "1.10.5"
bytes = "1.6.0"

[patch.crates-io]
starknet-core = { git = "https://github.com/kasarlabs/starknet-rs.git", branch = "fork" }
Expand Down
1 change: 1 addition & 0 deletions crates/client/gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ starknet-types-core.workspace = true

# Other
anyhow.workspace = true
bytes.workspace = true
hyper.workspace = true
log.workspace = true
reqwest.workspace = true
Expand Down
33 changes: 10 additions & 23 deletions crates/client/gateway/src/client/methods.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
use std::{borrow::Cow, sync::Arc};

use super::{builder::FeederClient, request_builder::RequestBuilder};
use crate::error::SequencerError;
use mp_block::{BlockId, BlockTag};
use mp_class::{CompressedLegacyContractClass, ContractClass, FlattenedSierraClass};
use starknet_core::types::{contract::legacy::LegacyContractClass, Felt};

use crate::error::{SequencerError, StarknetError};

use super::{builder::FeederClient, request_builder::RequestBuilder};

use mp_gateway::{
block::{ProviderBlock, ProviderBlockPending, ProviderBlockPendingMaybe},
state_update::{
ProviderStateUpdate, ProviderStateUpdatePending, ProviderStateUpdatePendingMaybe, ProviderStateUpdateWithBlock,
ProviderStateUpdateWithBlockPending, ProviderStateUpdateWithBlockPendingMaybe,
},
};
use starknet_core::types::{contract::legacy::LegacyContractClass, Felt};
use std::{borrow::Cow, sync::Arc};

impl FeederClient {
pub async fn get_block(&self, block_id: BlockId) -> Result<ProviderBlockPendingMaybe, SequencerError> {
Expand Down Expand Up @@ -76,25 +72,16 @@ impl FeederClient {
.with_block_id(block_id)
.with_class_hash(class_hash);

let response = request.send_get_raw().await?;
let status = response.status();
if status == reqwest::StatusCode::INTERNAL_SERVER_ERROR || status == reqwest::StatusCode::BAD_REQUEST {
let error = match response.json::<StarknetError>().await {
Ok(e) => SequencerError::StarknetError(e),
Err(e) if e.is_decode() => SequencerError::InvalidStarknetErrorVariant(e),
Err(e) => SequencerError::ReqwestError(e),
};
return Err(error);
}

let bytes = response.bytes().await?;
match serde_json::from_slice::<FlattenedSierraClass>(&bytes) {
match request.send_get::<FlattenedSierraClass>().await {
Ok(class_sierra) => Ok(ContractClass::Sierra(Arc::new(class_sierra))),
Err(_) => {
let class_legacy = serde_json::from_slice::<LegacyContractClass>(&bytes)?;
Err(SequencerError::DeserializeBody { serde_error: _, body }) => {
// if it failed with flattebed sierra, it might be a legacy class.
let class_legacy = serde_json::from_slice::<LegacyContractClass>(&body)
.map_err(|serde_error| SequencerError::DeserializeBody { serde_error, body })?;
let class_compressed: CompressedLegacyContractClass = class_legacy.compress()?.into();
Ok(ContractClass::Legacy(Arc::new(class_compressed)))
}
Err(err) => Err(err),
}
}
}
Expand Down
23 changes: 13 additions & 10 deletions crates/client/gateway/src/client/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ async fn unpack<T>(response: reqwest::Response) -> Result<T, SequencerError>
where
T: ::serde::de::DeserializeOwned,
{
let status = response.status();
if status == reqwest::StatusCode::INTERNAL_SERVER_ERROR || status == reqwest::StatusCode::BAD_REQUEST {
let error = match response.json::<StarknetError>().await {
Ok(e) => SequencerError::StarknetError(e),
Err(e) if e.is_decode() => SequencerError::InvalidStarknetErrorVariant(e),
Err(e) => SequencerError::ReqwestError(e),
};
return Err(error);
} else if status == reqwest::StatusCode::TOO_MANY_REQUESTS {
let http_status = response.status();
if http_status == reqwest::StatusCode::TOO_MANY_REQUESTS {
return Err(SequencerError::StarknetError(StarknetError::rate_limited()));
} else if !http_status.is_success() {
let body = response.bytes().await?;
let starknet_error = serde_json::from_slice::<StarknetError>(&body)
.map_err(|serde_error| SequencerError::InvalidStarknetError { http_status, serde_error, body })?;

return Err(starknet_error.into());
}

response.json::<T>().await.map_err(SequencerError::InvalidStarknetErrorVariant)
let body = response.bytes().await?;
let res =
serde_json::from_slice(&body).map_err(|serde_error| SequencerError::DeserializeBody { serde_error, body })?;

Ok(res)
}
16 changes: 9 additions & 7 deletions crates/client/gateway/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use bytes::Bytes;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use starknet_core::types::Felt;
use starknet_types_core::felt::FromStrError;

#[derive(Debug, thiserror::Error)]
pub enum SequencerError {
#[error("starknet error: {0}")]
#[error("Starknet error: {0:#}")]
StarknetError(#[from] StarknetError),
#[error("reqwest error: {0}")]
#[error("Reqwest error: {0:#}")]
ReqwestError(#[from] reqwest::Error),
#[error("error deserializing response: {0}")]
SerdeError(#[from] serde_json::Error),
#[error("error compressing class: {0}")]
#[error("Error deserializing response: {serde_error:#}")]
DeserializeBody { serde_error: serde_json::Error, body: Bytes },
#[error("Error compressing class: {0:#}")]
CompressError(#[from] starknet_core::types::contract::CompressProgramError),
#[error("invalid error variant: {0}")]
InvalidStarknetErrorVariant(reqwest::Error),
#[error("Failed to parse returned error with http status {http_status}: {serde_error:#}")]
InvalidStarknetError { http_status: StatusCode, serde_error: serde_json::Error, body: Bytes },
}

#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
Expand Down
71 changes: 49 additions & 22 deletions crates/client/sync/src/fetch/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,36 +86,53 @@ impl From<FetchBlockId> for starknet_core::types::BlockId {
}

pub async fn fetch_pending_block_and_updates(
parent_block_hash: Felt,
chain_id: &ChainId,
provider: &FeederClient,
) -> Result<UnverifiedPendingFullBlock, FetchError> {
) -> Result<Option<UnverifiedPendingFullBlock>, FetchError> {
let block_id = FetchBlockId::Pending;

let sw = PerfStopwatch::new();
let (state_update, block) = retry(
let block = retry(
|| async {
let (state_update, block) = provider
.get_state_update_with_block(block_id.into())
.await
.map(ProviderStateUpdateWithBlockPendingMaybe::as_update_and_block)?;
Ok((state_update, block))
match provider.get_state_update_with_block(block_id.into()).await {
Ok(block) => Ok(Some(block)),
// Ignore (this is the case where we returned a closed block when we asked for a pending one)
// When the FGW does not have a pending block, it can return the latest block instead
Err(SequencerError::DeserializeBody { body: _, serde_error }) => {
log::debug!("Serde error when fetching the pending block: {serde_error:#}");
Ok(None)
}
Err(err) => Err(err),
}
},
MAX_RETRY,
BASE_DELAY,
)
.await?;

let class_update = fetch_class_updates(chain_id, state_update.state_diff(), block_id, provider).await?;
let Some(block) = block else { return Ok(None) };

let (state_update, block) = block.as_update_and_block();
let (state_update, block) = (
state_update.pending_owned().expect("Block should be pending (checked via serde above)"),
block.pending_owned().expect("Block should be pending (checked via serde above)"),
);

if block.parent_block_hash != parent_block_hash {
log::debug!(
"Fetched a pending block, but mismatched parent block hash: parent_block_hash={:#x}",
block.parent_block_hash
);
return Ok(None);
}
let class_update = fetch_class_updates(chain_id, &state_update.state_diff, block_id, provider).await?;

stopwatch_end!(sw, "fetching {:?}: {:?}", block_id);

let converted = convert_sequencer_block_pending(
block.pending_owned().expect("Block called on block tag pending should be pending"),
state_update.pending_owned().expect("State update called on block tag pending should be pending"),
class_update,
)
.context("Parsing the FGW pending block format")?;
Ok(converted)
let converted = convert_sequencer_block_pending(block, state_update, class_update)
.context("Parsing the FGW pending block format")?;

Ok(Some(converted))
}

pub async fn fetch_block_and_updates(
Expand Down Expand Up @@ -254,7 +271,7 @@ async fn fetch_class(
provider: &FeederClient,
) -> Result<(Felt, ContractClass), SequencerError> {
let contract_class = provider.get_class_by_hash(class_hash, block_id.into()).await?;
log::debug!("Got the contract class {:?}", contract_class);
log::debug!("Got the contract class {:?}", class_hash);
Ok((class_hash, contract_class))
}

Expand Down Expand Up @@ -346,9 +363,14 @@ mod test_l2_fetchers {
// Mock class hash
ctx.mock_class_hash("../../../cairo/target/dev/madara_contracts_TestContract.contract_class.json");

let result = fetch_pending_block_and_updates(&ctx.backend.chain_config().chain_id, &ctx.provider).await;
let result = fetch_pending_block_and_updates(
Felt::from_hex_unchecked("0x1db054847816dbc0098c88915430c44da2c1e3f910fbcb454e14282baba0e75"),
&ctx.backend.chain_config().chain_id,
&ctx.provider,
)
.await;

let pending_block = result.expect("Failed to fetch pending block");
let pending_block = result.expect("Failed to fetch pending block").expect("No pending block");

assert!(
matches!(pending_block, UnverifiedPendingFullBlock { .. }),
Expand Down Expand Up @@ -426,7 +448,12 @@ mod test_l2_fetchers {
// Mock a "pending block not found" scenario
ctx.mock_block_pending_not_found();

let result = fetch_pending_block_and_updates(&ctx.backend.chain_config().chain_id, &ctx.provider).await;
let result = fetch_pending_block_and_updates(
Felt::from_hex_unchecked("0x1db054847816dbc0098c88915430c44da2c1e3f910fbcb454e14282baba0e75"),
&ctx.backend.chain_config().chain_id,
&ctx.provider,
)
.await;

assert!(
matches!(
Expand All @@ -436,7 +463,7 @@ mod test_l2_fetchers {
..
})))
),
"Expected BlockNotFound error, but got: {:?}",
"Expected no block, but got: {:?}",
result
);
}
Expand Down Expand Up @@ -602,7 +629,7 @@ mod test_l2_fetchers {
let result = ctx.provider.get_state_update_with_block(FetchBlockId::BlockN(5).into()).await;

assert!(
matches!(result, Err(SequencerError::InvalidStarknetErrorVariant(_))),
matches!(result, Err(SequencerError::DeserializeBody { .. })),
"Expected error about mismatched data, but got: {:?}",
result
);
Expand Down
2 changes: 1 addition & 1 deletion crates/client/sync/src/fetch/fetchers_real_fgw_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ fn client_mainnet_fixture() -> FeederClient {
#[rstest]
#[tokio::test]
async fn test_can_fetch_pending_block(client_mainnet_fixture: FeederClient) {
let block = fetch_pending_block_and_updates(&ChainId::Mainnet, &client_mainnet_fixture).await.unwrap();
let block = fetch_pending_block_and_updates(Felt::ZERO, &ChainId::Mainnet, &client_mainnet_fixture).await.unwrap();
// ignore as we can't check much here :/
drop(block);
}
Expand Down
24 changes: 17 additions & 7 deletions crates/client/sync/src/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use mc_db::MadaraStorageError;
use mc_gateway::client::builder::FeederClient;
use mc_gateway::error::SequencerError;
use mc_telemetry::{TelemetryHandle, VerbosityLevel};
use mp_block::BlockId;
use mp_block::BlockTag;
use mp_utils::{channel_wait_or_graceful_shutdown, wait_or_graceful_shutdown, PerfStopwatch};
use starknet_api::core::ChainId;
use starknet_types_core::felt::Felt;
Expand Down Expand Up @@ -61,7 +63,7 @@ async fn l2_verify_and_apply_task(
trim_hash(&header.global_state_root)
);
log::debug!(
"Block import #{} ({}) has state root {}",
"Block import #{} ({:#x}) has state root {:#x}",
header.block_number,
block_hash,
header.global_state_root
Expand Down Expand Up @@ -140,16 +142,24 @@ async fn l2_pending_block_task(
return Ok(());
}

log::debug!("start pending block poll");
log::debug!("Start pending block poll");

let mut interval = tokio::time::interval(pending_block_poll_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
while wait_or_graceful_shutdown(interval.tick()).await.is_some() {
log::debug!("getting pending block...");

let block = fetch_pending_block_and_updates(&backend.chain_config().chain_id, &provider)
.await
.context("Getting pending block from FGW")?;
log::debug!("Getting pending block...");

let current_block_hash = backend
.get_block_hash(&BlockId::Tag(BlockTag::Latest))
.context("Getting latest block hash")?
.unwrap_or(/* genesis parent block hash */ Felt::ZERO);
let Some(block) =
fetch_pending_block_and_updates(current_block_hash, &backend.chain_config().chain_id, &provider)
.await
.context("Getting pending block from FGW")?
else {
continue;
};

// HACK(see issue #239): The latest block in db may not match the pending parent block hash
// Just silently ignore it for now and move along.
Expand Down
Loading

0 comments on commit 68a60dd

Please sign in to comment.