Skip to content
This repository has been archived by the owner on Mar 23, 2024. It is now read-only.

Commit

Permalink
Leader/follower functionality - Observer (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwbh authored Aug 31, 2023
1 parent 4fbecf4 commit 65247a4
Show file tree
Hide file tree
Showing 13 changed files with 652 additions and 232 deletions.
27 changes: 16 additions & 11 deletions broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use partition::PartitionDetails;
use shared_structures::{Broadcast, DirManager, Message, Metadata, Status, Topic};
use shared_structures::{Broadcast, DirManager, EntityType, Message, Metadata, Status, Topic};
use uuid::Uuid;

mod partition;
Expand Down Expand Up @@ -38,9 +38,9 @@ impl Broker {
addr: String,
name: Option<&String>,
) -> Result<Arc<Mutex<Self>>, String> {
let custom_dir: Option<PathBuf> = name.map(|f| f.into());
let custom_dir: Option<PathBuf> = name.map(|f| format!("/broker/{}", f).into());

let cluster_metadata = Metadata { brokers: vec![] };
let cluster_metadata: Metadata = Metadata::default();

let connected_producers = Arc::new(Mutex::new(vec![]));

Expand Down Expand Up @@ -93,12 +93,17 @@ impl Broker {
}

fn handshake(&mut self) -> Result<(), String> {
Broadcast::to(
Broadcast::to_many(
&mut self.stream,
&Message::BrokerWantsToConnect {
id: self.local_metadata.id.clone(),
addr: self.addr.clone(),
},
&[
Message::EntityWantsToConnect {
entity_type: EntityType::Broker,
},
Message::BrokerConnectionDetails {
id: self.local_metadata.id.clone(),
addr: self.addr.clone(),
},
],
)
}

Expand All @@ -108,12 +113,12 @@ impl Broker {
remote: Option<&mut TcpStream>,
) -> Result<(), String> {
let message = serde_json::from_str::<Message>(raw_data).map_err(|e| e.to_string())?;
self.handle_by_message(&message, remote)
self.handle_message(&message, remote)
}

// Messages from Producers and Observers are all processed here
// maybe better to split it into two functions for clarity.
fn handle_by_message(
fn handle_message(
&mut self,
message: &Message,
remote: Option<&mut TcpStream>,
Expand Down Expand Up @@ -171,7 +176,7 @@ impl Broker {
}
}
_ => Err(format!(
"Message {:?} is not handled in `handle_by_message`.",
"Message {:?} is not handled in `handle_message`.",
message
)),
}
Expand Down
2 changes: 1 addition & 1 deletion broker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let tcp_stream: Option<TcpStream>;
let mut sleep_interval = 1000;

// Should start trying to connect to the observer in intervals until success
// Should trying to connect to the observer in intervals until success
loop {
match TcpStream::connect(addr) {
Ok(stream) => {
Expand Down
41 changes: 26 additions & 15 deletions observer/src/distribution_manager/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,46 @@ use super::partition::Partition;
#[derive(Debug)]
pub struct Broker {
pub id: String,
pub stream: TcpStream,
pub stream: Option<TcpStream>,
pub partitions: Vec<Partition>,
pub reader: BufReader<TcpStream>,
pub reader: Option<BufReader<TcpStream>>,
pub status: Status,
pub addr: String,
}

impl Broker {
pub fn from(id: String, stream: TcpStream, addr: String) -> Result<Self, String> {
let read_stream = stream.try_clone().map_err(|e| e.to_string())?;
let reader = BufReader::new(read_stream);
pub fn from(id: String, stream: Option<TcpStream>, addr: String) -> Result<Self, String> {
if let Some(stream) = stream {
let read_stream = stream.try_clone().map_err(|e| e.to_string())?;
let reader = BufReader::new(read_stream);

Ok(Self {
id,
partitions: vec![],
stream,
reader,
status: Status::Up,
addr,
})
Ok(Self {
id,
partitions: vec![],
stream: Some(stream),
reader: Some(reader),
status: Status::Up,
addr,
})
} else {
Ok(Self {
id,
partitions: vec![],
stream: None,
reader: None,
status: Status::Up,
addr,
})
}
}

pub fn restore(&mut self, stream: TcpStream, addr: String) -> Result<(), String> {
let read_stream = stream.try_clone().map_err(|e| e.to_string())?;
let reader = BufReader::new(read_stream);

self.status = Status::Up;
self.stream = stream;
self.reader = reader;
self.stream = Some(stream);
self.reader = Some(reader);
self.addr = addr;

for partition in self.partitions.iter_mut() {
Expand Down
Loading

0 comments on commit 65247a4

Please sign in to comment.