Skip to content

Commit

Permalink
simplify server accept()
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Aug 18, 2023
1 parent 219c895 commit b816f25
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ impl Upgrade {
pub fn on_upgrade<E: ServerExt + 'static>(self, server: Server<E>) -> Response {
self.ws.on_upgrade(move |socket| async move {
let socket = Socket::new(socket, Default::default()); // TODO: Make it really configurable via Extensions
server.accept(socket, self.request, self.address).await;
server.accept(socket, self.request, self.address);
})
}
}
19 changes: 9 additions & 10 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ struct NewConnection {
socket: Socket,
address: SocketAddr,
request: Request,
respond_to: oneshot::Sender<()>,
}

struct Disconnected<E: ServerExt> {
Expand All @@ -129,7 +128,7 @@ where
loop {
if let Err(err) = async {
tokio::select! {
Some(NewConnection{socket, address, respond_to, request}) = self.connection_receiver.recv() => {
Some(NewConnection{socket, address, request}) = self.connection_receiver.recv() => {
let socket_sink = socket.sink.clone();
match self.extension.on_connect(socket, request, address).await {
Ok(session) => {
Expand All @@ -152,7 +151,6 @@ where
}
}
}
respond_to.send(()).unwrap_or_default();
}
Some(Disconnected{id, result}) = self.disconnection_receiver.recv() => {
match &result {
Expand Down Expand Up @@ -245,19 +243,20 @@ impl<E: ServerExt + 'static> Server<E> {
}

impl<E: ServerExt> Server<E> {
pub async fn accept(&self, socket: Socket, request: Request, address: SocketAddr) {
/// Accept a connection. Logs an error if the server actor is dead.
pub fn accept(&self, socket: Socket, request: Request, address: SocketAddr) {
// TODO: can we refuse the connection here?
let (connection_indicator_sender, connection_indicator_receiver) = oneshot::channel();
self.connection_sender
if self
.connection_sender
.send(NewConnection {
socket,
request,
address,
respond_to: connection_indicator_sender,
})
.map_err(|_| "connections is down")
.unwrap_or_default();
connection_indicator_receiver.await.unwrap_or_default()
.is_err()
{
tracing::error!("accepted a connection but the server actor is dead");
}
}

pub(crate) fn disconnected(
Expand Down
2 changes: 1 addition & 1 deletion src/tungstenite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ cfg_if::cfg_if! {
continue;
}
};
server.accept(socket, request, address).await;
server.accept(socket, request, address);
}
}

Expand Down

0 comments on commit b816f25

Please sign in to comment.