Skip to content

Commit

Permalink
add on_connect_fail() to ClientExt
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Oct 16, 2023
1 parent 039cd67 commit 1deef89
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Add `ClientConnectorWasm` and `wasm_client` feature. Added `chat-client-wasm` example to show a WASM client that compiles. It currently only listens to the chat and can't input anything since browser does not have a terminal.
- Refactor `Socket` and `Client` to not depend on `tokio` when compiling to WASM. This is a breaking change as the `Client` API now exposes `async_channel` error types instead of `tokio` error types, and `Client::call_with()` now takes an `async_channel::Sender` instead of a `tokio` oneshot.
- Add unimplemented socket close codes.
- Add `ClientExt::on_connect_fail()` for custom handling of connection attempt failures. By default the client will continue trying to connect.


Migration guide:
Expand Down
69 changes: 45 additions & 24 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,20 +235,41 @@ pub trait ClientExt: Send {
/// Type the custom call - parameters passed to `on_call`.
type Call: Send;

/// Handler for text messages from the server. Returning an error will force-close the client.
/// Handler for text messages from the server.
///
/// Returning an error will force-close the client.
async fn on_text(&mut self, text: String) -> Result<(), Error>;
/// Handler for binary messages from the server. Returning an error will force-close the client.
/// Handler for binary messages from the server.
///
/// Returning an error will force-close the client.
async fn on_binary(&mut self, bytes: Vec<u8>) -> Result<(), Error>;
/// Handler for custom calls from other parts from your program. Returning an error will force-close the client.
/// Handler for custom calls from other parts from your program.
///
/// Returning an error will force-close the client.
///
/// This is useful for concurrency and polymorphism.
async fn on_call(&mut self, call: Self::Call) -> Result<(), Error>;

/// Called when the client successfully connected(or reconnected). Returned errors will be ignored.
/// Called when the client successfully connected (or reconnected).
///
/// Returning an error will force-close the client.
async fn on_connect(&mut self) -> Result<(), Error> {
Ok(())
}

/// Called when the connection is closed by the server. Returning an error will force-close the client.
/// Called when the client fails a connection/reconnection attempt.
///
/// Returning an error will force-close the client.
///
/// By default, the client will continue trying to connect.
/// Return [`ClientCloseMode::Close`] here to fully close instead.
async fn on_connect_fail(&mut self, _error: WSError) -> Result<ClientCloseMode, Error> {
Ok(ClientCloseMode::Reconnect)
}

/// Called when the connection is closed by the server.
///
/// Returning an error will force-close the client.
///
/// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead.
///
Expand All @@ -259,6 +280,8 @@ pub trait ClientExt: Send {

/// Called when the connection is closed by the socket dying.
///
/// Returning an error will force-close the client.
///
/// By default, the client will try to reconnect. Return [`ClientCloseMode::Close`] here to fully close instead.
///
/// For reconnections, use `ClientConfig::reconnect_interval`.
Expand All @@ -276,24 +299,20 @@ pub trait ClientExt: Send {
pub trait ClientConnector {
type Handle: enfync::Handle;
type Message: Into<RawMessage> + From<RawMessage> + std::fmt::Debug + Send + 'static;
type WSError: std::error::Error + Into<WSError>;
type WSError: std::error::Error + Into<WSError> + Send;
type Socket: SinkExt<Self::Message, Error = Self::WSError>
+ StreamExt<Item = Result<Self::Message, Self::WSError>>
+ Unpin
+ Send
+ 'static;
type ConnectError: std::error::Error + Send;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle;

/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(
&self,
client_config: &ClientConfig,
) -> Result<Self::Socket, Self::ConnectError>;
async fn connect(&self, client_config: &ClientConfig) -> Result<Self::Socket, Self::WSError>;
}

/// An `ezsockets` client.
Expand Down Expand Up @@ -497,7 +516,7 @@ impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
}
}
if closed_self {
tracing::trace!("client closed itself");
tracing::debug!("client closed itself");
return Ok(())
}
}
Expand All @@ -514,7 +533,7 @@ impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
Message::Text(text) => self.client.on_text(text).await?,
Message::Binary(bytes) => self.client.on_binary(bytes).await?,
Message::Close(frame) => {
tracing::trace!("client closed by server");
tracing::debug!("client closed by server");
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => {
Expand All @@ -539,7 +558,7 @@ impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
tracing::warn!("connection error: {error}");
}
None => {
tracing::trace!("client socket died");
tracing::debug!("client socket died");
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => {
Expand Down Expand Up @@ -581,10 +600,7 @@ async fn client_connect<E: ClientExt, Connector: ClientConnector>(
match result {
Ok(socket_impl) => {
tracing::info!("successfully connected");
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}, closing client", err);
return Err(err);
}
client.on_connect().await?;
let socket = Socket::new(
socket_impl,
config.socket_config.clone().unwrap_or_default(),
Expand All @@ -593,11 +609,16 @@ async fn client_connect<E: ClientExt, Connector: ClientConnector>(
return Ok(Some(socket));
}
Err(err) => {
tracing::warn!(
"connecting failed due to {}, will retry in {}s",
err,
config.reconnect_interval.as_secs()
);
tracing::warn!("connecting failed due to {}", err);
match client.on_connect_fail(err.into()).await? {
ClientCloseMode::Reconnect => {
tracing::debug!("will retry in {}s", config.reconnect_interval.as_secs());
}
ClientCloseMode::Close => {
tracing::debug!("client closed itself after a connection failure");
return Ok(None);
}
}
}
};

Expand All @@ -624,7 +645,7 @@ async fn client_connect<E: ClientExt, Connector: ClientConnector>(
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while connecting");
tracing::debug!(?frame, "client closed itself while connecting");
return Ok(None);
}
_ => {
Expand Down
3 changes: 1 addition & 2 deletions src/client_connectors/client_connector_tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl ClientConnector for ClientConnectorTokio {
type Socket = tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>;
type ConnectError = tungstenite::error::Error;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle {
Expand All @@ -50,7 +49,7 @@ impl ClientConnector for ClientConnectorTokio {
/// Connect to a websocket server.
///
/// Returns `Err` if the request is invalid.
async fn connect(&self, config: &ClientConfig) -> Result<Self::Socket, Self::ConnectError> {
async fn connect(&self, config: &ClientConfig) -> Result<Self::Socket, Self::WSError> {
let request = config.connect_http_request();
let (socket, _) = tokio_tungstenite::connect_async(request).await?;
Ok(socket)
Expand Down
3 changes: 1 addition & 2 deletions src/client_connectors/client_connector_wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl ClientConnector for ClientConnectorWasm {
type Message = tokio_tungstenite_wasm::Message;
type WSError = tokio_tungstenite_wasm::Error;
type Socket = WebSocketStreamProxy;
type ConnectError = tokio_tungstenite_wasm::Error;

/// Get the connector's runtime handle.
fn handle(&self) -> Self::Handle {
Expand All @@ -97,7 +96,7 @@ impl ClientConnector for ClientConnectorWasm {
///
/// Panics if any headers were added to the client config. Websockets on browser does not support
/// additional headers (use [`ClientConfig::query_parameter()`] instead).
async fn connect(&self, config: &ClientConfig) -> Result<Self::Socket, Self::ConnectError> {
async fn connect(&self, config: &ClientConfig) -> Result<Self::Socket, Self::WSError> {
if config.headers().len() > 0 {
panic!("client may not submit HTTP headers in WASM connection requests");
}
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ cfg_if::cfg_if! {
}
}

pub use tokio_tungstenite_wasm::Error as WSError;

pub type Error = Box<dyn std::error::Error + Send + Sync>;
pub type Request = http::Request<()>;

0 comments on commit 1deef89

Please sign in to comment.