Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

US-317: PDT - Update listen to import all fields #261

Merged
merged 6 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions products/pdt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions products/pdt/pdtdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ sqlx = { version = "0.7.1", features = [
"tls-native-tls",
] }
ethers = "2.0.10"
serde_with = "3.4.0"
1 change: 1 addition & 0 deletions products/pdt/pdtdb/src/values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ mod transaction;

pub type BQTransaction = transaction::BQTransaction;
pub type PSQLTransaction = transaction::PSQLTransaction;
pub type ZILTransactionBody = transaction::ZILTransactionBody;
pub type BQMicroblock = microblock::BQMicroblock;
pub type PSQLMicroblock = microblock::PSQLMicroblock;
49 changes: 49 additions & 0 deletions products/pdt/pdtdb/src/values/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pdtlib::proto::ProtoTransactionWithReceipt;
use primitive_types::{H160, H256};
use psql_derive::PSQLInsertable;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use sqlx::FromRow;
#[derive(Serialize, Deserialize, Clone, FromRow, PSQLInsertable, Debug, PartialEq)]
pub struct BQTransaction {
Expand Down Expand Up @@ -274,6 +275,31 @@ impl BQTransaction {
eth_transaction_type,
})
}

pub fn from_eth_with_zil_txn_bodies(
WuBruno marked this conversation as resolved.
Show resolved Hide resolved
in_val: &Transaction,
zil_txn_body: &ZILTransactionBody,
zqversion: i64,
) -> Result<Self> {
let txn_body =
Self::from_eth(in_val, zqversion).expect("should be compatible with eth transactions");

let from_addr_zil = utils::maybe_hex_address_from_public_key(
zil_txn_body.sender_pub_key.as_bytes(),
utils::API::Zilliqa,
);
let raw_receipt = encode_u8(zil_txn_body.receipt.as_bytes());
Ok(BQTransaction {
code: zil_txn_body.code.clone(),
receipt: Some(zil_txn_body.receipt.clone()),
raw_receipt: Some(raw_receipt),
sender_public_key: Some(zil_txn_body.sender_pub_key.clone()),
from_addr_zil,
signature: Some(zil_txn_body.signature.clone()),
..txn_body
})
}

pub fn to_json(&self) -> Result<String> {
Ok(serde_json::to_string(self)?)
}
Expand Down Expand Up @@ -448,6 +474,29 @@ fn decode_u8(x: String) -> Vec<u8> {
.expect("base64-encoding should be decodeable")
}

#[serde_as]
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ZILTransactionBody {
#[serde(rename = "ID")]
pub id: String,
#[serde_as(as = "DisplayFromStr")]
pub amount: String,
pub code: Option<String>, // sometimes has
pub data: Option<String>,
#[serde_as(as = "DisplayFromStr")]
pub gas_limit: i64,
pub gas_price: String,
#[serde_as(as = "DisplayFromStr")]
pub nonce: i64,
pub receipt: String,
pub sender_pub_key: String,
pub signature: String,
pub to_addr: String,
#[serde_as(as = "DisplayFromStr")]
pub version: i64,
}

#[test]
fn check_involution() {
let bq_txn = BQTransaction {
Expand Down
26 changes: 20 additions & 6 deletions products/pdt/pdtlisten/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ mod importer;
mod listener;
mod types;
use anyhow::{anyhow, bail, Context, Result};
use ethers::{prelude::*, providers::StreamExt};
use ethers::{prelude::*, providers::StreamExt, utils::hex};
use jsonrpsee::{core::client::ClientT, http_client::HttpClient, rpc_params};
use pdtbq::{bq::ZilliqaBQProject, bq_utils::BigQueryDatasetLocation};
use pdtdb::{
utils::ProcessCoordinates,
values::{BQMicroblock, BQTransaction, PSQLMicroblock, PSQLTransaction},
values::{BQMicroblock, BQTransaction, PSQLMicroblock, PSQLTransaction, ZILTransactionBody},
zqproj::{Inserter, ZilliqaDBProject},
};
use pdtpsql::psql::ZilliqaPSQLProject;
use serde::Serialize;
use serde_json::{from_value, to_value, Value};
use sqlx::postgres::PgPoolOptions;
use std::{marker::PhantomData, time::Duration};
use std::{collections::HashMap, marker::PhantomData, time::Duration};
use tokio::pin;
use tokio::task::JoinSet;
use tokio_stream::StreamExt as TokioStreamExt;
Expand All @@ -33,15 +33,29 @@ async fn get_block_info(number: U64, client: &HttpClient) -> Result<types::GetTx

fn convert_block_and_txns(
block: &Block<Transaction>,
zil_txn_bodies: &Vec<ZILTransactionBody>,
) -> Result<(BQMicroblock, Vec<BQTransaction>)> {
let my_block = block.clone();
let bq_block = BQMicroblock::from_eth(&my_block)?;
let version = bq_block.header_version;
let zil_transactions: HashMap<&str, ZILTransactionBody> = zil_txn_bodies
.into_iter()
.map(|x| (x.id.as_str(), x.clone()))
.collect();

WuBruno marked this conversation as resolved.
Show resolved Hide resolved
let mut txn_errs = vec![];
let bq_transactions: Vec<BQTransaction> = my_block
.transactions
.into_iter()
.map(|txn| BQTransaction::from_eth(&txn, version))
.map(|txn| {
if let Some(zil_txn_body) =
zil_transactions.get(hex::encode(&txn.hash.as_bytes()).as_str())
{
BQTransaction::from_eth_with_zil_txn_bodies(&txn, zil_txn_body, version)
} else {
Err(anyhow!("some transaction zil body not found"))
}
})
.filter_map(|r| r.map_err(|e| txn_errs.push(e)).ok())
.collect();
if !txn_errs.is_empty() {
Expand Down Expand Up @@ -264,9 +278,9 @@ pub async fn listen_bq(
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}

for block in blocks {
for (block, zil_txn_bodies) in blocks {
// convert our blocks and insert it into our buffer
convert_block_and_txns(&block)
convert_block_and_txns(&block, &zil_txn_bodies)
.and_then(|(bq_block, bq_txns)| {
bq_importer.insert_into_buffer(bq_block, bq_txns)
})
Expand Down
Loading
Loading