From f6e9646786c0fa963eb04c03fa28b7ab676bf3f1 Mon Sep 17 00:00:00 2001 From: koe Date: Sat, 30 Sep 2023 16:22:11 -0500 Subject: [PATCH 1/2] refactor reconnect loop for use during initial connection attempt --- src/client.rs | 153 +++++++++++++++++++++++++++++--------------------- 1 file changed, 88 insertions(+), 65 deletions(-) diff --git a/src/client.rs b/src/client.rs index b80df31..1965b53 100644 --- a/src/client.rs +++ b/src/client.rs @@ -326,7 +326,7 @@ pub async fn connect( client_fn: impl FnOnce(Client) -> E, config: ClientConfig, ) -> (Client, impl Future>) { - let (to_socket_sender, to_socket_receiver) = mpsc::unbounded_channel(); + let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel(); let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel(); let handle = Client { to_socket_sender, @@ -334,15 +334,14 @@ pub async fn connect( }; let mut client = client_fn(handle.clone()); let future = tokio::spawn(async move { - let http_request = config.connect_http_request(); tracing::info!("connecting to {}...", config.url); - let (stream, _) = tokio_tungstenite::connect_async(http_request).await?; - if let Err(err) = client.on_connect().await { - tracing::error!("calling on_connect() failed due to {}", err); - return Err(err); - } - let socket = Socket::new(stream, config.socket_config.clone().unwrap_or_default()); + let Some(socket) = + client_connect(1usize, &config, &mut to_socket_receiver, &mut client).await? + else { + return Ok(()); + }; tracing::info!("connected to {}", config.url); + let mut actor = ClientActor { client, to_socket_receiver, @@ -393,9 +392,15 @@ impl ClientActor { match self.client.on_close(frame).await? { ClientCloseMode::Reconnect => { - if !self.reconnect().await? { - return Ok(()) - } + let Some(socket) = client_connect( + usize::MAX, + &self.config, + &mut self.to_socket_receiver, + &mut self.client, + ).await? else { + return Ok(()); + }; + self.socket = socket; }, ClientCloseMode::Close => return Ok(()) } @@ -410,9 +415,15 @@ impl ClientActor { match self.client.on_disconnect().await? { ClientCloseMode::Reconnect => { - if !self.reconnect().await? { - return Ok(()) - } + let Some(socket) = client_connect( + usize::MAX, + &self.config, + &mut self.to_socket_receiver, + &mut self.client, + ).await? else { + return Ok(()); + }; + self.socket = socket; }, ClientCloseMode::Close => return Ok(()) } @@ -425,61 +436,73 @@ impl ClientActor { Ok(()) } +} - /// Returns Ok(true) if reconnecting succeeded, Ok(false) if the client closed itself, and `Err` if an error occurred. - async fn reconnect(&mut self) -> Result { - for i in 1.. { - tracing::info!("reconnecting attempt no: {}...", i); - let connect_http_request = self.config.connect_http_request(); - let result = tokio_tungstenite::connect_async(connect_http_request).await; - match result { - Ok((socket, _)) => { - tracing::info!("successfully reconnected"); - if let Err(err) = self.client.on_connect().await { - tracing::error!("calling on_connect() failed due to {}", err); - } - self.socket = Socket::new( - socket, - self.config.socket_config.clone().unwrap_or_default(), - ); - return Ok(true); - } - Err(err) => { - tracing::warn!( - "reconnecting failed due to {}. will retry in {}s", - err, - self.config.reconnect_interval.as_secs() - ); - } - }; - // Discard messages until either the reconnect interval passes, the socket receiver disconnects, or - // the user sends a close message. - let sleep = tokio::time::sleep(self.config.reconnect_interval); - tokio::pin!(sleep); - loop { - tokio::select! { - _ = &mut sleep => break, - Some(inmessage) = self.to_socket_receiver.recv() => { - match &inmessage.message - { - Some(Message::Close(frame)) => { - tracing::trace!(?frame, "client closed itself while reconnecting"); - return Ok(false); - } - _ => { - tracing::warn!("client is reconnecting, discarding message from user"); - continue; - } - } - }, - else => { - tracing::warn!("client is dead, aborting reconnect"); - return Err(Error::from("client died while trying to reconnect")); - }, +/// Returns Ok(Some(socket)) if connecting succeeded, Ok(None) if the client closed itself, and `Err` if an error occurred. +async fn client_connect( + max_attempts: usize, + config: &ClientConfig, + to_socket_receiver: &mut mpsc::UnboundedReceiver, + client: &mut E, +) -> Result, Error> { + for i in 1.. { + // connection attempt + tracing::info!("connecting attempt no: {}...", i); + let connect_http_request = config.connect_http_request(); + let result = tokio_tungstenite::connect_async(connect_http_request).await; + match result { + Ok((socket, _)) => { + tracing::info!("successfully connected"); + if let Err(err) = client.on_connect().await { + tracing::error!("calling on_connect() failed due to {}", err); } + let socket = Socket::new(socket, config.socket_config.clone().unwrap_or_default()); + return Ok(Some(socket)); } + Err(err) => { + tracing::warn!( + "connecting failed due to {}, will retry in {}s", + err, + config.reconnect_interval.as_secs() + ); + } + }; + + // abort if we have exceeded the max attempts + if i >= max_attempts { + return Err(Error::from(format!( + "failed to connect after {} attempt(s), aborting...", + i + ))); } - Err(Error::from("client failed to reconnect")) + // Discard messages until either the connect interval passes, the socket receiver disconnects, or + // the user sends a close message. + let sleep = tokio::time::sleep(config.reconnect_interval); + tokio::pin!(sleep); + loop { + tokio::select! { + _ = &mut sleep => break, + Some(inmessage) = to_socket_receiver.recv() => { + match &inmessage.message + { + Some(Message::Close(frame)) => { + tracing::trace!(?frame, "client closed itself while connecting"); + return Ok(None); + } + _ => { + tracing::warn!("client is connecting, discarding message from user"); + continue; + } + } + }, + else => { + tracing::warn!("client is dead, aborting connection attempts"); + return Err(Error::from("client died while trying to connect")); + }, + } + } } + + Err(Error::from("client failed to connect")) } From 6b9443bf2103fcca2174005e775e83d5a15513e5 Mon Sep 17 00:00:00 2001 From: koe Date: Sat, 30 Sep 2023 17:56:48 -0500 Subject: [PATCH 2/2] add client config for number of initial connect/reconnect attempts --- CHANGELOG.md | 7 ++++--- src/client.rs | 38 ++++++++++++++++++++++++++++++++------ 2 files changed, 36 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 22e855d..08c5cdf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,9 +22,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Return `Ok(MessageSignal)` from `Client` and `Session` `.binary()/.text()/.close()` endpoints instead of `Ok(())`. The `MessageSignal::state()` method will indicate the current state of the message (sending/sent/failed). - Clients attempt to reconnect immediately instead of after one full reconnect interval. - Incoming user messages are discarded while a client is reconnecting, to better match the usual behavior of a websocket connection. If you want messages to be buffered while reconnecting, you should implement your own buffer. -- rename `socket::Config` -> `socket::SocketConfig` and add a `heartbeat_ping_msg_fn` member variable in order to support custom Ping/Pong protocols - - add `ClientConfig::socket_config()` setter so clients can define their socket's config - - add `ezsockets::axum::Upgrade::on_upgrade_with_config()` that accepts a `SocketConfig` +- Rename `socket::Config` -> `socket::SocketConfig` and add a `heartbeat_ping_msg_fn` member variable in order to support custom Ping/Pong protocols. + - Add `ClientConfig::socket_config()` setter so clients can define their socket's config. + - Add `ezsockets::axum::Upgrade::on_upgrade_with_config()` that accepts a `SocketConfig`. +- Refactor `ezeockets::client::connect()` to use a retry loop for the initial connection. Add `max_initial_connect_attempts` and `max_reconnect_attempts` options to the `ClientConfig` (they default to 'infinite'). Migration guide: diff --git a/src/client.rs b/src/client.rs index 1965b53..506eed9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -71,6 +71,8 @@ pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0); #[derive(Debug)] pub struct ClientConfig { url: Url, + max_initial_connect_attempts: usize, + max_reconnect_attempts: usize, reconnect_interval: Duration, headers: http::HeaderMap, socket_config: Option, @@ -87,6 +89,8 @@ impl ClientConfig { let url = url.try_into().expect("invalid URL"); Self { url, + max_initial_connect_attempts: usize::MAX, + max_reconnect_attempts: usize::MAX, reconnect_interval: DEFAULT_RECONNECT_INTERVAL, headers: http::HeaderMap::new(), socket_config: None, @@ -152,6 +156,22 @@ impl ClientConfig { self } + /// Set the maximum number of connection attempts when starting a client. + /// + /// Defaults to infinite. + pub fn max_initial_connect_attempts(mut self, max_initial_connect_attempts: usize) -> Self { + self.max_initial_connect_attempts = max_initial_connect_attempts; + self + } + + /// Set the maximum number of attempts when reconnecting. + /// + /// Defaults to infinite. + pub fn max_reconnect_attempts(mut self, max_reconnect_attempts: usize) -> Self { + self.max_reconnect_attempts = max_reconnect_attempts; + self + } + /// Set the reconnect interval. pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self { self.reconnect_interval = reconnect_interval; @@ -335,8 +355,13 @@ pub async fn connect( let mut client = client_fn(handle.clone()); let future = tokio::spawn(async move { tracing::info!("connecting to {}...", config.url); - let Some(socket) = - client_connect(1usize, &config, &mut to_socket_receiver, &mut client).await? + let Some(socket) = client_connect( + config.max_initial_connect_attempts, + &config, + &mut to_socket_receiver, + &mut client, + ) + .await? else { return Ok(()); }; @@ -393,7 +418,7 @@ impl ClientActor { { ClientCloseMode::Reconnect => { let Some(socket) = client_connect( - usize::MAX, + self.config.max_reconnect_attempts, &self.config, &mut self.to_socket_receiver, &mut self.client, @@ -416,7 +441,7 @@ impl ClientActor { { ClientCloseMode::Reconnect => { let Some(socket) = client_connect( - usize::MAX, + self.config.max_reconnect_attempts, &self.config, &mut self.to_socket_receiver, &mut self.client, @@ -454,7 +479,8 @@ async fn client_connect( Ok((socket, _)) => { tracing::info!("successfully connected"); if let Err(err) = client.on_connect().await { - tracing::error!("calling on_connect() failed due to {}", err); + tracing::error!("calling on_connect() failed due to {}, closing client", err); + return Err(err); } let socket = Socket::new(socket, config.socket_config.clone().unwrap_or_default()); return Ok(Some(socket)); @@ -468,7 +494,7 @@ async fn client_connect( } }; - // abort if we have exceeded the max attempts + // abort if we have reached the max attempts if i >= max_attempts { return Err(Error::from(format!( "failed to connect after {} attempt(s), aborting...",