From b3cb137fa29567e3fb7fcd283c81aa1084a24f7a Mon Sep 17 00:00:00 2001 From: koe Date: Sat, 30 Sep 2023 15:14:52 -0500 Subject: [PATCH] allow client to close itself while in the reconnect loop --- src/client.rs | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/src/client.rs b/src/client.rs index 5fa934f..b80df31 100644 --- a/src/client.rs +++ b/src/client.rs @@ -392,7 +392,11 @@ impl ClientActor { tracing::trace!("client closed by server"); match self.client.on_close(frame).await? { - ClientCloseMode::Reconnect => self.reconnect().await?, + ClientCloseMode::Reconnect => { + if !self.reconnect().await? { + return Ok(()) + } + }, ClientCloseMode::Close => return Ok(()) } } @@ -405,7 +409,11 @@ impl ClientActor { tracing::trace!("client socket died"); match self.client.on_disconnect().await? { - ClientCloseMode::Reconnect => self.reconnect().await?, + ClientCloseMode::Reconnect => { + if !self.reconnect().await? { + return Ok(()) + } + }, ClientCloseMode::Close => return Ok(()) } } @@ -418,7 +426,8 @@ impl ClientActor { Ok(()) } - async fn reconnect(&mut self) -> Result<(), Error> { + /// Returns Ok(true) if reconnecting succeeded, Ok(false) if the client closed itself, and `Err` if an error occurred. + async fn reconnect(&mut self) -> Result { for i in 1.. { tracing::info!("reconnecting attempt no: {}...", i); let connect_http_request = self.config.connect_http_request(); @@ -433,7 +442,7 @@ impl ClientActor { socket, self.config.socket_config.clone().unwrap_or_default(), ); - return Ok(()); + return Ok(true); } Err(err) => { tracing::warn!( @@ -443,15 +452,25 @@ impl ClientActor { ); } }; - // discard messages until either the reconnect interval passes or the socket receiver disconnects + // Discard messages until either the reconnect interval passes, the socket receiver disconnects, or + // the user sends a close message. let sleep = tokio::time::sleep(self.config.reconnect_interval); tokio::pin!(sleep); loop { tokio::select! { _ = &mut sleep => break, - Some(_) = self.to_socket_receiver.recv() => { - tracing::warn!("client is reconnecting, discarding message from user"); - continue + Some(inmessage) = self.to_socket_receiver.recv() => { + match &inmessage.message + { + Some(Message::Close(frame)) => { + tracing::trace!(?frame, "client closed itself while reconnecting"); + return Ok(false); + } + _ => { + tracing::warn!("client is reconnecting, discarding message from user"); + continue; + } + } }, else => { tracing::warn!("client is dead, aborting reconnect");