Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Fixed a bug where read_one_message would break the flow of reading me…
Browse files Browse the repository at this point in the history
…ssage past it
  • Loading branch information
pwbh committed Aug 29, 2023
1 parent 210986a commit 9272933
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 61 deletions.
22 changes: 10 additions & 12 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,19 +93,17 @@ impl Broker {
}

fn handshake(&mut self) -> Result<(), String> {
Broadcast::to(
Broadcast::to_many(
&mut self.stream,
&Message::EntityWantsToConnect {
entity_type: EntityType::Broker,
},
)?;

Broadcast::to(
&mut self.stream,
&Message::BrokerConnectionDetails {
id: self.local_metadata.id.clone(),
addr: self.addr.clone(),
},
&[
Message::EntityWantsToConnect {
entity_type: EntityType::Broker,
},
Message::BrokerConnectionDetails {
id: self.local_metadata.id.clone(),
addr: self.addr.clone(),
},
],
)
}

Expand Down
38 changes: 28 additions & 10 deletions observer/src/distribution_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,26 @@ use crate::{config::Config, CLUSTER_FILE};
pub struct DistributionManager {
pub brokers: Arc<Mutex<Vec<Broker>>>,
pub topics: Vec<Arc<Mutex<Topic>>>,
pub dir_manager: DirManager,
pub cluster_dir: DirManager,
pub followers: Vec<TcpStream>,
config: Config,
pending_replication_partitions: Vec<(usize, Partition)>,
}

impl DistributionManager {
pub fn from(config: Config, name: Option<&String>) -> Result<Arc<Mutex<Self>>, String> {
let custom_dir: Option<PathBuf> = name.map(|f| format!("/observer/{}", f).into());
let custom_dir = if let Some(name) = name {
let custom_path = format!("/observer/{}/", name);
Some(PathBuf::from(custom_path))
} else {
Some(PathBuf::from("/observer"))
};

println!("Custom dir: {:?}", custom_dir);

let dir_manager = DirManager::with_dir(custom_dir.as_ref());
let cluster_dir = DirManager::with_dir(custom_dir.as_ref());

let cluster_metadata = match dir_manager.open::<Metadata>(CLUSTER_FILE) {
let cluster_metadata = match cluster_dir.open::<Metadata>(CLUSTER_FILE) {
Ok(m) => m,
Err(_) => Metadata::default(),
};
Expand All @@ -44,7 +51,7 @@ impl DistributionManager {
topics: vec![],
config,
pending_replication_partitions: vec![],
dir_manager,
cluster_dir,
followers: vec![],
};

Expand All @@ -53,7 +60,7 @@ impl DistributionManager {
Ok(Arc::new(Mutex::new(distribution_manager)))
}

fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> {
pub fn load_cluster_state(&mut self, cluster_metadata: &Metadata) -> Result<(), String> {
self.topics = cluster_metadata
.topics
.iter()
Expand Down Expand Up @@ -109,17 +116,20 @@ impl DistributionManager {

pub fn save_cluster_state(&self) -> Result<(), String> {
let metadata = self.get_cluster_metadata()?;
self.dir_manager.save(CLUSTER_FILE, &metadata)
self.cluster_dir.save(CLUSTER_FILE, &metadata)
}

// Will return the broker id that has been added or restored to the Observer
// TODO: Should check whether a broker that is being added already exists in the system + if it's Status is `Down`
// meaning that this broker has disconnected in one of many possible ways, including user interference, unexpected system crush
// or any other reason. Observer should try and sync with the brokers via the brokers provided id.
pub fn connect_broker(&mut self, stream: TcpStream) -> Result<String, String> {
println!("NEW BROKER: {:?}", stream);
// Handshake process between the Broker and Observer happening in get_broker_metadata
let (id, addr, stream) = self.get_broker_metadata(stream)?;
println!("BROKER METADATA: {} {} {:?}", id, addr, stream);
let mut brokers_lock = self.brokers.lock().unwrap();
println!("AQUIRED BROKER LOCK");
let broker_id =
if let Some(disconnected_broker) = brokers_lock.iter_mut().find(|b| b.id == id) {
disconnected_broker.restore(stream, addr)?;
Expand All @@ -128,12 +138,12 @@ impl DistributionManager {
} else {
let mut broker = Broker::from(id, Some(stream), addr)?;
self.spawn_broker_reader(&broker)?;
let broker_id = broker.id.clone();
// Need to replicate the pending partitions if there is any
replicate_pending_partitions_once(
&mut self.pending_replication_partitions,
&mut broker,
)?;
let broker_id = broker.id.clone();
brokers_lock.push(broker);
broker_id
};
Expand Down Expand Up @@ -199,8 +209,12 @@ impl DistributionManager {

let mut followers_streams: Vec<_> = self.followers.iter_mut().collect();

println!("Followers: {:?}", followers_streams);

let message = shared_structures::Message::ClusterMetadata { metadata };

println!("BROADCASTING!!!");

Broadcast::all(&mut followers_streams[..], &message)?;
Broadcast::all(&mut broker_streams[..], &message)
}
Expand Down Expand Up @@ -295,8 +309,12 @@ impl DistributionManager {

let mut reader = BufReader::new(&stream);

println!("WAITING FOR LINE FROM BROKER");

let bytes_read = reader.read_line(&mut buf).map_err(|e| e.to_string())?;

println!("LINE RECEIEVED");

if bytes_read == 0 {
return Err("Client exited unexpectadly during handhsake.".to_string());
}
Expand Down Expand Up @@ -393,9 +411,9 @@ pub fn broadcast_replicate_partition(
broker: &mut Broker,
replica: &mut Partition,
) -> Result<(), String> {
if let Some(broker_tream) = &mut broker.stream {
if let Some(broker_stream) = &mut broker.stream {
Broadcast::to(
broker_tream,
broker_stream,
&Message::CreatePartition {
id: replica.id.clone(),
replica_id: replica.replica_id.clone(),
Expand Down
43 changes: 17 additions & 26 deletions observer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use clap::{arg, command};
use observer::{
distribution_manager::{Broker, DistributionManager},
Observer, DEV_CONFIG, PROD_CONFIG,
};
use observer::{distribution_manager::DistributionManager, Observer, DEV_CONFIG, PROD_CONFIG};
use shared_structures::{println_c, Broadcast, EntityType, Message, MessageDecoder, Reader, Role};
use std::{
io::{BufRead, BufReader},
Expand Down Expand Up @@ -51,24 +48,23 @@ fn main() -> Result<(), String> {
println_c("Started following leader", 50)
}

let mut streams_distribution_manager = observer.distribution_manager.clone();
let mut connections_distribution_manager = observer.distribution_manager.clone();

// Connections listener
std::thread::spawn(move || loop {
let connection = observer.listener.incoming().next();

if let Some(stream) = connection {
match stream {
Ok(stream) => {
Ok(mut stream) => {
println!("stream: {:#?}", stream);
if let Ok(message) = Reader::read_message(&stream) {
println!("{:#?}", message);
if let Ok(message) = Reader::read_one_message(&mut stream) {
match message {
Message::EntityWantsToConnect {
entity_type: EntityType::Observer,
} => {
match handle_connect_observer_follower(
&mut streams_distribution_manager,
&mut connections_distribution_manager,
stream,
) {
Ok(observer_follower_id) => {
Expand All @@ -85,7 +81,7 @@ fn main() -> Result<(), String> {
Message::EntityWantsToConnect {
entity_type: EntityType::Broker,
} => match handle_connect_broker(
&mut streams_distribution_manager,
&mut connections_distribution_manager,
stream,
) {
Ok(broker_id) => println!("Broker {} connected", broker_id),
Expand All @@ -108,6 +104,8 @@ fn main() -> Result<(), String> {

let mut followers_distribution_manager = observer.distribution_manager.clone();

println!("{:?}", leader);

// Leader obsrver exists, enabling the follower functionality, instead of
// the leader functionality which is able to create partitions, create topics etc
if let Some(leader) = leader.cloned() {
Expand All @@ -131,7 +129,9 @@ fn main() -> Result<(), String> {

loop {
// TODO: constantly read delegated messages from leader
let bytes_read = reader.read_line(&mut buf).unwrap();
let bytes_read = reader
.read_line(&mut buf)
.map_err(|e| format!("Leader follower error: {}", e))?;

if bytes_read == 0 {
println!("Leader has closed connection. Exiting.");
Expand Down Expand Up @@ -189,23 +189,12 @@ fn handle_delegated_message(
) -> Result<(), String> {
let delegated_message = MessageDecoder::decode(raw_message)?;

println!("Delegated messaeg: {:?}", delegated_message);

match delegated_message {
Message::ClusterMetadata { metadata } => {
let mut distribution_manager_lock = distribution_manager.lock().unwrap();

distribution_manager_lock.topics = metadata
.topics
.iter()
.map(|t| Arc::new(Mutex::new(t.clone())))
.collect();

let mut brokers_lock = distribution_manager_lock.brokers.lock().unwrap();

for broker in metadata.brokers {
let broker = Broker::from(broker.id, None, broker.addr)?;
brokers_lock.push(broker);
}

distribution_manager_lock.load_cluster_state(&metadata)?;
distribution_manager_lock.save_cluster_state()
}
_ => Err("Could not read delegated cluster metadata".to_string()),
Expand All @@ -216,7 +205,9 @@ fn handle_list_command(
distribution_manager: &mut Arc<Mutex<DistributionManager>>,
command: &observer::command_processor::Command,
) -> Result<(), String> {
println!("TRYING TO AQUIRE LOCK!");
let distribution_manager_lock = distribution_manager.lock().unwrap();
println!("LOCK AQUIRED!!!");
let level = command.arguments.first().unwrap();

if level == "ALL" {
Expand Down Expand Up @@ -266,7 +257,7 @@ fn handle_connect_observer_follower(
let mut distribution_manager_lock = distribution_manager.lock().unwrap();
let stream_addr = stream.peer_addr().map_err(|e| e.to_string())?;
distribution_manager_lock.followers.push(stream);
Ok(stream_addr.to_string())
Ok(stream_addr.ip().to_string())
}

fn handle_connect_broker(
Expand Down
28 changes: 28 additions & 0 deletions shared_structures/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,34 @@ impl Broadcast {

Ok(())
}

pub fn to_many(stream: &mut TcpStream, messages: &[Message]) -> Result<(), String> {
let mut payloads: Vec<String> = vec![];

for message in messages {
let mut payload = serde_json::to_string(message)
.map_err(|_| "Couldn't serialize the data structure to send.".to_string())?;
payload.push('\n');
payloads.push(payload);
}

println!("{:#?}", payloads);

let payload_bytes: Vec<_> = payloads
.iter()
.flat_map(|p| p.as_bytes().to_owned())
.collect();

let bytes = &payload_bytes[..];

let bytes_written = stream.write(bytes).map_err(|e| e.to_string())?;

if bytes_written == 0 {
return Err("0 bytes have been written, might be an error, please create a new issue in nyx repository.".to_string());
}

Ok(())
}
}

#[cfg(test)]
Expand Down
38 changes: 25 additions & 13 deletions shared_structures/src/reader.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
use std::io::{BufRead, BufReader};
use std::{io::Read, net::TcpStream};

use crate::Message;

pub struct Reader;

impl Reader {
pub fn read_message<R: std::io::Read, T: serde::de::DeserializeOwned>(
inner: R,
) -> Result<T, String> {
let mut reader = BufReader::new(inner);
let mut buf = String::with_capacity(1024);
let message = reader
.read_line(&mut buf)
.map_err(|e| format!("Reader error: {}", e))?;

if message == 0 {
return Err("Could not read message from given value".to_string());
pub fn read_one_message(stream: &mut TcpStream) -> Result<Message, String> {
let message = Self::read_until_char(stream, '\n')?;

serde_json::from_str::<Message>(&message)
.map_err(|e| format!("Error while deserialziing: {}", e))
}

fn read_until_char(stream: &mut TcpStream, target_char: char) -> Result<String, String> {
let mut buffer = [0u8; 1]; // Read one byte at a time
let mut result = String::new();

loop {
stream.read_exact(&mut buffer).map_err(|e| e.to_string())?; // Read one byte into the buffer

let byte_read = buffer[0];
let character = byte_read as char;
result.push(character);

if character == target_char {
break;
}
}

serde_json::from_str::<T>(&buf).map_err(|e| format!("Error while deserialziing: {}", e))
Ok(result)
}
}

0 comments on commit 9272933

Please sign in to comment.