From cef3f1247d89265a18d3f39ce3315ccb4eb9b458 Mon Sep 17 00:00:00 2001 From: Fedor Sakharov Date: Wed, 27 Mar 2024 09:57:39 +0100 Subject: [PATCH] feat: add filtering by l1 recepient (#423) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ ## Why ❔ ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `cargo fmt`. --- .github/spellcheck/finalizer.dic | 1 + Cargo.lock | 2 - README.md | 15 - bin/withdrawal-finalizer/src/config.rs | 8 +- bin/withdrawal-finalizer/src/main.rs | 2 +- chain-events/src/l2_events.rs | 25 +- client/src/zksync_types.rs | 3 + finalizer/Cargo.toml | 4 - finalizer/src/lib.rs | 142 +------- ...e59ab42e1511a38989dfc83f0a240d85a511a.json | 41 --- ...82b4099dad6c4b65d951e3534e2168e7ad79e.json | 77 ---- ...f736dacfb4b7e815127c0b95ef291e05df3e6.json | 77 ++++ ...68fb4dc41f5e0fb9d4b55cc13a7c5210963de.json | 78 ----- ...bb5f6b7a8e177692896d4d9fe9db129defb28.json | 78 ----- ...d6d0ba5525e99e237584b7db0777b1a5a43fa.json | 18 - ...987c10cec7252cadb72b4dbc2831b81f1dd50.json | 19 + ...e273f30a3e3ec884b0c1310aafbbf06a8f165.json | 8 +- ...28ef898f7e7a29fd93a97229dd974a86f9d90.json | 78 +++++ ...325203324_add-l1-receiver-address.down.sql | 2 + ...40325203324_add-l1-receiver-address.up.sql | 2 + storage/src/lib.rs | 329 ++++++------------ storage/src/macro_utils.rs | 148 ++++++++ 22 files changed, 484 insertions(+), 673 deletions(-) delete mode 100644 storage/.sqlx/query-1439a51cb9304493287a9c54c13e59ab42e1511a38989dfc83f0a240d85a511a.json delete mode 100644 storage/.sqlx/query-21ed1c0b070f4929f990e1810e682b4099dad6c4b65d951e3534e2168e7ad79e.json create mode 100644 storage/.sqlx/query-4a3f009964fdf43b3bd09d98b4ef736dacfb4b7e815127c0b95ef291e05df3e6.json delete mode 100644 storage/.sqlx/query-78b20c983b951e668b255f46c1f68fb4dc41f5e0fb9d4b55cc13a7c5210963de.json delete mode 100644 storage/.sqlx/query-8c331d6438925f2ebe7749d5cc5bb5f6b7a8e177692896d4d9fe9db129defb28.json delete mode 100644 storage/.sqlx/query-93fe2d0516a63ed0e27ef4fe4b2d6d0ba5525e99e237584b7db0777b1a5a43fa.json create mode 100644 storage/.sqlx/query-aadc16b7622f1b2a49630abc878987c10cec7252cadb72b4dbc2831b81f1dd50.json create mode 100644 storage/.sqlx/query-d8b97db2eb9166b68386244231828ef898f7e7a29fd93a97229dd974a86f9d90.json create mode 100644 storage/migrations/20240325203324_add-l1-receiver-address.down.sql create mode 100644 storage/migrations/20240325203324_add-l1-receiver-address.up.sql create mode 100644 storage/src/macro_utils.rs diff --git a/.github/spellcheck/finalizer.dic b/.github/spellcheck/finalizer.dic index f79502dd..ba797b36 100644 --- a/.github/spellcheck/finalizer.dic +++ b/.github/spellcheck/finalizer.dic @@ -1,4 +1,5 @@ 42 +args ABI vlog L2 diff --git a/Cargo.lock b/Cargo.lock index 6a5fbc08..857c89f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1439,8 +1439,6 @@ dependencies = [ "client", "ethers", "futures", - "pretty_assertions", - "serde", "serde_json", "sqlx", "storage", diff --git a/README.md b/README.md index e9c98cad..574687a9 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,6 @@ Deployment is done by deploying a dockerized image of the service. | `BATCH_FINALIZATION_GAS_LIMIT` | The gas limit of the finalization of the whole batch in a call to `finalizeWithdrawals` in Withdrawal Finalizer Contract | | `WITHDRAWAL_FINALIZER_ACCOUNT_PRIVATE_KEY` | The private key of the account that is going to be submit finalization transactions | | `TX_RETRY_TIMEOUT_SECS` | Number of seconds to wait for a potentially stuck finalization transaction before readjusting its fees | -| `TOKENS_TO_FINALIZE` | Configures the sets of tokens this instance of finalizer will finalize. It may be configured as a whitelist, a blacklist, a wildcard or completely disable any finalization. For more info see below. | | `FINALIZE_ETH_TOKEN` | (Optional) Configure, whether the Ethereum withdrawal events should be monitored. Useful to turn off for custom bridges that are only interested in a particular ERC20 token and have nothing to do with main Ethereum withdrawals | | `CUSTOM_TOKEN_DEPLOYER_ADDRESSES` | (Optional) Normally ERC20 tokens are deployed by the bridge contract. However, in custom cases it may be necessary to override that behavior with a custom set of addresses that have deployed tokens | | `CUSTOM_TOKEN_ADDRESSES` | (Optional) Adds a predefined list of tokens to finalize. May be useful in case of custom bridge setups when the regular technique of finding token deployments does not work. | @@ -62,20 +61,6 @@ The configuration structure describing the service config can be found in [`conf ** more about zkSync contracts can be found [here](https://github.com/matter-labs/era-contracts/blob/main/docs/Overview.md) -## Configuring Tokens to finalize. - -It may be handy to limit a set of tokens the Finalizer is finalizing. This -configuration may be specified by setting a rule in the `TOKENS_TO_FINALIZE` value. -If this environment variable is not set then by default Finalizer will only finalize -ETH token (`0x000...0800a`). - -You may specify `All`, `None`, `BlackList` or `WhiteList` as json documents: - -1. `TOKENS_TO_FINALIZE = '"All"'` - Finalize everything -1. `TOKENS_TO_FINALIZE = '"None"'` - Finalize nothing -1. `TOKENS_TO_FINALIZE = '{ "WhiteList":[ "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" ] }'` - Finalize only these tokens -1. `TOKENS_TO_FINALIZE = '{ "BlackList":[ "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" ] }'` - Finalize all tokens but these - ## Deploying the finalizer smart contract The finalizer smart contract needs to reference the addresses of the diamond proxy contract and l1 erc20 proxy contract. diff --git a/bin/withdrawal-finalizer/src/config.rs b/bin/withdrawal-finalizer/src/config.rs index 5b7912c0..6cf88923 100644 --- a/bin/withdrawal-finalizer/src/config.rs +++ b/bin/withdrawal-finalizer/src/config.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use envconfig::Envconfig; use ethers::types::Address; -use finalizer::{AddrList, TokenList}; +use finalizer::AddrList; use serde::{Deserialize, Serialize}; use url::Url; @@ -66,9 +66,6 @@ pub struct Config { #[envconfig(from = "TX_RETRY_TIMEOUT_SECS")] pub tx_retry_timeout: usize, - #[envconfig(from = "TOKENS_TO_FINALIZE")] - pub tokens_to_finalize: Option, - #[envconfig(from = "FINALIZE_ETH_TOKEN")] pub finalize_eth_token: Option, @@ -86,6 +83,9 @@ pub struct Config { #[envconfig(from = "ETH_FINALIZATION_THRESHOLD")] pub eth_finalization_threshold: Option, + + #[envconfig(from = "ONLY_L1_RECIPIENTS")] + pub only_l1_recipients: Option, } #[derive(Deserialize, Serialize, Debug, Eq, PartialEq)] diff --git a/bin/withdrawal-finalizer/src/main.rs b/bin/withdrawal-finalizer/src/main.rs index fcaed761..f39ff0b7 100644 --- a/bin/withdrawal-finalizer/src/main.rs +++ b/bin/withdrawal-finalizer/src/main.rs @@ -304,9 +304,9 @@ async fn main() -> Result<()> { l1_bridge, config.tx_retry_timeout, finalizer_account_address, - config.tokens_to_finalize.unwrap_or_default(), meter_withdrawals, eth_finalization_threshold, + config.only_l1_recipients.map(|v| v.0.into_iter().collect()), ); let finalizer_handle = tokio::spawn(finalizer.run(client_l2)); diff --git a/chain-events/src/l2_events.rs b/chain-events/src/l2_events.rs index 20ff1a1f..132771c6 100644 --- a/chain-events/src/l2_events.rs +++ b/chain-events/src/l2_events.rs @@ -463,8 +463,7 @@ impl L2EventsListener { { if let (Some(tx_hash), Some(block_number)) = (log.transaction_hash, log.block_number) { match l2_event { - L2Events::BridgeBurn(BridgeBurnFilter { amount, .. }) - | L2Events::Withdrawal(WithdrawalFilter { amount, .. }) => { + L2Events::BridgeBurn(BridgeBurnFilter { amount, .. }) => { CHAIN_EVENTS_METRICS.withdrawal_events.inc(); let we = WithdrawalEvent { @@ -472,6 +471,28 @@ impl L2EventsListener { block_number: block_number.as_u64(), token: log.address, amount: *amount, + l1_receiver: None, + }; + let event = we.into(); + tracing::info!("sending withdrawal event {event:?}"); + sender + .send(event) + .await + .map_err(|_| Error::ChannelClosing)?; + } + L2Events::Withdrawal(WithdrawalFilter { + amount, + l_1_receiver, + .. + }) => { + CHAIN_EVENTS_METRICS.withdrawal_events.inc(); + + let we = WithdrawalEvent { + tx_hash, + block_number: block_number.as_u64(), + token: log.address, + amount: *amount, + l1_receiver: Some(*l_1_receiver), }; let event = we.into(); tracing::info!("sending withdrawal event {event:?}"); diff --git a/client/src/zksync_types.rs b/client/src/zksync_types.rs index 8cf8043d..d653d9b1 100644 --- a/client/src/zksync_types.rs +++ b/client/src/zksync_types.rs @@ -188,6 +188,9 @@ pub struct WithdrawalEvent { /// The amount transferred. pub amount: U256, + + /// Address on L1 that will receive this withdrawal. + pub l1_receiver: Option
, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/finalizer/Cargo.toml b/finalizer/Cargo.toml index f174b219..c6f1afb2 100644 --- a/finalizer/Cargo.toml +++ b/finalizer/Cargo.toml @@ -14,13 +14,9 @@ sqlx = { workspace = true, features = ["postgres", "runtime-tokio-rustls"] } tokio = { workspace = true, features = ["macros"] } tracing = { workspace = true } vise = { workspace = true } -serde = { workspace = true } serde_json = { workspace = true } client = { workspace = true } storage = { workspace = true } tx-sender = { workspace = true } withdrawals-meterer = { workspace = true } - -[dev-dependencies] -pretty_assertions = { workspace = true } diff --git a/finalizer/src/lib.rs b/finalizer/src/lib.rs index 9445dd42..a5264821 100644 --- a/finalizer/src/lib.rs +++ b/finalizer/src/lib.rs @@ -14,7 +14,6 @@ use ethers::{ types::{H256, U256}, }; use futures::TryFutureExt; -use serde::Deserialize; use sqlx::PgPool; use client::{ @@ -45,34 +44,6 @@ const OUT_OF_FUNDS_BACKOFF: Duration = Duration::from_secs(10); /// Backoff period if one of the loop iterations has failed. const LOOP_ITERATION_ERROR_BACKOFF: Duration = Duration::from_secs(5); -/// An `enum` that defines a set of tokens that Finalizer finalizes. -#[derive(Deserialize, Debug, Eq, PartialEq)] -pub enum TokenList { - /// Finalize all known tokens - All, - /// Finalize nothing - None, - /// Finalize everything but these tokens, this is a blacklist. - BlackList(Vec
), - /// Finalize nothing but these tokens, this is a whitelist. - WhiteList(Vec
), -} - -impl Default for TokenList { - fn default() -> Self { - Self::All - } -} - -impl FromStr for TokenList { - type Err = serde_json::Error; - - fn from_str(s: &str) -> std::result::Result { - let res = serde_json::from_str(s)?; - Ok(res) - } -} - /// A newtype that represents a set of addresses in JSON format. #[derive(Debug, Eq, PartialEq)] pub struct AddrList(pub Vec
); @@ -102,8 +73,8 @@ pub struct Finalizer { tx_retry_timeout: Duration, account_address: Address, withdrawals_meterer: Option, - token_list: TokenList, eth_threshold: Option, + only_l1_recipients: Option>, } const NO_NEW_WITHDRAWALS_BACKOFF: Duration = Duration::from_secs(5); @@ -131,9 +102,9 @@ where l1_bridge: IL1Bridge, tx_retry_timeout: usize, account_address: Address, - token_list: TokenList, meter_withdrawals: bool, eth_threshold: Option, + only_l1_recipients: Option>, ) -> Self { let withdrawals_meterer = meter_withdrawals.then_some(WithdrawalsMeter::new( pgpool.clone(), @@ -142,8 +113,6 @@ where let tx_fee_limit = ethers::utils::parse_ether(TX_FEE_LIMIT) .expect("{TX_FEE_LIMIT} ether is a parsable amount; qed"); - tracing::info!("finalizing tokens {token_list:?}"); - Self { pgpool, one_withdrawal_gas_limit, @@ -158,8 +127,8 @@ where tx_retry_timeout: Duration::from_secs(tx_retry_timeout as u64), account_address, withdrawals_meterer, - token_list, eth_threshold, + only_l1_recipients, } } @@ -352,35 +321,13 @@ where async fn loop_iteration(&mut self) -> Result<()> { tracing::debug!("begin iteration of the finalizer loop"); - let try_finalize_these = match &self.token_list { - TokenList::All => { - storage::withdrawals_to_finalize( - &self.pgpool, - self.query_db_pagination_limit, - self.eth_threshold, - ) - .await? - } - TokenList::WhiteList(w) => { - storage::withdrawals_to_finalize_with_whitelist( - &self.pgpool, - self.query_db_pagination_limit, - w, - self.eth_threshold, - ) - .await? - } - TokenList::BlackList(b) => { - storage::withdrawals_to_finalize_with_blacklist( - &self.pgpool, - self.query_db_pagination_limit, - b, - self.eth_threshold, - ) - .await? - } - TokenList::None => return Ok(()), - }; + let try_finalize_these = storage::withdrawals_to_finalize( + &self.pgpool, + self.query_db_pagination_limit, + self.eth_threshold, + self.only_l1_recipients.as_deref(), + ) + .await?; tracing::debug!("trying to finalize these {try_finalize_these:?}"); @@ -649,72 +596,3 @@ where Ok(()) } - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use crate::AddrList; - - use super::TokenList; - use ethers::abi::Address; - use pretty_assertions::assert_eq; - - #[test] - fn tokens_list_de() { - let all = "\"All\""; - - let none = "\"None\""; - - let all: TokenList = serde_json::from_str(all).unwrap(); - assert_eq!(all, TokenList::All); - - let none: TokenList = serde_json::from_str(none).unwrap(); - assert_eq!(none, TokenList::None); - - let black = r#" - { - "BlackList":[ - "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" - ] - } - "#; - - let usdc_addr: Address = "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" - .parse() - .unwrap(); - - let blocked_usdc: TokenList = serde_json::from_str(black).unwrap(); - assert_eq!(blocked_usdc, TokenList::BlackList(vec![usdc_addr])); - - let white = r#" - { - "WhiteList":[ - "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" - ] - } - "#; - - let allowed_usdc: TokenList = serde_json::from_str(white).unwrap(); - assert_eq!(allowed_usdc, TokenList::WhiteList(vec![usdc_addr])); - } - - #[test] - fn addr_list_de() { - let addr_1: Address = "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4" - .parse() - .unwrap(); - let addr_2: Address = "0x1820495E7d1B8BA82B706FB972d2A2B8282023d0" - .parse() - .unwrap(); - - let addr_list = r#"[ - "0x3355df6D4c9C3035724Fd0e3914dE96A5a83aaf4", - "0x1820495E7d1B8BA82B706FB972d2A2B8282023d0" - ]"#; - - let list: AddrList = AddrList::from_str(addr_list).unwrap(); - - assert_eq!(list, AddrList(vec![addr_1, addr_2])); - } -} diff --git a/storage/.sqlx/query-1439a51cb9304493287a9c54c13e59ab42e1511a38989dfc83f0a240d85a511a.json b/storage/.sqlx/query-1439a51cb9304493287a9c54c13e59ab42e1511a38989dfc83f0a240d85a511a.json deleted file mode 100644 index 3f809610..00000000 --- a/storage/.sqlx/query-1439a51cb9304493287a9c54c13e59ab42e1511a38989dfc83f0a240d85a511a.json +++ /dev/null @@ -1,41 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n l2_to_l1_events.l1_token_addr,\n l2_to_l1_events.amount,\n withdrawals.tx_hash,\n finalization_data.finalization_tx\n FROM\n l2_to_l1_events\n JOIN finalization_data ON\n finalization_data.l1_batch_number = l2_to_l1_events.l2_block_number\n AND finalization_data.l2_tx_number_in_block = l2_to_l1_events.tx_number_in_block\n JOIN withdrawals ON\n withdrawals.id = finalization_data.withdrawal_id\n WHERE l2_to_l1_events.to_address = $1\n ORDER BY l2_to_l1_events.l2_block_number DESC\n LIMIT $2\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "l1_token_addr", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "amount", - "type_info": "Numeric" - }, - { - "ordinal": 2, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 3, - "name": "finalization_tx", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Bytea", - "Int8" - ] - }, - "nullable": [ - false, - false, - false, - true - ] - }, - "hash": "1439a51cb9304493287a9c54c13e59ab42e1511a38989dfc83f0a240d85a511a" -} diff --git a/storage/.sqlx/query-21ed1c0b070f4929f990e1810e682b4099dad6c4b65d951e3534e2168e7ad79e.json b/storage/.sqlx/query-21ed1c0b070f4929f990e1810e682b4099dad6c4b65d951e3534e2168e7ad79e.json deleted file mode 100644 index 6a167800..00000000 --- a/storage/.sqlx/query-21ed1c0b070f4929f990e1810e682b4099dad6c4b65d951e3534e2168e7ad79e.json +++ /dev/null @@ -1,77 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n w.tx_hash,\n w.event_index_in_tx,\n withdrawal_id,\n finalization_data.l2_block_number,\n l1_batch_number,\n l2_message_index,\n l2_tx_number_in_block,\n message,\n sender,\n proof\n FROM\n finalization_data\n JOIN withdrawals w ON finalization_data.withdrawal_id = w.id\n WHERE\n finalization_tx IS NULL\n AND failed_finalization_attempts < 3\n AND finalization_data.l2_block_number <= COALESCE(\n (\n SELECT\n MAX(l2_block_number)\n FROM\n l2_blocks\n WHERE\n execute_l1_block_number IS NOT NULL\n ),\n 1\n )\n AND (\n last_finalization_attempt IS NULL\n OR\n last_finalization_attempt < NOW() - INTERVAL '1 minutes'\n )\n AND (\n CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $2\n ELSE TRUE\n END\n )\n LIMIT\n $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "event_index_in_tx", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "withdrawal_id", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "l2_block_number", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "l1_batch_number", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "l2_message_index", - "type_info": "Int4" - }, - { - "ordinal": 6, - "name": "l2_tx_number_in_block", - "type_info": "Int2" - }, - { - "ordinal": 7, - "name": "message", - "type_info": "Bytea" - }, - { - "ordinal": 8, - "name": "sender", - "type_info": "Bytea" - }, - { - "ordinal": 9, - "name": "proof", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8", - "Numeric" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "21ed1c0b070f4929f990e1810e682b4099dad6c4b65d951e3534e2168e7ad79e" -} diff --git a/storage/.sqlx/query-4a3f009964fdf43b3bd09d98b4ef736dacfb4b7e815127c0b95ef291e05df3e6.json b/storage/.sqlx/query-4a3f009964fdf43b3bd09d98b4ef736dacfb4b7e815127c0b95ef291e05df3e6.json new file mode 100644 index 00000000..7be68470 --- /dev/null +++ b/storage/.sqlx/query-4a3f009964fdf43b3bd09d98b4ef736dacfb4b7e815127c0b95ef291e05df3e6.json @@ -0,0 +1,77 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n w.tx_hash,\n w.event_index_in_tx,\n withdrawal_id,\n finalization_data.l2_block_number,\n l1_batch_number,\n l2_message_index,\n l2_tx_number_in_block,\n message,\n sender,\n proof\n FROM\n finalization_data\n JOIN withdrawals w ON finalization_data.withdrawal_id = w.id\n WHERE\n finalization_tx IS NULL\n AND\n failed_finalization_attempts < 3\n AND\n finalization_data.l2_block_number <= COALESCE(\n (\n SELECT\n MAX(l2_block_number)\n FROM\n l2_blocks\n WHERE\n execute_l1_block_number IS NOT NULL\n ),\n 1\n )\n AND\n (\n last_finalization_attempt IS NULL\n OR\n last_finalization_attempt < NOW() - INTERVAL '1 minutes'\n )\n AND\n (\n CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $2\n ELSE TRUE\n END\n )\n limit $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "event_index_in_tx", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "withdrawal_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "l2_block_number", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 5, + "name": "l2_message_index", + "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "l2_tx_number_in_block", + "type_info": "Int2" + }, + { + "ordinal": 7, + "name": "message", + "type_info": "Bytea" + }, + { + "ordinal": 8, + "name": "sender", + "type_info": "Bytea" + }, + { + "ordinal": 9, + "name": "proof", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8", + "Numeric" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "4a3f009964fdf43b3bd09d98b4ef736dacfb4b7e815127c0b95ef291e05df3e6" +} diff --git a/storage/.sqlx/query-78b20c983b951e668b255f46c1f68fb4dc41f5e0fb9d4b55cc13a7c5210963de.json b/storage/.sqlx/query-78b20c983b951e668b255f46c1f68fb4dc41f5e0fb9d4b55cc13a7c5210963de.json deleted file mode 100644 index 460f757e..00000000 --- a/storage/.sqlx/query-78b20c983b951e668b255f46c1f68fb4dc41f5e0fb9d4b55cc13a7c5210963de.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n w.tx_hash,\n w.event_index_in_tx,\n withdrawal_id,\n finalization_data.l2_block_number,\n l1_batch_number,\n l2_message_index,\n l2_tx_number_in_block,\n message,\n sender,\n proof\n FROM\n finalization_data\n JOIN withdrawals w ON finalization_data.withdrawal_id = w.id\n WHERE\n finalization_tx IS NULL\n AND failed_finalization_attempts < 3\n AND finalization_data.l2_block_number <= COALESCE(\n (\n SELECT\n MAX(l2_block_number)\n FROM\n l2_blocks\n WHERE\n execute_l1_block_number IS NOT NULL\n ),\n 1\n )\n AND w.token NOT IN (SELECT * FROM UNNEST (\n $2 :: BYTEA []\n ))\n AND (\n CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $3\n ELSE TRUE\n END\n )\n LIMIT\n $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "event_index_in_tx", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "withdrawal_id", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "l2_block_number", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "l1_batch_number", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "l2_message_index", - "type_info": "Int4" - }, - { - "ordinal": 6, - "name": "l2_tx_number_in_block", - "type_info": "Int2" - }, - { - "ordinal": 7, - "name": "message", - "type_info": "Bytea" - }, - { - "ordinal": 8, - "name": "sender", - "type_info": "Bytea" - }, - { - "ordinal": 9, - "name": "proof", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8", - "ByteaArray", - "Numeric" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "78b20c983b951e668b255f46c1f68fb4dc41f5e0fb9d4b55cc13a7c5210963de" -} diff --git a/storage/.sqlx/query-8c331d6438925f2ebe7749d5cc5bb5f6b7a8e177692896d4d9fe9db129defb28.json b/storage/.sqlx/query-8c331d6438925f2ebe7749d5cc5bb5f6b7a8e177692896d4d9fe9db129defb28.json deleted file mode 100644 index 794981bd..00000000 --- a/storage/.sqlx/query-8c331d6438925f2ebe7749d5cc5bb5f6b7a8e177692896d4d9fe9db129defb28.json +++ /dev/null @@ -1,78 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n w.tx_hash,\n w.event_index_in_tx,\n withdrawal_id,\n finalization_data.l2_block_number,\n l1_batch_number,\n l2_message_index,\n l2_tx_number_in_block,\n message,\n sender,\n proof\n FROM\n finalization_data\n JOIN withdrawals w ON finalization_data.withdrawal_id = w.id\n WHERE\n finalization_tx IS NULL\n AND failed_finalization_attempts < 3\n AND finalization_data.l2_block_number <= COALESCE(\n (\n SELECT\n MAX(l2_block_number)\n FROM\n l2_blocks\n WHERE\n execute_l1_block_number IS NOT NULL\n ),\n 1\n )\n AND w.token IN (SELECT * FROM UNNEST (\n $2 :: BYTEA []\n ))\n AND (\n CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $3\n ELSE TRUE\n END\n )\n LIMIT\n $1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "event_index_in_tx", - "type_info": "Int4" - }, - { - "ordinal": 2, - "name": "withdrawal_id", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "l2_block_number", - "type_info": "Int8" - }, - { - "ordinal": 4, - "name": "l1_batch_number", - "type_info": "Int8" - }, - { - "ordinal": 5, - "name": "l2_message_index", - "type_info": "Int4" - }, - { - "ordinal": 6, - "name": "l2_tx_number_in_block", - "type_info": "Int2" - }, - { - "ordinal": 7, - "name": "message", - "type_info": "Bytea" - }, - { - "ordinal": 8, - "name": "sender", - "type_info": "Bytea" - }, - { - "ordinal": 9, - "name": "proof", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Int8", - "ByteaArray", - "Numeric" - ] - }, - "nullable": [ - false, - false, - false, - false, - false, - false, - false, - false, - false, - false - ] - }, - "hash": "8c331d6438925f2ebe7749d5cc5bb5f6b7a8e177692896d4d9fe9db129defb28" -} diff --git a/storage/.sqlx/query-93fe2d0516a63ed0e27ef4fe4b2d6d0ba5525e99e237584b7db0777b1a5a43fa.json b/storage/.sqlx/query-93fe2d0516a63ed0e27ef4fe4b2d6d0ba5525e99e237584b7db0777b1a5a43fa.json deleted file mode 100644 index 5125d6aa..00000000 --- a/storage/.sqlx/query-93fe2d0516a63ed0e27ef4fe4b2d6d0ba5525e99e237584b7db0777b1a5a43fa.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO\n withdrawals (\n tx_hash,\n l2_block_number,\n token,\n amount,\n event_index_in_tx\n )\n SELECT\n u.tx_hash,\n u.l2_block_number,\n u.token,\n u.amount,\n u.index_in_tx\n FROM\n unnest(\n $1 :: BYTEA [],\n $2 :: bigint [],\n $3 :: BYTEA [],\n $4 :: numeric [],\n $5 :: integer []\n ) AS u(\n tx_hash,\n l2_block_number,\n token,\n amount,\n index_in_tx\n ) ON CONFLICT (\n tx_hash,\n event_index_in_tx\n ) DO NOTHING\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "ByteaArray", - "Int8Array", - "ByteaArray", - "NumericArray", - "Int4Array" - ] - }, - "nullable": [] - }, - "hash": "93fe2d0516a63ed0e27ef4fe4b2d6d0ba5525e99e237584b7db0777b1a5a43fa" -} diff --git a/storage/.sqlx/query-aadc16b7622f1b2a49630abc878987c10cec7252cadb72b4dbc2831b81f1dd50.json b/storage/.sqlx/query-aadc16b7622f1b2a49630abc878987c10cec7252cadb72b4dbc2831b81f1dd50.json new file mode 100644 index 00000000..edb4693e --- /dev/null +++ b/storage/.sqlx/query-aadc16b7622f1b2a49630abc878987c10cec7252cadb72b4dbc2831b81f1dd50.json @@ -0,0 +1,19 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n withdrawals (\n tx_hash,\n l2_block_number,\n token,\n amount,\n event_index_in_tx,\n l1_receiver\n )\n SELECT\n u.tx_hash,\n u.l2_block_number,\n u.token,\n u.amount,\n u.index_in_tx,\n u.l1_receiver\n FROM\n unnest(\n $1 :: BYTEA [],\n $2 :: bigint [],\n $3 :: BYTEA [],\n $4 :: numeric [],\n $5 :: integer [],\n $6 :: BYTEA []\n ) AS u(\n tx_hash,\n l2_block_number,\n token,\n amount,\n index_in_tx,\n l1_receiver\n ) ON CONFLICT (\n tx_hash,\n event_index_in_tx\n ) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray", + "Int8Array", + "ByteaArray", + "NumericArray", + "Int4Array", + "ByteaArray" + ] + }, + "nullable": [] + }, + "hash": "aadc16b7622f1b2a49630abc878987c10cec7252cadb72b4dbc2831b81f1dd50" +} diff --git a/storage/.sqlx/query-bec6b9ad565f05d3587ce1db853e273f30a3e3ec884b0c1310aafbbf06a8f165.json b/storage/.sqlx/query-bec6b9ad565f05d3587ce1db853e273f30a3e3ec884b0c1310aafbbf06a8f165.json index 7278b7a0..cb3de789 100644 --- a/storage/.sqlx/query-bec6b9ad565f05d3587ce1db853e273f30a3e3ec884b0c1310aafbbf06a8f165.json +++ b/storage/.sqlx/query-bec6b9ad565f05d3587ce1db853e273f30a3e3ec884b0c1310aafbbf06a8f165.json @@ -37,6 +37,11 @@ "ordinal": 6, "name": "finalizable", "type_info": "Bool" + }, + { + "ordinal": 7, + "name": "l1_receiver", + "type_info": "Bytea" } ], "parameters": { @@ -51,7 +56,8 @@ false, false, false, - false + false, + true ] }, "hash": "bec6b9ad565f05d3587ce1db853e273f30a3e3ec884b0c1310aafbbf06a8f165" diff --git a/storage/.sqlx/query-d8b97db2eb9166b68386244231828ef898f7e7a29fd93a97229dd974a86f9d90.json b/storage/.sqlx/query-d8b97db2eb9166b68386244231828ef898f7e7a29fd93a97229dd974a86f9d90.json new file mode 100644 index 00000000..ae43d108 --- /dev/null +++ b/storage/.sqlx/query-d8b97db2eb9166b68386244231828ef898f7e7a29fd93a97229dd974a86f9d90.json @@ -0,0 +1,78 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n w.tx_hash,\n w.event_index_in_tx,\n withdrawal_id,\n finalization_data.l2_block_number,\n l1_batch_number,\n l2_message_index,\n l2_tx_number_in_block,\n message,\n sender,\n proof\n FROM\n finalization_data\n JOIN withdrawals w ON finalization_data.withdrawal_id = w.id\n WHERE\n finalization_tx IS NULL\n AND\n failed_finalization_attempts < 3\n AND\n finalization_data.l2_block_number <= COALESCE(\n (\n SELECT\n MAX(l2_block_number)\n FROM\n l2_blocks\n WHERE\n execute_l1_block_number IS NOT NULL\n ),\n 1\n )\n AND\n (\n last_finalization_attempt IS NULL\n OR\n last_finalization_attempt < NOW() - INTERVAL '1 minutes'\n )\n AND\n (\n CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $2\n ELSE TRUE\n END\n )\n AND l1_receiver = ANY($3) limit $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "event_index_in_tx", + "type_info": "Int4" + }, + { + "ordinal": 2, + "name": "withdrawal_id", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "l2_block_number", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "l1_batch_number", + "type_info": "Int8" + }, + { + "ordinal": 5, + "name": "l2_message_index", + "type_info": "Int4" + }, + { + "ordinal": 6, + "name": "l2_tx_number_in_block", + "type_info": "Int2" + }, + { + "ordinal": 7, + "name": "message", + "type_info": "Bytea" + }, + { + "ordinal": 8, + "name": "sender", + "type_info": "Bytea" + }, + { + "ordinal": 9, + "name": "proof", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Int8", + "Numeric", + "ByteaArray" + ] + }, + "nullable": [ + false, + false, + false, + false, + false, + false, + false, + false, + false, + false + ] + }, + "hash": "d8b97db2eb9166b68386244231828ef898f7e7a29fd93a97229dd974a86f9d90" +} diff --git a/storage/migrations/20240325203324_add-l1-receiver-address.down.sql b/storage/migrations/20240325203324_add-l1-receiver-address.down.sql new file mode 100644 index 00000000..79f03935 --- /dev/null +++ b/storage/migrations/20240325203324_add-l1-receiver-address.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS ix_withdrawals_l1_receiver; +ALTER TABLE withdrawals DROP COLUMN l1_receiver; diff --git a/storage/migrations/20240325203324_add-l1-receiver-address.up.sql b/storage/migrations/20240325203324_add-l1-receiver-address.up.sql new file mode 100644 index 00000000..4796f8a0 --- /dev/null +++ b/storage/migrations/20240325203324_add-l1-receiver-address.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE withdrawals ADD COLUMN l1_receiver BYTEA; +CREATE INDEX IF NOT EXISTS ix_withdrawals_l1_receiver ON withdrawals (l1_receiver); diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 927a4619..c572a21c 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -15,6 +15,7 @@ use client::{ }; mod error; +mod macro_utils; mod metrics; mod utils; @@ -263,6 +264,7 @@ pub async fn get_withdrawals(pool: &PgPool, ids: &[i64]) -> Result Resu let mut tokens = Vec::with_capacity(events.len()); let mut amounts = Vec::with_capacity(events.len()); let mut indices_in_tx = Vec::with_capacity(events.len()); + let mut l1_receivers = Vec::with_capacity(events.len()); events.iter().for_each(|sw| { tx_hashes.push(sw.event.tx_hash.0.to_vec()); @@ -292,6 +295,7 @@ pub async fn add_withdrawals(pool: &PgPool, events: &[StoredWithdrawal]) -> Resu tokens.push(sw.event.token.0.to_vec()); amounts.push(u256_to_big_decimal(sw.event.amount)); indices_in_tx.push(sw.index_in_tx as i32); + l1_receivers.push(sw.event.l1_receiver.map(|a| a.0.to_vec())); }); let latency = STORAGE_METRICS.call[&"add_withdrawals"].start(); @@ -304,27 +308,31 @@ pub async fn add_withdrawals(pool: &PgPool, events: &[StoredWithdrawal]) -> Resu l2_block_number, token, amount, - event_index_in_tx + event_index_in_tx, + l1_receiver ) SELECT u.tx_hash, u.l2_block_number, u.token, u.amount, - u.index_in_tx + u.index_in_tx, + u.l1_receiver FROM unnest( $1 :: BYTEA [], $2 :: bigint [], $3 :: BYTEA [], $4 :: numeric [], - $5 :: integer [] + $5 :: integer [], + $6 :: BYTEA [] ) AS u( tx_hash, l2_block_number, token, amount, - index_in_tx + index_in_tx, + l1_receiver ) ON CONFLICT ( tx_hash, event_index_in_tx @@ -335,6 +343,7 @@ pub async fn add_withdrawals(pool: &PgPool, events: &[StoredWithdrawal]) -> Resu &tokens, amounts.as_slice(), &indices_in_tx, + &l1_receivers as &[Option>], ) .execute(pool) .await?; @@ -741,233 +750,113 @@ pub async fn set_withdrawal_unfinalizable( Ok(()) } -/// Get the earliest withdrawals never attempted to be finalized before -pub async fn withdrawals_to_finalize_with_blacklist( - pool: &PgPool, - limit_by: u64, - token_blacklist: &[Address], - eth_threshold: Option, -) -> Result> { - let blacklist: Vec<_> = token_blacklist.iter().map(|a| a.0.to_vec()).collect(); - // if no threshold, query _all_ ethereum withdrawals since all of them are >= 0. - let eth_threshold = eth_threshold.unwrap_or(U256::zero()); - - let data = sqlx::query!( - " - SELECT - w.tx_hash, - w.event_index_in_tx, - withdrawal_id, - finalization_data.l2_block_number, - l1_batch_number, - l2_message_index, - l2_tx_number_in_block, - message, - sender, - proof - FROM - finalization_data - JOIN withdrawals w ON finalization_data.withdrawal_id = w.id - WHERE - finalization_tx IS NULL - AND failed_finalization_attempts < 3 - AND finalization_data.l2_block_number <= COALESCE( - ( - SELECT - MAX(l2_block_number) - FROM - l2_blocks - WHERE - execute_l1_block_number IS NOT NULL - ), - 1 - ) - AND w.token NOT IN (SELECT * FROM UNNEST ( - $2 :: BYTEA [] - )) - AND ( - CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $3 - ELSE TRUE - END - ) - LIMIT - $1 - ", - limit_by as i64, - &blacklist, - u256_to_big_decimal(eth_threshold), - ) - .fetch_all(pool) - .await? - .into_iter() - .map(|record| WithdrawalParams { - tx_hash: H256::from_slice(&record.tx_hash), - event_index_in_tx: record.event_index_in_tx as u32, - id: record.withdrawal_id as u64, - l2_block_number: record.l2_block_number as u64, - l1_batch_number: record.l1_batch_number.into(), - l2_message_index: record.l2_message_index as u32, - l2_tx_number_in_block: record.l2_tx_number_in_block as u16, - message: record.message.into(), - sender: Address::from_slice(&record.sender), - proof: bincode::deserialize(&record.proof) - .expect("storage contains data correctly serialized by bincode; qed"), - }) - .collect(); - - Ok(data) -} - -/// Get the earliest withdrawals never attempted to be finalized before -pub async fn withdrawals_to_finalize_with_whitelist( - pool: &PgPool, - limit_by: u64, - token_whitelist: &[Address], - eth_threshold: Option, -) -> Result> { - let whitelist: Vec<_> = token_whitelist.iter().map(|a| a.0.to_vec()).collect(); - // if no threshold, query _all_ ethereum withdrawals since all of them are >= 0. - let eth_threshold = eth_threshold.unwrap_or(U256::zero()); - - let data = sqlx::query!( - " - SELECT - w.tx_hash, - w.event_index_in_tx, - withdrawal_id, - finalization_data.l2_block_number, - l1_batch_number, - l2_message_index, - l2_tx_number_in_block, - message, - sender, - proof - FROM - finalization_data - JOIN withdrawals w ON finalization_data.withdrawal_id = w.id - WHERE - finalization_tx IS NULL - AND failed_finalization_attempts < 3 - AND finalization_data.l2_block_number <= COALESCE( - ( - SELECT - MAX(l2_block_number) - FROM - l2_blocks - WHERE - execute_l1_block_number IS NOT NULL - ), - 1 - ) - AND w.token IN (SELECT * FROM UNNEST ( - $2 :: BYTEA [] - )) - AND ( - CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $3 - ELSE TRUE - END - ) - LIMIT - $1 - ", - limit_by as i64, - &whitelist, - u256_to_big_decimal(eth_threshold), - ) - .fetch_all(pool) - .await? - .into_iter() - .map(|record| WithdrawalParams { - tx_hash: H256::from_slice(&record.tx_hash), - event_index_in_tx: record.event_index_in_tx as u32, - id: record.withdrawal_id as u64, - l2_block_number: record.l2_block_number as u64, - l1_batch_number: record.l1_batch_number.into(), - l2_message_index: record.l2_message_index as u32, - l2_tx_number_in_block: record.l2_tx_number_in_block as u16, - message: record.message.into(), - sender: Address::from_slice(&record.sender), - proof: bincode::deserialize(&record.proof) - .expect("storage contains data correctly serialized by bincode; qed"), - }) - .collect(); - - Ok(data) -} - /// Get the earliest withdrawals never attempted to be finalized before pub async fn withdrawals_to_finalize( pool: &PgPool, limit_by: u64, eth_threshold: Option, + only_l1_recipients: Option<&[Address]>, ) -> Result> { let latency = STORAGE_METRICS.call[&"withdrawals_to_finalize"].start(); // if no threshold, query _all_ ethereum withdrawals since all of them are >= 0. let eth_threshold = eth_threshold.unwrap_or(U256::zero()); + struct WithdrawalParamsInner { + tx_hash: Vec, + event_index_in_tx: i32, + withdrawal_id: i64, + l2_block_number: i64, + l1_batch_number: i64, + l2_message_index: i32, + l2_tx_number_in_block: i16, + message: Vec, + sender: Vec, + proof: Vec, + } - let data = sqlx::query!( - " - SELECT - w.tx_hash, - w.event_index_in_tx, - withdrawal_id, - finalization_data.l2_block_number, - l1_batch_number, - l2_message_index, - l2_tx_number_in_block, - message, - sender, - proof - FROM - finalization_data - JOIN withdrawals w ON finalization_data.withdrawal_id = w.id - WHERE - finalization_tx IS NULL - AND failed_finalization_attempts < 3 - AND finalization_data.l2_block_number <= COALESCE( - ( - SELECT - MAX(l2_block_number) - FROM - l2_blocks - WHERE - execute_l1_block_number IS NOT NULL + let query = match_query_as!( + WithdrawalParamsInner, + [ + r#" + SELECT + w.tx_hash, + w.event_index_in_tx, + withdrawal_id, + finalization_data.l2_block_number, + l1_batch_number, + l2_message_index, + l2_tx_number_in_block, + message, + sender, + proof + FROM + finalization_data + JOIN withdrawals w ON finalization_data.withdrawal_id = w.id + WHERE + finalization_tx IS NULL + AND + failed_finalization_attempts < 3 + AND + finalization_data.l2_block_number <= COALESCE( + ( + SELECT + MAX(l2_block_number) + FROM + l2_blocks + WHERE + execute_l1_block_number IS NOT NULL + ), + 1 + ) + AND + ( + last_finalization_attempt IS NULL + OR + last_finalization_attempt < NOW() - INTERVAL '1 minutes' + ) + AND + ( + CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $2 + ELSE TRUE + END + ) + "#, + _ // Maybe filter by l1 receiver + ], + match (only_l1_recipients) { + Some(receivers) => ( + "AND l1_receiver = ANY($3) limit $1"; + limit_by as i64, + u256_to_big_decimal(eth_threshold), + &receivers.iter() + .map(Address::as_bytes) + .collect::>() as &[&[u8]] ), - 1 - ) - AND ( - last_finalization_attempt IS NULL - OR - last_finalization_attempt < NOW() - INTERVAL '1 minutes' - ) - AND ( - CASE WHEN token = decode('000000000000000000000000000000000000800A', 'hex') THEN amount >= $2 - ELSE TRUE - END - ) - LIMIT - $1 - ", - limit_by as i64, - u256_to_big_decimal(eth_threshold), - ) - .fetch_all(pool) - .await? - .into_iter() - .map(|record| WithdrawalParams { - tx_hash: H256::from_slice(&record.tx_hash), - event_index_in_tx: record.event_index_in_tx as u32, - id: record.withdrawal_id as u64, - l2_block_number: record.l2_block_number as u64, - l1_batch_number: record.l1_batch_number.into(), - l2_message_index: record.l2_message_index as u32, - l2_tx_number_in_block: record.l2_tx_number_in_block as u16, - message: record.message.into(), - sender: Address::from_slice(&record.sender), - proof: bincode::deserialize(&record.proof) - .expect("storage contains data correctly serialized by bincode; qed"), - }) - .collect(); + None => ( + "limit $1"; + limit_by as i64, + u256_to_big_decimal(eth_threshold) + ), + } + ); + + let data = query + .fetch_all(pool) + .await? + .into_iter() + .map(|record| WithdrawalParams { + tx_hash: H256::from_slice(&record.tx_hash), + event_index_in_tx: record.event_index_in_tx as u32, + id: record.withdrawal_id as u64, + l2_block_number: record.l2_block_number as u64, + l1_batch_number: record.l1_batch_number.into(), + l2_message_index: record.l2_message_index as u32, + l2_tx_number_in_block: record.l2_tx_number_in_block as u16, + message: record.message.into(), + sender: Address::from_slice(&record.sender), + proof: bincode::deserialize(&record.proof) + .expect("storage contains data correctly serialized by bincode; qed"), + }) + .collect(); latency.observe(); diff --git a/storage/src/macro_utils.rs b/storage/src/macro_utils.rs new file mode 100644 index 00000000..b5ad03bc --- /dev/null +++ b/storage/src/macro_utils.rs @@ -0,0 +1,148 @@ +/// Interpolates the provided DB query consisting of several or comma-separated parts, each of parts being a string literal +/// or `_`. `_` parts are substituted with the provided variables in the order of appearance. +/// +/// We use tail recursion and accumulate (possibly substituted) parts in an accumulator. This is because `query_as!` would not +/// work otherwise; its input must be fully expanded. +#[macro_export] +macro_rules! interpolate_query { + // Terminal clause: we have a final substitution. + (query_type: $query_type:ty; acc: $acc:expr; args: $($args:expr,)*; (_,) => $var:literal,) => { + sqlx::query_as!($query_type, $acc + $var, $($args,)*) + }; + // Terminal clause: we have a final query part. + (query_type: $query_type:ty; acc: $acc:expr; args: $($args:expr,)*; ($part:literal,) =>) => { + sqlx::query_as!($query_type, $acc + $part, $($args,)*) + }; + + // We have a non-terminal substitution. Substitute it with a `var`, add to the accumulator and recurse. + ( + query_type: $query_type:ty; + acc: $acc:expr; + args: $($args:expr,)*; + (_, $($other_parts:tt,)+) => $var:literal, $($other_vars:literal,)* + ) => { + interpolate_query!( + query_type: $query_type; + acc: $acc + $var; + args: $($args,)*; + ($($other_parts,)+) => $($other_vars,)* + ) + }; + // We have a non-terminal query part. Add it to the accumulator and recurse. + ( + query_type: $query_type:ty; + acc: $acc:expr; + args: $($args:expr,)*; + ($part:tt, $($other_parts:tt,)+) => $($vars:literal,)* + ) => { + interpolate_query!( + query_type: $query_type; + acc: $acc + $part; + args: $($args,)*; + ($($other_parts,)+) => $($vars,)* + ) + }; +} + +/// +/// Builds a set of statically compiled DB queries based on the provided condition. This allows to avoid copying similar queries +/// or making them dynamic (i.e., using `sqlx::query()` function etc.). +/// +/// The macro accepts 3 arguments: +/// +/// - Output type. Has same semantics as the type in `sqlx::query_as!` macro. +/// - Query parts, enclosed in `[]` brackets. Each part must be either a string literal, or a `_` placeholder. +/// - `match` expression. Each variant hand must return a `()`-enclosed comma-separated list of substitutions for the placeholder +/// query parts (in the order of their appearance in the query parts), then a semicolon `;`, then a list of arguments +/// for the query (may be empty; has same semantics as arguments for `sqlx::query!`). Each substitution must be a string literal. +/// The number of arguments may differ across variants (e.g., one of variants may introduce one or more additional args). +/// +/// See the crate code for examples of usage. +#[macro_export] +macro_rules! match_query_as { + ( + $query_type:ty, + [$($parts:tt),+], + match ($input:expr) { + $($variants:tt)* + } + ) => { + // We parse `variants` recursively and add parsed parts to an accumulator. + match_query_as!( + @inner + query_type: $query_type, + input: $input, + query_parts: ($($parts),+), + acc: (); + $($variants)* + ) + }; + + // Terminal clause: we've parsed all match variants. Now we need to expand into a `match` expression. + ( + @inner + query_type: $query_type:ty, + input: $input:expr, + // We surround token trees (`:tt` [designator]) by `()` so that they are delimited. We need token trees in the first place + // because it's one of the few forms of designators than can be matched against specific tokens or parsed further (e.g., + // as `expr`essions, `pat`terns etc.) during query expansion. + // + // [designator]: https://doc.rust-lang.org/rust-by-example/macros/designators.html + query_parts: ($($_parts:tt),+), // not used; parts are copied into each variant expansion + acc: ($($acc:tt)*); + ) => { + match_query_as!( + @expand + query_type: $query_type, + input: $input, + acc: ($($acc)*) + ) + }; + // Non-terminal clause: we have at least one variant left. We add the variant to the accumulator copying parts + // into it. Copying is necessary because Rust macros are not able to nest expansions of independently repeated vars + // (i.e., query parts and variants in this case). + ( + @inner + query_type: $query_type:ty, + input: $input:expr, + query_parts: ($($parts:tt),+), + acc: ($($acc:tt)*); + $p:pat => ($($clause:tt)*), + $($rest:tt)* + ) => { + match_query_as!( + @inner + query_type: $query_type, + input: $input, + query_parts: ($($parts),+), + acc: ($($acc)* $p => (parts: $($parts,)+) ($($clause)*)); + $($rest)* + ) + }; + // Expansion routine: match all variants, each with copied query parts, and expand them as a `match` expression + // with the corresponding variants. + ( + @expand + query_type: $query_type:ty, + input: $input:expr, + acc: ($( + $p:pat => (parts: $($parts:tt,)+) ($($substitutions:literal),+ ; $($args:expr),*) + )*) + ) => { + match ($input) { + $( + $p => { + // We need to specify `query` type (specifically, the 2nd type param in `Map`) so that all `match` variants + // return the same type. + let query: sqlx::query::Map<_, fn(_) -> _, _> = interpolate_query!( + query_type: $query_type; + acc: ""; + args: $($args,)*; + ($($parts,)+) => $($substitutions,)+ + ); + query + } + )* + } + }; +}