Skip to content

Commit

Permalink
Add listening of the newest block
Browse files Browse the repository at this point in the history
  • Loading branch information
WuBruno committed Oct 26, 2023
1 parent ebf9ed9 commit a92040c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 2 deletions.
4 changes: 4 additions & 0 deletions products/pdt/pdtbq/src/bq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ impl ZilliqaBQProject {
pub fn get_max_query_rows(&self) -> usize {
MAX_QUERY_ROWS
}

pub async fn get_latest_block(&self) -> Result<Option<i64>> {
return self.microblocks.get_max_meta_block(&self.bq_client).await;
}
}

#[async_trait]
Expand Down
20 changes: 20 additions & 0 deletions products/pdt/pdtbq/src/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,26 @@ impl Meta {
}
}
}

pub async fn find_max_block(&self, client: &Client) -> Result<Option<i64>> {
let query = format!(
"SELECT start_blk,nr_blks FROM `{}` ORDER BY start_blk DESC LIMIT 1",
self.table.get_table_desc()
);
let mut result = client
.job()
.query(&self.table.dataset.project_id, QueryRequest::new(&query))
.await?;
if result.next_row() {
let block = result
.get_i64(0)?
.ok_or(anyhow!("No start_blk in record"))?;
let number = result.get_i64(1)?.ok_or(anyhow!("No nr_blks in record"))?;
Ok(Some(block + number))
} else {
Ok(None)
}
}
}

#[async_trait]
Expand Down
4 changes: 4 additions & 0 deletions products/pdt/pdtbq/src/tracked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl TrackedTable {
meta,
})
}

pub async fn get_max_meta_block(&self, client: &Client) -> Result<Option<i64>> {
return self.meta.find_max_block(client).await;
}
}

#[async_trait]
Expand Down
2 changes: 1 addition & 1 deletion products/pdt/pdtlisten/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ pub async fn listen_bq(bq_project_id: &str, bq_dataset_id: &str, api_url: &str)

let provider = Provider::<Http>::try_from(api_url)?;

let stream = listen_blocks(&provider);
let stream = listen_blocks(&provider, zilliqa_bq_proj.get_latest_block().await?);
pin!(stream);

while let Some(blocks) = TokioStreamExt::next(&mut stream).await {
Expand Down
4 changes: 3 additions & 1 deletion products/pdt/pdtlisten/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn get_block(
let last_seen_block_number_unwrap = if let Some(_block_number) = last_seen_block_number {
_block_number.clone()
} else {
// if does not know last_seen_block, assumes it was just the one before
block_number - 1
};

Expand All @@ -72,10 +73,11 @@ async fn get_block(
/// Polls in an interval for new blocks, tracking using `last_seen_block_number`
pub fn listen_blocks(
provider: &Provider<Http>,
from_block: Option<i64>,
) -> impl Stream<Item = Result<Vec<Block<Transaction>>>> + '_ {
try_stream! {
let mut interval = interval(Duration::from_secs(15));
let mut last_seen_block_number: Option<U64> = None;
let mut last_seen_block_number: Option<U64> = from_block.map(U64::from);
loop {
interval.tick().await;
let blocks = get_block(provider, &mut last_seen_block_number).await?;
Expand Down

0 comments on commit a92040c

Please sign in to comment.