diff --git a/src/client.rs b/src/client.rs index ece24c8..985a2fe 100644 --- a/src/client.rs +++ b/src/client.rs @@ -72,7 +72,7 @@ pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0); #[derive(Debug)] pub struct ClientConfig { url: Url, - reconnect_interval: Option, + reconnect_interval: Duration, headers: http::HeaderMap, } @@ -87,7 +87,7 @@ impl ClientConfig { let url = url.try_into().expect("invalid URL"); Self { url, - reconnect_interval: Some(DEFAULT_RECONNECT_INTERVAL), + reconnect_interval: DEFAULT_RECONNECT_INTERVAL, headers: http::HeaderMap::new(), } } @@ -152,7 +152,7 @@ impl ClientConfig { } pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self { - self.reconnect_interval = Some(reconnect_interval); + self.reconnect_interval = reconnect_interval; self } @@ -205,7 +205,7 @@ pub trait ClientExt: Send { /// /// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead. /// - /// For reconnections, use `ClientConfig::reconnect_interval`(enabled by default). + /// For reconnections, use `ClientConfig::reconnect_interval`. async fn on_close(&mut self, _frame: Option) -> Result { Ok(ClientCloseMode::Reconnect) } @@ -214,7 +214,7 @@ pub trait ClientExt: Send { /// /// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead. /// - /// For reconnections, use `ClientConfig::reconnect_interval`(enabled by default). + /// For reconnections, use `ClientConfig::reconnect_interval`. async fn on_disconnect(&mut self) -> Result { Ok(ClientCloseMode::Reconnect) } @@ -382,13 +382,7 @@ impl ClientActor { } async fn try_reconnect(&mut self) -> bool { - let Some(reconnect_interval) = self.config.reconnect_interval else { - tracing::warn!("no reconnect interval set, aborting reconnect attempt"); - return false; - }; - tracing::info!("reconnecting in {}s", reconnect_interval.as_secs()); for i in 1.. { - tokio::time::sleep(reconnect_interval).await; tracing::info!("reconnecting attempt no: {}...", i); let connect_http_request = self.config.connect_http_request(); let result = tokio_tungstenite::connect_async(connect_http_request).await; @@ -407,10 +401,26 @@ impl ClientActor { tracing::warn!( "reconnecting failed due to {}. will retry in {}s", err, - reconnect_interval.as_secs() + self.config.reconnect_interval.as_secs() ); } }; + // discard messages until either the reconnect interval passes or the socket receiver disconnects + let sleep = tokio::time::sleep(self.config.reconnect_interval); + tokio::pin!(sleep); + loop { + tokio::select! { + _ = &mut sleep => break, + Some(_) = self.to_socket_receiver.recv() => { + tracing::warn!("client is reconnecting, discarding message from user"); + continue + }, + else => { + tracing::warn!("client is dead, aborting reconnect"); + return false; + }, + } + } } false