Skip to content

Commit

Permalink
l1-filter: Replace TxFilterRules with TxFilterConfig (#458)
Browse files Browse the repository at this point in the history
* l1-filter: Add TxFilterConfig

* filter: Remove TxFilterRule

* filter: Renaming and some cleanups

* primitives: Add sorted vec

* btcio/query: Remove Chainstate and Db dependency

* btcio/reader: Fix start..end calculation

* Fix nits

---------

Co-authored-by: Bibek Pandey <[email protected]>
  • Loading branch information
bewakes and Bibek Pandey authored Nov 29, 2024
1 parent 93d1294 commit da56be9
Show file tree
Hide file tree
Showing 28 changed files with 515 additions and 270 deletions.
11 changes: 4 additions & 7 deletions bin/strata-client/src/l1_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,20 @@ where

// TODO switch to checking the L1 tip in the consensus/client state
let l1_db = db.l1_db().clone();
let target_next_block = l1_db
.get_chain_tip()?
.map(|i| i + 1)
.unwrap_or(params.rollup().horizon_l1_height);
let horz_height = params.rollup().horizon_l1_height;
let target_next_block = l1_db.get_chain_tip()?.map(|i| i + 1).unwrap_or(horz_height);
assert!(target_next_block >= horz_height);

let reader_config = Arc::new(config.get_reader_config(params.clone()));
let chs_db = db.chain_state_db().clone();

executor.spawn_critical_async(
"bitcoin_data_reader_task",
bitcoin_data_reader_task::<D>(
bitcoin_data_reader_task(
rpc_client,
ev_tx,
target_next_block,
reader_config,
status_rx.clone(),
chs_db,
),
);

Expand Down
142 changes: 66 additions & 76 deletions crates/btcio/src/reader/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{

use anyhow::bail;
use bitcoin::{hashes::Hash, BlockHash};
use strata_db::traits::Database;
use strata_primitives::{buf::Buf32, params::Params};
use strata_primitives::buf::Buf32;
use strata_state::l1::{
get_btc_params, get_difficulty_adjustment_height, BtcParams, HeaderVerificationState,
L1BlockId, TimestampStore,
};
use strata_status::StatusTx;
use strata_tx_parser::{
filter::{derive_tx_filter_rules, filter_relevant_txs, TxFilterRule},
filter::filter_protocol_op_tx_refs,
filter_types::TxFilterConfig,
messages::{BlockData, L1Event},
};
use tokio::sync::mpsc;
Expand All @@ -26,42 +26,45 @@ use crate::{
status::{apply_status_updates, L1StatusUpdate},
};

// TODO: remove this
pub async fn bitcoin_data_reader_task<D: Database + 'static>(
/// Context that encapsulates common items needed for L1 reader.
struct ReaderContext<R: Reader> {
/// Bitcoin reader client
client: Arc<R>,
/// L1Event sender
event_tx: mpsc::Sender<L1Event>,
/// Config
config: Arc<ReaderConfig>,
/// Status transmitter
status_tx: Arc<StatusTx>,
}

/// The main task that initializes the reader state and starts reading from bitcoin.
pub async fn bitcoin_data_reader_task(
client: Arc<impl Reader>,
event_tx: mpsc::Sender<L1Event>,
target_next_block: u64,
config: Arc<ReaderConfig>,
status_rx: Arc<StatusTx>,
_chstate_db: Arc<D::ChainstateDB>,
status_tx: Arc<StatusTx>,
) -> anyhow::Result<()> {
do_reader_task(
client.as_ref(),
&event_tx,
target_next_block,
let ctx = ReaderContext {
client,
event_tx,
config,
status_rx.clone(),
)
.await
status_tx,
};
do_reader_task(ctx, target_next_block).await
}

async fn do_reader_task(
client: &impl Reader,
event_tx: &mpsc::Sender<L1Event>,
/// Inner function that actually does the reading task.
async fn do_reader_task<R: Reader>(
ctx: ReaderContext<R>,
target_next_block: u64,
config: Arc<ReaderConfig>,
status_rx: Arc<StatusTx>,
) -> anyhow::Result<()> {
info!(%target_next_block, "started L1 reader task!");

let poll_dur = Duration::from_millis(config.client_poll_dur_ms as u64);
let poll_dur = Duration::from_millis(ctx.config.client_poll_dur_ms as u64);

let mut state = init_reader_state(
target_next_block,
config.max_reorg_depth as usize * 2,
client,
)
.await?;
let mut state = init_reader_state(&ctx, target_next_block).await?;
let best_blkid = state.best_block();
info!(%best_blkid, "initialized L1 reader state");

Expand All @@ -70,19 +73,9 @@ async fn do_reader_task(
let cur_best_height = state.best_block_idx();
let poll_span = debug_span!("l1poll", %cur_best_height);

// Maybe this should be called outside loop?
let filters = derive_tx_filter_rules(config.params.rollup())?;

if let Err(err) = poll_for_new_blocks(
client,
event_tx,
&filters,
&mut state,
&mut status_updates,
config.params.as_ref(),
)
.instrument(poll_span)
.await
if let Err(err) = poll_for_new_blocks(&ctx, &mut state, &mut status_updates)
.instrument(poll_span)
.await
{
warn!(%cur_best_height, err = %err, "failed to poll Bitcoin client");
status_updates.push(L1StatusUpdate::RpcError(err.to_string()));
Expand All @@ -108,25 +101,33 @@ async fn do_reader_task(
.as_millis() as u64,
));

apply_status_updates(&status_updates, status_rx.clone()).await;
apply_status_updates(&status_updates, ctx.status_tx.clone()).await;
}
}

/// Inits the reader state by trying to backfill blocks up to a target height.
async fn init_reader_state(
async fn init_reader_state<R: Reader>(
ctx: &ReaderContext<R>,
target_next_block: u64,
lookback: usize,
client: &impl Reader,
) -> anyhow::Result<ReaderState> {
// Init the reader state using the blockid we were given, fill in a few blocks back.
debug!(%target_next_block, "initializing reader state");
let mut init_queue = VecDeque::new();

let lookback = ctx.config.max_reorg_depth as usize * 2;
let client = ctx.client.as_ref();
let hor_height = ctx.config.params.rollup().horizon_l1_height;
let pre_hor = hor_height.saturating_sub(1);
let target = target_next_block as i64;

// Do some math to figure out where our start and end are.
// TODO something screwed up with bookkeeping here
let chain_info = client.get_blockchain_info().await?;
let start_height = i64::max(target_next_block as i64 - lookback as i64, 0) as u64;
let end_height = u64::min(target_next_block - 1, chain_info.blocks);
let start_height = (target - lookback as i64)
.max(pre_hor as i64)
.min(chain_info.blocks as i64) as u64;
let end_height = chain_info
.blocks
.min(pre_hor.max(target_next_block.saturating_sub(1)));
debug!(%start_height, %end_height, "queried L1 client, have init range");

// Loop through the range we've determined to be okay and pull the blocks we want to look back
Expand All @@ -145,15 +146,12 @@ async fn init_reader_state(

/// Polls the chain to see if there's new blocks to look at, possibly reorging
/// if there's a mixup and we have to go back.
async fn poll_for_new_blocks(
client: &impl Reader,
event_tx: &mpsc::Sender<L1Event>,
filters: &[TxFilterRule],
async fn poll_for_new_blocks<R: Reader>(
ctx: &ReaderContext<R>,
state: &mut ReaderState,
status_updates: &mut Vec<L1StatusUpdate>,
params: &Params,
) -> anyhow::Result<()> {
let chain_info = client.get_blockchain_info().await?;
let chain_info = ctx.client.get_blockchain_info().await?;
status_updates.push(L1StatusUpdate::RpcConnected(true));
let client_height = chain_info.blocks;
let fresh_best_block = chain_info.best_block_hash.parse::<BlockHash>()?;
Expand All @@ -167,12 +165,12 @@ async fn poll_for_new_blocks(
}

// First, check for a reorg if there is one.
if let Some((pivot_height, pivot_blkid)) = find_pivot_block(client, state).await? {
if let Some((pivot_height, pivot_blkid)) = find_pivot_block(ctx.client.as_ref(), state).await? {
if pivot_height < state.best_block_idx() {
info!(%pivot_height, %pivot_blkid, "found apparent reorg");
state.rollback_to_height(pivot_height);
let revert_ev = L1Event::RevertTo(pivot_height);
if event_tx.send(revert_ev).await.is_err() {
if ctx.event_tx.send(revert_ev).await.is_err() {
warn!("unable to submit L1 reorg event, did persistence task exit?");
}
}
Expand All @@ -187,16 +185,7 @@ async fn poll_for_new_blocks(
// Now process each block we missed.
let scan_start_height = state.next_height();
for fetch_height in scan_start_height..=client_height {
let l1blkid = match fetch_and_process_block(
fetch_height,
client,
event_tx,
state,
status_updates,
filters,
params,
)
.await
let l1blkid = match fetch_and_process_block(ctx, fetch_height, state, status_updates).await
{
Ok(b) => b,
Err(e) => {
Expand Down Expand Up @@ -232,37 +221,38 @@ async fn find_pivot_block(
Ok(None)
}

async fn fetch_and_process_block(
/// Fetches a block at given height, extracts relevant transactions and emits an `L1Event`.
async fn fetch_and_process_block<R: Reader>(
ctx: &ReaderContext<R>,
height: u64,
client: &impl Reader,
event_tx: &mpsc::Sender<L1Event>,
state: &mut ReaderState,
status_updates: &mut Vec<L1StatusUpdate>,
filters: &[TxFilterRule],
params: &Params,
) -> anyhow::Result<BlockHash> {
let block = client.get_block_at(height).await?;
let block = ctx.client.get_block_at(height).await?;
let txs = block.txdata.len();

let filtered_txs = filter_relevant_txs(&block, filters);
let params = ctx.config.params.clone();
let filter_config = TxFilterConfig::derive_from(params.rollup())?;
let filtered_txs = filter_protocol_op_tx_refs(&block, filter_config);
let block_data = BlockData::new(height, block, filtered_txs);
let l1blkid = block_data.block().block_hash();
trace!(%height, %l1blkid, %txs, "fetched block from client");

status_updates.push(L1StatusUpdate::CurHeight(height));
status_updates.push(L1StatusUpdate::CurTip(l1blkid.to_string()));

let threshold = params.rollup.l1_reorg_safe_depth;
let genesis_ht = params.rollup.genesis_l1_height;
let threshold = params.rollup().l1_reorg_safe_depth;
let genesis_ht = params.rollup().genesis_l1_height;
let genesis_threshold = genesis_ht + threshold as u64;

trace!(%genesis_ht, %threshold, %genesis_threshold, "should genesis?");

if height == genesis_threshold {
info!(%height, %genesis_ht, "time for genesis");
let l1_verification_state =
get_verification_state(client, genesis_ht + 1, &get_btc_params()).await?;
if let Err(e) = event_tx
get_verification_state(ctx.client.as_ref(), genesis_ht + 1, &get_btc_params()).await?;
if let Err(e) = ctx
.event_tx
.send(L1Event::GenesisVerificationState(
height,
l1_verification_state,
Expand All @@ -274,7 +264,7 @@ async fn fetch_and_process_block(
}
}

if let Err(e) = event_tx.send(L1Event::BlockData(block_data)).await {
if let Err(e) = ctx.event_tx.send(L1Event::BlockData(block_data)).await {
error!("failed to submit L1 block event, did the persistence task crash?");
return Err(e.into());
}
Expand Down
4 changes: 1 addition & 3 deletions crates/chaintsn/src/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ fn process_execution_update<'u>(
state: &mut StateCache,
update: &'u exec_update::ExecUpdate,
) -> Result<&'u [WithdrawalIntent], TsnError> {
// for all the ops, corresponding to DepositIntent , remove those DepositIntent the ExecEnvState
let deposits = state.state().exec_env_state().pending_deposits();

// for all the ops, corresponding to DepositIntent, remove those DepositIntent the ExecEnvState
let applied_ops = update.input().applied_ops();

let applied_deposit_intent_idx = applied_ops
Expand Down
2 changes: 1 addition & 1 deletion crates/consensus-logic/src/l1_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ fn check_for_da_batch(
let signed_checkpts = protocol_ops_txs
.iter()
.filter_map(|ops_txs| match ops_txs.proto_op() {
strata_state::tx::ProtocolOperation::RollupInscription(inscription) => Some((
strata_state::tx::ProtocolOperation::Checkpoint(inscription) => Some((
inscription,
&blockdata.block().txdata[ops_txs.index() as usize],
)),
Expand Down
9 changes: 8 additions & 1 deletion crates/primitives/src/l1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ pub struct L1Status {
/// some useful traits on it such as [`serde::Deserialize`], [`borsh::BorshSerialize`] and
/// [`borsh::BorshDeserialize`].
// TODO: implement [`arbitrary::Arbitrary`]?
#[derive(Debug, Clone, Serialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, PartialEq, Eq, PartialOrd, Ord)]
pub struct BitcoinAddress {
/// The [`bitcoin::Network`] that this address is valid in.
network: Network,
Expand Down Expand Up @@ -352,6 +352,13 @@ impl BorshDeserialize for BitcoinAddress {
}
}

/// Outpoint of a bitcoin tx
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, BorshSerialize, BorshDeserialize)]
pub struct Outpoint {
pub txid: Buf32,
pub vout: u32,
}

/// A wrapper for bitcoin amount in sats similar to the implementation in [`bitcoin::Amount`].
///
/// NOTE: This wrapper has been created so that we can implement `Borsh*` traits on it.
Expand Down
1 change: 1 addition & 0 deletions crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ pub mod operator;
pub mod params;
pub mod prelude;
pub mod relay;
pub mod sorted_vec;
pub mod utils;
pub mod vk;
1 change: 1 addition & 0 deletions crates/primitives/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ pub struct DepositTxParams {
pub address_length: u8,

/// Exact bitcoin amount in the at-rest deposit.
// TODO: rename this to deposit_denominations and set the type to be a vec(possibly sorted)
pub deposit_amount: u64,

/// federation address derived from operator entries
Expand Down
Loading

0 comments on commit da56be9

Please sign in to comment.