Skip to content

Commit

Permalink
ZIL-5446: PDT add listen (#249)
Browse files Browse the repository at this point in the history
* ZIL-5446: Fix manual listen

* update loop

* adding comments

* Add listening of the newest block

* add comments
  • Loading branch information
WuBruno authored Oct 26, 2023
1 parent 94daef2 commit 3ce0b52
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 29 deletions.
2 changes: 2 additions & 0 deletions products/pdt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion products/pdt/pdt/src/bqimport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ pub async fn bq_multi_import(
start_blk: Option<i64>,
project_id: &str,
dataset_id: &str,
duplicate_persistence: bool,
) -> Result<()> {
// OK. Just go ..
let mut jobs = JoinSet::new();
Expand Down
1 change: 0 additions & 1 deletion products/pdt/pdt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ async fn bigquery_import_multi(unpack_dir: &str, opts: &MultiOptions) -> Result<
opts.start_block,
&opts.project_id,
&opts.dataset_id,
!opts.no_dup,
)
.await
}
Expand Down
4 changes: 4 additions & 0 deletions products/pdt/pdtbq/src/bq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ impl ZilliqaBQProject {
pub fn get_max_query_rows(&self) -> usize {
MAX_QUERY_ROWS
}

pub async fn get_latest_block(&self) -> Result<Option<i64>> {
return self.microblocks.get_max_meta_block(&self.bq_client).await;
}
}

#[async_trait]
Expand Down
20 changes: 20 additions & 0 deletions products/pdt/pdtbq/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ impl Meta {
}
}
}

pub async fn find_max_block(&self, client: &Client) -> Result<Option<i64>> {
let query = format!(
"SELECT start_blk,nr_blks FROM `{}` ORDER BY start_blk DESC LIMIT 1",
self.table.get_table_desc()
);
let mut result = client
.job()
.query(&self.table.dataset.project_id, QueryRequest::new(&query))
.await?;
if result.next_row() {
let block = result
.get_i64(0)?
.ok_or(anyhow!("No start_blk in record"))?;
let number = result.get_i64(1)?.ok_or(anyhow!("No nr_blks in record"))?;
Ok(Some(block + number))
} else {
Ok(None)
}
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions products/pdt/pdtbq/src/tracked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl TrackedTable {
meta,
})
}

pub async fn get_max_meta_block(&self, client: &Client) -> Result<Option<i64>> {
return self.meta.find_max_block(client).await;
}
}

