From 3a999830259a7b1b511da4338340439b3f306ae6 Mon Sep 17 00:00:00 2001 From: Wolfgang Welz Date: Mon, 22 Jan 2024 22:10:46 +0100 Subject: [PATCH] Fix batch queue handling in Optimism derivation (#73) * sort batches in a multimap * update copyright --- lib/src/optimism/batcher.rs | 173 ++++++++++++++++------------ lib/src/optimism/batcher_channel.rs | 24 ++-- lib/src/optimism/mod.rs | 76 ++++++------ primitives/src/batch.rs | 81 +++++-------- 4 files changed, 183 insertions(+), 171 deletions(-) diff --git a/lib/src/optimism/batcher.rs b/lib/src/optimism/batcher.rs index 266b0ab47..cf38c0452 100644 --- a/lib/src/optimism/batcher.rs +++ b/lib/src/optimism/batcher.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RISC Zero, Inc. +// Copyright 2024 RISC Zero, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -13,14 +13,11 @@ // limitations under the License. use core::cmp::Ordering; -use std::{ - cmp::Reverse, - collections::{BinaryHeap, VecDeque}, -}; +use std::collections::{BTreeMap, VecDeque}; -use anyhow::{ensure, Context, Result}; +use anyhow::{bail, ensure, Context, Result}; use zeth_primitives::{ - batch::Batch, + batch::{Batch, BatchEssence}, transactions::{ ethereum::EthereumTxEssence, optimism::{OptimismTxEssence, OPTIMISM_DEPOSITED_TX_TYPE}, @@ -34,9 +31,16 @@ use super::{ }; #[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] -pub struct BlockInfo { +pub struct BlockId { + pub hash: B256, + pub number: BlockNumber, +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq, Default)] +pub struct L2BlockInfo { pub hash: B256, pub timestamp: u64, + pub l1_origin: BlockId, } #[derive(Clone, Debug, Default, PartialEq, Eq)] @@ -52,7 +56,7 @@ pub struct Epoch { pub struct State { pub current_l1_block_number: BlockNumber, pub current_l1_block_hash: BlockHash, - pub safe_head: BlockInfo, + pub safe_head: L2BlockInfo, pub epoch: Epoch, pub op_epoch_queue: VecDeque, pub next_epoch: Option, @@ -62,7 +66,7 @@ impl State { pub fn new( current_l1_block_number: BlockNumber, current_l1_block_hash: BlockHash, - safe_head: BlockInfo, + safe_head: L2BlockInfo, epoch: Epoch, ) -> Self { State { @@ -75,19 +79,19 @@ impl State { } } - pub fn do_next_epoch(&mut self) -> anyhow::Result<()> { - self.epoch = self.next_epoch.take().expect("No next epoch!"); + pub fn do_next_epoch(&mut self) -> Result<()> { + self.epoch = self.next_epoch.take().context("no next epoch!")?; self.deque_next_epoch_if_none()?; Ok(()) } - pub fn push_epoch(&mut self, epoch: Epoch) -> anyhow::Result<()> { + pub fn push_epoch(&mut self, epoch: Epoch) -> Result<()> { self.op_epoch_queue.push_back(epoch); self.deque_next_epoch_if_none()?; Ok(()) } - fn deque_next_epoch_if_none(&mut self) -> anyhow::Result<()> { + fn deque_next_epoch_if_none(&mut self) -> Result<()> { if self.next_epoch.is_none() { while let Some(next_epoch) = self.op_epoch_queue.pop_front() { if next_epoch.number <= self.epoch.number { @@ -96,7 +100,7 @@ impl State { self.next_epoch = Some(next_epoch); break; } else { - anyhow::bail!("Epoch gap!"); + bail!("Epoch gap!"); } } } @@ -112,8 +116,15 @@ enum BatchStatus { Future, } +/// A [Batch] with inclusion information. +pub struct BatchWithInclusion { + pub essence: BatchEssence, + pub inclusion_block_number: BlockNumber, +} + pub struct Batcher { - batches: BinaryHeap>, + /// Multimap of batches, keyed by timestamp + batches: BTreeMap>, batcher_channel: BatcherChannels, pub state: State, pub config: ChainConfig, @@ -122,7 +133,7 @@ pub struct Batcher { impl Batcher { pub fn new( config: ChainConfig, - op_head: BlockInfo, + op_head: L2BlockInfo, eth_block: &BlockInput, ) -> Result { let eth_block_hash = eth_block.block_header.hash(); @@ -143,7 +154,7 @@ impl Batcher { ); Ok(Batcher { - batches: BinaryHeap::new(), + batches: BTreeMap::new(), batcher_channel, state, config, @@ -198,7 +209,10 @@ impl Batcher { batch.essence.parent_hash, batch.essence.epoch_num ); - self.batches.push(Reverse(batch)); + self.batches + .entry(batch.essence.timestamp) + .or_default() + .push_back(batch); }); } @@ -209,78 +223,85 @@ impl Batcher { } pub fn read_batch(&mut self) -> Result> { - let mut out = None; + let epoch = &self.state.epoch; + let safe_l2_head = self.state.safe_head; + + ensure!( + safe_l2_head.l1_origin.hash == epoch.hash + || safe_l2_head.l1_origin.number == epoch.number - 1, + "buffered L1 chain epoch does not match safe head origin" + ); + + let mut next_batch = None; // Grab the first accepted batch. From the spec: // "The batches are processed in order of the inclusion on L1: if multiple batches can be // accept-ed the first is applied. An implementation can defer future batches a later // derivation step to reduce validation work." - while let Some(Reverse(batch)) = self.batches.pop() { - match self.batch_status(&batch) { - BatchStatus::Accept => { - out = Some(batch); - break; - } - BatchStatus::Drop => { - #[cfg(not(target_os = "zkvm"))] - log::debug!("Dropping batch"); - } - BatchStatus::Future => { - #[cfg(not(target_os = "zkvm"))] - log::debug!("Encountered future batch"); - - self.batches.push(Reverse(batch)); - break; - } - BatchStatus::Undecided => { - #[cfg(not(target_os = "zkvm"))] - log::debug!("Encountered undecided batch"); - - self.batches.push(Reverse(batch)); - break; + 'outer: while let Some((ts, mut batches)) = self.batches.pop_first() { + // iterate over all batches, in order of inclusion and find the first accepted batch + // retain batches that may be processed in the future, or those we are undecided on + while let Some(batch) = batches.pop_front() { + match self.batch_status(&batch) { + BatchStatus::Accept => { + next_batch = Some(batch); + // if there are still batches left, insert them back into the map + if !batches.is_empty() { + self.batches.insert(ts, batches); + } + break 'outer; + } + BatchStatus::Drop => {} + BatchStatus::Future | BatchStatus::Undecided => { + batches.push_front(batch); + self.batches.insert(ts, batches); + break 'outer; + } } } } + if let Some(batch) = next_batch { + return Ok(Some(Batch(batch.essence))); + } + // If there are no accepted batches, attempt to generate the default batch. From the spec: // "If no batch can be accept-ed, and the stage has completed buffering of all batches // that can fully be read from the L1 block at height epoch.number + - // sequence_window_size, and the next_epoch is available, then an empty batch can - // be derived." - if out.is_none() { - let current_l1_block = self.state.current_l1_block_number; - let safe_head = self.state.safe_head; - let current_epoch = &self.state.epoch; - let next_epoch = &self.state.next_epoch; - let seq_window_size = self.config.seq_window_size; - - if let Some(next_epoch) = next_epoch { - if current_l1_block > current_epoch.number + seq_window_size { - let next_timestamp = safe_head.timestamp + self.config.blocktime; - let epoch = if next_timestamp < next_epoch.timestamp { - // From the spec: - // "If next_timestamp < next_epoch.time: the current L1 origin is repeated, - // to preserve the L2 time invariant." - current_epoch - } else { - next_epoch - }; - - out = Some(Batch::new( - current_l1_block, - safe_head.hash, - epoch.number, - epoch.hash, - next_timestamp, - )) - } + // sequence_window_size, and the next_epoch is available, then an empty batch can be + // derived." + let current_l1_block = self.state.current_l1_block_number; + let sequence_window_size = self.config.seq_window_size; + let first_of_epoch = epoch.number == safe_l2_head.l1_origin.number + 1; + + if current_l1_block > epoch.number + sequence_window_size { + if let Some(next_epoch) = &self.state.next_epoch { + let next_timestamp = safe_l2_head.timestamp + self.config.blocktime; + let batch_epoch = if next_timestamp < next_epoch.timestamp || first_of_epoch { + // From the spec: + // "If next_timestamp < next_epoch.time: the current L1 origin is repeated, + // to preserve the L2 time invariant." + // "If the batch is the first batch of the epoch, that epoch is used instead + // of advancing the epoch to ensure that there is at least one L2 block per + // epoch." + epoch + } else { + next_epoch + }; + + return Ok(Some(Batch::new( + safe_l2_head.hash, + batch_epoch.number, + batch_epoch.hash, + next_timestamp, + ))); } } - Ok(out) + Ok(None) } - fn batch_status(&self, batch: &Batch) -> BatchStatus { + fn batch_status(&self, batch: &BatchWithInclusion) -> BatchStatus { // Apply the batch status rules. The spec describes a precise order for these checks. let epoch = &self.state.epoch; @@ -295,7 +316,7 @@ impl Batcher { Ordering::Greater => { #[cfg(not(target_os = "zkvm"))] log::debug!( - "Future batch: {} = batch.essence.timestamp > next_timestamp = {}", + "Future batch: {} = batch.timestamp > next_timestamp = {}", &batch.essence.timestamp, &next_timestamp ); @@ -304,7 +325,7 @@ impl Batcher { Ordering::Less => { #[cfg(not(target_os = "zkvm"))] log::debug!( - "Batch too old: {} = batch.essence.timestamp < next_timestamp = {}", + "Batch too old: {} = batch.timestamp < next_timestamp = {}", &batch.essence.timestamp, &next_timestamp ); diff --git a/lib/src/optimism/batcher_channel.rs b/lib/src/optimism/batcher_channel.rs index 4770670e9..8655b5803 100644 --- a/lib/src/optimism/batcher_channel.rs +++ b/lib/src/optimism/batcher_channel.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RISC Zero, Inc. +// Copyright 2024 RISC Zero, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,7 +27,7 @@ use zeth_primitives::{ Address, BlockNumber, }; -use super::config::ChainConfig; +use super::{batcher::BatchWithInclusion, config::ChainConfig}; use crate::utils::MultiReader; pub const MAX_RLP_BYTES_PER_CHANNEL: u64 = 10_000_000; @@ -37,7 +37,7 @@ pub struct BatcherChannels { max_channel_bank_size: u64, channel_timeout: u64, channels: VecDeque, - batches: VecDeque>, + batches: VecDeque>, } impl BatcherChannels { @@ -126,7 +126,7 @@ impl BatcherChannels { Ok(()) } - pub fn read_batches(&mut self) -> Option> { + pub fn read_batches(&mut self) -> Option> { self.batches.pop_front() } @@ -282,7 +282,7 @@ impl Channel { /// Reads all batches from an ready channel. If there is an invalid batch, the rest of /// the channel is skipped, but previous batches are returned. - fn read_batches(&self, block_number: BlockNumber) -> Vec { + fn read_batches(&self, block_number: BlockNumber) -> Vec { debug_assert!(self.is_ready()); let mut batches = Vec::new(); @@ -297,18 +297,24 @@ impl Channel { batches } - fn decode_batches(&self, block_number: BlockNumber, batches: &mut Vec) -> Result<()> { + fn decode_batches( + &self, + block_number: BlockNumber, + batches: &mut Vec, + ) -> Result<()> { let decompressed = self .decompress() .context("failed to decompress channel data")?; let mut channel_data = decompressed.as_slice(); while !channel_data.is_empty() { - let mut batch = Batch::decode(&mut channel_data) + let batch = Batch::decode(&mut channel_data) .with_context(|| format!("failed to decode batch {}", batches.len()))?; - batch.inclusion_block_number = block_number; - batches.push(batch); + batches.push(BatchWithInclusion { + essence: batch.0, + inclusion_block_number: block_number, + }); } Ok(()) diff --git a/lib/src/optimism/mod.rs b/lib/src/optimism/mod.rs index 51894e71c..e36a18ba0 100644 --- a/lib/src/optimism/mod.rs +++ b/lib/src/optimism/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RISC Zero, Inc. +// Copyright 2024 RISC Zero, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ use zeth_primitives::{ use crate::{ consts::ONE, optimism::{ - batcher::{Batcher, BlockInfo}, + batcher::{Batcher, BlockId, L2BlockInfo}, batcher_db::BatcherDb, config::ChainConfig, }, @@ -163,9 +163,13 @@ impl DeriveMachine { Batcher::new( op_chain_config, - BlockInfo { + L2BlockInfo { hash: op_head_block_hash, timestamp: op_head.block_header.timestamp.try_into().unwrap(), + l1_origin: BlockId { + number: set_l1_block_values.number, + hash: set_l1_block_values.hash, + }, }, ð_head, )? @@ -218,33 +222,34 @@ impl DeriveMachine { info!( "Read batch for Op block {}: timestamp={}, epoch={}, tx count={}, parent hash={:?}", self.op_block_no, - op_batch.essence.timestamp, - op_batch.essence.epoch_num, - op_batch.essence.transactions.len(), - op_batch.essence.parent_hash, + op_batch.0.timestamp, + op_batch.0.epoch_num, + op_batch.0.transactions.len(), + op_batch.0.parent_hash, ); // Update sequence number (and fetch deposits if start of new epoch) - let deposits = - if op_batch.essence.epoch_num == self.op_batcher.state.epoch.number + 1 { - self.op_block_seq_no = 0; - self.op_batcher.state.do_next_epoch()?; - - self.op_batcher - .state - .epoch - .deposits - .iter() - .map(|tx| tx.to_rlp()) - .collect() - } else { - self.op_block_seq_no += 1; - - Vec::new() - }; + let l2_safe_head = &self.op_batcher.state.safe_head; + let deposits = if l2_safe_head.l1_origin.number != op_batch.0.epoch_num { + self.op_block_seq_no = 0; + self.op_batcher.state.do_next_epoch()?; + + self.op_batcher + .state + .epoch + .deposits + .iter() + .map(|tx| tx.to_rlp()) + .collect() + } else { + self.op_block_seq_no += 1; + + Vec::new() + }; // Obtain new Op head let new_op_head = { + // load the next op block header let new_op_head = self .derive_input .db @@ -252,9 +257,9 @@ impl DeriveMachine { .context("block not found")?; // Verify new op head has the expected parent - assert_eq!( - new_op_head.parent_hash, - self.op_batcher.state.safe_head.hash + ensure!( + new_op_head.parent_hash == self.op_batcher.state.safe_head.hash, + "Invalid op block parent hash" ); // Verify that the new op head transactions are consistent with the batch @@ -266,7 +271,7 @@ impl DeriveMachine { let l1_attributes_tx = self.derive_l1_attributes_deposited_tx(&op_batch); let derived_transactions = once(l1_attributes_tx.to_rlp()) .chain(deposits) - .chain(op_batch.essence.transactions.iter().map(|tx| tx.to_vec())) + .chain(op_batch.0.transactions.iter().map(|tx| tx.to_vec())) .enumerate(); let mut tx_trie = MptNode::default(); @@ -274,9 +279,10 @@ impl DeriveMachine { let trie_key = tx_no.to_rlp(); tx_trie.insert(&trie_key, tx)?; } - if tx_trie.hash() != new_op_head.transactions_root { - bail!("Invalid op block transaction data! Transaction trie root does not match") - } + ensure!( + tx_trie.hash() == new_op_head.transactions_root, + "Invalid op block transaction data! Transaction trie root does not match" + ); } new_op_head @@ -290,9 +296,13 @@ impl DeriveMachine { new_op_head.number, new_op_head_hash ); - self.op_batcher.state.safe_head = BlockInfo { + self.op_batcher.state.safe_head = L2BlockInfo { hash: new_op_head_hash, timestamp: new_op_head.timestamp.try_into().unwrap(), + l1_origin: BlockId { + number: self.op_batcher.state.epoch.number, + hash: self.op_batcher.state.epoch.hash, + }, }; derived_op_blocks.push((new_op_head.number, new_op_head_hash)); @@ -335,7 +345,7 @@ impl DeriveMachine { }); let source_hash: B256 = { - let l1_block_hash = op_batch.essence.epoch_hash.0; + let l1_block_hash = op_batch.0.epoch_hash.0; let seq_number = U256::from(self.op_block_seq_no).to_be_bytes::<32>(); let source_hash_sequencing = keccak([l1_block_hash, seq_number].concat()); keccak([ONE.to_be_bytes::<32>(), source_hash_sequencing].concat()).into() diff --git a/primitives/src/batch.rs b/primitives/src/batch.rs index 1028ba4b4..a2c4fa021 100644 --- a/primitives/src/batch.rs +++ b/primitives/src/batch.rs @@ -1,4 +1,4 @@ -// Copyright 2023 RISC Zero, Inc. +// Copyright 2024 RISC Zero, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,63 +12,43 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::cmp::Ordering; - -use alloy_primitives::{BlockNumber, Bytes, B256}; +use alloy_primitives::{Bytes, B256}; use alloy_rlp::{Decodable, Encodable}; use alloy_rlp_derive::{RlpDecodable, RlpEncodable}; use serde::{Deserialize, Serialize}; +/// Bytes for RLP-encoded transactions. pub type RawTransaction = Bytes; -/// A batch contains information to build one Optimism block. +/// A batch represents the inputs needed to build Optimism block. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub struct Batch { - pub inclusion_block_number: BlockNumber, - pub essence: BatchEssence, -} +pub struct Batch(pub BatchEssence); /// Represents the core details of a [Batch], specifically the portion that is derived /// from the batcher transactions. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, RlpEncodable, RlpDecodable)] pub struct BatchEssence { + /// The block hash of the previous L2 block. pub parent_hash: B256, + /// The number of the L1 block corresponding to the sequencing epoch of the L2 block. pub epoch_num: u64, + /// The hash of the L1 block corresponding to the sequencing epoch of the L2 block. pub epoch_hash: B256, + /// The timestamp of the L2 block. pub timestamp: u64, + /// An RLP-encoded list of EIP-2718 encoded transactions. pub transactions: Vec, } -impl PartialOrd for Batch { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for Batch { - fn cmp(&self, other: &Self) -> Ordering { - self.essence.timestamp.cmp(&other.essence.timestamp) - } -} - impl Batch { - pub fn new( - inclusion_block_number: BlockNumber, - parent_hash: B256, - epoch_num: u64, - epoch_hash: B256, - timestamp: u64, - ) -> Self { - Self { - inclusion_block_number, - essence: BatchEssence { - parent_hash, - epoch_num, - epoch_hash, - timestamp, - transactions: Vec::new(), - }, - } + pub fn new(parent_hash: B256, epoch_num: u64, epoch_hash: B256, timestamp: u64) -> Self { + Batch(BatchEssence { + parent_hash, + epoch_num, + epoch_hash, + timestamp, + transactions: Vec::new(), + }) } } @@ -78,28 +58,26 @@ impl Encodable for Batch { // wrap the RLP-essence inside a bytes payload alloy_rlp::Header { list: false, - payload_length: self.essence.length() + 1, + payload_length: self.0.length() + 1, } .encode(out); out.put_u8(0x00); - self.essence.encode(out); + self.0.encode(out); } #[inline] fn length(&self) -> usize { - let bytes_length = self.essence.length() + 1; + let bytes_length = self.0.length() + 1; alloy_rlp::length_of_length(bytes_length) + bytes_length } } impl Decodable for Batch { + #[inline] fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { let bytes = alloy_rlp::Header::decode_bytes(buf, false)?; match bytes.split_first() { - Some((0, mut payload)) => Ok(Self { - inclusion_block_number: 0, - essence: BatchEssence::decode(&mut payload)?, - }), + Some((0, mut payload)) => Ok(Self(BatchEssence::decode(&mut payload)?)), Some(_) => Err(alloy_rlp::Error::Custom("invalid version")), None => Err(alloy_rlp::Error::InputTooShort), } @@ -117,14 +95,11 @@ mod tests { fn rlp_roundtrip() { let expected = hex!("b85000f84da0dbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3840109d8fea0438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec0410884647f5ea9c0"); let batch: Batch = serde_json::from_value(json!({ - "inclusion_block_number": 0, - "essence": { - "parent_hash": "0xdbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3", - "epoch_num": 17422590, - "epoch_hash": "0x438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec04108", - "timestamp": 1686068905, - "transactions": [] - } + "parent_hash": "0xdbf6a80fef073de06add9b0d14026d6e5a86c85f6d102c36d3d8e9cf89c2afd3", + "epoch_num": 17422590, + "epoch_hash": "0x438335a20d98863a4c0c97999eb2481921ccd28553eac6f913af7c12aec04108", + "timestamp": 1686068905, + "transactions": [] })) .unwrap();