From 85af14467eb57fae6e593c29f9f092b592b61dd4 Mon Sep 17 00:00:00 2001 From: jurriaan Date: Tue, 26 Nov 2024 21:20:24 +0100 Subject: [PATCH 1/3] fix(hubble): replace tendermint-rpc library --- Cargo.lock | 3 +- hubble/Cargo.toml | 3 +- hubble/src/indexer/tm/block_handle.rs | 64 ++++--- hubble/src/indexer/tm/fetcher_client.rs | 232 +++++++++--------------- hubble/src/indexer/tm/mod.rs | 6 +- hubble/src/indexer/tm/provider.rs | 87 +++++---- lib/cometbft-rpc/src/lib.rs | 34 +++- lib/cometbft-rpc/src/rpc_types.rs | 33 +++- 8 files changed, 231 insertions(+), 231 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8b862ddbd..eda05f9360 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6079,6 +6079,7 @@ dependencies = [ "futures", "hex", "itertools 0.13.0", + "jsonrpsee", "lazy_static", "movement-light-client-types", "num-traits", @@ -6093,9 +6094,7 @@ dependencies = [ "serde-aux", "serde_json", "sqlx", - "tendermint", "tendermint-light-client-types", - "tendermint-rpc", "thiserror", "tikv-jemallocator", "time", diff --git a/hubble/Cargo.toml b/hubble/Cargo.toml index e51158dfbc..5caf6b1c84 100644 --- a/hubble/Cargo.toml +++ b/hubble/Cargo.toml @@ -41,6 +41,7 @@ ethereum-light-client-types = { workspace = true, features = ["serde"] } futures = { workspace = true, features = ["async-await"] } hex = { workspace = true } itertools = "0.13.0" +jsonrpsee = { workspace = true, features = ["tracing", "ws-client", "http-client"] } lazy_static = { workspace = true } movement-light-client-types = { workspace = true, features = ["proto", "serde"] } num-traits = "0.2.19" @@ -55,9 +56,7 @@ serde = { workspace = true, features = ["derive"] } serde-aux = "4.5.0" serde_json = { workspace = true } sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json", "bigdecimal"] } -tendermint = { workspace = true, features = ["std"] } tendermint-light-client-types = { workspace = true, features = ["proto"] } -tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] } thiserror = { workspace = true } time = { workspace = true, features = ["serde"] } tokio = { workspace = true, features = ["full"] } diff --git a/hubble/src/indexer/tm/block_handle.rs b/hubble/src/indexer/tm/block_handle.rs index 3fd49b9c95..ab70ff63fc 100644 --- a/hubble/src/indexer/tm/block_handle.rs +++ b/hubble/src/indexer/tm/block_handle.rs @@ -1,8 +1,12 @@ use axum::async_trait; -use color_eyre::eyre::Report; +use color_eyre::eyre::{eyre, Report}; +use cometbft_rpc::{ + rpc_types::{BlockMeta, BlockResponse, CommitResponse}, + types::types::{block_id::BlockId, header::Header}, +}; use futures::Stream; use sqlx::Postgres; -use tendermint::block::Meta; +use time::OffsetDateTime; use tracing::debug; use crate::indexer::{ @@ -21,33 +25,33 @@ use crate::indexer::{ #[derive(Clone)] pub struct BlockHeader { - pub block_id: tendermint::block::Id, - pub header: tendermint::block::Header, + pub block_id: BlockId, + pub header: Header, } -impl From for BlockHeader { - fn from(response: tendermint_rpc::endpoint::block::Response) -> Self { +impl From for BlockHeader { + fn from(block_response: BlockResponse) -> Self { BlockHeader { - block_id: response.block_id, - header: response.block.header, + block_id: block_response.block_id, + header: block_response.block.header, } } } -impl From for BlockHeader { - fn from(meta: tendermint::block::Meta) -> Self { +impl From for BlockHeader { + fn from(block_meta: BlockMeta) -> Self { BlockHeader { - block_id: meta.block_id, - header: meta.header, + block_id: block_meta.block_id, + header: block_meta.header, } } } -impl From for BlockHeader { - fn from(response: tendermint_rpc::endpoint::commit::Response) -> Self { +impl From for BlockHeader { + fn from(commit_response: CommitResponse) -> Self { BlockHeader { - block_id: response.signed_header.commit.block_id, - header: response.signed_header.header, + block_id: commit_response.signed_header.commit.block_id, + header: commit_response.signed_header.header, } } } @@ -55,19 +59,33 @@ impl From for BlockHeader { impl BlockReferenceProvider for BlockHeader { fn block_reference(&self) -> Result { Ok(BlockReference { - height: self.header.height.into(), - hash: self.block_id.hash.to_string(), - timestamp: self.header.time.into(), + height: self.header.height.inner() as u64, + hash: self + .block_id + .hash + .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? + .to_string(), + timestamp: OffsetDateTime::from_unix_timestamp_nanos( + self.header.time.as_unix_nanos() as i128 + ) + .map_err(|err| IndexerError::ProviderError(err.into()))?, }) } } -impl BlockReferenceProvider for Meta { +impl BlockReferenceProvider for BlockMeta { fn block_reference(&self) -> Result { Ok(BlockReference { - height: self.header.height.into(), - hash: self.block_id.hash.to_string(), - timestamp: self.header.time.into(), + height: self.header.height.inner() as u64, + hash: self + .block_id + .hash + .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? + .to_string(), + timestamp: OffsetDateTime::from_unix_timestamp_nanos( + self.header.time.as_unix_nanos() as i128 + ) + .map_err(|err| IndexerError::ProviderError(err.into()))?, }) } } diff --git a/hubble/src/indexer/tm/fetcher_client.rs b/hubble/src/indexer/tm/fetcher_client.rs index 2745d4f5ca..5b364e9719 100644 --- a/hubble/src/indexer/tm/fetcher_client.rs +++ b/hubble/src/indexer/tm/fetcher_client.rs @@ -1,23 +1,22 @@ -use std::fmt::Display; +use std::{error::Error, fmt::Display}; use axum::async_trait; use color_eyre::{ eyre::{eyre, Report}, Result, }; +use cometbft_rpc::{ + rpc_types::{BlockMeta, BlockResultsResponse, Order, TxResponse}, + JsonRpcError, +}; use futures::{ join, stream::{BoxStream, FuturesOrdered}, FutureExt, Stream, StreamExt, TryFutureExt, }; use itertools::Itertools; +use jsonrpsee::types::{error::INTERNAL_ERROR_CODE, ErrorObject}; use regex::Regex; -use tendermint::block::Meta; -use tendermint_rpc::{ - error::ErrorDetail, - query::{Condition, Query}, - Code, Error, Order, -}; use time::OffsetDateTime; use tokio::task::JoinSet; use tracing::{debug, info, info_span, trace, Instrument}; @@ -91,7 +90,7 @@ impl TmFetcherClient { pub fn handle_ok_fetching_metas( &self, - block_metas: Vec, + block_metas: Vec, fetch_mode: FetchMode, provider_id: RpcProviderId, ) -> BoxStream> { @@ -140,13 +139,7 @@ impl TmFetcherClient { &self, height: BlockHeight, provider_id: RpcProviderId, - ) -> Result< - ( - tendermint_rpc::endpoint::block_results::Response, - Vec, - ), - IndexerError, - > { + ) -> Result<(BlockResultsResponse, Vec), IndexerError> { debug!("{}: fetching block results", height); let block_results = self .provider @@ -167,18 +160,24 @@ impl TmFetcherClient { pub fn check_consistency( &self, provider_id: RpcProviderId, - block_results: &tendermint_rpc::endpoint::block_results::Response, - transactions_response: &[tendermint_rpc::endpoint::tx::Response], + block_results: &BlockResultsResponse, + transactions_response: &[TxResponse], ) -> Result<(), IndexerError> { let txs_event_count: usize = transactions_response .iter() .map(|tx| tx.tx_result.events.len()) .sum(); - let block_tx_event_count = block_results + let block_tx_event_count: usize = block_results .txs_results - .as_ref() - .map_or(0, |r| r.iter().map(|result| result.events.len()).sum()); + .iter() + .map(|tx_results| { + tx_results + .iter() + .map(|tx_result| tx_result.events.len()) + .sum::() + }) + .sum(); match txs_event_count == block_tx_event_count { true => Ok(()), @@ -195,20 +194,24 @@ impl TmFetcherClient { pub fn convert_to_pg_data( &self, block_header: &BlockHeader, - block_results: tendermint_rpc::endpoint::block_results::Response, - transactions_response: Vec, + block_results: BlockResultsResponse, + transactions_response: Vec, ) -> Result<(PgBlock, Vec, Vec), IndexerError> { let (block_id, header, block_reference) = ( - block_header.block_id, + block_header.block_id.clone(), block_header.header.clone(), block_header.block_reference()?, ); let pg_block = PgBlock { chain_id: self.chain_id, - hash: block_id.hash.to_string(), - height: header.height.value(), - time: header.time.into(), + hash: block_id + .hash + .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? + .to_string(), + height: header.height.inner() as u64, + time: OffsetDateTime::from_unix_timestamp_nanos(header.time.as_unix_nanos() as i128) + .map_err(|err| IndexerError::ProviderError(err.into()))?, data: serde_json::to_value(&header) .unwrap() .replace_escape_chars(), @@ -221,7 +224,7 @@ impl TmFetcherClient { let pg_transactions = transactions_response .into_iter() - .filter(|tx| tx.tx_result.code.is_ok()) + .filter(|tx| tx.tx_result.code == 0) // 0 == OK .map(|tx| { let transaction_hash = tx.hash.to_string(); let data = serde_json::to_value(&tx).unwrap().replace_escape_chars(); @@ -230,7 +233,7 @@ impl TmFetcherClient { if self .filter .as_ref() - .is_some_and(|filter| filter.is_match(event.kind.as_str())) + .is_some_and(|filter| filter.is_match(event.ty.as_str())) { block_index += 1; return None; @@ -283,7 +286,7 @@ impl TmFetcherClient { } pub fn handle_err_fetching_metas( - error: Error, + error: JsonRpcError, ) -> BoxStream<'static, Result> { futures::stream::once(async move { Err(error.into()) }).boxed() } @@ -296,42 +299,24 @@ impl TmFetcherClient { ) -> Result { debug!("{}: fetching", selection); - let block_header: Result, Error> = match selection { - BlockSelection::LastFinalized => self - .provider - .latest_block(provider_id) - .inspect_err(|e| debug!(?e, "error fetching latest block")) - .await - .map(|response| Some((response.provider_id, response.response.into()))), - BlockSelection::Height(height) => match self - .provider - .block(height, provider_id) - .inspect_err(|e| debug!(?e, "error fetching block at {}", height)) - .await - { - Ok(result) => Ok(Some((result.provider_id, result.response.into()))), - Err(err) => match err.detail() { - // TODO: cleanup - // The RPC will return an internal error on queries for blocks exceeding the current height. - // `is_height_exceeded_error` untangles the error and checks for this case. - ErrorDetail::Response(err_detail) => { - let inner = &err_detail.source; - let code = inner.code(); - let message = inner.data().unwrap_or_default(); - if matches!(code, Code::InternalError) - && (message.contains("must be less than or equal to") - || message.contains("could not find results for height")) - { - trace!("{}: no block: beyond tip error: {}", selection, message,); - Ok(None) - } else { - Err(err) - } - } - _ => Err(err), + let block_header: Result, JsonRpcError> = + match selection { + BlockSelection::LastFinalized => self + .provider + .latest_block(provider_id) + .inspect_err(|e| debug!(?e, "error fetching latest block")) + .await + .map(|response| Some((response.provider_id, response.response.into()))), + BlockSelection::Height(height) => match self + .provider + .block(height, provider_id) + .inspect_err(|e| debug!(?e, "error fetching block at {}", height)) + .await + { + Ok(result) => Ok(Some((result.provider_id, result.response.into()))), + Err(err) => Self::detect_reading_beyond_tip(err, &selection), }, - }, - }; + }; match block_header { Ok(Some((provider_id, header))) => { @@ -368,25 +353,39 @@ impl TmFetcherClient { } } + fn detect_reading_beyond_tip( + error: JsonRpcError, + selection: &BlockSelection, + ) -> Result, JsonRpcError> { + if let Some(source) = error.source() { + if let Some(error_object) = source.downcast_ref::() { + if let (INTERNAL_ERROR_CODE, Some(message)) = ( + error_object.code(), + error_object.data().map(|data| data.to_string()), + ) { + if message.contains("must be less than or equal to") + || message.contains("could not find results for height") + { + trace!("{}: no block: beyond tip error: {}", selection, message); + + return Ok(None); // we're reading beyond the tip + }; + }; + }; + }; + + Err(error) + } + async fn fetch_transactions_for_block( &self, height: BlockHeight, expected: impl Into>, provider_id: RpcProviderId, - ) -> Result, Report> { + ) -> Result, Report> { let expected = expected.into(); debug!("{}: fetching", height); - let query = Query { - event_type: None, - conditions: vec![Condition { - key: "tx.height".to_string(), - operation: tendermint_rpc::query::Operation::Eq( - tendermint_rpc::query::Operand::Unsigned(height), - ), - }], - }; - let mut txs = if let Some(expected) = expected { Vec::with_capacity(expected) } else { @@ -398,11 +397,11 @@ impl TmFetcherClient { let response = self .provider .tx_search( - query.clone(), + height, false, page, self.tx_search_max_page_size, - Order::Ascending, + Order::Asc, Some(provider_id), ) .await? @@ -449,73 +448,20 @@ pub trait BlockExt { fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec; } -impl BlockExt for tendermint_rpc::endpoint::block_results::Response { +impl BlockExt for BlockResultsResponse { fn events(self, chain_id: ChainId, block_hash: String, time: OffsetDateTime) -> Vec { - let block_height: i32 = self.height.value().try_into().unwrap(); - let begin_block_events = self - .begin_block_events - .unwrap_or_default() - .into_iter() - .map(|e| PgEvent { - chain_id, - block_hash: block_hash.clone(), - block_height: block_height as u64, - time, - data: serde_json::to_value(e).unwrap().replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - let end_block_events = self.end_block_events.into_iter().map(|e| PgEvent { - chain_id, - block_hash: block_hash.clone(), - block_height: block_height as u64, - time, - data: serde_json::to_value(e).unwrap().replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - let finalize_block_events = self.finalize_block_events.into_iter().map(|e| PgEvent { + let finalize_block_events = self.finalize_block_events.iter().map(|e| PgEvent { chain_id, block_hash: block_hash.clone(), - block_height: block_height as u64, + block_height: self.height, time, data: serde_json::to_value(e).unwrap().replace_escape_chars(), transaction_hash: None, transaction_index: None, block_index: 0, }); - let validator_updates = self.validator_updates.into_iter().map(|e| PgEvent { - chain_id, - block_hash: block_hash.clone(), - block_height: block_height as u64, - time, - data: serde_json::to_value(WithType::validator_update(e)) - .unwrap() - .replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - let consensus_param_updates = self.consensus_param_updates.into_iter().map(|e| PgEvent { - chain_id, - block_hash: block_hash.clone(), - block_height: block_height as u64, - time, - data: serde_json::to_value(WithType::consensus_param_update(e)) - .unwrap() - .replace_escape_chars(), - transaction_hash: None, - transaction_index: None, - block_index: 0, - }); - begin_block_events - .chain(end_block_events) - .chain(finalize_block_events) - .chain(validator_updates) - .chain(consensus_param_updates) + finalize_block_events .enumerate() .map(|(i, mut event)| { event.block_index = i as i32; @@ -533,22 +479,6 @@ pub struct WithType { inner: I, } -impl WithType { - fn validator_update(inner: I) -> Self { - WithType { - kind: "validator_update", - inner, - } - } - - fn consensus_param_update(inner: I) -> Self { - WithType { - kind: "consensus_param_update", - inner, - } - } -} - trait SerdeValueExt { fn replace_escape_chars(self) -> Self; } @@ -599,7 +529,7 @@ impl FetcherClient for TmFetcherClient { join_set: &mut JoinSet>, context: TmContext, ) -> Result { - let provider = Provider::new(context.rpc_urls, context.grpc_urls); + let provider = Provider::new(context.rpc_urls, context.grpc_urls).await?; info!("fetching chain-id from node"); let chain_id = provider diff --git a/hubble/src/indexer/tm/mod.rs b/hubble/src/indexer/tm/mod.rs index ce52f6e293..e7997fb471 100644 --- a/hubble/src/indexer/tm/mod.rs +++ b/hubble/src/indexer/tm/mod.rs @@ -1,5 +1,5 @@ use color_eyre::eyre::Report; -use tendermint_rpc::Error; +use cometbft_rpc::JsonRpcError; use crate::indexer::api::IndexerError; @@ -11,8 +11,8 @@ mod fetcher_client; mod postgres; mod provider; -impl From for IndexerError { - fn from(error: Error) -> Self { +impl From for IndexerError { + fn from(error: JsonRpcError) -> Self { Self::ProviderError(Report::from(error)) } } diff --git a/hubble/src/indexer/tm/provider.rs b/hubble/src/indexer/tm/provider.rs index 5ddf473e50..6abfb550ef 100644 --- a/hubble/src/indexer/tm/provider.rs +++ b/hubble/src/indexer/tm/provider.rs @@ -1,11 +1,21 @@ -use std::result::Result; +use std::{ + num::{NonZeroU32, NonZeroU64, NonZeroU8}, + result::Result, +}; use color_eyre::eyre::Report; +use cometbft_rpc::{ + rpc_types::{ + BlockResponse, BlockResultsResponse, BlockchainResponse, Order, StatusResponse, + TxSearchResponse, + }, + Client, JsonRpcError, +}; +use futures::future; use protos::ibc::{ core::client::v1::{QueryClientStateRequest, QueryClientStateResponse}, lightclients::wasm::v1::{QueryCodeRequest, QueryCodeResponse}, }; -use tendermint_rpc::{query::Query, Client, HttpClient, Order}; use tonic::Response; use unionlabs::aptos::block_info::BlockHeight; use url::Url; @@ -17,7 +27,7 @@ use crate::{ #[derive(Clone, Debug)] pub struct Provider { - pub rpc_client: RaceClient, + pub rpc_client: RaceClient, pub grpc_client: RaceClient, } @@ -86,26 +96,29 @@ impl From> for GrpcResult { } impl Provider { - pub fn new(rpc_urls: Vec, grpc_urls: Vec) -> Self { - Self { - rpc_client: RaceClient::new( - rpc_urls + pub async fn new(rpc_urls: Vec, grpc_urls: Vec) -> Result { + Ok(Self { + rpc_client: { + RaceClient::new( + future::join_all( + rpc_urls + .into_iter() + .map(|rpc_url| Client::new(rpc_url.as_str().to_owned())), + ) + .await .into_iter() - .map(|rpc_url| { - HttpClient::new(rpc_url.as_str()).expect("rpc-client can be created") - }) - .collect(), - ), + .collect::, _>>()?, + ) + }, grpc_client: RaceClient::new(grpc_urls.into_iter().map(GrpcClient::new).collect()), - } + }) } // RPC pub async fn status( &self, provider_id: Option, - ) -> Result, tendermint_rpc::error::Error> - { + ) -> Result, JsonRpcError> { self.rpc_client .race(provider_id.map(Into::into), |c| c.status()) .await @@ -117,13 +130,13 @@ impl Provider { min_inclusive: BlockHeight, max_inclusive: BlockHeight, provider_id: Option, - ) -> Result< - RpcResult, - tendermint_rpc::error::Error, - > { + ) -> Result, JsonRpcError> { self.rpc_client .race(provider_id.map(Into::into), |c| { - c.blockchain(min_inclusive as u32, max_inclusive as u32) + c.blockchain( + NonZeroU64::try_from(min_inclusive).expect("non-zero min"), + NonZeroU64::try_from(max_inclusive).expect("non-zero max"), + ) }) .await .map(Into::into) @@ -132,10 +145,9 @@ impl Provider { pub async fn latest_block( &self, provider_id: Option, - ) -> Result, tendermint_rpc::error::Error> - { + ) -> Result, JsonRpcError> { self.rpc_client - .race(provider_id.map(Into::into), |c| c.latest_block()) + .race(provider_id.map(Into::into), |c| c.block(None)) .await .map(Into::into) } @@ -144,10 +156,11 @@ impl Provider { &self, height: BlockHeight, provider_id: Option, - ) -> Result, tendermint_rpc::error::Error> - { + ) -> Result, JsonRpcError> { self.rpc_client - .race(provider_id.map(Into::into), |c| c.block(height as u32)) + .race(provider_id.map(Into::into), |c| { + c.block(Some(NonZeroU64::try_from(height).expect("non-zero height"))) + }) .await .map(Into::into) } @@ -156,13 +169,10 @@ impl Provider { &self, height: BlockHeight, provider_id: Option, - ) -> Result< - RpcResult, - tendermint_rpc::error::Error, - > { + ) -> Result, JsonRpcError> { self.rpc_client .race(provider_id.map(Into::into), |c| { - c.block_results(height as u32) + c.block_results(Some(NonZeroU64::try_from(height).expect("non-zero height"))) }) .await .map(Into::into) @@ -170,19 +180,22 @@ impl Provider { pub async fn tx_search( &self, - query: Query, + height: BlockHeight, prove: bool, page: u32, per_page: u8, order: Order, provider_id: Option, - ) -> Result< - RpcResult, - tendermint_rpc::error::Error, - > { + ) -> Result, JsonRpcError> { self.rpc_client .race(provider_id.map(Into::into), |c| { - c.tx_search(query.clone(), prove, page, per_page, order.clone()) + c.tx_search( + format!("tx.height={}", height), + prove, + NonZeroU32::try_from(page).expect("non-zero page"), + NonZeroU8::try_from(per_page).expect("non-zero per-page"), + order.clone(), + ) }) .await .map(Into::into) diff --git a/lib/cometbft-rpc/src/lib.rs b/lib/cometbft-rpc/src/lib.rs index 146e467e59..724f177645 100644 --- a/lib/cometbft-rpc/src/lib.rs +++ b/lib/cometbft-rpc/src/lib.rs @@ -25,8 +25,9 @@ use unionlabs::{ }; use crate::rpc_types::{ - AbciQueryResponse, AllValidatorsResponse, BlockResponse, BroadcastTxSyncResponse, - CommitResponse, Order, StatusResponse, TxResponse, TxSearchResponse, ValidatorsResponse, + AbciQueryResponse, AllValidatorsResponse, BlockResponse, BlockResultsResponse, + BlockchainResponse, BroadcastTxSyncResponse, CommitResponse, Order, StatusResponse, TxResponse, + TxSearchResponse, ValidatorsResponse, }; #[cfg(test)] @@ -200,6 +201,19 @@ impl Client { .await } + pub async fn blockchain( + &self, + min_height: NonZeroU64, + max_height: NonZeroU64, + ) -> Result { + self.inner + .request( + "blockchain", + (min_height.to_string(), max_height.to_string()), + ) + .await + } + pub async fn tx_search( &self, query: impl AsRef, @@ -244,14 +258,14 @@ impl Client { .await } - // pub async fn block_results( - // &self, - // height: Option>, - // ) -> Result { - // self.client - // .request("block_results", rpc_params![height.map(|x| x.to_string())]) - // .await - // } + pub async fn block_results( + &self, + height: Option, + ) -> Result { + self.inner + .request("block_results", rpc_params![height.map(|x| x.to_string())]) + .await + } } #[derive(Debug, Clone)] diff --git a/lib/cometbft-rpc/src/rpc_types.rs b/lib/cometbft-rpc/src/rpc_types.rs index 1588f6e9b5..5a974ffc68 100644 --- a/lib/cometbft-rpc/src/rpc_types.rs +++ b/lib/cometbft-rpc/src/rpc_types.rs @@ -1,12 +1,12 @@ use std::num::NonZeroU64; use cometbft_types::{ - abci::{exec_tx_result::ExecTxResult, response_query::QueryResponse}, + abci::{event::Event, exec_tx_result::ExecTxResult, response_query::QueryResponse}, crypto::public_key::PublicKey, p2p::default_node_info::DefaultNodeInfo, types::{ - block::Block, block_id::BlockId, signed_header::SignedHeader, tx_proof::TxProof, - validator::Validator, + block::Block, block_id::BlockId, header::Header, signed_header::SignedHeader, + tx_proof::TxProof, validator::Validator, }, }; use serde::{Deserialize, Serialize}; @@ -36,6 +36,25 @@ pub struct BlockResponse { pub block: Block, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BlockchainResponse { + #[serde(with = "::serde_utils::string")] + pub last_height: u64, + pub block_metas: Vec, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct BlockMeta { + pub block_id: BlockId, + #[serde(with = "::serde_utils::string")] + pub block_size: u64, + pub header: Header, + #[serde(with = "::serde_utils::string")] + pub num_txs: u64, +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct StatusResponse { @@ -137,6 +156,14 @@ pub struct TxSearchResponse { pub total_count: u32, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BlockResultsResponse { + #[serde(with = "::serde_utils::string")] + pub height: u64, + pub txs_results: Option>, + pub finalize_block_events: Option>, +} + #[derive(macros::Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(deny_unknown_fields)] pub struct BroadcastTxSyncResponse { From 9f75c28936c3fd81d3cb3eafaab2f7d9551c4b58 Mon Sep 17 00:00:00 2001 From: jurriaan Date: Fri, 29 Nov 2024 10:37:21 +0100 Subject: [PATCH 2/3] fix(hubble): replace as with try_into --- hubble/src/arb.rs | 8 ++-- hubble/src/bera.rs | 14 +++---- hubble/src/indexer/api.rs | 3 +- hubble/src/indexer/aptos/block_handle.rs | 42 ++++++++++++------- hubble/src/indexer/aptos/fetcher_client.rs | 20 +++++---- hubble/src/indexer/aptos/postgres.rs | 18 ++++---- hubble/src/indexer/eth/fetcher_client.rs | 14 ++++--- hubble/src/indexer/eth/postgres.rs | 17 ++++---- hubble/src/indexer/fetcher.rs | 9 ++-- hubble/src/indexer/finalizer.rs | 8 ++-- hubble/src/indexer/fixer.rs | 6 ++- hubble/src/indexer/postgres.rs | 49 +++++++++++++++------- hubble/src/indexer/tm/block_handle.rs | 8 ++-- hubble/src/indexer/tm/fetcher_client.rs | 19 +++++---- hubble/src/indexer/tm/postgres.rs | 23 ++++++---- hubble/src/scroll.rs | 4 +- hubble/src/token_list.rs | 4 +- 17 files changed, 161 insertions(+), 105 deletions(-) diff --git a/hubble/src/arb.rs b/hubble/src/arb.rs index d73a1d0954..950fee9b8a 100644 --- a/hubble/src/arb.rs +++ b/hubble/src/arb.rs @@ -154,7 +154,9 @@ impl Arb { let slot_offset_bytes = self .rollup_finalization_config .l1_next_node_num_slot_offset_bytes - .inner() as usize; + .inner() + .try_into() + .unwrap(); let raw_slot = self .l1_client .get_storage_at( @@ -183,13 +185,13 @@ impl Arb { impl Querier for Arb { async fn get_execution_height(&self, slot: i64) -> Result<(i64, i64)> { - let height = (|| self.execution_height_of_beacon_slot(slot as u64)) + let height = (|| self.execution_height_of_beacon_slot(slot.try_into().unwrap())) .retry( &ConstantBuilder::default() .with_delay(Duration::from_millis(500)) .with_max_times(60), ) .await?; - Ok((slot, height as i64)) + Ok((slot, height.try_into().unwrap())) } } diff --git a/hubble/src/bera.rs b/hubble/src/bera.rs index a5eda695c2..e3b0865a05 100644 --- a/hubble/src/bera.rs +++ b/hubble/src/bera.rs @@ -10,7 +10,7 @@ use color_eyre::{ }; use cometbft_rpc::{rpc_types::AbciQueryResponse, Client}; use tracing::info; -use unionlabs::encoding::DecodeAs; +use unionlabs::{bounded::BoundedI64, encoding::DecodeAs}; use crate::consensus::{Indexer, Querier}; @@ -64,11 +64,7 @@ impl Bera { .abci_query( "store/beacon/key", data, - Some( - (slot as i64 - 1) - .try_into() - .expect("converting slot to abci_query slot"), - ), + Some(BoundedI64::<1>::new(slot - 1).expect("converting slot to abci_query slot")), prove, ) .await?; @@ -108,14 +104,16 @@ impl Bera { impl Querier for Bera { async fn get_execution_height(&self, slot: i64) -> Result<(i64, i64)> { - let height = (|| self.execution_header_at_beacon_slot(slot as u64)) + let height = (|| self.execution_header_at_beacon_slot(slot.try_into().unwrap())) .retry( &ConstantBuilder::default() .with_delay(Duration::from_millis(500)) .with_max_times(60), ) .await? - .block_number as i64; + .block_number + .try_into() + .unwrap(); Ok((slot, height)) } } diff --git a/hubble/src/indexer/api.rs b/hubble/src/indexer/api.rs index 46e18679e6..a3e7fc685c 100644 --- a/hubble/src/indexer/api.rs +++ b/hubble/src/indexer/api.rs @@ -60,7 +60,8 @@ impl BlockRange { let range: Range = self.clone().into(); range.step_by(chunk_size).map(move |start_inclusive| { - let end_exclusive = (start_inclusive + chunk_size as u64).min(self.end_exclusive); + let chunk_size: u64 = chunk_size.try_into().unwrap(); + let end_exclusive = (start_inclusive + chunk_size).min(self.end_exclusive); (start_inclusive..end_exclusive).into() }) } diff --git a/hubble/src/indexer/aptos/block_handle.rs b/hubble/src/indexer/aptos/block_handle.rs index 7e85107dd1..91c01d0aa5 100644 --- a/hubble/src/indexer/aptos/block_handle.rs +++ b/hubble/src/indexer/aptos/block_handle.rs @@ -29,13 +29,12 @@ use crate::indexer::{ impl BlockReferenceProvider for Block { fn block_reference(&self) -> Result { + let block_timestamp: i128 = self.block_timestamp.0.into(); Ok(BlockReference { height: self.block_height.into(), hash: self.block_hash.to_string(), - timestamp: OffsetDateTime::from_unix_timestamp_nanos( - (self.block_timestamp.0 as i128) * 1000, - ) - .map_err(Report::from)?, + timestamp: OffsetDateTime::from_unix_timestamp_nanos(block_timestamp * 1000) + .map_err(Report::from)?, }) } } @@ -114,22 +113,33 @@ impl BlockHandle for AptosBlockHandle { if active_contracts.contains(&account_address.to_standard_string()) { Some(PgTransaction { internal_chain_id: self.internal_chain_id, - height: self.reference.height as i64, - version: transaction.info.version.0 as i64, + height: self.reference.height.try_into().unwrap(), + version: transaction.info.version.0.try_into().unwrap(), transaction_hash: transaction.info.hash.to_string(), - transaction_index: transaction_index as i64, + transaction_index: transaction_index.try_into().unwrap(), events: transaction .events .into_iter() .enumerate() .map(|(transaction_event_index, event)| PgEvent { internal_chain_id: self.internal_chain_id, - height: self.reference.height as i64, - version: transaction.info.version.0 as i64, - index: event_index_iter.next().unwrap() as i64, - transaction_event_index: transaction_event_index as i64, - sequence_number: event.sequence_number.0 as i64, - creation_number: event.guid.creation_number.0 as i64, + height: self.reference.height.try_into().unwrap(), + version: transaction.info.version.0.try_into().unwrap(), + index: event_index_iter.next().unwrap(), + transaction_event_index: transaction_event_index + .try_into() + .unwrap(), + sequence_number: event + .sequence_number + .0 + .try_into() + .unwrap(), + creation_number: event + .guid + .creation_number + .0 + .try_into() + .unwrap(), account_address: event .guid .account_address @@ -165,11 +175,11 @@ impl BlockHandle for AptosBlockHandle { tx, PgBlock { internal_chain_id: self.internal_chain_id, - height: self.reference.height as i64, + height: self.reference.height.try_into().unwrap(), block_hash: self.reference.hash.clone(), timestamp: self.reference.timestamp, - first_version: block.first_version.0 as i64, // TODO: check if .0 is ok - last_version: block.last_version.0 as i64, + first_version: block.first_version.0.try_into().unwrap(), + last_version: block.last_version.0.try_into().unwrap(), transactions, }, ) diff --git a/hubble/src/indexer/aptos/fetcher_client.rs b/hubble/src/indexer/aptos/fetcher_client.rs index 159742078f..78e401b2a7 100644 --- a/hubble/src/indexer/aptos/fetcher_client.rs +++ b/hubble/src/indexer/aptos/fetcher_client.rs @@ -137,17 +137,23 @@ impl AptosFetcherClient { let complete_start_inclusive: BlockHeight = block.first_version.into(); let complete_end_inclusive: BlockHeight = block.last_version.into(); + let tx_search_max_page_size: u64 = self.tx_search_max_page_size.into(); - let mut result = - Vec::with_capacity((complete_end_inclusive + 1 - complete_start_inclusive) as usize); + let mut result = Vec::with_capacity( + (complete_end_inclusive + 1 - complete_start_inclusive) + .try_into() + .unwrap(), + ); for chunk_start_inclusive in (complete_start_inclusive..=complete_end_inclusive) - .step_by(self.tx_search_max_page_size as usize) + .step_by(self.tx_search_max_page_size.into()) { - let chunk_end_exclusive = (chunk_start_inclusive + self.tx_search_max_page_size as u64) - .min(complete_end_inclusive + 1); // +1, because end is inclusive + let chunk_end_exclusive = + (chunk_start_inclusive + tx_search_max_page_size).min(complete_end_inclusive + 1); // +1, because end is inclusive - let chunk_limit = (chunk_end_exclusive - chunk_start_inclusive) as u16; + let chunk_limit: u16 = (chunk_end_exclusive - chunk_start_inclusive) + .try_into() + .unwrap(); trace!( "fetching chunk for block {} - versions: [{},{}]", @@ -202,7 +208,7 @@ impl AptosFetcherClient { chunk_end_exclusive - 1 ); - let mut result = Vec::with_capacity(self.tx_search_max_page_size as usize); + let mut result = Vec::with_capacity(self.tx_search_max_page_size.into()); for transaction_index in chunk_start_inclusive..chunk_end_exclusive { trace!( diff --git a/hubble/src/indexer/aptos/postgres.rs b/hubble/src/indexer/aptos/postgres.rs index 328308a000..79f3644c02 100644 --- a/hubble/src/indexer/aptos/postgres.rs +++ b/hubble/src/indexer/aptos/postgres.rs @@ -136,12 +136,13 @@ pub async fn delete_aptos_block_transactions_events( internal_chain_id: i32, height: BlockHeight, ) -> sqlx::Result<()> { + let height: i64 = height.try_into().unwrap(); sqlx::query!( " DELETE FROM v1_aptos.events WHERE internal_chain_id = $1 AND height = $2 ", internal_chain_id, - height as i64 + height, ) .execute(tx.as_mut()) .await?; @@ -151,7 +152,7 @@ pub async fn delete_aptos_block_transactions_events( DELETE FROM v1_aptos.transactions WHERE internal_chain_id = $1 AND height = $2 ", internal_chain_id, - height as i64 + height, ) .execute(tx.as_mut()) .await?; @@ -161,13 +162,12 @@ pub async fn delete_aptos_block_transactions_events( DELETE FROM v1_aptos.blocks WHERE internal_chain_id = $1 AND height = $2 ", internal_chain_id, - height as i64, + height, ) .execute(tx.as_mut()) .await?; - schedule_replication_reset(tx, internal_chain_id, height as i64, "block reorg (delete)") - .await?; + schedule_replication_reset(tx, internal_chain_id, height, "block reorg (delete)").await?; Ok(()) } @@ -177,6 +177,8 @@ pub async fn active_contracts( internal_chain_id: i32, height: BlockHeight, ) -> sqlx::Result> { + let height: i64 = height.try_into().unwrap(); + let result = sqlx::query!( r#" SELECT address @@ -185,7 +187,7 @@ pub async fn active_contracts( AND $2 between start_height and end_height "#, internal_chain_id, - height as i64, + height, ) .fetch_all(tx.as_mut()) .await? @@ -219,8 +221,8 @@ pub async fn unmapped_clients( .await? .into_iter() .map(|record| UnmappedClient { - version: record.transaction_version.expect("client-created-event to have transaction version") as u64, - height: record.height.expect("client-created-event to have a height") as u64, + version: record.transaction_version.expect("client-created-event to have transaction version").try_into().unwrap(), + height: record.height.expect("client-created-event to have a height").try_into().unwrap(), client_id: record.client_id, }) .collect_vec(); diff --git a/hubble/src/indexer/eth/fetcher_client.rs b/hubble/src/indexer/eth/fetcher_client.rs index 490e21b4c7..6d120ec484 100644 --- a/hubble/src/indexer/eth/fetcher_client.rs +++ b/hubble/src/indexer/eth/fetcher_client.rs @@ -50,8 +50,10 @@ impl BlockReferenceProvider for Block { Ok(BlockReference { height: self.header.number, hash: self.header.hash.to_lower_hex(), - timestamp: OffsetDateTime::from_unix_timestamp(self.header.timestamp as i64) - .map_err(|err| IndexerError::ProviderError(err.into()))?, + timestamp: OffsetDateTime::from_unix_timestamp( + self.header.timestamp.try_into().unwrap(), + ) + .map_err(|err| IndexerError::ProviderError(err.into()))?, }) } } @@ -207,7 +209,7 @@ impl EthFetcherClient { .into_iter() .map(|((transaction_hash, transaction_index), logs)| { let transaction_hash = transaction_hash.to_lower_hex(); - let transaction_index = transaction_index as i32; + let transaction_index: i32 = transaction_index.try_into().unwrap(); let events: Vec = logs .into_iter() @@ -216,8 +218,8 @@ impl EthFetcherClient { let data = serde_json::to_value(&log).unwrap(); EventInsert { data, - log_index: log.log_index.unwrap() as usize, - transaction_log_index: transaction_log_index as i32, + log_index: log.log_index.expect("log_index").try_into().unwrap(), + transaction_log_index: transaction_log_index.try_into().unwrap(), } }) .collect(); @@ -248,7 +250,7 @@ impl EthFetcherClient { chain_id: self.chain_id, hash: block_reference.hash, header: block.clone(), - height: block_reference.height as i32, + height: block_reference.height.try_into().unwrap(), time: block_reference.timestamp, transactions, })) diff --git a/hubble/src/indexer/eth/postgres.rs b/hubble/src/indexer/eth/postgres.rs index e1356aad32..a9758bd846 100644 --- a/hubble/src/indexer/eth/postgres.rs +++ b/hubble/src/indexer/eth/postgres.rs @@ -34,7 +34,7 @@ impl From for PgLog { PgLog { chain_id: block.chain_id, block_hash: block.hash.clone(), - height: block.height as u64, + height: block.height.try_into().unwrap(), time: block.time, data: PgLogData { header: block.header, @@ -58,11 +58,13 @@ pub async fn insert_batch_logs( ) = logs .into_iter() .map(|l| { + let height: i64 = l.height.try_into().unwrap(); + ( l.chain_id.db, l.block_hash, serde_json::to_value(&l.data).expect("data should be json serializable"), - l.height as i64, + height, l.time, ) }) @@ -109,17 +111,18 @@ pub async fn delete_eth_log( chain_id: i32, height: BlockHeight, ) -> sqlx::Result<()> { + let height: i64 = height.try_into().unwrap(); sqlx::query!( " DELETE FROM v1_evm.logs WHERE chain_id = $1 AND height = $2 ", chain_id, - height as i32 + height, ) .execute(tx.as_mut()) .await?; - schedule_replication_reset(tx, chain_id, height as i64, "block reorg (delete)").await?; + schedule_replication_reset(tx, chain_id, height, "block reorg (delete)").await?; Ok(()) } @@ -149,7 +152,7 @@ pub async fn unmapped_clients( .into_iter() .map(|record| UnmappedClient { transaction_hash: record.transaction_hash.expect("client-created event to have transaction hash"), - height: record.height.expect("client-created event to have a height") as u64, + height: record.height.expect("client-created event to have a height").try_into().unwrap(), client_id: record.client_id, }) .collect_vec(); @@ -174,8 +177,8 @@ pub async fn transaction_filter( .into_iter() .map(|record| AddressFilter { block_range: BlockRange { - start_inclusive: record.start_height as u64, - end_exclusive: record.end_height as u64, + start_inclusive: record.start_height.try_into().unwrap(), + end_exclusive: record.end_height.try_into().unwrap(), }, address: record.address.parse().expect("address to be valid"), }) diff --git a/hubble/src/indexer/fetcher.rs b/hubble/src/indexer/fetcher.rs index f53f88ff90..0dd5e58ea3 100644 --- a/hubble/src/indexer/fetcher.rs +++ b/hubble/src/indexer/fetcher.rs @@ -25,6 +25,9 @@ impl Indexer { } async fn run_to_finalized(&self, fetcher_client: &T) -> Result<(), IndexerError> { + let chunk_size: u64 = self.chunk_size.try_into().unwrap(); + let delay_blocks: u64 = self.finalizer_config.delay_blocks.try_into().unwrap(); + loop { debug!("fetching last finalized block"); match fetcher_client @@ -33,11 +36,7 @@ impl Indexer { { Ok(last_finalized) => { let next_height = self.next_height().await?; - if next_height - + self.chunk_size as u64 - + self.finalizer_config.delay_blocks as u64 - > last_finalized.reference().height - { + if next_height + chunk_size + delay_blocks > last_finalized.reference().height { info!("near finalized height (current: {} + chunk: {} + delay: {} > finalized: {}) => start 'run to tip'", next_height, self.chunk_size, self.finalizer_config.delay_blocks, last_finalized.reference()); return Ok(()); } diff --git a/hubble/src/indexer/finalizer.rs b/hubble/src/indexer/finalizer.rs index 3be5542b69..03ed9c0641 100644 --- a/hubble/src/indexer/finalizer.rs +++ b/hubble/src/indexer/finalizer.rs @@ -20,6 +20,7 @@ use crate::indexer::{ impl Indexer { pub async fn run_finalizer(&self, fetcher_client: T) -> Result<(), IndexerError> { + let chunk_size: u64 = self.chunk_size.try_into().unwrap(); loop { if let Some(block_range_to_finalize) = self.block_range_to_finalize().await? { info!("{}: begin", block_range_to_finalize); @@ -39,15 +40,14 @@ impl Indexer { // consider the block to be finalized if it's >= than the consensus height, considering the finalization delay blocks. let consensus_height_with_safety_margin = reference .height - .saturating_sub(self.finalizer_config.delay_blocks as u64); + .saturating_sub(self.finalizer_config.delay_blocks.try_into().unwrap()); let some_blocks_needs_to_be_finalized = block_range_to_finalize .start_inclusive <= consensus_height_with_safety_margin; if some_blocks_needs_to_be_finalized { // find the end of the range to finalize - let end_of_chunk = block_range_to_finalize.start_inclusive - + self.chunk_size as BlockHeight; + let end_of_chunk = block_range_to_finalize.start_inclusive + chunk_size; let end_until_finalized = consensus_height_with_safety_margin + 1; let end_until_last_tracked_block = block_range_to_finalize.end_exclusive; @@ -86,7 +86,7 @@ impl Indexer { { let range_to_monitor = (height ..(min( - height + self.chunk_size as BlockHeight, + height + chunk_size, block_range_to_finalize.end_exclusive, ))) .into(); diff --git a/hubble/src/indexer/fixer.rs b/hubble/src/indexer/fixer.rs index a50efafc8f..fe9b490da8 100644 --- a/hubble/src/indexer/fixer.rs +++ b/hubble/src/indexer/fixer.rs @@ -10,12 +10,14 @@ use super::{ Indexer, }; use crate::indexer::{ - api::{BlockHandle, BlockHeight, BlockSelection, FetchMode}, + api::{BlockHandle, BlockSelection, FetchMode}, HappyRangeFetcher, }; impl Indexer { pub async fn run_fixer(&self, fetcher_client: T) -> Result<(), IndexerError> { + let chunk_size: u64 = self.chunk_size.try_into().unwrap(); + loop { if let Some(block_range_to_fix) = self.block_range_to_fix().await? { info!("{}: begin", block_range_to_fix); @@ -36,7 +38,7 @@ impl Indexer { if block_range_to_fix.start_inclusive <= last_finalized_reference.height { // find the end of the range to fix let end_of_chunk_exclusive = - block_range_to_fix.start_inclusive + self.chunk_size as BlockHeight; + block_range_to_fix.start_inclusive + chunk_size; let end_until_finalized = last_finalized_reference.height + 1; let end_until_last_block_to_fix = block_range_to_fix.end_exclusive; diff --git a/hubble/src/indexer/postgres.rs b/hubble/src/indexer/postgres.rs index c54e5576a8..f7368cfa26 100644 --- a/hubble/src/indexer/postgres.rs +++ b/hubble/src/indexer/postgres.rs @@ -21,7 +21,7 @@ pub async fn get_current_height( .fetch_optional(tx.as_mut()) .await?; - Ok(record.map(|h| h.height as BlockHeight)) + Ok(record.map(|h| h.height.try_into().unwrap())) } pub async fn update_current_height( @@ -30,6 +30,8 @@ pub async fn update_current_height( height: BlockHeight, timestamp: OffsetDateTime, ) -> sqlx::Result<()> { + let height: i64 = height.try_into().unwrap(); + sqlx::query!( " INSERT INTO hubble.indexer_status (indexer_id, height, timestamp) @@ -40,7 +42,7 @@ pub async fn update_current_height( timestamp = excluded.timestamp ", indexer_id, - height as i64, + height, timestamp, ) .execute(tx.as_mut()) @@ -65,7 +67,12 @@ pub async fn get_block_range_to_finalize( .await?; Ok(match (record.min_height, record.max_height) { - (Some(min), Some(max)) => Some((min as BlockHeight..max as BlockHeight + 1).into()), + (Some(min), Some(max)) => { + let min_inclusive: BlockHeight = min.try_into().unwrap(); + let max_inclusive: BlockHeight = max.try_into().unwrap(); + let max_exclusive = max_inclusive + 1; + Some((min_inclusive..max_exclusive).into()) + } (None, None) => None, _ => unreachable!("expecting min_height and max_height to be either null or available"), }) @@ -77,6 +84,7 @@ pub async fn get_next_block_to_monitor( consensus_height: BlockHeight, min_duration_between_monitor_checks: Duration, ) -> sqlx::Result> { + let consensus_height: i64 = consensus_height.try_into().unwrap(); let record = sqlx::query!( " SELECT height height @@ -86,13 +94,13 @@ pub async fn get_next_block_to_monitor( ORDER BY updated_at ", indexer_id, - consensus_height as i64, + consensus_height, OffsetDateTime::now_utc() - min_duration_between_monitor_checks, ) .fetch_optional(tx.as_mut()) .await?; - Ok(record.map(|r| r.height as BlockHeight)) + Ok(record.map(|r| r.height.try_into().unwrap())) } pub async fn get_block_range_to_fix( @@ -116,9 +124,13 @@ pub async fn get_block_range_to_fix( .await?; Ok(record.map(|r| { - (r.start_height as BlockHeight - ..r.end_height.expect("end_height column value") as BlockHeight) - .into() + let start_inclusive: BlockHeight = r.start_height.try_into().unwrap(); + let end_exclusive: BlockHeight = r + .end_height + .expect("end_height column value") + .try_into() + .unwrap(); + (start_inclusive..end_exclusive).into() })) } @@ -127,6 +139,8 @@ pub async fn update_block_range_to_fix( indexer_id: IndexerId, range: BlockRange, ) -> sqlx::Result<()> { + let start_inclusive: i64 = range.start_inclusive.try_into().unwrap(); + let end_exclusive: i64 = range.end_exclusive.try_into().unwrap(); // update start of ranges sqlx::query!( " @@ -136,8 +150,8 @@ pub async fn update_block_range_to_fix( AND start_height = $2 ", indexer_id, - range.start_inclusive as i64, - range.end_exclusive as i64, + start_inclusive, + end_exclusive, ) .execute(tx.as_mut()) .await?; @@ -151,7 +165,7 @@ pub async fn update_block_range_to_fix( AND end_height <= $2 ", indexer_id, - range.end_exclusive as i64, + end_exclusive, ) .execute(tx.as_mut()) .await?; @@ -164,6 +178,7 @@ pub async fn delete_block_status( indexer_id: IndexerId, height: BlockHeight, ) -> sqlx::Result> { + let height: i64 = height.try_into().unwrap(); let record = sqlx::query!( " DELETE FROM hubble.block_status @@ -171,12 +186,12 @@ pub async fn delete_block_status( RETURNING hash ", indexer_id, - height as i64, + height, ) .fetch_optional(tx.as_mut()) .await?; - Ok(record.map(|r| r.hash as BlockHash)) + Ok(record.map(|r| r.hash)) } pub async fn get_block_status_hash( @@ -184,18 +199,19 @@ pub async fn get_block_status_hash( indexer_id: IndexerId, height: BlockHeight, ) -> sqlx::Result> { + let height: i64 = height.try_into().unwrap(); let record = sqlx::query!( " SELECT hash FROM hubble.block_status WHERE indexer_id = $1 AND height = $2 ", indexer_id, - height as i64, + height, ) .fetch_optional(tx.as_mut()) .await?; - Ok(record.map(|r| r.hash as BlockHash)) + Ok(record.map(|r| r.hash)) } pub async fn update_block_status( @@ -205,6 +221,7 @@ pub async fn update_block_status( hash: BlockHash, timestamp: OffsetDateTime, ) -> sqlx::Result<()> { + let height: i64 = height.try_into().unwrap(); sqlx::query!( " INSERT INTO hubble.block_status (indexer_id, height, hash, timestamp) @@ -215,7 +232,7 @@ pub async fn update_block_status( timestamp = excluded.timestamp ", indexer_id, - height as i64, + height, hash, timestamp, ) diff --git a/hubble/src/indexer/tm/block_handle.rs b/hubble/src/indexer/tm/block_handle.rs index ab70ff63fc..01b119bad5 100644 --- a/hubble/src/indexer/tm/block_handle.rs +++ b/hubble/src/indexer/tm/block_handle.rs @@ -59,14 +59,14 @@ impl From for BlockHeader { impl BlockReferenceProvider for BlockHeader { fn block_reference(&self) -> Result { Ok(BlockReference { - height: self.header.height.inner() as u64, + height: self.header.height.inner().try_into().unwrap(), hash: self .block_id .hash .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? .to_string(), timestamp: OffsetDateTime::from_unix_timestamp_nanos( - self.header.time.as_unix_nanos() as i128 + self.header.time.as_unix_nanos().into(), ) .map_err(|err| IndexerError::ProviderError(err.into()))?, }) @@ -76,14 +76,14 @@ impl BlockReferenceProvider for BlockHeader { impl BlockReferenceProvider for BlockMeta { fn block_reference(&self) -> Result { Ok(BlockReference { - height: self.header.height.inner() as u64, + height: self.header.height.inner().try_into().unwrap(), hash: self .block_id .hash .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? .to_string(), timestamp: OffsetDateTime::from_unix_timestamp_nanos( - self.header.time.as_unix_nanos() as i128 + self.header.time.as_unix_nanos().into(), ) .map_err(|err| IndexerError::ProviderError(err.into()))?, }) diff --git a/hubble/src/indexer/tm/fetcher_client.rs b/hubble/src/indexer/tm/fetcher_client.rs index 5b364e9719..0b75dc3aa6 100644 --- a/hubble/src/indexer/tm/fetcher_client.rs +++ b/hubble/src/indexer/tm/fetcher_client.rs @@ -209,8 +209,8 @@ impl TmFetcherClient { .hash .ok_or(IndexerError::ProviderError(eyre!("expected hash")))? .to_string(), - height: header.height.inner() as u64, - time: OffsetDateTime::from_unix_timestamp_nanos(header.time.as_unix_nanos() as i128) + height: header.height.inner().try_into().unwrap(), + time: OffsetDateTime::from_unix_timestamp_nanos(header.time.as_unix_nanos().into()) .map_err(|err| IndexerError::ProviderError(err.into()))?, data: serde_json::to_value(&header) .unwrap() @@ -276,9 +276,12 @@ impl TmFetcherClient { ) .into_iter() .enumerate() - .map(|(i, e)| PgEvent { - block_index: i as i32 + block_index, - ..e + .map(|(i, e)| { + let index: i32 = i.try_into().unwrap(); + PgEvent { + block_index: index + block_index, + ..e + } }), ); @@ -410,7 +413,9 @@ impl TmFetcherClient { txs.extend(response.txs); // We always query for the maximum page size. If we get less items, we know pagination is done - let current_count = (page - 1) * self.tx_search_max_page_size as u32 + len as u32; + let tx_search_max_page_size: u32 = self.tx_search_max_page_size.into(); + let len: u32 = len.try_into().unwrap(); + let current_count = (page - 1) * tx_search_max_page_size + len; let total_count = response.total_count; debug!("{height}: fetched transactions page {page} ({current_count}/{total_count})"); @@ -464,7 +469,7 @@ impl BlockExt for BlockResultsResponse { finalize_block_events .enumerate() .map(|(i, mut event)| { - event.block_index = i as i32; + event.block_index = i.try_into().unwrap(); event }) .collect() diff --git a/hubble/src/indexer/tm/postgres.rs b/hubble/src/indexer/tm/postgres.rs index eb0b2ec0cb..81663e2fa0 100644 --- a/hubble/src/indexer/tm/postgres.rs +++ b/hubble/src/indexer/tm/postgres.rs @@ -57,7 +57,10 @@ pub async fn insert_batch_blocks( Vec, ) = blocks .into_iter() - .map(|b| (b.chain_id.db, b.hash, b.data, b.height as i64, b.time)) + .map(|b| { + let height: i64 = b.height.try_into().unwrap(); + (b.chain_id.db, b.hash, b.data, height, b.time) + }) .multiunzip(); sqlx::query!(" @@ -84,10 +87,12 @@ pub async fn insert_batch_transactions( ) = transactions .into_iter() .map(|t| { + let block_height: i64 = t.block_height.try_into().unwrap(); + ( t.chain_id.db, t.block_hash, - t.block_height as i64, + block_height, t.hash, t.data, t.index, @@ -131,10 +136,12 @@ pub async fn insert_batch_events( ) = events .into_iter() .map(|e| { + let block_height: i64 = e.block_height.try_into().unwrap(); + ( e.chain_id.db, e.block_hash, - e.block_height as i64, + block_height, e.transaction_hash.map(Into::into), e.block_index, e.transaction_index, @@ -159,12 +166,14 @@ pub async fn delete_tm_block_transactions_events( chain_id: i32, height: BlockHeight, ) -> sqlx::Result<()> { + let height: i64 = height.try_into().unwrap(); + sqlx::query!( " DELETE FROM v1_cosmos.events WHERE chain_id = $1 AND height = $2 ", chain_id, - height as i32 + height, ) .execute(tx.as_mut()) .await?; @@ -174,7 +183,7 @@ pub async fn delete_tm_block_transactions_events( DELETE FROM v1_cosmos.transactions WHERE chain_id = $1 AND height = $2 ", chain_id, - height as i32 + height, ) .execute(tx.as_mut()) .await?; @@ -184,12 +193,12 @@ pub async fn delete_tm_block_transactions_events( DELETE FROM v1_cosmos.blocks WHERE chain_id = $1 AND height = $2 ", chain_id, - height as i32 + height, ) .execute(tx.as_mut()) .await?; - schedule_replication_reset(tx, chain_id, height as i64, "block reorg (delete)").await?; + schedule_replication_reset(tx, chain_id, height, "block reorg (delete)").await?; Ok(()) } diff --git a/hubble/src/scroll.rs b/hubble/src/scroll.rs index 3f9664fe5f..129d39973c 100644 --- a/hubble/src/scroll.rs +++ b/hubble/src/scroll.rs @@ -148,13 +148,13 @@ impl Scroll { impl Querier for Scroll { async fn get_execution_height(&self, slot: i64) -> Result<(i64, i64)> { - let height = (|| self.execution_height_of_beacon_slot(slot as u64)) + let height = (|| self.execution_height_of_beacon_slot(slot.try_into().unwrap())) .retry( &ConstantBuilder::default() .with_delay(Duration::from_millis(500)) .with_max_times(60), ) .await?; - Ok((slot, height as i64)) + Ok((slot, height.try_into().unwrap())) } } diff --git a/hubble/src/token_list.rs b/hubble/src/token_list.rs index b65627af16..c765641f12 100644 --- a/hubble/src/token_list.rs +++ b/hubble/src/token_list.rs @@ -41,7 +41,7 @@ pub async fn update_tokens(db: sqlx::PgPool, urls: TokensUrls) -> Result<()> { .filter_map(|token| { chain_ids_and_ids.get(&token.0.to_string()).map(|id| { ( - *id as i64, + (*id).into(), token.1.clone(), token.2.clone(), token.3, @@ -76,7 +76,7 @@ pub async fn get_tokens(urls: TokensUrls) -> Result>> { if val.get("statusCode").is_none() { debug!("Token list successfully retrieved from: {}", url); - Ok(Some(serde_json::from_value(val).unwrap())) as Result> + Ok(Some(serde_json::from_value(val).unwrap())) } else { debug!("No valid token list found at: {}", url); Ok(None) From f40e13a0c013fb1bd8061e281ae6cd5dda72f610 Mon Sep 17 00:00:00 2001 From: jurriaan Date: Fri, 29 Nov 2024 10:40:44 +0100 Subject: [PATCH 3/3] fix(hubble): rework --- hubble/src/indexer/tm/fetcher_client.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hubble/src/indexer/tm/fetcher_client.rs b/hubble/src/indexer/tm/fetcher_client.rs index 0b75dc3aa6..a4b78f07eb 100644 --- a/hubble/src/indexer/tm/fetcher_client.rs +++ b/hubble/src/indexer/tm/fetcher_client.rs @@ -21,6 +21,8 @@ use time::OffsetDateTime; use tokio::task::JoinSet; use tracing::{debug, info, info_span, trace, Instrument}; +const TX_RESULT_CODE_OK: u32 = 0; + use crate::{ indexer::{ api::{ @@ -224,7 +226,7 @@ impl TmFetcherClient { let pg_transactions = transactions_response .into_iter() - .filter(|tx| tx.tx_result.code == 0) // 0 == OK + .filter(|tx| tx.tx_result.code == TX_RESULT_CODE_OK) .map(|tx| { let transaction_hash = tx.hash.to_string(); let data = serde_json::to_value(&tx).unwrap().replace_escape_chars();