From 8aaa5c4f902b80045f949acb12bdfb1bf44d4ac6 Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Thu, 31 Aug 2023 21:06:11 +0300 Subject: [PATCH] benchmarking --- broker/src/lib.rs | 4 +- broker/src/main.rs | 7 +-- broker/src/partition/db.rs | 13 ++++- producer/src/main.rs | 78 +++++++++++++++++++++++--- shared_structures/src/broadcast/mod.rs | 2 +- 5 files changed, 87 insertions(+), 17 deletions(-) diff --git a/broker/src/lib.rs b/broker/src/lib.rs index a9a822d..28de25e 100644 --- a/broker/src/lib.rs +++ b/broker/src/lib.rs @@ -161,8 +161,8 @@ impl Broker { replica_id, payload, } => { - println!("Received a message for partition replica {}!!!", replica_id); - println!("Message: {:#?}", payload); + // println!("Received a message for partition replica {}!!!", replica_id); + // println!("Message: {:#?}", payload); if let Some(partition) = self .local_metadata diff --git a/broker/src/main.rs b/broker/src/main.rs index 1a33091..0b1b474 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -106,10 +106,9 @@ fn main() -> Result<(), Box> { println!("Initial data on the broker:"); for partition in broker_lock.local_metadata.partitions.iter() { - println!( - "Partition {}: {:#?}", - partition.details.replica_id, partition.database - ); + if let Some(db) = &partition.database { + println!("Partition {}: {}", partition.details.replica_id, db); + } } drop(broker_lock); diff --git a/broker/src/partition/db.rs b/broker/src/partition/db.rs index b940b9a..20ffbf2 100644 --- a/broker/src/partition/db.rs +++ b/broker/src/partition/db.rs @@ -1,4 +1,7 @@ -use std::{fmt::Debug, path::PathBuf}; +use std::{ + fmt::{Debug, Display}, + path::PathBuf, +}; use heed::{ types::{OwnedType, SerdeJson}, @@ -28,8 +31,10 @@ impl DB { .create(&db_file_name) .map_err(|e| format!("PartitionDB: {}", e))?; let env = EnvOpenOptions::new() + .map_size(60000 * 1024 * 1024) .open(db_file_path) .map_err(|e| format!("PartitionDB: {}", e))?; + let db: Database, SerdeJson> = env .create_database(None) .map_err(|e| format!("PartitionDB: {}", e))?; @@ -42,6 +47,12 @@ impl DB { } } +impl Display for DB { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Total records in partition: {}", self.length) + } +} + impl Debug for DB { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let txn = self.env.read_txn().unwrap(); diff --git a/producer/src/main.rs b/producer/src/main.rs index f9767a0..7526de7 100644 --- a/producer/src/main.rs +++ b/producer/src/main.rs @@ -1,4 +1,4 @@ -use std::io::stdin; +use std::{io::stdin, time::Instant}; use clap::{arg, command}; use producer::Producer; @@ -20,15 +20,75 @@ fn main() -> Result<(), String> { println!("Broker details: {:#?}", producer.broker_details); - println!("Broadcasting a test message to the partition"); + println!("Broadcasting messages..."); - Broadcast::to( - &mut producer.stream, - &shared_structures::Message::ProducerMessage { - replica_id: producer.destination_replica_id, - payload: json!({"message": "test"}), - }, - )?; + let total_itteration = 50_000; + + let mut message_count = 0; + + let start_time = Instant::now(); + + // ---------------------- Just for a test ------------------------- + loop { + if message_count == total_itteration { + break; + } + + Broadcast::to( + &mut producer.stream, + &shared_structures::Message::ProducerMessage { + replica_id: producer.destination_replica_id.clone(), + payload: json!({ + "_meta": { + "template_version": 0 + }, + "fixtures": [ + { + "name": "cus_jenny_rosen", + "path": "/v1/customers", + "method": "post", + "params": { + "name": "Jenny Rosen", + "email": "jenny@rosen.com", + "source": "tok_visa", + "address": { + "line1": "1 Main Street", + "city": "New York" + } + } + }, + { + "name": "ch_jenny_charge", + "path": "/v1/charges", + "method": "post", + "params": { + "customer": "${cus_jenny_rosen:id}", + "amount": 100, + "currency": "usd", + "capture": false + } + }, + { + "name": "capt_bender", + "path": "/v1/charges/${ch_jenny_charge:id}/capture", + "method": "post" + } + ] + }), + }, + )?; + message_count += 1; + } + // ---------------------------------------------------------------- + + let end_time = Instant::now(); + let elapsed_time = end_time - start_time; + + let executions_per_second = (total_itteration as f64) / (elapsed_time.as_secs_f64()); + + println!("Total message: {:?}", total_itteration); + println!("Total time: {:?}", elapsed_time); + println!("Executions per second: {:.2}", executions_per_second); let mut buf = String::with_capacity(1024); diff --git a/shared_structures/src/broadcast/mod.rs b/shared_structures/src/broadcast/mod.rs index 5b92bfa..1ea8d8a 100644 --- a/shared_structures/src/broadcast/mod.rs +++ b/shared_structures/src/broadcast/mod.rs @@ -35,7 +35,7 @@ impl Broadcast { .write(payload.as_bytes()) .map_err(|e| e.to_string())?; - println!("Message broadcasted with {} bytes", bytes_written); + // println!("Message broadcasted with {} bytes", bytes_written); if bytes_written == 0 { return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string());