Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
benchmarking
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh committed Aug 31, 2023
1 parent aaee418 commit 8aaa5c4
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 17 deletions.
4 changes: 2 additions & 2 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ impl Broker {
replica_id,
payload,
} => {
println!("Received a message for partition replica {}!!!", replica_id);
println!("Message: {:#?}", payload);
// println!("Received a message for partition replica {}!!!", replica_id);
// println!("Message: {:#?}", payload);

if let Some(partition) = self
.local_metadata
Expand Down
7 changes: 3 additions & 4 deletions broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,9 @@ fn main() -> Result<(), Box<dyn Error>> {
println!("Initial data on the broker:");

for partition in broker_lock.local_metadata.partitions.iter() {
println!(
"Partition {}: {:#?}",
partition.details.replica_id, partition.database
);
if let Some(db) = &partition.database {
println!("Partition {}: {}", partition.details.replica_id, db);
}
}

drop(broker_lock);
Expand Down
13 changes: 12 additions & 1 deletion broker/src/partition/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{fmt::Debug, path::PathBuf};
use std::{
fmt::{Debug, Display},
path::PathBuf,
};

use heed::{
types::{OwnedType, SerdeJson},
Expand Down Expand Up @@ -28,8 +31,10 @@ impl DB {
.create(&db_file_name)
.map_err(|e| format!("PartitionDB: {}", e))?;
let env = EnvOpenOptions::new()
.map_size(60000 * 1024 * 1024)
.open(db_file_path)
.map_err(|e| format!("PartitionDB: {}", e))?;

let db: Database<OwnedType<u128>, SerdeJson<String>> = env
.create_database(None)
.map_err(|e| format!("PartitionDB: {}", e))?;
Expand All @@ -42,6 +47,12 @@ impl DB {
}
}

impl Display for DB {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Total records in partition: {}", self.length)
}
}

impl Debug for DB {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let txn = self.env.read_txn().unwrap();
Expand Down
78 changes: 69 additions & 9 deletions producer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::io::stdin;
use std::{io::stdin, time::Instant};

use clap::{arg, command};
use producer::Producer;
Expand All @@ -20,15 +20,75 @@ fn main() -> Result<(), String> {

println!("Broker details: {:#?}", producer.broker_details);

println!("Broadcasting a test message to the partition");
println!("Broadcasting messages...");

Broadcast::to(
&mut producer.stream,
&shared_structures::Message::ProducerMessage {
replica_id: producer.destination_replica_id,
payload: json!({"message": "test"}),
},
)?;
let total_itteration = 50_000;

let mut message_count = 0;

let start_time = Instant::now();

// ---------------------- Just for a test -------------------------
loop {
if message_count == total_itteration {
break;
}

Broadcast::to(
&mut producer.stream,
&shared_structures::Message::ProducerMessage {
replica_id: producer.destination_replica_id.clone(),
payload: json!({
"_meta": {
"template_version": 0
},
"fixtures": [
{
"name": "cus_jenny_rosen",
"path": "/v1/customers",
"method": "post",
"params": {
"name": "Jenny Rosen",
"email": "[email protected]",
"source": "tok_visa",
"address": {
"line1": "1 Main Street",
"city": "New York"
}
}
},
{
"name": "ch_jenny_charge",
"path": "/v1/charges",
"method": "post",
"params": {
"customer": "${cus_jenny_rosen:id}",
"amount": 100,
"currency": "usd",
"capture": false
}
},
{
"name": "capt_bender",
"path": "/v1/charges/${ch_jenny_charge:id}/capture",
"method": "post"
}
]
}),
},
)?;
message_count += 1;
}
// ----------------------------------------------------------------

let end_time = Instant::now();
let elapsed_time = end_time - start_time;

let executions_per_second = (total_itteration as f64) / (elapsed_time.as_secs_f64());

println!("Total message: {:?}", total_itteration);
println!("Total time: {:?}", elapsed_time);
println!("Executions per second: {:.2}", executions_per_second);

let mut buf = String::with_capacity(1024);

Expand Down
2 changes: 1 addition & 1 deletion shared_structures/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Broadcast {
.write(payload.as_bytes())
.map_err(|e| e.to_string())?;

println!("Message broadcasted with {} bytes", bytes_written);
// println!("Message broadcasted with {} bytes", bytes_written);

if bytes_written == 0 {
return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string());
Expand Down

0 comments on commit 8aaa5c4

Please sign in to comment.