From b816f254bdfc95c42ddbba413ad7883104106655 Mon Sep 17 00:00:00 2001 From: koe Date: Fri, 18 Aug 2023 13:19:43 -0500 Subject: [PATCH] simplify server accept() --- src/axum.rs | 2 +- src/server.rs | 19 +++++++++---------- src/tungstenite.rs | 2 +- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/src/axum.rs b/src/axum.rs index bd1d6c9..7e6f69a 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -163,7 +163,7 @@ impl Upgrade { pub fn on_upgrade(self, server: Server) -> Response { self.ws.on_upgrade(move |socket| async move { let socket = Socket::new(socket, Default::default()); // TODO: Make it really configurable via Extensions - server.accept(socket, self.request, self.address).await; + server.accept(socket, self.request, self.address); }) } } diff --git a/src/server.rs b/src/server.rs index 0667b99..4ff0dff 100644 --- a/src/server.rs +++ b/src/server.rs @@ -103,7 +103,6 @@ struct NewConnection { socket: Socket, address: SocketAddr, request: Request, - respond_to: oneshot::Sender<()>, } struct Disconnected { @@ -129,7 +128,7 @@ where loop { if let Err(err) = async { tokio::select! { - Some(NewConnection{socket, address, respond_to, request}) = self.connection_receiver.recv() => { + Some(NewConnection{socket, address, request}) = self.connection_receiver.recv() => { let socket_sink = socket.sink.clone(); match self.extension.on_connect(socket, request, address).await { Ok(session) => { @@ -152,7 +151,6 @@ where } } } - respond_to.send(()).unwrap_or_default(); } Some(Disconnected{id, result}) = self.disconnection_receiver.recv() => { match &result { @@ -245,19 +243,20 @@ impl Server { } impl Server { - pub async fn accept(&self, socket: Socket, request: Request, address: SocketAddr) { + /// Accept a connection. Logs an error if the server actor is dead. + pub fn accept(&self, socket: Socket, request: Request, address: SocketAddr) { // TODO: can we refuse the connection here? - let (connection_indicator_sender, connection_indicator_receiver) = oneshot::channel(); - self.connection_sender + if self + .connection_sender .send(NewConnection { socket, request, address, - respond_to: connection_indicator_sender, }) - .map_err(|_| "connections is down") - .unwrap_or_default(); - connection_indicator_receiver.await.unwrap_or_default() + .is_err() + { + tracing::error!("accepted a connection but the server actor is dead"); + } } pub(crate) fn disconnected( diff --git a/src/tungstenite.rs b/src/tungstenite.rs index 96c672a..024aabf 100644 --- a/src/tungstenite.rs +++ b/src/tungstenite.rs @@ -215,7 +215,7 @@ cfg_if::cfg_if! { continue; } }; - server.accept(socket, request, address).await; + server.accept(socket, request, address); } }