Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh committed Aug 30, 2023
1 parent 68977d6 commit 1048dc3
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 40 deletions.
35 changes: 9 additions & 26 deletions observer/src/distribution_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub use broker::Broker;
pub use partition::Partition;
use shared_structures::{
metadata::{BrokerDetails, PartitionDetails},
Broadcast, DirManager, Message, Metadata, Status, Topic,
Broadcast, DirManager, Message, MessageDecoder, Metadata, Reader, Status, Topic,
};

use crate::{config::Config, CLUSTER_FILE};
Expand Down Expand Up @@ -303,32 +303,11 @@ impl DistributionManager {

fn get_broker_metadata(
&self,
stream: TcpStream,
mut stream: TcpStream,
) -> Result<(String, String, TcpStream), String> {
let mut buf = String::with_capacity(1024);

let mut reader = BufReader::new(&stream);

println!("WAITING FOR LINE FROM BROKER");

let bytes_read = reader.read_line(&mut buf).map_err(|e| e.to_string())?;

println!("LINE RECEIEVED");

if bytes_read == 0 {
return Err("Client exited unexpectadly during handhsake.".to_string());
}

let message = match serde_json::from_str::<Message>(&buf) {
Ok(m) => m,
Err(_) => {
return Err(
"Handshake with client failed, unrecognized payload received".to_string(),
)
}
};

if let Message::BrokerConnectionDetails { id, addr } = message {
if let Message::BrokerConnectionDetails { id, addr } =
Reader::read_one_message(&mut stream)?
{
Ok((id, addr, stream))
} else {
Err("Handshake with client failed, wrong message received from client.".to_string())
Expand Down Expand Up @@ -397,6 +376,8 @@ impl DistributionManager {
}
break;
}

buf.clear();
}
});
Ok(())
Expand Down Expand Up @@ -595,6 +576,8 @@ mod tests {
if size == 0 {
break;
}

buf.clear();
}
});

Expand Down
18 changes: 4 additions & 14 deletions producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
net::TcpStream,
};

use shared_structures::{metadata::BrokerDetails, Broadcast, Message};
use shared_structures::{metadata::BrokerDetails, Broadcast, Reader};

pub struct Producer {
pub mode: String,
Expand Down Expand Up @@ -34,19 +34,7 @@ impl Producer {
&shared_structures::Message::RequestClusterMetadata,
)?;

let mut reader = BufReader::new(&stream);
let mut buf = String::with_capacity(1024);

let bytes_read = reader
.read_line(&mut buf)
.map_err(|e| format!("Producer: {}", e))?;

if bytes_read == 0 {
return Err("Got nothing from broker, something went wrong".to_string());
}

let message =
serde_json::from_str::<Message>(&buf).map_err(|e| format!("Producer: {}", e))?;
let message = Reader::read_one_message(&mut stream)?;

match message {
shared_structures::Message::ClusterMetadata {
Expand Down Expand Up @@ -111,6 +99,8 @@ impl Producer {
}

println!("Recieved message from broker: {:#?}", buf);

buf.clear();
}
});

Expand Down

0 comments on commit 1048dc3

Please sign in to comment.