Skip to content

Commit

Permalink
refactor reconnect loop for use during initial connection attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Sep 30, 2023
1 parent 99d07bd commit ec4387b
Showing 1 changed file with 91 additions and 65 deletions.
156 changes: 91 additions & 65 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,23 +326,25 @@ pub async fn connect<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
) -> (Client<E>, impl Future<Output = Result<(), Error>>) {
let (to_socket_sender, to_socket_receiver) = mpsc::unbounded_channel();
let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel();
let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel();
let handle = Client {
to_socket_sender,
client_call_sender,
};
let mut client = client_fn(handle.clone());
let future = tokio::spawn(async move {
let http_request = config.connect_http_request();
tracing::info!("connecting to {}...", config.url);
let (stream, _) = tokio_tungstenite::connect_async(http_request).await?;
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
return Err(err);
}
let socket = Socket::new(stream, config.socket_config.clone().unwrap_or_default());
let Some(socket) = client_connect(
1usize,
&config,
&mut to_socket_receiver,
&mut client,
).await? else {
return Ok(());
};
tracing::info!("connected to {}", config.url);

let mut actor = ClientActor {
client,
to_socket_receiver,
Expand Down Expand Up @@ -393,9 +395,15 @@ impl<E: ClientExt> ClientActor<E> {
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
let Some(socket) = client_connect(
usize::MAX,
&self.config,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
}
Expand All @@ -410,9 +418,15 @@ impl<E: ClientExt> ClientActor<E> {
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
let Some(socket) = client_connect(
usize::MAX,
&self.config,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
}
Expand All @@ -425,61 +439,73 @@ impl<E: ClientExt> ClientActor<E> {

Ok(())
}
}

/// Returns Ok(true) if reconnecting succeeded, Ok(false) if the client closed itself, and `Err` if an error occurred.
async fn reconnect(&mut self) -> Result<bool, Error> {
for i in 1.. {
tracing::info!("reconnecting attempt no: {}...", i);
let connect_http_request = self.config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
match result {
Ok((socket, _)) => {
tracing::info!("successfully reconnected");
if let Err(err) = self.client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
}
self.socket = Socket::new(
socket,
self.config.socket_config.clone().unwrap_or_default(),
);
return Ok(true);
}
Err(err) => {
tracing::warn!(
"reconnecting failed due to {}. will retry in {}s",
err,
self.config.reconnect_interval.as_secs()
);
}
};
// Discard messages until either the reconnect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = tokio::time::sleep(self.config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(inmessage) = self.to_socket_receiver.recv() => {
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while reconnecting");
return Ok(false);
}
_ => {
tracing::warn!("client is reconnecting, discarding message from user");
continue;
}
}
},
else => {
tracing::warn!("client is dead, aborting reconnect");
return Err(Error::from("client died while trying to reconnect"));
},
/// Returns Ok(Some(socket)) if connecting succeeded, Ok(None) if the client closed itself, and `Err` if an error occurred.
async fn client_connect<E: ClientExt>(
max_attempts: usize,
config: &ClientConfig,
to_socket_receiver: &mut mpsc::UnboundedReceiver<InMessage>,
client: &mut E,
) -> Result<Option<Socket>, Error> {
for i in 1.. {
// connection attempt
tracing::info!("connecting attempt no: {}...", i);
let connect_http_request = config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
match result {
Ok((socket, _)) => {
tracing::info!("successfully connected");
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
}
let socket = Socket::new(socket, config.socket_config.clone().unwrap_or_default());
return Ok(Some(socket));
}
Err(err) => {
tracing::warn!(
"connecting failed due to {}, will retry in {}s",
err,
config.reconnect_interval.as_secs()
);
}
};

// abort if we have exceeded the max attempts
if i >= max_attempts {
return Err(Error::from(format!(
"failed to connect after {} attempt(s), aborting...",
i
)));
}

Err(Error::from("client failed to reconnect"))
// Discard messages until either the connect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = tokio::time::sleep(config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(inmessage) = to_socket_receiver.recv() => {
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while connecting");
return Ok(None);
}
_ => {
tracing::warn!("client is connecting, discarding message from user");
continue;
}
}
},
else => {
tracing::warn!("client is dead, aborting connection attempts");
return Err(Error::from("client died while trying to connect"));
},
}
}
}

Err(Error::from("client failed to connect"))
}

0 comments on commit ec4387b

Please sign in to comment.