Skip to content

Commit

Permalink
allow client to close itself while in the reconnect loop
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Sep 30, 2023
1 parent 7393f98 commit b3cb137
Showing 1 changed file with 27 additions and 8 deletions.
35 changes: 27 additions & 8 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,11 @@ impl<E: ClientExt> ClientActor<E> {
tracing::trace!("client closed by server");
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => self.reconnect().await?,
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
},
ClientCloseMode::Close => return Ok(())
}
}
Expand All @@ -405,7 +409,11 @@ impl<E: ClientExt> ClientActor<E> {
tracing::trace!("client socket died");
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => self.reconnect().await?,
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
},
ClientCloseMode::Close => return Ok(())
}
}
Expand All @@ -418,7 +426,8 @@ impl<E: ClientExt> ClientActor<E> {
Ok(())
}

async fn reconnect(&mut self) -> Result<(), Error> {
/// Returns Ok(true) if reconnecting succeeded, Ok(false) if the client closed itself, and `Err` if an error occurred.
async fn reconnect(&mut self) -> Result<bool, Error> {
for i in 1.. {
tracing::info!("reconnecting attempt no: {}...", i);
let connect_http_request = self.config.connect_http_request();
Expand All @@ -433,7 +442,7 @@ impl<E: ClientExt> ClientActor<E> {
socket,
self.config.socket_config.clone().unwrap_or_default(),
);
return Ok(());
return Ok(true);
}
Err(err) => {
tracing::warn!(
Expand All @@ -443,15 +452,25 @@ impl<E: ClientExt> ClientActor<E> {
);
}
};
// discard messages until either the reconnect interval passes or the socket receiver disconnects
// Discard messages until either the reconnect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = tokio::time::sleep(self.config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(_) = self.to_socket_receiver.recv() => {
tracing::warn!("client is reconnecting, discarding message from user");
continue
Some(inmessage) = self.to_socket_receiver.recv() => {
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while reconnecting");
return Ok(false);
}
_ => {
tracing::warn!("client is reconnecting, discarding message from user");
continue;
}
}
},
else => {
tracing::warn!("client is dead, aborting reconnect");
Expand Down

0 comments on commit b3cb137

Please sign in to comment.