Skip to content

Commit

Permalink
Improve reconnect cleanup guarantees (#96)
Browse files Browse the repository at this point in the history
* update client connection loop to always drain incoming messages before making a connection attempt
  • Loading branch information
UkoeHB authored Nov 14, 2023
1 parent ccad632 commit 47ad265
Showing 1 changed file with 27 additions and 26 deletions.
53 changes: 27 additions & 26 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,31 @@ async fn client_connect<E: ClientExt, Connector: ClientConnector>(
client: &mut E,
) -> Result<Option<Socket>, Error> {
for i in 1.. {
// handle incoming user messages
// - It is important to do this at least once after a disconnect so users can guarantee that messages are not
// sent 'across' a reconnect cycle. They can achieve that, in combination with this step, by manually
// preventing messages from being sent to to_socket_receiver between on_disconnect/on_close and on_connect.
loop {
let in_message = to_socket_receiver.try_recv();
match in_message {
Ok(inmessage) => match &inmessage.message {
Some(Message::Close(frame)) => {
tracing::debug!(?frame, "client closed itself while connecting");
return Ok(None);
}
_ => {
tracing::warn!("client is connecting, discarding message from user");
continue;
}
},
Err(async_channel::TryRecvError::Empty) => break,
Err(async_channel::TryRecvError::Closed) => {
tracing::warn!("client is dead, aborting connection attempts");
return Err(Error::from("client died while trying to connect"));
}
}
}

// connection attempt
tracing::info!("connecting attempt no: {}...", i);
let result = client_connector.connect(config).await;
Expand Down Expand Up @@ -630,32 +655,8 @@ async fn client_connect<E: ClientExt, Connector: ClientConnector>(
)));
}

// Discard messages until either the connect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = sleep(config.reconnect_interval).fuse();
futures::pin_mut!(sleep);
loop {
futures::select! {
_ = sleep => break,
res = to_socket_receiver.recv().fuse() => {
let Ok(inmessage) = res else {
tracing::warn!("client is dead, aborting connection attempts");
return Err(Error::from("client died while trying to connect"));
};
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::debug!(?frame, "client closed itself while connecting");
return Ok(None);
}
_ => {
tracing::warn!("client is connecting, discarding message from user");
continue;
}
}
},
}
}
// wait for the connect interval
sleep(config.reconnect_interval).await;
}

Err(Error::from("client failed to connect"))
Expand Down

0 comments on commit 47ad265

Please sign in to comment.