Skip to content

Commit

Permalink
ZIL-5451: PDT - add block buffer size config (#251)
Browse files Browse the repository at this point in the history
* small refactor

* add block buffer size
  • Loading branch information
WuBruno authored Oct 30, 2023
1 parent 3ce0b52 commit 6a652db
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 16 deletions.
1 change: 1 addition & 0 deletions products/pdt/cd/base/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ data:
UNPACK_DIR: "/data/download"
NR_THREADS: "1"
BATCH_BLOCKS: "1000"
BUFFER_SIZE: "5"
9 changes: 7 additions & 2 deletions products/pdt/cd/base/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,14 @@ spec:
configMapKeyRef:
name: pdt
key: NETWORK_TYPE_TESTNET
- name: BUFFER_SIZE
valueFrom:
configMapKeyRef:
name: pdt
key: BUFFER_SIZE
command: ["/bin/bash", "-c"]
args:
- ./pdt --network-type $NETWORK_TYPE bqlisten --project-id $PROJECT_ID --dataset-id $DATASET_ID
- ./pdt --network-type $NETWORK_TYPE bqlisten --project-id $PROJECT_ID --dataset-id $DATASET_ID --buffer-size $BUFFER_SIZE
---
apiVersion: apps/v1
kind: Deployment
Expand Down Expand Up @@ -82,4 +87,4 @@ spec:
key: NETWORK_TYPE_MAINNET
command: ["/bin/bash", "-c"]
args:
- ./pdt --network-type $NETWORK_TYPE bqlisten --project-id $PROJECT_ID --dataset-id $DATASET_ID
- ./pdt --network-type $NETWORK_TYPE bqlisten --project-id $PROJECT_ID --dataset-id $DATASET_ID --buffer-size $BUFFER_SIZE
1 change: 1 addition & 0 deletions products/pdt/cd/overlays/staging/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ data:
UNPACK_DIR: "/data/download"
NR_THREADS: "1"
BATCH_BLOCKS: "10000"
BUFFER_SIZE: "5"
7 changes: 6 additions & 1 deletion products/pdt/pdt/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ struct ListenOptions {

#[arg(long, default_value = "prj-c-data-analytics-3xs14wez")]
project_id: String,

#[arg(long, default_value = "5")]
buffer_size: usize,
}

const TESTNET_BUCKET: &str = "301978b4-0c0a-4b6b-ad7b-3a2f63c5182c";
Expand Down Expand Up @@ -281,12 +284,13 @@ async fn bigquery_listen_outer(
bq_project_id: &str,
bq_dataset_id: &str,
network_type: &NetworkType,
block_buffer_size: usize,
) -> Result<()> {
let api_url = match network_type {
NetworkType::Testnet => DEV_API_URL,
NetworkType::Mainnet => MAINNET_API_URL,
};
listen_bq(bq_project_id, bq_dataset_id, api_url).await
listen_bq(bq_project_id, bq_dataset_id, api_url, block_buffer_size).await
}

#[tokio::main]
Expand Down Expand Up @@ -325,6 +329,7 @@ async fn main() -> Result<()> {
&opts.dataset_id,
&cli.network_type
.expect("no network type -- did forget to set --network-type?"),
opts.buffer_size,
)
.await
}
Expand Down
30 changes: 17 additions & 13 deletions products/pdt/pdtlisten/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,12 @@ pub async fn listen_psql(postgres_url: &str, api_url: &str) -> Result<()> {
/// This allows continuity from last listen or import was carried out
/// The listen also keeps track blocks that it has encountered before, discarding any seen blocks
/// If encounters a gap of block received with last seen, tries to patch it
pub async fn listen_bq(bq_project_id: &str, bq_dataset_id: &str, api_url: &str) -> Result<()> {
pub async fn listen_bq(
bq_project_id: &str,
bq_dataset_id: &str,
api_url: &str,
block_buffer_size: usize,
) -> Result<()> {
// let mut jobs = JoinSet::new();
let coords = ProcessCoordinates {
nr_machines: 1,
Expand Down Expand Up @@ -255,24 +260,23 @@ pub async fn listen_bq(bq_project_id: &str, bq_dataset_id: &str, api_url: &str)
while let Some(blocks) = TokioStreamExt::next(&mut stream).await {
match blocks {
Ok(blocks) => {
if bq_importer.buffers.is_none() {
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}

for block in blocks {
if bq_importer.buffers.is_none() {
bq_importer.reset_buffer(&zilliqa_bq_proj).await?;
}
// convert our blocks and insert it into our buffer
match {
let (bq_block, bq_txns) = convert_block_and_txns(&block)?;
bq_importer.insert_into_buffer(bq_block, bq_txns)
} {
Ok(_) => (),
Err(err) => {
convert_block_and_txns(&block)
.and_then(|(bq_block, bq_txns)| {
bq_importer.insert_into_buffer(bq_block, bq_txns)
})
.unwrap_or_else(|err| {
eprintln!("conversion to bq failed due to {:?}", err);
}
}
})
}

// if we've got enough blocks in hand
if bq_importer.n_blocks() >= MAX_TASKS {
if bq_importer.n_blocks() >= block_buffer_size {
let my_bq_proj = zilliqa_bq_proj.clone();
let buffers = bq_importer.take_buffers()?;
let range = bq_importer
Expand Down

0 comments on commit 6a652db

Please sign in to comment.