#[async_trait]
Expand Down
2 changes: 2 additions & 0 deletions products/pdt/pdtlisten/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,5 @@ tokio = { version = "1.28.1", features = [
"fs",
"process",
] }
async-stream = "0.3.5"
tokio-stream = "0.1.14"
61 changes: 34 additions & 27 deletions products/pdt/pdtlisten/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
mod importer;
mod listener;
mod types;
use anyhow::{anyhow, bail, Context, Result};
use ethers::prelude::*;
use ethers::{prelude::*, providers::StreamExt};
use jsonrpsee::{core::client::ClientT, http_client::HttpClient, rpc_params};
use pdtbq::{bq::ZilliqaBQProject, bq_utils::BigQueryDatasetLocation};
use pdtdb::{
Expand All @@ -14,7 +15,11 @@ use serde::Serialize;
use serde_json::{from_value, to_value, Value};
use sqlx::postgres::PgPoolOptions;
use std::{marker::PhantomData, time::Duration};
use tokio::pin;
use tokio::task::JoinSet;
use tokio_stream::StreamExt as TokioStreamExt;

use crate::listener::listen_blocks;

const MAX_TASKS: usize = 50;

Expand Down Expand Up @@ -171,13 +176,12 @@ pub async fn listen_psql(postgres_url: &str, api_url: &str) -> Result<()> {

let provider = Provider::<Http>::try_from(api_url)?;

let mut stream = provider
.watch_blocks()
.await?
.map(|hash: H256| get_block_by_hash(hash, &provider))
.buffered(MAX_TASKS);
let mut stream = StreamExt::map(provider.watch_blocks().await?, |hash: H256| {
get_block_by_hash(hash, &provider)
})
.buffered(MAX_TASKS);

while let Some(block) = stream.next().await {
while let Some(block) = StreamExt::next(&mut stream).await {
match block {
Ok(block) => {
// let postgres go off and do its thing
Expand Down Expand Up @@ -212,6 +216,10 @@ pub async fn listen_psql(postgres_url: &str, api_url: &str) -> Result<()> {
Ok(())
}

/// Have implemented a listening system that queries the latest found block in the meta table.
/// This allows continuity from last listen or import was carried out
/// The listen also keeps track blocks that it has encountered before, discarding any seen blocks
/// If encounters a gap of block received with last seen, tries to patch it
pub async fn listen_bq(bq_project_id: &str, bq_dataset_id: &str, api_url: &str) -> Result<()> {
// let mut jobs = JoinSet::new();
let coords = ProcessCoordinates {
Expand Down Expand Up @@ -241,29 +249,28 @@ pub async fn listen_bq(bq_project_id: &str, bq_dataset_id: &str, api_url: &str)

let provider = Provider::<Http>::try_from(api_url)?;

let mut stream = provider
.watch_blocks()
.await?
.map(|hash: H256| get_block_by_hash(hash, &provider))
.buffered(MAX_TASKS);
let stream = listen_blocks(&provider, zilliqa_bq_proj.get_latest_block().await?);
pin!(stream);

while let Some(block) = stream.next().await {
match block {
Ok(block) => {
// OK, time to deal with bigquery
if bq_importer.buffers.is_none() {
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}
// convert our blocks and insert it into our buffer
match {
let (bq_block, bq_txns) = convert_block_and_txns(&block)?;
bq_importer.insert_into_buffer(bq_block, bq_txns)
} {
Ok(_) => (),
Err(err) => {
eprintln!("conversion to bq failed due to {:?}", err);
while let Some(blocks) = TokioStreamExt::next(&mut stream).await {
match blocks {
Ok(blocks) => {
for block in blocks {
if bq_importer.buffers.is_none() {
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}
// convert our blocks and insert it into our buffer
match {
let (bq_block, bq_txns) = convert_block_and_txns(&block)?;
bq_importer.insert_into_buffer(bq_block, bq_txns)
} {
Ok(_) => (),
Err(err) => {
eprintln!("conversion to bq failed due to {:?}", err);
}
}
}

// if we've got enough blocks in hand
if bq_importer.n_blocks() >= MAX_TASKS {
let my_bq_proj = zilliqa_bq_proj.clone();
Expand Down
87 changes: 87 additions & 0 deletions products/pdt/pdtlisten/src/listener.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use ethers::{
providers::{Http, Middleware, Provider},
types::{Block, Transaction, U64},
};

use anyhow::Result;
use async_stream::try_stream;
use serde_json::{to_value, Value};
use tokio::time::{interval, Duration};
use tokio_stream::Stream;

async fn get_block_by_number(x: U64, provider: &Provider<Http>) -> Result<Block<Transaction>> {
println!("found block with number {:?}, getting block info", x);
fn serialize(v: impl serde::Serialize) -> Value {
to_value(v).unwrap()
}
let mut raw_block: Value = provider
.request("eth_getBlockByNumber", [serialize(x), serialize(true)])
.await?;
// ZIL-5328 means our nonce is only one byte instead of 8, which ethers
// is not happy about.
raw_block["nonce"] = serde_json::to_value("0x0000000000000000")?;

let mut block: Block<Transaction> = serde_json::from_value(raw_block)?;
while block.number.is_none() {
println!("{:?} is pending, looping", x);

// loop until the block is no longer pending
// the sleep duration is set arbitrarily.
tokio::time::sleep(Duration::from_millis(1000)).await;
raw_block = provider
.request("eth_getBlockByNumber", [serialize(x), serialize(true)])
.await?;
raw_block["nonce"] = serde_json::to_value("0x0000000000000000")?;

block = serde_json::from_value(raw_block)?;
}
println!("found block number {:?}", block.number);
// println!("{:#?}", block);
Ok(block)
}

/// Fetches the most recent block number and compares against `last_seen_block_number` and retrieves all blocks in between
async fn get_block(
provider: &Provider<Http>,
last_seen_block_number: &mut Option<U64>,
) -> Result<Vec<Block<Transaction>>> {
let block_number = provider.get_block_number().await?;

let last_seen_block_number_unwrap = if let Some(_block_number) = last_seen_block_number {
_block_number.clone()
} else {
// if does not know last_seen_block, assumes it was just the one before
block_number - 1
};

if block_number <= last_seen_block_number_unwrap {
// Already seen this block
return Ok(Vec::new());
}

let mut blocks: Vec<Block<Transaction>> = Vec::new();
for _block_number in last_seen_block_number_unwrap.as_u64() + 1..=block_number.as_u64() {
let block = get_block_by_number(_block_number.into(), provider).await?;
blocks.push(block);
}

*last_seen_block_number = Some(block_number);

Ok(blocks)
}

/// Polls in an interval for new blocks, tracking using `last_seen_block_number`
pub fn listen_blocks(
provider: &Provider<Http>,
from_block: Option<i64>,
) -> impl Stream<Item = Result<Vec<Block<Transaction>>>> + '_ {
try_stream! {
let mut interval = interval(Duration::from_secs(15));
let mut last_seen_block_number: Option<U64> = from_block.map(U64::from);
loop {
interval.tick().await;
let blocks = get_block(provider, &mut last_seen_block_number).await?;
yield blocks
}
}
}

0 comments on commit 3ce0b52

Please sign in to comment.