From e7d23699d1a8a84581428856e078f5c85ea8079f Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Thu, 14 Sep 2023 22:06:13 +0300 Subject: [PATCH] Throw away producer + consumer --- consumer/Cargo.toml | 8 ---- consumer/src/main.rs | 3 -- producer/Cargo.toml | 14 ------ producer/src/lib.rs | 109 ------------------------------------------- producer/src/main.rs | 46 ------------------ 5 files changed, 180 deletions(-) delete mode 100644 consumer/Cargo.toml delete mode 100644 consumer/src/main.rs delete mode 100644 producer/Cargo.toml delete mode 100644 producer/src/lib.rs delete mode 100644 producer/src/main.rs diff --git a/consumer/Cargo.toml b/consumer/Cargo.toml deleted file mode 100644 index d482a53..0000000 --- a/consumer/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "consumer" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/consumer/src/main.rs b/consumer/src/main.rs deleted file mode 100644 index e7a11a9..0000000 --- a/consumer/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/producer/Cargo.toml b/producer/Cargo.toml deleted file mode 100644 index f226d40..0000000 --- a/producer/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "producer" -edition.workspace = true -version.workspace = true -description = "Producer is the entity which pushes messages onto the brokers" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -shared_structures = { path = "../shared_structures" } - -clap.workspace = true -serde.workspace = true -serde_json.workspace = true diff --git a/producer/src/lib.rs b/producer/src/lib.rs deleted file mode 100644 index e0a01e9..0000000 --- a/producer/src/lib.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::{ - io::{BufRead, BufReader}, - net::TcpStream, -}; - -use shared_structures::{metadata::BrokerDetails, Broadcast, Reader}; - -pub struct Producer { - pub mode: String, - pub broker_details: BrokerDetails, - pub stream: TcpStream, - pub topic: String, - pub destination_replica_id: String, -} - -impl Producer { - pub fn from(brokers: &str, mode: &str, topic: &str) -> Result { - let brokers: Vec<_> = brokers - .split_terminator(',') - .map(|b| b.to_string()) - .collect(); - - if brokers.is_empty() { - return Err("No brokers were provided".to_string()); - } - - // Get metadata from the first broker we are connecting to (Doesn't really matter from which one) - // We are just looking for the broker that holds the leader to the topic we want to push to - let mut stream = TcpStream::connect(&brokers[0]).map_err(|e| e.to_string())?; - - // Request cluster metadata from the first random broker we are conected to in the provided list - Broadcast::to( - &mut stream, - &shared_structures::Message::RequestClusterMetadata, - )?; - - let message = Reader::read_one_message(&mut stream)?; - - match message { - shared_structures::Message::ClusterMetadata { - metadata: cluster_metadata, - } => { - let broker_details = cluster_metadata - .brokers - .iter() - .find(|b| b.partitions.iter().any(|p| p.topic.name == topic)) - .ok_or("Broker with desired partition has not been found.")?; - - let partition_details = broker_details - .partitions - .iter() - .find(|p| p.topic.name == topic) - .ok_or("Couldn't find the desited partition on selected broker")?; - - // If the random broker we connected to happen to be the correct one, - // no need to reconnect already connected. - - let peer_addr = stream.peer_addr().map_err(|e| format!("Producer: {}", e))?; - - let stream = if peer_addr.to_string() == broker_details.addr { - stream - } else { - TcpStream::connect(&broker_details.addr).map_err(|e| e.to_string())? - }; - - println!("Stream: {:#?}", stream); - - let producer = Self { - mode: mode.to_string(), - broker_details: broker_details.clone(), - stream, - topic: topic.to_string(), - destination_replica_id: partition_details.replica_id.clone(), - }; - - producer.open_broker_reader()?; - - Ok(producer) - } - _ => Err("Wrong message received on handshake".to_string()), - } - } - - fn open_broker_reader(&self) -> Result<(), String> { - let reader_stream = self - .stream - .try_clone() - .map_err(|e| format!("Producer: {}", e))?; - - std::thread::spawn(|| { - let mut buf = String::with_capacity(1024); - let mut reader = BufReader::new(reader_stream); - - loop { - let bytes_read = reader.read_line(&mut buf).unwrap(); - - if bytes_read == 0 { - break; - } - - println!("Recieved message from broker: {:#?}", buf); - - buf.clear(); - } - }); - - Ok(()) - } -} diff --git a/producer/src/main.rs b/producer/src/main.rs deleted file mode 100644 index f9767a0..0000000 --- a/producer/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::io::stdin; - -use clap::{arg, command}; -use producer::Producer; -use serde_json::json; -use shared_structures::Broadcast; - -fn main() -> Result<(), String> { - let matches = command!() - .arg(arg!(-b --brokers "List of brokers to connect to seperated by comma e.g. localhost:3000,localhost:4000,...").required(true)) - .arg(arg!(-t --topic "The name of the topic onto which producer is going to push messages").required(true)) - .arg(arg!(-m --mode "In which mode you want to run the producer 'test' or 'production', defaults to 'production'").required(false).default_value("production")) - .get_matches(); - - let brokers = matches.get_one::("brokers").unwrap(); - let mode = matches.get_one::("mode").unwrap(); - let topic = matches.get_one::("topic").unwrap(); - - let mut producer = Producer::from(brokers, mode, topic)?; - - println!("Broker details: {:#?}", producer.broker_details); - - println!("Broadcasting a test message to the partition"); - - Broadcast::to( - &mut producer.stream, - &shared_structures::Message::ProducerMessage { - replica_id: producer.destination_replica_id, - payload: json!({"message": "test"}), - }, - )?; - - let mut buf = String::with_capacity(1024); - - loop { - stdin().read_line(&mut buf).unwrap(); - - if buf == "EXIT" { - break; - } - - buf.clear() - } - - Ok(()) -}