Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up client<>server synchronization in stream test #485

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions src/qos_core/src/io/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ mod test {
os::{fd::AsRawFd, unix::net::UnixListener},
path::Path,
str::from_utf8,
sync::{Arc, Barrier},
thread,
};

Expand All @@ -391,9 +392,14 @@ mod test {
pub fn new(path: String) -> Self {
Self { path }
}
pub fn start(&mut self) {
pub fn start(&mut self) -> Arc<Barrier> {
let listener = UnixListener::bind(&self.path).unwrap();
let path = self.path.clone();

// Create a barrier to synchronize between server and client
let barrier = Arc::new(Barrier::new(2));
let server_barrier = Arc::clone(&barrier);

Comment on lines +398 to +402
Copy link
Contributor

@r-n-o r-n-o Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, indentation is off (tabs vs spaces).

This tabs v space issue is there for all new lines you inserted so it's probably an editor configuration issue.

thread::spawn(move || {
let (mut stream, _peer_addr) = listener.accept().unwrap();

Expand All @@ -406,15 +412,18 @@ mod test {
let _ = stream.write(b"PONG").unwrap();
}

// Then shutdown the server
let _ = shutdown(listener.as_raw_fd(), Shutdown::Both);
let _ = close(listener.as_raw_fd());
// Wait for the client to signal that it's done
server_barrier.wait();

let server_socket = Path::new(&path);
if server_socket.exists() {
drop(std::fs::remove_file(server_socket));
}
// Remove the socket file after shutting down
let server_socket = Path::new(&path);
if server_socket.exists() {
let _ = std::fs::remove_file(server_socket);
}
});

// Return the barrier to the client to allow for synchronization
barrier
}
}

Expand Down Expand Up @@ -446,9 +455,7 @@ mod test {
// request
let mut server =
HarakiriPongServer::new(socket_server_path.to_string());
thread::spawn(move || {
server.start();
});
let barrier = server.start();

// Now create a stream connecting to this mini-server
let unix_addr =
Expand All @@ -459,12 +466,15 @@ mod test {
// Write "PING"
let written = pong_stream.write(b"PING").unwrap();
assert_eq!(written, 4);

// Read, and expect "PONG"
let mut resp = [0u8; 4];
let res = pong_stream.read(&mut resp).unwrap();
assert_eq!(res, 4);
assert_eq!(from_utf8(&resp).unwrap(), "PONG");

// Signal to the server that the client is done
barrier.wait();
}

#[test]
Expand Down