Skip to content

Commit

Permalink
fix(hubble): replace tendermint-rpc library (#3309)
Browse files Browse the repository at this point in the history
  • Loading branch information
qlp authored Nov 29, 2024
2 parents 30e2827 + f40e13a commit 2b371ad
Show file tree
Hide file tree
Showing 23 changed files with 388 additions and 330 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions hubble/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ ethereum-light-client-types = { workspace = true, features = ["serde"] }
futures = { workspace = true, features = ["async-await"] }
hex = { workspace = true }
itertools = "0.13.0"
jsonrpsee = { workspace = true, features = ["tracing", "ws-client", "http-client"] }
lazy_static = { workspace = true }
movement-light-client-types = { workspace = true, features = ["proto", "serde"] }
num-traits = "0.2.19"
Expand All @@ -55,9 +56,7 @@ serde = { workspace = true, features = ["derive"] }
serde-aux = "4.5.0"
serde_json = { workspace = true }
sqlx = { workspace = true, features = ["postgres", "runtime-tokio", "tls-rustls", "time", "macros", "json", "bigdecimal"] }
tendermint = { workspace = true, features = ["std"] }
tendermint-light-client-types = { workspace = true, features = ["proto"] }
tendermint-rpc = { workspace = true, features = ["http-client", "tokio"] }
thiserror = { workspace = true }
time = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = ["full"] }
Expand Down
8 changes: 5 additions & 3 deletions hubble/src/arb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@ impl Arb {
let slot_offset_bytes = self
.rollup_finalization_config
.l1_next_node_num_slot_offset_bytes
.inner() as usize;
.inner()
.try_into()
.unwrap();
let raw_slot = self
.l1_client
.get_storage_at(
Expand Down Expand Up @@ -183,13 +185,13 @@ impl Arb {

impl Querier for Arb {
async fn get_execution_height(&self, slot: i64) -> Result<(i64, i64)> {
let height = (|| self.execution_height_of_beacon_slot(slot as u64))
let height = (|| self.execution_height_of_beacon_slot(slot.try_into().unwrap()))
.retry(
&ConstantBuilder::default()
.with_delay(Duration::from_millis(500))
.with_max_times(60),
)
.await?;
Ok((slot, height as i64))
Ok((slot, height.try_into().unwrap()))
}
}
14 changes: 6 additions & 8 deletions hubble/src/bera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use color_eyre::{
};
use cometbft_rpc::{rpc_types::AbciQueryResponse, Client};
use tracing::info;
use unionlabs::encoding::DecodeAs;
use unionlabs::{bounded::BoundedI64, encoding::DecodeAs};

use crate::consensus::{Indexer, Querier};

Expand Down Expand Up @@ -64,11 +64,7 @@ impl Bera {
.abci_query(
"store/beacon/key",
data,
Some(
(slot as i64 - 1)
.try_into()
.expect("converting slot to abci_query slot"),
),
Some(BoundedI64::<1>::new(slot - 1).expect("converting slot to abci_query slot")),
prove,
)
.await?;
Expand Down Expand Up @@ -108,14 +104,16 @@ impl Bera {

impl Querier for Bera {
async fn get_execution_height(&self, slot: i64) -> Result<(i64, i64)> {
let height = (|| self.execution_header_at_beacon_slot(slot as u64))
let height = (|| self.execution_header_at_beacon_slot(slot.try_into().unwrap()))
.retry(
&ConstantBuilder::default()
.with_delay(Duration::from_millis(500))
.with_max_times(60),
)
.await?
.block_number as i64;
.block_number
.try_into()
.unwrap();
Ok((slot, height))
}
}
3 changes: 2 additions & 1 deletion hubble/src/indexer/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ impl BlockRange {
let range: Range<BlockHeight> = self.clone().into();

range.step_by(chunk_size).map(move |start_inclusive| {
let end_exclusive = (start_inclusive + chunk_size as u64).min(self.end_exclusive);
let chunk_size: u64 = chunk_size.try_into().unwrap();
let end_exclusive = (start_inclusive + chunk_size).min(self.end_exclusive);
(start_inclusive..end_exclusive).into()
})
}
Expand Down
42 changes: 26 additions & 16 deletions hubble/src/indexer/aptos/block_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ use crate::indexer::{

impl BlockReferenceProvider for Block {
fn block_reference(&self) -> Result<BlockReference, Report> {
let block_timestamp: i128 = self.block_timestamp.0.into();
Ok(BlockReference {
height: self.block_height.into(),
hash: self.block_hash.to_string(),
timestamp: OffsetDateTime::from_unix_timestamp_nanos(
(self.block_timestamp.0 as i128) * 1000,
)
.map_err(Report::from)?,
timestamp: OffsetDateTime::from_unix_timestamp_nanos(block_timestamp * 1000)
.map_err(Report::from)?,
})
}
}
Expand Down Expand Up @@ -114,22 +113,33 @@ impl BlockHandle for AptosBlockHandle {
if active_contracts.contains(&account_address.to_standard_string()) {
Some(PgTransaction {
internal_chain_id: self.internal_chain_id,
height: self.reference.height as i64,
version: transaction.info.version.0 as i64,
height: self.reference.height.try_into().unwrap(),
version: transaction.info.version.0.try_into().unwrap(),
transaction_hash: transaction.info.hash.to_string(),
transaction_index: transaction_index as i64,
transaction_index: transaction_index.try_into().unwrap(),
events: transaction
.events
.into_iter()
.enumerate()
.map(|(transaction_event_index, event)| PgEvent {
internal_chain_id: self.internal_chain_id,
height: self.reference.height as i64,
version: transaction.info.version.0 as i64,
index: event_index_iter.next().unwrap() as i64,
transaction_event_index: transaction_event_index as i64,
sequence_number: event.sequence_number.0 as i64,
creation_number: event.guid.creation_number.0 as i64,
height: self.reference.height.try_into().unwrap(),
version: transaction.info.version.0.try_into().unwrap(),
index: event_index_iter.next().unwrap(),
transaction_event_index: transaction_event_index
.try_into()
.unwrap(),
sequence_number: event
.sequence_number
.0
.try_into()
.unwrap(),
creation_number: event
.guid
.creation_number
.0
.try_into()
.unwrap(),
account_address: event
.guid
.account_address
Expand Down Expand Up @@ -165,11 +175,11 @@ impl BlockHandle for AptosBlockHandle {
tx,
PgBlock {
internal_chain_id: self.internal_chain_id,
height: self.reference.height as i64,
height: self.reference.height.try_into().unwrap(),
block_hash: self.reference.hash.clone(),
timestamp: self.reference.timestamp,
first_version: block.first_version.0 as i64, // TODO: check if .0 is ok
last_version: block.last_version.0 as i64,
first_version: block.first_version.0.try_into().unwrap(),
last_version: block.last_version.0.try_into().unwrap(),
transactions,
},
)
Expand Down
20 changes: 13 additions & 7 deletions hubble/src/indexer/aptos/fetcher_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,23 @@ impl AptosFetcherClient {

let complete_start_inclusive: BlockHeight = block.first_version.into();
let complete_end_inclusive: BlockHeight = block.last_version.into();
let tx_search_max_page_size: u64 = self.tx_search_max_page_size.into();

let mut result =
Vec::with_capacity((complete_end_inclusive + 1 - complete_start_inclusive) as usize);
let mut result = Vec::with_capacity(
(complete_end_inclusive + 1 - complete_start_inclusive)
.try_into()
.unwrap(),
);

for chunk_start_inclusive in (complete_start_inclusive..=complete_end_inclusive)
.step_by(self.tx_search_max_page_size as usize)
.step_by(self.tx_search_max_page_size.into())
{
let chunk_end_exclusive = (chunk_start_inclusive + self.tx_search_max_page_size as u64)
.min(complete_end_inclusive + 1); // +1, because end is inclusive
let chunk_end_exclusive =
(chunk_start_inclusive + tx_search_max_page_size).min(complete_end_inclusive + 1); // +1, because end is inclusive

let chunk_limit = (chunk_end_exclusive - chunk_start_inclusive) as u16;
let chunk_limit: u16 = (chunk_end_exclusive - chunk_start_inclusive)
.try_into()
.unwrap();

trace!(
"fetching chunk for block {} - versions: [{},{}]",
Expand Down Expand Up @@ -202,7 +208,7 @@ impl AptosFetcherClient {
chunk_end_exclusive - 1
);

let mut result = Vec::with_capacity(self.tx_search_max_page_size as usize);
let mut result = Vec::with_capacity(self.tx_search_max_page_size.into());

for transaction_index in chunk_start_inclusive..chunk_end_exclusive {
trace!(
Expand Down
18 changes: 10 additions & 8 deletions hubble/src/indexer/aptos/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,13 @@ pub async fn delete_aptos_block_transactions_events(
internal_chain_id: i32,
height: BlockHeight,
) -> sqlx::Result<()> {
let height: i64 = height.try_into().unwrap();
sqlx::query!(
"
DELETE FROM v1_aptos.events WHERE internal_chain_id = $1 AND height = $2
",
internal_chain_id,
height as i64
height,
)
.execute(tx.as_mut())
.await?;
Expand All @@ -151,7 +152,7 @@ pub async fn delete_aptos_block_transactions_events(
DELETE FROM v1_aptos.transactions WHERE internal_chain_id = $1 AND height = $2
",
internal_chain_id,
height as i64
height,
)
.execute(tx.as_mut())
.await?;
Expand All @@ -161,13 +162,12 @@ pub async fn delete_aptos_block_transactions_events(
DELETE FROM v1_aptos.blocks WHERE internal_chain_id = $1 AND height = $2
",
internal_chain_id,
height as i64,
height,
)
.execute(tx.as_mut())
.await?;

schedule_replication_reset(tx, internal_chain_id, height as i64, "block reorg (delete)")
.await?;
schedule_replication_reset(tx, internal_chain_id, height, "block reorg (delete)").await?;

Ok(())
}
Expand All @@ -177,6 +177,8 @@ pub async fn active_contracts(
internal_chain_id: i32,
height: BlockHeight,
) -> sqlx::Result<HashSet<String>> {
let height: i64 = height.try_into().unwrap();

let result = sqlx::query!(
r#"
SELECT address
Expand All @@ -185,7 +187,7 @@ pub async fn active_contracts(
AND $2 between start_height and end_height
"#,
internal_chain_id,
height as i64,
height,
)
.fetch_all(tx.as_mut())
.await?
Expand Down Expand Up @@ -219,8 +221,8 @@ pub async fn unmapped_clients(
.await?
.into_iter()
.map(|record| UnmappedClient {
version: record.transaction_version.expect("client-created-event to have transaction version") as u64,
height: record.height.expect("client-created-event to have a height") as u64,
version: record.transaction_version.expect("client-created-event to have transaction version").try_into().unwrap(),
height: record.height.expect("client-created-event to have a height").try_into().unwrap(),
client_id: record.client_id,
})
.collect_vec();
Expand Down
14 changes: 8 additions & 6 deletions hubble/src/indexer/eth/fetcher_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ impl BlockReferenceProvider for Block {
Ok(BlockReference {
height: self.header.number,
hash: self.header.hash.to_lower_hex(),
timestamp: OffsetDateTime::from_unix_timestamp(self.header.timestamp as i64)
.map_err(|err| IndexerError::ProviderError(err.into()))?,
timestamp: OffsetDateTime::from_unix_timestamp(
self.header.timestamp.try_into().unwrap(),
)
.map_err(|err| IndexerError::ProviderError(err.into()))?,
})
}
}
Expand Down Expand Up @@ -207,7 +209,7 @@ impl EthFetcherClient {
.into_iter()
.map(|((transaction_hash, transaction_index), logs)| {
let transaction_hash = transaction_hash.to_lower_hex();
let transaction_index = transaction_index as i32;
let transaction_index: i32 = transaction_index.try_into().unwrap();

let events: Vec<EventInsert> = logs
.into_iter()
Expand All @@ -216,8 +218,8 @@ impl EthFetcherClient {
let data = serde_json::to_value(&log).unwrap();
EventInsert {
data,
log_index: log.log_index.unwrap() as usize,
transaction_log_index: transaction_log_index as i32,
log_index: log.log_index.expect("log_index").try_into().unwrap(),
transaction_log_index: transaction_log_index.try_into().unwrap(),
}
})
.collect();
Expand Down Expand Up @@ -248,7 +250,7 @@ impl EthFetcherClient {
chain_id: self.chain_id,
hash: block_reference.hash,
header: block.clone(),
height: block_reference.height as i32,
height: block_reference.height.try_into().unwrap(),
time: block_reference.timestamp,
transactions,
}))
Expand Down
Loading

0 comments on commit 2b371ad

Please sign in to comment.