Skip to content

Commit

Permalink
fix(block_production): continue pending block now reexecutes the prev…
Browse files Browse the repository at this point in the history
…ious transactions (#411)

Co-authored-by: antiyro <[email protected]>
  • Loading branch information
cchudant and antiyro authored Dec 17, 2024
1 parent d3a8367 commit 237f31c
Show file tree
Hide file tree
Showing 48 changed files with 802 additions and 648 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Next release

- fix(block_production): continue pending block now reexecutes the previous transactions
- feat(services): reworked Madara services for better cancellation control
- feat: fetch eth/strk price and sync strk gas price
- feat(block_production): continue pending block on restart
Expand Down
5 changes: 3 additions & 2 deletions configs/chain_config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,6 @@ sequencer_address: "0x0"
mempool_tx_limit: 10000
# Transaction limit in the mempool, additional limit for declare transactions.
mempool_declare_tx_limit: 20
# Max age of a transaction in the mempool.
mempool_tx_max_age: "5h"
# Max age of a transaction in the mempool. Null for no age limit.
# mempool_tx_max_age: "5h"
mempool_tx_max_age: null
2 changes: 1 addition & 1 deletion configs/presets/devnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ eth_core_contract_address: "0xe7f1725e7734ce288f8367e1bb143e90bb3f0512"
eth_gps_statement_verifier: "0xf294781D719D2F4169cE54469C28908E6FA752C1"
mempool_tx_limit: 10000
mempool_declare_tx_limit: 20
mempool_tx_max_age: "5h"
mempool_tx_max_age: null
2 changes: 1 addition & 1 deletion configs/presets/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ eth_core_contract_address: "0x4737c0c1B4D5b1A687B42610DdabEE781152359c"
eth_gps_statement_verifier: "0x2046B966994Adcb88D83f467a41b75d64C2a619F"
mempool_tx_limit: 10000
mempool_declare_tx_limit: 20
mempool_tx_max_age: "5h"
mempool_tx_max_age: null
2 changes: 1 addition & 1 deletion configs/presets/mainnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ eth_core_contract_address: "0xc662c410C0ECf747543f5bA90660f6ABeBD9C8c4"
eth_gps_statement_verifier: "0x47312450B3Ac8b5b8e247a6bB6d523e7605bDb60"
mempool_tx_limit: 10000
mempool_declare_tx_limit: 20
mempool_tx_max_age: "5h"
mempool_tx_max_age: null
2 changes: 1 addition & 1 deletion configs/presets/sepolia.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ eth_core_contract_address: "0xE2Bb56ee936fd6433DC0F6e7e3b8365C906AA057"
eth_gps_statement_verifier: "0xf294781D719D2F4169cE54469C28908E6FA752C1"
mempool_tx_limit: 10000
mempool_declare_tx_limit: 20
mempool_tx_max_age: "5h"
mempool_tx_max_age: null
2 changes: 0 additions & 2 deletions crates/client/analytics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ impl Analytics {

let layer = OpenTelemetryTracingBridge::new(&logger_provider);
tracing_subscriber.with(OpenTelemetryLayer::new(tracer)).with(layer).init();

tracing::info!("OTEL initialized");
Ok(())
}

Expand Down
10 changes: 5 additions & 5 deletions crates/client/block_import/src/tests/block_import_utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use mp_block::header::{GasPrices, L1DataAvailabilityMode};
use mp_block::header::{BlockTimestamp, GasPrices, L1DataAvailabilityMode};
use mp_block::Header;
use mp_chain_config::StarknetVersion;
use mp_state_update::StateDiff;
Expand All @@ -18,7 +18,7 @@ pub fn create_dummy_unverified_header() -> UnverifiedHeader {
UnverifiedHeader {
parent_block_hash: Some(felt!("0x1")),
sequencer_address: felt!("0x2"),
block_timestamp: 12345,
block_timestamp: BlockTimestamp(12345),
protocol_version: StarknetVersion::new(0, 13, 2, 0),
l1_gas_price: GasPrices {
eth_l1_gas_price: 14,
Expand Down Expand Up @@ -77,7 +77,7 @@ pub fn create_dummy_header() -> Header {
state_diff_length: Some(0),
state_diff_commitment: Some(felt!("0x0")),
receipt_commitment: Some(felt!("0x0")),
block_timestamp: 12345,
block_timestamp: BlockTimestamp(12345),
protocol_version: StarknetVersion::new(0, 13, 2, 0),
l1_gas_price: GasPrices {
eth_l1_gas_price: 14,
Expand Down Expand Up @@ -117,7 +117,7 @@ pub fn create_dummy_unverified_full_block() -> UnverifiedFullBlock {
header: UnverifiedHeader {
parent_block_hash: Some(Felt::ZERO),
sequencer_address: Felt::ZERO,
block_timestamp: 0,
block_timestamp: BlockTimestamp(0),
protocol_version: StarknetVersion::default(),
l1_gas_price: GasPrices::default(),
l1_da_mode: L1DataAvailabilityMode::Blob,
Expand All @@ -139,7 +139,7 @@ pub fn create_dummy_pending_block() -> PreValidatedPendingBlock {
header: UnverifiedHeader {
parent_block_hash: Some(felt!("0x1")),
sequencer_address: felt!("0x2"),
block_timestamp: 12345,
block_timestamp: BlockTimestamp(12345),
protocol_version: StarknetVersion::new(0, 13, 2, 0),
l1_gas_price: GasPrices {
eth_l1_gas_price: 14,
Expand Down
4 changes: 2 additions & 2 deletions crates/client/block_import/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! Step 2. verify_apply: [`PreValidatedBlock`] ====[`crate::verify_apply`]===> [`BlockImportResult`]
use mp_block::{
header::{GasPrices, L1DataAvailabilityMode},
header::{BlockTimestamp, GasPrices, L1DataAvailabilityMode},
Header, VisitedSegments,
};
use mp_chain_config::StarknetVersion;
Expand All @@ -24,7 +24,7 @@ pub struct UnverifiedHeader {
/// The Starknet address of the sequencer that created this block.
pub sequencer_address: Felt,
/// The time the sequencer created this block before executing transactions
pub block_timestamp: u64,
pub block_timestamp: BlockTimestamp,
/// The version of the Starknet protocol used when creating this block
pub protocol_version: StarknetVersion,
/// Gas prices for this block
Expand Down
40 changes: 1 addition & 39 deletions crates/client/block_production/src/finalize_execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,51 +12,13 @@ use mp_state_update::{
ContractStorageDiffItem, DeclaredClassItem, DeployedContractItem, NonceUpdate, ReplacedClassItem, StateDiff,
StorageEntry,
};
use starknet_api::core::{ClassHash, CompiledClassHash, ContractAddress, Nonce};
use starknet_api::core::ContractAddress;
use std::collections::{hash_map, HashMap};

#[derive(Debug, thiserror::Error)]
#[error("Error converting state diff to state map")]
pub struct StateDiffToStateMapError;

pub fn state_diff_to_state_map(diff: StateDiff) -> Result<StateMaps, StateDiffToStateMapError> {
let nonces = diff
.nonces
.into_iter()
.map(|entry| Ok((entry.contract_address.try_into().map_err(|_| StateDiffToStateMapError)?, Nonce(entry.nonce))))
.collect::<Result<_, StateDiffToStateMapError>>()?;
let class_hashes = diff
.deployed_contracts
.into_iter()
.map(|entry| Ok((entry.address.try_into().map_err(|_| StateDiffToStateMapError)?, ClassHash(entry.class_hash))))
.chain(diff.replaced_classes.into_iter().map(|entry| {
Ok((entry.contract_address.try_into().map_err(|_| StateDiffToStateMapError)?, ClassHash(entry.class_hash)))
}))
.collect::<Result<_, StateDiffToStateMapError>>()?;
let storage = diff
.storage_diffs
.into_iter()
.flat_map(|d| {
d.storage_entries.into_iter().map(move |e| {
Ok((
(
d.address.try_into().map_err(|_| StateDiffToStateMapError)?,
e.key.try_into().map_err(|_| StateDiffToStateMapError)?,
),
e.value,
))
})
})
.collect::<Result<_, StateDiffToStateMapError>>()?;
let declared_contracts = diff.declared_classes.iter().map(|d| (ClassHash(d.class_hash), true)).collect();
let compiled_class_hashes = diff
.declared_classes
.into_iter()
.map(|d| (ClassHash(d.class_hash), CompiledClassHash(d.compiled_class_hash)))
.collect();
Ok(StateMaps { nonces, class_hashes, storage, declared_contracts, compiled_class_hashes })
}

pub(crate) fn state_map_to_state_diff(
backend: &MadaraBackend,
on_top_of: &Option<DbBlockId>,
Expand Down
124 changes: 47 additions & 77 deletions crates/client/block_production/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@ use crate::close_block::close_block;
use crate::metrics::BlockProductionMetrics;
use blockifier::blockifier::transaction_executor::{TransactionExecutor, BLOCK_STATE_ACCESS_ERR};
use blockifier::bouncer::{BouncerWeights, BuiltinCount};
use blockifier::state::state_api::UpdatableState;
use blockifier::transaction::errors::TransactionExecutionError;
use finalize_execution_state::{state_diff_to_state_map, StateDiffToStateMapError};
use finalize_execution_state::StateDiffToStateMapError;
use mc_block_import::{BlockImportError, BlockImporter};
use mc_db::db_block_id::DbBlockId;
use mc_db::{MadaraBackend, MadaraStorageError};
use mc_exec::{BlockifierStateAdapter, ExecutionContext};
use mc_mempool::header::make_pending_header;
use mc_mempool::{L1DataProvider, MempoolProvider};
use mp_block::{BlockId, BlockTag, MadaraMaybePendingBlockInfo, MadaraPendingBlock, VisitedSegments};
use mp_block::{BlockId, BlockTag, MadaraPendingBlock, VisitedSegments};
use mp_class::compile::ClassCompilationError;
use mp_class::{ConvertedClass, LegacyConvertedClass, SierraConvertedClass};
use mp_class::ConvertedClass;
use mp_convert::ToFelt;
use mp_receipt::from_blockifier_execution_info;
use mp_state_update::{ContractStorageDiffItem, StateDiff, StorageEntry};
use mp_transactions::TransactionWithHash;
use mp_utils::service::ServiceContext;
use opentelemetry::KeyValue;
use starknet_api::core::ClassHash;
use starknet_types_core::felt::Felt;
use std::borrow::Cow;
use std::collections::VecDeque;
Expand All @@ -48,6 +46,7 @@ use std::time::Instant;
mod close_block;
mod finalize_execution_state;
pub mod metrics;
mod re_add_finalized_to_blockifier;

#[derive(Default, Clone)]
struct ContinueBlockStats {
Expand Down Expand Up @@ -103,85 +102,54 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
self.current_pending_tick = n;
}

/// Continue the pending block state by re-adding all of its transactions back into the mempool.
/// This function will always clear the pending block in db, even if the transactions could not be added to the mempool.
pub fn re_add_pending_block_txs_to_mempool(
backend: &MadaraBackend,
mempool: &Mempool,
) -> Result<(), Cow<'static, str>> {
let Some(current_pending_block) =
backend.get_block(&DbBlockId::Pending).map_err(|err| format!("Getting pending block: {err:#}"))?
else {
// No pending block
return Ok(());
};
backend.clear_pending_block().map_err(|err| format!("Clearing pending block: {err:#}"))?;

let n_txs = re_add_finalized_to_blockifier::re_add_txs_to_mempool(current_pending_block, mempool, backend)
.map_err(|err| format!("Re-adding transactions to mempool: {err:#}"))?;

if n_txs > 0 {
tracing::info!("🔁 Re-added {n_txs} transactions from the pending block back into the mempool");
}
Ok(())
}

pub fn new(
backend: Arc<MadaraBackend>,
importer: Arc<BlockImporter>,
mempool: Arc<Mempool>,
metrics: Arc<BlockProductionMetrics>,
l1_data_provider: Arc<dyn L1DataProvider>,
) -> Result<Self, Error> {
let (pending_block, state_diff, pcs) = match backend.get_block(&DbBlockId::Pending)? {
Some(pending) => {
let MadaraMaybePendingBlockInfo::Pending(info) = pending.info else {
return Err(Error::Unexpected("Get a pending block".into()));
};
let pending_state_update = backend.get_pending_block_state_update()?;
(MadaraPendingBlock { info, inner: pending.inner }, pending_state_update, Default::default())
}
None => {
let parent_block_hash = backend
.get_block_hash(&BlockId::Tag(BlockTag::Latest))?
.unwrap_or(/* genesis block's parent hash */ Felt::ZERO);

(
MadaraPendingBlock::new_empty(make_pending_header(
parent_block_hash,
backend.chain_config(),
l1_data_provider.as_ref(),
)),
StateDiff::default(),
Default::default(),
)
}
};
if let Err(err) = Self::re_add_pending_block_txs_to_mempool(&backend, &mempool) {
// This error should not stop block production from working. If it happens, that's too bad. We drop the pending state and start from
// a fresh one.
tracing::error!("Failed to continue the pending block state: {err:#}");
}

let declared_classes: Vec<ConvertedClass> = state_diff
.declared_classes
.iter()
.map(|item| {
let class_info = backend.get_class_info(&DbBlockId::Pending, &item.class_hash)?.ok_or_else(|| {
Error::Unexpected(format!("No class info for declared class {:#x}", item.class_hash).into())
})?;
let converted_class = match class_info {
mp_class::ClassInfo::Sierra(info) => {
let compiled =
backend.get_sierra_compiled(&DbBlockId::Pending, &item.class_hash)?.ok_or_else(|| {
Error::Unexpected(
format!("No compiled class for declared class {:#x}", item.class_hash).into(),
)
})?;
let compiled = Arc::new(compiled);
ConvertedClass::Sierra(SierraConvertedClass { class_hash: item.class_hash, info, compiled })
}
mp_class::ClassInfo::Legacy(info) => {
ConvertedClass::Legacy(LegacyConvertedClass { class_hash: item.class_hash, info })
}
};

Ok(converted_class)
})
.collect::<Result<_, Error>>()?;

let class_hash_to_class = declared_classes
.iter()
.map(|c| {
Ok((
ClassHash(c.class_hash()),
match c {
ConvertedClass::Legacy(class) => class.info.contract_class.to_blockifier_class()?,
ConvertedClass::Sierra(class) => class.compiled.to_blockifier_class()?,
},
))
})
.collect::<Result<_, Error>>()?;

let mut executor =
ExecutionContext::new_in_block(Arc::clone(&backend), &pending_block.info.clone().into())?.tx_executor();
let block_state =
executor.block_state.as_mut().expect("Block state can not be None unless we take ownership of it");
let parent_block_hash = backend
.get_block_hash(&BlockId::Tag(BlockTag::Latest))?
.unwrap_or(/* genesis block's parent hash */ Felt::ZERO);

// Apply pending state
block_state.apply_writes(&state_diff_to_state_map(state_diff)?, &class_hash_to_class, &pcs);
let pending_block = MadaraPendingBlock::new_empty(make_pending_header(
parent_block_hash,
backend.chain_config(),
l1_data_provider.as_ref(),
));

let executor =
ExecutionContext::new_in_block(Arc::clone(&backend), &pending_block.info.clone().into())?.tx_executor();

Ok(Self {
importer,
Expand All @@ -190,7 +158,7 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {
executor,
current_pending_tick: 0,
block: pending_block,
declared_classes,
declared_classes: Default::default(),
l1_data_provider,
metrics,
})
Expand Down Expand Up @@ -294,7 +262,9 @@ impl<Mempool: MempoolProvider> BlockProductionTask<Mempool> {

// Add back the unexecuted transactions to the mempool.
stats.n_re_added_to_mempool = txs_to_process.len();
self.mempool.re_add_txs(txs_to_process, executed_txs);
self.mempool
.re_add_txs(txs_to_process, executed_txs)
.map_err(|err| Error::Unexpected(format!("Mempool error: {err:#}").into()))?;

tracing::debug!(
"Finished tick with {} new transactions, now at {} - re-adding {} txs to mempool",
Expand Down
Loading

0 comments on commit 237f31c

Please sign in to comment.