diff --git a/broker/src/lib.rs b/broker/src/lib.rs index 3cec836..a9a822d 100644 --- a/broker/src/lib.rs +++ b/broker/src/lib.rs @@ -5,7 +5,7 @@ use std::{ }; use partition::PartitionDetails; -use shared_structures::{Broadcast, DirManager, Message, Metadata, Status, Topic}; +use shared_structures::{Broadcast, DirManager, EntityType, Message, Metadata, Status, Topic}; use uuid::Uuid; mod partition; @@ -38,9 +38,9 @@ impl Broker { addr: String, name: Option<&String>, ) -> Result>, String> { - let custom_dir: Option = name.map(|f| f.into()); + let custom_dir: Option = name.map(|f| format!("/broker/{}", f).into()); - let cluster_metadata = Metadata { brokers: vec![] }; + let cluster_metadata: Metadata = Metadata::default(); let connected_producers = Arc::new(Mutex::new(vec![])); @@ -93,12 +93,17 @@ impl Broker { } fn handshake(&mut self) -> Result<(), String> { - Broadcast::to( + Broadcast::to_many( &mut self.stream, - &Message::BrokerWantsToConnect { - id: self.local_metadata.id.clone(), - addr: self.addr.clone(), - }, + &[ + Message::EntityWantsToConnect { + entity_type: EntityType::Broker, + }, + Message::BrokerConnectionDetails { + id: self.local_metadata.id.clone(), + addr: self.addr.clone(), + }, + ], ) } @@ -108,12 +113,12 @@ impl Broker { remote: Option<&mut TcpStream>, ) -> Result<(), String> { let message = serde_json::from_str::(raw_data).map_err(|e| e.to_string())?; - self.handle_by_message(&message, remote) + self.handle_message(&message, remote) } // Messages from Producers and Observers are all processed here // maybe better to split it into two functions for clarity. - fn handle_by_message( + fn handle_message( &mut self, message: &Message, remote: Option<&mut TcpStream>, @@ -171,7 +176,7 @@ impl Broker { } } _ => Err(format!( - "Message {:?} is not handled in `handle_by_message`.", + "Message {:?} is not handled in `handle_message`.", message )), } diff --git a/broker/src/main.rs b/broker/src/main.rs index a09a2cc..1a33091 100644 --- a/broker/src/main.rs +++ b/broker/src/main.rs @@ -33,7 +33,7 @@ fn main() -> Result<(), Box> { let tcp_stream: Option; let mut sleep_interval = 1000; - // Should start trying to connect to the observer in intervals until success + // Should trying to connect to the observer in intervals until success loop { match TcpStream::connect(addr) { Ok(stream) => { diff --git a/observer/src/distribution_manager/broker.rs b/observer/src/distribution_manager/broker.rs index 9bd76e6..6eb0ed0 100644 --- a/observer/src/distribution_manager/broker.rs +++ b/observer/src/distribution_manager/broker.rs @@ -7,26 +7,37 @@ use super::partition::Partition; #[derive(Debug)] pub struct Broker { pub id: String, - pub stream: TcpStream, + pub stream: Option, pub partitions: Vec, - pub reader: BufReader, + pub reader: Option>, pub status: Status, pub addr: String, } impl Broker { - pub fn from(id: String, stream: TcpStream, addr: String) -> Result { - let read_stream = stream.try_clone().map_err(|e| e.to_string())?; - let reader = BufReader::new(read_stream); + pub fn from(id: String, stream: Option, addr: String) -> Result { + if let Some(stream) = stream { + let read_stream = stream.try_clone().map_err(|e| e.to_string())?; + let reader = BufReader::new(read_stream); - Ok(Self { - id, - partitions: vec![], - stream, - reader, - status: Status::Up, - addr, - }) + Ok(Self { + id, + partitions: vec![], + stream: Some(stream), + reader: Some(reader), + status: Status::Up, + addr, + }) + } else { + Ok(Self { + id, + partitions: vec![], + stream: None, + reader: None, + status: Status::Up, + addr, + }) + } } pub fn restore(&mut self, stream: TcpStream, addr: String) -> Result<(), String> { @@ -34,8 +45,8 @@ impl Broker { let reader = BufReader::new(read_stream); self.status = Status::Up; - self.stream = stream; - self.reader = reader; + self.stream = Some(stream); + self.reader = Some(reader); self.addr = addr; for partition in self.partitions.iter_mut() { diff --git a/observer/src/distribution_manager/mod.rs b/observer/src/distribution_manager/mod.rs index 9b9b638..639b6cd 100644 --- a/observer/src/distribution_manager/mod.rs +++ b/observer/src/distribution_manager/mod.rs @@ -1,6 +1,7 @@ use std::{ io::{BufRead, BufReader}, net::TcpStream, + path::PathBuf, sync::{Arc, Mutex, MutexGuard}, time::Duration, }; @@ -12,27 +13,110 @@ pub use broker::Broker; pub use partition::Partition; use shared_structures::{ metadata::{BrokerDetails, PartitionDetails}, - Broadcast, Message, Metadata, Status, Topic, + Broadcast, DirManager, Message, Metadata, Reader, Status, Topic, }; -use crate::config::Config; +use crate::{config::Config, CLUSTER_FILE}; #[derive(Debug)] pub struct DistributionManager { pub brokers: Arc>>, - topics: Vec>>, + pub topics: Vec>>, + pub cluster_dir: DirManager, + pub followers: Vec, config: Config, pending_replication_partitions: Vec<(usize, Partition)>, } impl DistributionManager { - pub fn new(config: Config) -> Arc> { - Arc::new(Mutex::new(Self { + pub fn from(config: Config, name: Option<&String>) -> Result>, String> { + let custom_dir = if let Some(name) = name { + let custom_path = format!("/observer/{}/", name); + Some(PathBuf::from(custom_path)) + } else { + Some(PathBuf::from("/observer")) + }; + + println!("Custom dir: {:?}", custom_dir); + + let cluster_dir = DirManager::with_dir(custom_dir.as_ref()); + + let cluster_metadata = match cluster_dir.open::(CLUSTER_FILE) { + Ok(m) => m, + Err(_) => Metadata::default(), + }; + + let mut distribution_manager = Self { brokers: Arc::new(Mutex::new(vec![])), topics: vec![], config, pending_replication_partitions: vec![], - })) + cluster_dir, + followers: vec![], + }; + + distribution_manager.load_cluster_state(&cluster_metadata)?; + + Ok(Arc::new(Mutex::new(distribution_manager))) + } + + pub fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> { + self.topics = cluster_metadata + .topics + .iter() + .map(|t| Arc::new(Mutex::new(t.clone()))) + .collect(); + + let mut brokers_lock = self.brokers.lock().unwrap(); + + for b in cluster_metadata.brokers.iter() { + let partitions: Vec<_> = b + .partitions + .iter() + .map(|p| { + // Theoratically we should never have a case where we don't find the topic of the + // partition in the system, this is why I allow myself to unwrap here, and crash the system + // if such case occures (Indicates a serious bug in the system). + let topic = self + .topics + .iter() + .find(|t| { + let t_lock = t.lock().unwrap(); + + t_lock.name == p.topic.name + }) + .unwrap(); + + Partition { + id: p.id.clone(), + replica_id: p.replica_id.clone(), + partition_number: p.partition_number, + replica_count: p.replica_count, + role: p.role, + status: Status::Down, + topic: topic.clone(), + } + }) + .collect(); + + let offline_broker = Broker { + id: b.id.clone(), + partitions, + stream: None, + reader: None, + status: Status::Down, + addr: b.addr.clone(), + }; + + brokers_lock.push(offline_broker); + } + + Ok(()) + } + + pub fn save_cluster_state(&self) -> Result<(), String> { + let metadata = self.get_cluster_metadata()?; + self.cluster_dir.save(CLUSTER_FILE, &metadata) } // Will return the broker id that has been added or restored to the Observer @@ -40,36 +124,45 @@ impl DistributionManager { // meaning that this broker has disconnected in one of many possible ways, including user interference, unexpected system crush // or any other reason. Observer should try and sync with the brokers via the brokers provided id. pub fn connect_broker(&mut self, stream: TcpStream) -> Result { + println!("NEW BROKER: {:?}", stream); // Handshake process between the Broker and Observer happening in get_broker_metadata let (id, addr, stream) = self.get_broker_metadata(stream)?; + println!("BROKER METADATA: {} {} {:?}", id, addr, stream); let mut brokers_lock = self.brokers.lock().unwrap(); + println!("AQUIRED BROKER LOCK"); let broker_id = if let Some(disconnected_broker) = brokers_lock.iter_mut().find(|b| b.id == id) { disconnected_broker.restore(stream, addr)?; self.spawn_broker_reader(disconnected_broker)?; disconnected_broker.id.clone() } else { - let mut broker = Broker::from(id, stream, addr)?; + let mut broker = Broker::from(id, Some(stream), addr)?; self.spawn_broker_reader(&broker)?; - let broker_id = broker.id.clone(); // Need to replicate the pending partitions if there is any replicate_pending_partitions_once( &mut self.pending_replication_partitions, &mut broker, )?; + let broker_id = broker.id.clone(); brokers_lock.push(broker); broker_id }; - self.send_cluster_metadata(&mut brokers_lock)?; + // Releaseing lock for broadcast_cluster_metadata + drop(brokers_lock); + + self.broadcast_cluster_metadata()?; Ok(broker_id) } - fn send_cluster_metadata(&self, brokers: &mut [Broker]) -> Result<(), String> { + pub fn get_cluster_metadata(&self) -> Result { + let brokers = self.brokers.lock().unwrap(); + let metadata_brokers: Vec = brokers .iter() .map(|b| BrokerDetails { + id: b.id.clone(), addr: b.addr.clone(), status: b.status, partitions: b @@ -80,30 +173,62 @@ impl DistributionManager { replica_id: p.replica_id.to_string(), role: p.role, topic: p.topic.lock().unwrap().clone(), + partition_number: p.partition_number, + replica_count: p.replica_count, }) .collect(), }) .collect(); - let mut streams: Vec<_> = brokers.iter_mut().map(|b| &mut b.stream).collect(); + let topics: Vec<_> = self + .topics + .iter() + .map(|t| { + let t_lock = t.lock().unwrap(); + t_lock.clone() + }) + .collect(); - Broadcast::all( - &mut streams[..], - &shared_structures::Message::ClusterMetadata { - metadata: Metadata { - brokers: metadata_brokers, - }, - }, - )?; + Ok(Metadata { + brokers: metadata_brokers, + topics, + }) + } - Ok(()) + fn broadcast_cluster_metadata(&mut self) -> Result<(), String> { + let metadata = self.get_cluster_metadata()?; + + self.save_cluster_state()?; + + let mut brokers = self.brokers.lock().unwrap(); + + let mut broker_streams: Vec<_> = brokers + .iter_mut() + .filter_map(|b| b.stream.as_mut()) + .collect(); + + let mut followers_streams: Vec<_> = self.followers.iter_mut().collect(); + + println!("Followers: {:?}", followers_streams); + + let message = shared_structures::Message::ClusterMetadata { metadata }; + + println!("BROADCASTING!!!"); + + Broadcast::all(&mut followers_streams[..], &message)?; + Broadcast::all(&mut broker_streams[..], &message) } // Will return the name of created topic on success pub fn create_topic(&mut self, topic_name: &str) -> Result { let brokers_lock = self.brokers.lock().unwrap(); - if brokers_lock.len() == 0 { + let available_brokers = brokers_lock + .iter() + .filter(|b| b.status == Status::Up) + .count(); + + if available_brokers == 0 { return Err( "No brokers have been found, please make sure at least one broker is connected." .to_string(), @@ -163,7 +288,10 @@ impl DistributionManager { &partition, )?; - self.send_cluster_metadata(&mut brokers_lock)?; + // Releaseing lock for broadcast_cluster_metadata + drop(brokers_lock); + + self.broadcast_cluster_metadata()?; Ok(partition.id.clone()) @@ -175,28 +303,11 @@ impl DistributionManager { fn get_broker_metadata( &self, - stream: TcpStream, + mut stream: TcpStream, ) -> Result<(String, String, TcpStream), String> { - let mut buf = String::with_capacity(1024); - - let mut reader = BufReader::new(&stream); - - let bytes_read = reader.read_line(&mut buf).map_err(|e| e.to_string())?; - - if bytes_read == 0 { - return Err("Client exited unexpectadly during handhsake.".to_string()); - } - - let message = match serde_json::from_str::(&buf) { - Ok(m) => m, - Err(_) => { - return Err( - "Handshake with client failed, unrecognized payload received".to_string(), - ) - } - }; - - if let Message::BrokerWantsToConnect { id, addr } = message { + if let Message::BrokerConnectionDetails { id, addr } = + Reader::read_one_message(&mut stream)? + { Ok((id, addr, stream)) } else { Err("Handshake with client failed, wrong message received from client.".to_string()) @@ -209,64 +320,71 @@ impl DistributionManager { // TODO: What happens when a broker has lost connection? We need to find a new leader for all partition leaders. fn spawn_broker_reader(&self, broker: &Broker) -> Result<(), String> { - let watch_stream = broker.stream.try_clone().map_err(|e| e.to_string())?; - - let brokers = Arc::clone(&self.brokers); - let broker_id = broker.id.clone(); - - let throttle = self - .config - .get_number("throttle") - .ok_or("Throttle is missing form the configuration file.")?; - - std::thread::spawn(move || { - let mut reader = BufReader::new(watch_stream); - let mut buf = String::with_capacity(1024); - - loop { - let size = match reader.read_line(&mut buf) { - Ok(s) => s, - Err(e) => { - println!("Error in broker read thread: {}", e); - println!("Retrying to read with throttling at {}ms", throttle); - std::thread::sleep(Duration::from_millis(throttle as u64)); - continue; - } - }; - - // TODO: Think what should happen to the metadata of the broker that has been disconnected. - if size == 0 { - println!("Broker {} has disconnected.", broker_id); - - let mut brokers_lock = brokers.lock().unwrap(); - - if let Some(broker) = brokers_lock.iter_mut().find(|b| b.id == broker_id) { - broker.disconnect(); - - let offline_partitions: Vec<_> = broker - .get_offline_partitions() - .iter() - .map(|p| (&p.id, &p.replica_id, p.replica_count)) - .collect(); - - for offline_partition in offline_partitions.iter() { - println!( - "Broker {}:\t{}\t{}\t{}", - broker.id, - offline_partition.0, - offline_partition.1, - offline_partition.2 - ); + if let Some(broker_stream) = &broker.stream { + let watch_stream = broker_stream.try_clone().map_err(|e| e.to_string())?; + + let brokers = Arc::clone(&self.brokers); + let broker_id = broker.id.clone(); + + let throttle = self + .config + .get_number("throttle") + .ok_or("Throttle is missing form the configuration file.")?; + + std::thread::spawn(move || { + let mut reader = BufReader::new(watch_stream); + let mut buf = String::with_capacity(1024); + + loop { + let size = match reader.read_line(&mut buf) { + Ok(s) => s, + Err(e) => { + println!("Error in broker read thread: {}", e); + println!("Retrying to read with throttling at {}ms", throttle); + std::thread::sleep(Duration::from_millis(throttle as u64)); + continue; } - } else { - println!("Failed to find the Broker in the system, this can lead to major data loses."); - println!("Please let us know about this message by creating an issue on our GitHub repository https://github.com/pwbh/nyx/issues/new"); + }; + + // TODO: Think what should happen to the metadata of the broker that has been disconnected. + if size == 0 { + println!("Broker {} has disconnected.", broker_id); + + let mut brokers_lock = brokers.lock().unwrap(); + + if let Some(broker) = brokers_lock.iter_mut().find(|b| b.id == broker_id) { + broker.disconnect(); + + let offline_partitions: Vec<_> = broker + .get_offline_partitions() + .iter() + .map(|p| (&p.id, &p.replica_id, p.replica_count)) + .collect(); + + for offline_partition in offline_partitions.iter() { + println!( + "Broker {}:\t{}\t{}\t{}", + broker.id, + offline_partition.0, + offline_partition.1, + offline_partition.2 + ); + } + } else { + println!("Failed to find the Broker in the system, this can lead to major data loses."); + println!("Please let us know about this message by creating an issue on our GitHub repository https://github.com/pwbh/nyx/issues/new"); + } + break; } - break; + + buf.clear(); } - } - }); - Ok(()) + }); + Ok(()) + } else { + println!("Ignoring spawning broker reader as Observer follower"); + Ok(()) + } } } @@ -274,16 +392,20 @@ pub fn broadcast_replicate_partition( broker: &mut Broker, replica: &mut Partition, ) -> Result<(), String> { - Broadcast::to( - &mut broker.stream, - &Message::CreatePartition { - id: replica.id.clone(), - replica_id: replica.replica_id.clone(), - topic: replica.topic.lock().unwrap().clone(), - partition_number: replica.partition_number, - replica_count: replica.replica_count, - }, - )?; + if let Some(broker_stream) = &mut broker.stream { + Broadcast::to( + broker_stream, + &Message::CreatePartition { + id: replica.id.clone(), + replica_id: replica.replica_id.clone(), + topic: replica.topic.lock().unwrap().clone(), + partition_number: replica.partition_number, + replica_count: replica.replica_count, + }, + )?; + } else { + println!("Ignoring broadcasting message as Observer follower") + } // After successful creation of the partition on the broker, // we can set its status on the observer to Active. replica.status = Status::Up; @@ -384,25 +506,62 @@ fn get_least_distributed_broker<'a>( #[cfg(test)] mod tests { - use std::{io::Write, net::TcpListener}; + use std::{fs, io::Write, net::TcpListener}; + + use uuid::Uuid; use super::*; + fn cleanup_after_test(custom_test_name: &str) { + let mut custom_path = PathBuf::new(); + custom_path.push("/observer/"); + custom_path.push(custom_test_name); + + let test_files_path = DirManager::get_base_dir(Some(&custom_path)).unwrap(); + match fs::remove_dir_all(&test_files_path) { + Ok(_) => { + println!("Deleted {:?}", test_files_path) + } + + Err(e) => println!("Could not delete {:?} | {}", test_files_path, e), + } + } + fn config_mock() -> Config { Config::from("../config/dev.properties".into()).unwrap() } + fn get_custom_test_name() -> String { + format!("test_{}", Uuid::new_v4().to_string()) + } + + fn bootstrap_distribution_manager( + config: Option, + custom_test_name: &str, + ) -> Arc> { + let config = if let Some(config) = config { + config + } else { + config_mock() + }; + + let distribution_manager: Arc> = + DistributionManager::from(config, Some(&custom_test_name.to_string())).unwrap(); + + distribution_manager + } + fn mock_connecting_broker(addr: &str) -> TcpStream { let mut mock_stream = TcpStream::connect(addr).unwrap(); - let mut payload = serde_json::to_string(&Message::BrokerWantsToConnect { + + let mut payload = serde_json::to_string(&Message::BrokerConnectionDetails { id: uuid::Uuid::new_v4().to_string(), addr: "localhost:123123".to_string(), }) .unwrap(); - payload.push('\n'); - mock_stream.write(payload.as_bytes()).unwrap(); + let read_stream = mock_stream.try_clone().unwrap(); std::thread::spawn(|| { @@ -417,15 +576,20 @@ mod tests { if size == 0 { break; } + + buf.clear(); } }); mock_stream } - fn setup_distribution_for_tests(config: Config, port: &str) -> Arc> { - let distribution_manager: Arc> = - DistributionManager::new(config); + fn setup_distribution_for_tests( + config: Config, + port: &str, + custom_test_name: &str, + ) -> Arc> { + let distribution_manager = bootstrap_distribution_manager(Some(config), &custom_test_name); let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let addr = format!("localhost:{}", port); @@ -450,9 +614,8 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] fn create_brokers_works_as_expected() { - let config = config_mock(); - let distribution_manager: Arc> = - DistributionManager::new(config); + let custom_test_name = get_custom_test_name(); + let distribution_manager = bootstrap_distribution_manager(None, &custom_test_name); let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let addr = "localhost:0"; @@ -473,15 +636,16 @@ mod tests { println!("{:?}", result); - assert!(result.is_ok()) + assert!(result.is_ok()); + + cleanup_after_test(&custom_test_name); } #[test] #[cfg_attr(miri, ignore)] fn create_topic_fails_when_no_brokers() { - let config = config_mock(); - let distribution_manager: Arc> = - DistributionManager::new(config); + let custom_test_name = get_custom_test_name(); + let distribution_manager = bootstrap_distribution_manager(None, &custom_test_name); let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let topic_name = "new_user_registered"; @@ -492,22 +656,30 @@ mod tests { .unwrap_err(); assert!(result.contains("No brokers have been found")); + + cleanup_after_test(&custom_test_name); } #[test] #[cfg_attr(miri, ignore)] fn create_topic_works_as_expected_when_brokers_exist() { + let custom_test_name = get_custom_test_name(); + let config = config_mock(); // After brokers have connnected to the Observer - let distribution_manager = setup_distribution_for_tests(config, "5001"); + let distribution_manager = setup_distribution_for_tests(config, "5001", &custom_test_name); let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let topic_name = "new_user_registered"; + let topics_count_before_add = distribution_manager_lock.topics.iter().count(); + distribution_manager_lock.create_topic(topic_name).unwrap(); - assert_eq!(distribution_manager_lock.topics.len(), 1); + let topic_count_after_add = distribution_manager_lock.topics.iter().count(); + + assert_eq!(topic_count_after_add, topics_count_before_add + 1); // We cant add the same topic name twice - Should error let result = distribution_manager_lock @@ -516,21 +688,28 @@ mod tests { assert!(result.contains("already exist.")); + let topics_count_before_add = distribution_manager_lock.topics.iter().count(); + + let another_topic_name = "notification_resent"; + distribution_manager_lock - .create_topic("notification_resent") + .create_topic(another_topic_name) .unwrap(); - assert_eq!(distribution_manager_lock.topics.len(), 2); - assert_eq!( - distribution_manager_lock - .topics - .last() - .unwrap() - .lock() - .unwrap() - .name, - "notification_resent" - ); + let topic_count_after_add = distribution_manager_lock.topics.iter().count(); + + assert_eq!(topic_count_after_add, topics_count_before_add + 1); + + let last_element_in_topics = distribution_manager_lock + .topics + .last() + .unwrap() + .lock() + .unwrap(); + + assert_eq!(last_element_in_topics.name, another_topic_name); + + cleanup_after_test(&custom_test_name); } fn get_brokers_with_replicas( @@ -546,11 +725,12 @@ mod tests { #[test] #[cfg_attr(miri, ignore)] fn create_partition_distributes_replicas() { + let custom_test_name = get_custom_test_name(); let config = config_mock(); let replica_factor = config.get_number("replica_factor").unwrap(); - let distribution_manager = setup_distribution_for_tests(config, "5002"); + let distribution_manager = setup_distribution_for_tests(config, "5002", &custom_test_name); let mut distribution_manager_lock = distribution_manager.lock().unwrap(); let notifications_topic = "notifications"; @@ -658,5 +838,7 @@ mod tests { drop(brokers_lock); println!("{:#?}", distribution_manager_lock.brokers); + + cleanup_after_test(&custom_test_name); } } diff --git a/observer/src/lib.rs b/observer/src/lib.rs index a56c235..06094e5 100644 --- a/observer/src/lib.rs +++ b/observer/src/lib.rs @@ -19,6 +19,8 @@ pub const PROD_CONFIG: &str = "prod.properties"; const DEFAULT_PORT: u16 = 2828; +pub const CLUSTER_FILE: &str = "cluster.json"; + pub struct Observer { pub id: String, pub role: Role, @@ -29,7 +31,17 @@ pub struct Observer { } impl Observer { - pub fn new(config_path: &str, role: Role) -> Result { + pub fn from( + config_path: &str, + leader: Option<&String>, + name: Option<&String>, + ) -> Result { + let role = if leader.is_none() { + Role::Leader + } else { + Role::Follower + }; + let mut system = System::new_all(); let port: u16 = if let Ok(port) = std::env::var("PORT") { @@ -40,7 +52,7 @@ impl Observer { let config = Config::from(config_path.into())?; - let distribution_manager = DistributionManager::new(config); + let distribution_manager = DistributionManager::from(config, name)?; let command_processor = CommandProcessor::new(); @@ -60,7 +72,7 @@ impl Observer { total_disk_utilization = (total_disk_utilization / system.disks().len() as f64) * 1.0 * 10f64.powf(-9.0); - println!("Total disk space: {:.2} GiB", total_disk_utilization); + println!("Disk space: {:.2} GiB", total_disk_utilization); let mut total_cpu_utilization = 0f32; for cpu in system.cpus() { @@ -69,15 +81,17 @@ impl Observer { total_cpu_utilization /= system.cpus().len() as f32; - println!("Total CPU utilization: {:.1}%", total_cpu_utilization); + println!("CPU utilization: {:.1}%", total_cpu_utilization); - Ok(Self { + let observer = Self { id: Uuid::new_v4().to_string(), role, distribution_manager, command_processor, listener, system, - }) + }; + + Ok(observer) } } diff --git a/observer/src/main.rs b/observer/src/main.rs index 869b349..eb15164 100644 --- a/observer/src/main.rs +++ b/observer/src/main.rs @@ -1,11 +1,13 @@ use clap::{arg, command}; use observer::{distribution_manager::DistributionManager, Observer, DEV_CONFIG, PROD_CONFIG}; -use shared_structures::{println_c, Role}; +use shared_structures::{println_c, Broadcast, EntityType, Message, MessageDecoder, Reader, Role}; use std::{ + io::{BufRead, BufReader}, net::TcpStream, sync::{Arc, Mutex, MutexGuard}, }; +// TODO: Leader should delegate all messages to followers, for example it should delegate create broker commands to followers, etc. fn main() -> Result<(), String> { let default_config_path_by_env = get_config_path_by_env(); let matches = command!().arg( @@ -14,71 +16,163 @@ fn main() -> Result<(), String> { ).arg( arg!(-f --follow "Runs the Observer as a follower for leader located at , Host MUST by booted without -f flag.") .required(false) + ).arg( + arg!(-n --name "Assigns a name to the broker, names are useful if you want to run two brokers on the same machine. Useful for nyx maintainers testing multi-node features.") + .required(false) ).get_matches(); let leader = matches.get_one::("follow"); + let name = matches.get_one::("name"); + + if leader.is_some() && name.is_none() { + return Err("Name should be provided if following functionality enabled.".to_string()); + } let config_path = matches .get_one::("config") .unwrap_or(&default_config_path_by_env); - let role = if leader.is_none() { - Role::Leader - } else { - Role::Follower - }; + let mut observer = Observer::from(config_path, leader, name)?; - let mut observer = Observer::new(config_path, role)?; - - println_c( - &format!( - "Observer is ready to accept brokers on port {}", - observer.listener.local_addr().unwrap().port() - ), - 35, - ); + if observer.role == Role::Leader { + println_c( + &format!( + "Observer is ready to accept brokers on port {}", + observer.listener.local_addr().unwrap().port() + ), + 35, + ); + } else if let Some(name) = name { + println_c(&format!("Started following leader as {}", name), 50) + } else { + println_c("Started following leader", 50) + } - let mut streams_distribution_manager = observer.distribution_manager.clone(); + let mut connections_distribution_manager = observer.distribution_manager.clone(); - // Brokers listener + // Connections listener std::thread::spawn(move || loop { let connection = observer.listener.incoming().next(); if let Some(stream) = connection { match stream { - Ok(stream) => { - match handle_connect_broker(&mut streams_distribution_manager, stream) { - Ok(broker_id) => println!("Broker {} has connected.", broker_id), - Err(e) => println!("Error: {}", e), + Ok(mut stream) => { + println!("stream: {:#?}", stream); + if let Ok(message) = Reader::read_one_message(&mut stream) { + match message { + Message::EntityWantsToConnect { + entity_type: EntityType::Observer, + } => { + match handle_connect_observer_follower( + &mut connections_distribution_manager, + stream, + ) { + Ok(observer_follower_id) => { + println!( + "Observer follower connected {}", + observer_follower_id + ) + } + Err(e) => { + println!("Error while establishing connection: {}", e) + } + } + } + Message::EntityWantsToConnect { + entity_type: EntityType::Broker, + } => match handle_connect_broker( + &mut connections_distribution_manager, + stream, + ) { + Ok(broker_id) => println!("Broker {} connected", broker_id), + Err(e) => { + println!("Error while establishing connection: {}", e) + } + }, + _ => { + println!("Handhsake failed, message could not be verified from connecting entity.") + } + } + } else { + println!("Could not decode the provided message, skipping connection.") } } - Err(e) => println!("Failed to establish connection: {}", e), + Err(e) => println!("Failed to establish basic TCP connection: {}", e), } } }); - // This will make sure our main thread will never exit until the user will issue an EXIT command by himself - loop { - match observer.command_processor.process_raw_command() { - Ok(command) => match command { - observer::command_processor::Command { - name: observer::command_processor::CommandName::Create, - .. - } => match handle_create_command(&mut observer.distribution_manager, &command) { - Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), - }, - observer::command_processor::Command { - name: observer::command_processor::CommandName::List, - .. - } => match handle_list_command(&mut observer.distribution_manager, &command) { - Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), - }, + let mut followers_distribution_manager = observer.distribution_manager.clone(); + + println!("{:?}", leader); + + // Leader obsrver exists, enabling the follower functionality, instead of + // the leader functionality which is able to create partitions, create topics etc + if let Some(leader) = leader.cloned() { + // TODO: connect to leader + let mut leader_stream = TcpStream::connect(leader).unwrap(); + + match Broadcast::to( + &mut leader_stream, + &shared_structures::Message::EntityWantsToConnect { + entity_type: EntityType::Observer, }, - Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), + ) { + Ok(_) => println!("Sent connection request to leader."), + Err(e) => { + return Err(format!("Failed connecting to leader: {}", e)); + } }; + + let mut reader: BufReader<&mut TcpStream> = BufReader::new(&mut leader_stream); + let mut buf = String::with_capacity(1024); + + loop { + // TODO: constantly read delegated messages from leader + let bytes_read = reader + .read_line(&mut buf) + .map_err(|e| format!("Leader follower error: {}", e))?; + + if bytes_read == 0 { + println!("Leader has closed connection. Exiting."); + break; + } + + match handle_delegated_message(&buf, &mut followers_distribution_manager) { + Ok(_) => println!("Received delgated cluster metadata successfully"), + Err(e) => println!("Cluster metadata delegation error: {}", e), + }; + + buf.clear(); + } + } else { + // This will make sure our main thread will never exit until the user will issue an EXIT command by himself + loop { + match observer.command_processor.process_raw_command() { + Ok(command) => match command { + observer::command_processor::Command { + name: observer::command_processor::CommandName::Create, + .. + } => { + match handle_create_command(&mut observer.distribution_manager, &command) { + Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), + Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), + } + } + observer::command_processor::Command { + name: observer::command_processor::CommandName::List, + .. + } => match handle_list_command(&mut observer.distribution_manager, &command) { + Ok(()) => println!("\x1b[38;5;2mOK\x1b[0m"), + Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), + }, + }, + Err(e) => println!("\x1b[38;5;1mERROR:\x1b[0m {}", e), + }; + } } + + Ok(()) } fn get_config_path_by_env() -> String { @@ -91,11 +185,31 @@ fn get_config_path_by_env() -> String { format!("./config/{}", file_name) } +fn handle_delegated_message( + raw_message: &str, + distribution_manager: &mut Arc>, +) -> Result<(), String> { + let delegated_message = MessageDecoder::decode(raw_message)?; + + println!("Delegated messaeg: {:?}", delegated_message); + + match delegated_message { + Message::ClusterMetadata { metadata } => { + let mut distribution_manager_lock = distribution_manager.lock().unwrap(); + distribution_manager_lock.load_cluster_state(&metadata)?; + distribution_manager_lock.save_cluster_state() + } + _ => Err("Could not read delegated cluster metadata".to_string()), + } +} + fn handle_list_command( distribution_manager: &mut Arc>, command: &observer::command_processor::Command, ) -> Result<(), String> { + println!("TRYING TO AQUIRE LOCK!"); let distribution_manager_lock = distribution_manager.lock().unwrap(); + println!("LOCK AQUIRED!!!"); let level = command.arguments.first().unwrap(); if level == "ALL" { @@ -138,6 +252,16 @@ fn handle_create_command( } } +fn handle_connect_observer_follower( + distribution_manager: &mut Arc>, + stream: TcpStream, +) -> Result { + let mut distribution_manager_lock = distribution_manager.lock().unwrap(); + let stream_addr = stream.peer_addr().map_err(|e| e.to_string())?; + distribution_manager_lock.followers.push(stream); + Ok(stream_addr.ip().to_string()) +} + fn handle_connect_broker( distribution_manager: &mut Arc>, stream: TcpStream, diff --git a/producer/src/lib.rs b/producer/src/lib.rs index 4df604d..e0a01e9 100644 --- a/producer/src/lib.rs +++ b/producer/src/lib.rs @@ -3,7 +3,7 @@ use std::{ net::TcpStream, }; -use shared_structures::{metadata::BrokerDetails, Broadcast, Message}; +use shared_structures::{metadata::BrokerDetails, Broadcast, Reader}; pub struct Producer { pub mode: String, @@ -34,19 +34,7 @@ impl Producer { &shared_structures::Message::RequestClusterMetadata, )?; - let mut reader = BufReader::new(&stream); - let mut buf = String::with_capacity(1024); - - let bytes_read = reader - .read_line(&mut buf) - .map_err(|e| format!("Producer: {}", e))?; - - if bytes_read == 0 { - return Err("Got nothing from broker, something went wrong".to_string()); - } - - let message = - serde_json::from_str::(&buf).map_err(|e| format!("Producer: {}", e))?; + let message = Reader::read_one_message(&mut stream)?; match message { shared_structures::Message::ClusterMetadata { @@ -111,6 +99,8 @@ impl Producer { } println!("Recieved message from broker: {:#?}", buf); + + buf.clear(); } }); diff --git a/shared_structures/src/broadcast/mod.rs b/shared_structures/src/broadcast/mod.rs index 6bcda68..5b92bfa 100644 --- a/shared_structures/src/broadcast/mod.rs +++ b/shared_structures/src/broadcast/mod.rs @@ -43,6 +43,34 @@ impl Broadcast { Ok(()) } + + pub fn to_many(stream: &mut TcpStream, messages: &[Message]) -> Result<(), String> { + let mut payloads: Vec = vec![]; + + for message in messages { + let mut payload = serde_json::to_string(message) + .map_err(|_| "Couldn't serialize the data structure to send.".to_string())?; + payload.push('\n'); + payloads.push(payload); + } + + println!("{:#?}", payloads); + + let payload_bytes: Vec<_> = payloads + .iter() + .flat_map(|p| p.as_bytes().to_owned()) + .collect(); + + let bytes = &payload_bytes[..]; + + let bytes_written = stream.write(bytes).map_err(|e| e.to_string())?; + + if bytes_written == 0 { + return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string()); + } + + Ok(()) + } } #[cfg(test)] diff --git a/shared_structures/src/dir_manager/mod.rs b/shared_structures/src/dir_manager/mod.rs index 876d4a0..e1f7259 100644 --- a/shared_structures/src/dir_manager/mod.rs +++ b/shared_structures/src/dir_manager/mod.rs @@ -79,7 +79,7 @@ impl DirManager { Ok(filepath.into()) } - fn get_base_dir(custom_path: Option<&PathBuf>) -> Result { + pub fn get_base_dir(custom_path: Option<&PathBuf>) -> Result { let final_path = if let Some(custom_path) = custom_path { let dist = custom_path .clone() diff --git a/shared_structures/src/lib.rs b/shared_structures/src/lib.rs index 0ce94c0..b9d7b99 100644 --- a/shared_structures/src/lib.rs +++ b/shared_structures/src/lib.rs @@ -1,11 +1,16 @@ mod broadcast; mod dir_manager; -pub mod metadata; +mod message_decoder; +mod reader; mod topic; +pub mod metadata; + pub use broadcast::Broadcast; pub use dir_manager::DirManager; +pub use message_decoder::MessageDecoder; pub use metadata::Metadata; +pub use reader::Reader; pub use topic::Topic; #[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)] @@ -18,13 +23,19 @@ pub enum Status { Booting, } -#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Clone, Copy, Debug, PartialEq, serde::Serialize, serde::Deserialize)] pub enum Role { Follower, Leader, } -// TODO: Think of a way to better organize this enum +#[derive(Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] +pub enum EntityType { + Broker, + Observer, +} + +// TODO: Think of a way to better organize this enum or split it into more enums #[derive(Debug, serde::Serialize, serde::Deserialize)] pub enum Message { CreatePartition { @@ -40,14 +51,22 @@ pub enum Message { replica_id: String, }, // Should deny leadership request with the addr of broker where leader resides. - DenyLeadership, - BrokerWantsToConnect { + DenyLeadership { + leader_addr: String, + }, + BrokerConnectionDetails { id: String, addr: String, }, ProducerWantsToConnect { topic: String, }, + FollowerWantsToConnect { + entity_type: EntityType, + }, + EntityWantsToConnect { + entity_type: EntityType, + }, RequestClusterMetadata, ClusterMetadata { metadata: Metadata, diff --git a/shared_structures/src/message_decoder.rs b/shared_structures/src/message_decoder.rs new file mode 100644 index 0000000..dcbd0db --- /dev/null +++ b/shared_structures/src/message_decoder.rs @@ -0,0 +1,10 @@ +use crate::Message; + +pub struct MessageDecoder; + +impl MessageDecoder { + pub fn decode(raw_message: &str) -> Result { + serde_json::from_str::(raw_message) + .map_err(|e| format!("Error while deserialziing: {}", e)) + } +} diff --git a/shared_structures/src/metadata.rs b/shared_structures/src/metadata.rs index 4c05e2b..1028b4b 100644 --- a/shared_structures/src/metadata.rs +++ b/shared_structures/src/metadata.rs @@ -6,15 +6,19 @@ pub struct PartitionDetails { pub replica_id: String, pub role: Role, pub topic: Topic, + pub partition_number: usize, + pub replica_count: usize, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct BrokerDetails { + pub id: String, pub addr: String, pub status: Status, pub partitions: Vec, } -#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +#[derive(Default, Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct Metadata { pub brokers: Vec, + pub topics: Vec, } diff --git a/shared_structures/src/reader.rs b/shared_structures/src/reader.rs new file mode 100644 index 0000000..8b72bd1 --- /dev/null +++ b/shared_structures/src/reader.rs @@ -0,0 +1,33 @@ +use std::{io::Read, net::TcpStream}; + +use crate::Message; + +pub struct Reader; + +impl Reader { + pub fn read_one_message(stream: &mut TcpStream) -> Result { + let message = Self::read_until_char(stream, '\n')?; + + serde_json::from_str::(&message) + .map_err(|e| format!("Error while deserialziing: {}", e)) + } + + fn read_until_char(stream: &mut TcpStream, target_char: char) -> Result { + let mut buffer = [0u8; 1]; // Read one byte at a time + let mut result = String::new(); + + loop { + stream.read_exact(&mut buffer).map_err(|e| e.to_string())?; // Read one byte into the buffer + + let byte_read = buffer[0]; + let character = byte_read as char; + result.push(character); + + if character == target_char { + break; + } + } + + Ok(result) + } +}