From 927293325ce4f69c02c160c407b7595569718bbf Mon Sep 17 00:00:00 2001 From: pwbh <127856937+pwbh@users.noreply.github.com> Date: Wed, 30 Aug 2023 00:55:19 +0300 Subject: [PATCH] Fixed a bug where read_one_message would break the flow of reading message past it --- broker/src/lib.rs | 22 ++++++------ observer/src/distribution_manager/mod.rs | 38 +++++++++++++++------ observer/src/main.rs | 43 ++++++++++-------------- shared_structures/src/broadcast/mod.rs | 28 +++++++++++++++ shared_structures/src/reader.rs | 38 ++++++++++++++------- 5 files changed, 108 insertions(+), 61 deletions(-) diff --git a/broker/src/lib.rs b/broker/src/lib.rs index cb0565c..a9a822d 100644 --- a/broker/src/lib.rs +++ b/broker/src/lib.rs @@ -93,19 +93,17 @@ impl Broker { } fn handshake(&mut self) -> Result<(), String> { - Broadcast::to( + Broadcast::to_many( &mut self.stream, - &Message::EntityWantsToConnect { - entity_type: EntityType::Broker, - }, - )?; - - Broadcast::to( - &mut self.stream, - &Message::BrokerConnectionDetails { - id: self.local_metadata.id.clone(), - addr: self.addr.clone(), - }, + &[ + Message::EntityWantsToConnect { + entity_type: EntityType::Broker, + }, + Message::BrokerConnectionDetails { + id: self.local_metadata.id.clone(), + addr: self.addr.clone(), + }, + ], ) } diff --git a/observer/src/distribution_manager/mod.rs b/observer/src/distribution_manager/mod.rs index 111decc..9e1d154 100644 --- a/observer/src/distribution_manager/mod.rs +++ b/observer/src/distribution_manager/mod.rs @@ -22,7 +22,7 @@ use crate::{config::Config, CLUSTER_FILE}; pub struct DistributionManager { pub brokers: Arc>>, pub topics: Vec>>, - pub dir_manager: DirManager, + pub cluster_dir: DirManager, pub followers: Vec, config: Config, pending_replication_partitions: Vec<(usize, Partition)>, @@ -30,11 +30,18 @@ pub struct DistributionManager { impl DistributionManager { pub fn from(config: Config, name: Option<&String>) -> Result>, String> { - let custom_dir: Option = name.map(|f| format!("/observer/{}", f).into()); + let custom_dir = if let Some(name) = name { + let custom_path = format!("/observer/{}/", name); + Some(PathBuf::from(custom_path)) + } else { + Some(PathBuf::from("/observer")) + }; + + println!("Custom dir: {:?}", custom_dir); - let dir_manager = DirManager::with_dir(custom_dir.as_ref()); + let cluster_dir = DirManager::with_dir(custom_dir.as_ref()); - let cluster_metadata = match dir_manager.open::(CLUSTER_FILE) { + let cluster_metadata = match cluster_dir.open::(CLUSTER_FILE) { Ok(m) => m, Err(_) => Metadata::default(), }; @@ -44,7 +51,7 @@ impl DistributionManager { topics: vec![], config, pending_replication_partitions: vec![], - dir_manager, + cluster_dir, followers: vec![], }; @@ -53,7 +60,7 @@ impl DistributionManager { Ok(Arc::new(Mutex::new(distribution_manager))) } - fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> { + pub fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> { self.topics = cluster_metadata .topics .iter() @@ -109,7 +116,7 @@ impl DistributionManager { pub fn save_cluster_state(&self) -> Result<(), String> { let metadata = self.get_cluster_metadata()?; - self.dir_manager.save(CLUSTER_FILE, &metadata) + self.cluster_dir.save(CLUSTER_FILE, &metadata) } // Will return the broker id that has been added or restored to the Observer @@ -117,9 +124,12 @@ impl DistributionManager { // meaning that this broker has disconnected in one of many possible ways, including user interference, unexpected system crush // or any other reason. Observer should try and sync with the brokers via the brokers provided id. pub fn connect_broker(&mut self, stream: TcpStream) -> Result { + println!("NEW BROKER: {:?}", stream); // Handshake process between the Broker and Observer happening in get_broker_metadata let (id, addr, stream) = self.get_broker_metadata(stream)?; + println!("BROKER METADATA: {} {} {:?}", id, addr, stream); let mut brokers_lock = self.brokers.lock().unwrap(); + println!("AQUIRED BROKER LOCK"); let broker_id = if let Some(disconnected_broker) = brokers_lock.iter_mut().find(|b| b.id == id) { disconnected_broker.restore(stream, addr)?; @@ -128,12 +138,12 @@ impl DistributionManager { } else { let mut broker = Broker::from(id, Some(stream), addr)?; self.spawn_broker_reader(&broker)?; - let broker_id = broker.id.clone(); // Need to replicate the pending partitions if there is any replicate_pending_partitions_once( &mut self.pending_replication_partitions, &mut broker, )?; + let broker_id = broker.id.clone(); brokers_lock.push(broker); broker_id }; @@ -199,8 +209,12 @@ impl DistributionManager { let mut followers_streams: Vec<_> = self.followers.iter_mut().collect(); + println!("Followers: {:?}", followers_streams); + let message = shared_structures::Message::ClusterMetadata { metadata }; + println!("BROADCASTING!!!"); + Broadcast::all(&mut followers_streams[..], &message)?; Broadcast::all(&mut broker_streams[..], &message) } @@ -295,8 +309,12 @@ impl DistributionManager { let mut reader = BufReader::new(&stream); + println!("WAITING FOR LINE FROM BROKER"); + let bytes_read = reader.read_line(&mut buf).map_err(|e| e.to_string())?; + println!("LINE RECEIEVED"); + if bytes_read == 0 { return Err("Client exited unexpectadly during handhsake.".to_string()); } @@ -393,9 +411,9 @@ pub fn broadcast_replicate_partition( broker: &mut Broker, replica: &mut Partition, ) -> Result<(), String> { - if let Some(broker_tream) = &mut broker.stream { + if let Some(broker_stream) = &mut broker.stream { Broadcast::to( - broker_tream, + broker_stream, &Message::CreatePartition { id: replica.id.clone(), replica_id: replica.replica_id.clone(), diff --git a/observer/src/main.rs b/observer/src/main.rs index 2412c3d..eb4fd8b 100644 --- a/observer/src/main.rs +++ b/observer/src/main.rs @@ -1,8 +1,5 @@ use clap::{arg, command}; -use observer::{ - distribution_manager::{Broker, DistributionManager}, - Observer, DEV_CONFIG, PROD_CONFIG, -}; +use observer::{distribution_manager::DistributionManager, Observer, DEV_CONFIG, PROD_CONFIG}; use shared_structures::{println_c, Broadcast, EntityType, Message, MessageDecoder, Reader, Role}; use std::{ io::{BufRead, BufReader}, @@ -51,7 +48,7 @@ fn main() -> Result<(), String> { println_c("Started following leader", 50) } - let mut streams_distribution_manager = observer.distribution_manager.clone(); + let mut connections_distribution_manager = observer.distribution_manager.clone(); // Connections listener std::thread::spawn(move || loop { @@ -59,16 +56,15 @@ fn main() -> Result<(), String> { if let Some(stream) = connection { match stream { - Ok(stream) => { + Ok(mut stream) => { println!("stream: {:#?}", stream); - if let Ok(message) = Reader::read_message(&stream) { - println!("{:#?}", message); + if let Ok(message) = Reader::read_one_message(&mut stream) { match message { Message::EntityWantsToConnect { entity_type: EntityType::Observer, } => { match handle_connect_observer_follower( - &mut streams_distribution_manager, + &mut connections_distribution_manager, stream, ) { Ok(observer_follower_id) => { @@ -85,7 +81,7 @@ fn main() -> Result<(), String> { Message::EntityWantsToConnect { entity_type: EntityType::Broker, } => match handle_connect_broker( - &mut streams_distribution_manager, + &mut connections_distribution_manager, stream, ) { Ok(broker_id) => println!("Broker {} connected", broker_id), @@ -108,6 +104,8 @@ fn main() -> Result<(), String> { let mut followers_distribution_manager = observer.distribution_manager.clone(); + println!("{:?}", leader); + // Leader obsrver exists, enabling the follower functionality, instead of // the leader functionality which is able to create partitions, create topics etc if let Some(leader) = leader.cloned() { @@ -131,7 +129,9 @@ fn main() -> Result<(), String> { loop { // TODO: constantly read delegated messages from leader - let bytes_read = reader.read_line(&mut buf).unwrap(); + let bytes_read = reader + .read_line(&mut buf) + .map_err(|e| format!("Leader follower error: {}", e))?; if bytes_read == 0 { println!("Leader has closed connection. Exiting."); @@ -189,23 +189,12 @@ fn handle_delegated_message( ) -> Result<(), String> { let delegated_message = MessageDecoder::decode(raw_message)?; + println!("Delegated messaeg: {:?}", delegated_message); + match delegated_message { Message::ClusterMetadata { metadata } => { let mut distribution_manager_lock = distribution_manager.lock().unwrap(); - - distribution_manager_lock.topics = metadata - .topics - .iter() - .map(|t| Arc::new(Mutex::new(t.clone()))) - .collect(); - - let mut brokers_lock = distribution_manager_lock.brokers.lock().unwrap(); - - for broker in metadata.brokers { - let broker = Broker::from(broker.id, None, broker.addr)?; - brokers_lock.push(broker); - } - + distribution_manager_lock.load_cluster_state(&metadata)?; distribution_manager_lock.save_cluster_state() } _ => Err("Could not read delegated cluster metadata".to_string()), @@ -216,7 +205,9 @@ fn handle_list_command( distribution_manager: &mut Arc>, command: &observer::command_processor::Command, ) -> Result<(), String> { + println!("TRYING TO AQUIRE LOCK!"); let distribution_manager_lock = distribution_manager.lock().unwrap(); + println!("LOCK AQUIRED!!!"); let level = command.arguments.first().unwrap(); if level == "ALL" { @@ -266,7 +257,7 @@ fn handle_connect_observer_follower( let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let stream_addr = stream.peer_addr().map_err(|e| e.to_string())?; distribution_manager_lock.followers.push(stream); - Ok(stream_addr.to_string()) + Ok(stream_addr.ip().to_string()) } fn handle_connect_broker( diff --git a/shared_structures/src/broadcast/mod.rs b/shared_structures/src/broadcast/mod.rs index 6bcda68..5b92bfa 100644 --- a/shared_structures/src/broadcast/mod.rs +++ b/shared_structures/src/broadcast/mod.rs @@ -43,6 +43,34 @@ impl Broadcast { Ok(()) } + + pub fn to_many(stream: &mut TcpStream, messages: &[Message]) -> Result<(), String> { + let mut payloads: Vec = vec![]; + + for message in messages { + let mut payload = serde_json::to_string(message) + .map_err(|_| "Couldn't serialize the data structure to send.".to_string())?; + payload.push('\n'); + payloads.push(payload); + } + + println!("{:#?}", payloads); + + let payload_bytes: Vec<_> = payloads + .iter() + .flat_map(|p| p.as_bytes().to_owned()) + .collect(); + + let bytes = &payload_bytes[..]; + + let bytes_written = stream.write(bytes).map_err(|e| e.to_string())?; + + if bytes_written == 0 { + return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string()); + } + + Ok(()) + } } #[cfg(test)] diff --git a/shared_structures/src/reader.rs b/shared_structures/src/reader.rs index a050183..8b72bd1 100644 --- a/shared_structures/src/reader.rs +++ b/shared_structures/src/reader.rs @@ -1,21 +1,33 @@ -use std::io::{BufRead, BufReader}; +use std::{io::Read, net::TcpStream}; + +use crate::Message; pub struct Reader; impl Reader { - pub fn read_message( - inner: R, - ) -> Result { - let mut reader = BufReader::new(inner); - let mut buf = String::with_capacity(1024); - let message = reader - .read_line(&mut buf) - .map_err(|e| format!("Reader error: {}", e))?; - - if message == 0 { - return Err("Could not read message from given value".to_string()); + pub fn read_one_message(stream: &mut TcpStream) -> Result { + let message = Self::read_until_char(stream, '\n')?; + + serde_json::from_str::(&message) + .map_err(|e| format!("Error while deserialziing: {}", e)) + } + + fn read_until_char(stream: &mut TcpStream, target_char: char) -> Result { + let mut buffer = [0u8; 1]; // Read one byte at a time + let mut result = String::new(); + + loop { + stream.read_exact(&mut buffer).map_err(|e| e.to_string())?; // Read one byte into the buffer + + let byte_read = buffer[0]; + let character = byte_read as char; + result.push(character); + + if character == target_char { + break; + } } - serde_json::from_str::(&buf).map_err(|e| format!("Error while deserialziing: {}", e)) + Ok(result) } }