From 1deef89d39934af2d49a238cf63fe78ba77d932b Mon Sep 17 00:00:00 2001 From: koe Date: Mon, 16 Oct 2023 12:38:06 -0500 Subject: [PATCH] add on_connect_fail() to ClientExt --- CHANGELOG.md | 1 + src/client.rs | 69 ++++++++++++------- .../client_connector_tokio.rs | 3 +- .../client_connector_wasm.rs | 3 +- src/lib.rs | 2 + 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 46c1ebb..d6617a0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,6 +36,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Add `ClientConnectorWasm` and `wasm_client` feature. Added `chat-client-wasm` example to show a WASM client that compiles. It currently only listens to the chat and can't input anything since browser does not have a terminal. - Refactor `Socket` and `Client` to not depend on `tokio` when compiling to WASM. This is a breaking change as the `Client` API now exposes `async_channel` error types instead of `tokio` error types, and `Client::call_with()` now takes an `async_channel::Sender` instead of a `tokio` oneshot. - Add unimplemented socket close codes. +- Add `ClientExt::on_connect_fail()` for custom handling of connection attempt failures. By default the client will continue trying to connect. Migration guide: diff --git a/src/client.rs b/src/client.rs index 7467280..5a3825d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -235,20 +235,41 @@ pub trait ClientExt: Send { /// Type the custom call - parameters passed to `on_call`. type Call: Send; - /// Handler for text messages from the server. Returning an error will force-close the client. + /// Handler for text messages from the server. + /// + /// Returning an error will force-close the client. async fn on_text(&mut self, text: String) -> Result<(), Error>; - /// Handler for binary messages from the server. Returning an error will force-close the client. + /// Handler for binary messages from the server. + /// + /// Returning an error will force-close the client. async fn on_binary(&mut self, bytes: Vec) -> Result<(), Error>; - /// Handler for custom calls from other parts from your program. Returning an error will force-close the client. + /// Handler for custom calls from other parts from your program. + /// + /// Returning an error will force-close the client. + /// /// This is useful for concurrency and polymorphism. async fn on_call(&mut self, call: Self::Call) -> Result<(), Error>; - /// Called when the client successfully connected(or reconnected). Returned errors will be ignored. + /// Called when the client successfully connected (or reconnected). + /// + /// Returning an error will force-close the client. async fn on_connect(&mut self) -> Result<(), Error> { Ok(()) } - /// Called when the connection is closed by the server. Returning an error will force-close the client. + /// Called when the client fails a connection/reconnection attempt. + /// + /// Returning an error will force-close the client. + /// + /// By default, the client will continue trying to connect. + /// Return [`ClientCloseMode::Close`] here to fully close instead. + async fn on_connect_fail(&mut self, _error: WSError) -> Result { + Ok(ClientCloseMode::Reconnect) + } + + /// Called when the connection is closed by the server. + /// + /// Returning an error will force-close the client. /// /// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead. /// @@ -259,6 +280,8 @@ pub trait ClientExt: Send { /// Called when the connection is closed by the socket dying. /// + /// Returning an error will force-close the client. + /// /// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead. /// /// For reconnections, use `ClientConfig::reconnect_interval`. @@ -276,13 +299,12 @@ pub trait ClientExt: Send { pub trait ClientConnector { type Handle: enfync::Handle; type Message: Into + From + std::fmt::Debug + Send + 'static; - type WSError: std::error::Error + Into; + type WSError: std::error::Error + Into + Send; type Socket: SinkExt + StreamExt> + Unpin + Send + 'static; - type ConnectError: std::error::Error + Send; /// Get the connector's runtime handle. fn handle(&self) -> Self::Handle; @@ -290,10 +312,7 @@ pub trait ClientConnector { /// Connect to a websocket server. /// /// Returns `Err` if the request is invalid. - async fn connect( - &self, - client_config: &ClientConfig, - ) -> Result; + async fn connect(&self, client_config: &ClientConfig) -> Result; } /// An `ezsockets` client. @@ -497,7 +516,7 @@ impl ClientActor { } } if closed_self { - tracing::trace!("client closed itself"); + tracing::debug!("client closed itself"); return Ok(()) } } @@ -514,7 +533,7 @@ impl ClientActor { Message::Text(text) => self.client.on_text(text).await?, Message::Binary(bytes) => self.client.on_binary(bytes).await?, Message::Close(frame) => { - tracing::trace!("client closed by server"); + tracing::debug!("client closed by server"); match self.client.on_close(frame).await? { ClientCloseMode::Reconnect => { @@ -539,7 +558,7 @@ impl ClientActor { tracing::warn!("connection error: {error}"); } None => { - tracing::trace!("client socket died"); + tracing::debug!("client socket died"); match self.client.on_disconnect().await? { ClientCloseMode::Reconnect => { @@ -581,10 +600,7 @@ async fn client_connect( match result { Ok(socket_impl) => { tracing::info!("successfully connected"); - if let Err(err) = client.on_connect().await { - tracing::error!("calling on_connect() failed due to {}, closing client", err); - return Err(err); - } + client.on_connect().await?; let socket = Socket::new( socket_impl, config.socket_config.clone().unwrap_or_default(), @@ -593,11 +609,16 @@ async fn client_connect( return Ok(Some(socket)); } Err(err) => { - tracing::warn!( - "connecting failed due to {}, will retry in {}s", - err, - config.reconnect_interval.as_secs() - ); + tracing::warn!("connecting failed due to {}", err); + match client.on_connect_fail(err.into()).await? { + ClientCloseMode::Reconnect => { + tracing::debug!("will retry in {}s", config.reconnect_interval.as_secs()); + } + ClientCloseMode::Close => { + tracing::debug!("client closed itself after a connection failure"); + return Ok(None); + } + } } }; @@ -624,7 +645,7 @@ async fn client_connect( match &inmessage.message { Some(Message::Close(frame)) => { - tracing::trace!(?frame, "client closed itself while connecting"); + tracing::debug!(?frame, "client closed itself while connecting"); return Ok(None); } _ => { diff --git a/src/client_connectors/client_connector_tokio.rs b/src/client_connectors/client_connector_tokio.rs index d064008..7577135 100644 --- a/src/client_connectors/client_connector_tokio.rs +++ b/src/client_connectors/client_connector_tokio.rs @@ -40,7 +40,6 @@ impl ClientConnector for ClientConnectorTokio { type Socket = tokio_tungstenite::WebSocketStream< tokio_tungstenite::MaybeTlsStream, >; - type ConnectError = tungstenite::error::Error; /// Get the connector's runtime handle. fn handle(&self) -> Self::Handle { @@ -50,7 +49,7 @@ impl ClientConnector for ClientConnectorTokio { /// Connect to a websocket server. /// /// Returns `Err` if the request is invalid. - async fn connect(&self, config: &ClientConfig) -> Result { + async fn connect(&self, config: &ClientConfig) -> Result { let request = config.connect_http_request(); let (socket, _) = tokio_tungstenite::connect_async(request).await?; Ok(socket) diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index a40c545..6cc5896 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -84,7 +84,6 @@ impl ClientConnector for ClientConnectorWasm { type Message = tokio_tungstenite_wasm::Message; type WSError = tokio_tungstenite_wasm::Error; type Socket = WebSocketStreamProxy; - type ConnectError = tokio_tungstenite_wasm::Error; /// Get the connector's runtime handle. fn handle(&self) -> Self::Handle { @@ -97,7 +96,7 @@ impl ClientConnector for ClientConnectorWasm { /// /// Panics if any headers were added to the client config. Websockets on browser does not support /// additional headers (use [`ClientConfig::query_parameter()`] instead). - async fn connect(&self, config: &ClientConfig) -> Result { + async fn connect(&self, config: &ClientConfig) -> Result { if config.headers().len() > 0 { panic!("client may not submit HTTP headers in WASM connection requests"); } diff --git a/src/lib.rs b/src/lib.rs index 96bfec9..b3715c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -52,5 +52,7 @@ cfg_if::cfg_if! { } } +pub use tokio_tungstenite_wasm::Error as WSError; + pub type Error = Box; pub type Request = http::Request<()>;