Skip to content

Commit

Permalink
Merge pull request #81 from UkoeHB/fix_reconnect
Browse files Browse the repository at this point in the history
Adjust client reconnect
  • Loading branch information
UkoeHB authored Sep 27, 2023
2 parents 61b2e51 + 5651e93 commit 97c81c8
Showing 1 changed file with 22 additions and 12 deletions.
34 changes: 22 additions & 12 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0);
#[derive(Debug)]
pub struct ClientConfig {
url: Url,
reconnect_interval: Option<Duration>,
reconnect_interval: Duration,
headers: http::HeaderMap,
}

Expand All @@ -87,7 +87,7 @@ impl ClientConfig {
let url = url.try_into().expect("invalid URL");
Self {
url,
reconnect_interval: Some(DEFAULT_RECONNECT_INTERVAL),
reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
headers: http::HeaderMap::new(),
}
}
Expand Down Expand Up @@ -152,7 +152,7 @@ impl ClientConfig {
}

pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self {
self.reconnect_interval = Some(reconnect_interval);
self.reconnect_interval = reconnect_interval;
self
}

Expand Down Expand Up @@ -205,7 +205,7 @@ pub trait ClientExt: Send {
///
/// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead.
///
/// For reconnections, use `ClientConfig::reconnect_interval`(enabled by default).
/// For reconnections, use `ClientConfig::reconnect_interval`.
async fn on_close(&mut self, _frame: Option<CloseFrame>) -> Result<ClientCloseMode, Error> {
Ok(ClientCloseMode::Reconnect)
}
Expand All @@ -214,7 +214,7 @@ pub trait ClientExt: Send {
///
/// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead.
///
/// For reconnections, use `ClientConfig::reconnect_interval`(enabled by default).
/// For reconnections, use `ClientConfig::reconnect_interval`.
async fn on_disconnect(&mut self) -> Result<ClientCloseMode, Error> {
Ok(ClientCloseMode::Reconnect)
}
Expand Down Expand Up @@ -413,13 +413,7 @@ impl<E: ClientExt> ClientActor<E> {
}

async fn try_reconnect(&mut self) -> bool {
let Some(reconnect_interval) = self.config.reconnect_interval else {
tracing::warn!("no reconnect interval set, aborting reconnect attempt");
return false;
};
tracing::info!("reconnecting in {}s", reconnect_interval.as_secs());
for i in 1.. {
tokio::time::sleep(reconnect_interval).await;
tracing::info!("reconnecting attempt no: {}...", i);
let connect_http_request = self.config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
Expand All @@ -438,10 +432,26 @@ impl<E: ClientExt> ClientActor<E> {
tracing::warn!(
"reconnecting failed due to {}. will retry in {}s",
err,
reconnect_interval.as_secs()
self.config.reconnect_interval.as_secs()
);
}
};
// discard messages until either the reconnect interval passes or the socket receiver disconnects
let sleep = tokio::time::sleep(self.config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(_) = self.to_socket_receiver.recv() => {
tracing::warn!("client is reconnecting, discarding message from user");
continue
},
else => {
tracing::warn!("client is dead, aborting reconnect");
return false;
},
}
}
}

false
Expand Down

0 comments on commit 97c81c8

Please sign in to comment.