Skip to content

Commit

Permalink
Merge pull request #88 from UkoeHB/share_connect
Browse files Browse the repository at this point in the history
Refactor reconnect loop for use during initial connection attempt
  • Loading branch information
UkoeHB authored Sep 30, 2023
2 parents 99d07bd + 6b9443b commit 0cc591e
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 68 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Return `Ok(MessageSignal)` from `Client` and `Session` `.binary()/.text()/.close()` endpoints instead of `Ok(())`. The `MessageSignal::state()` method will indicate the current state of the message (sending/sent/failed).
- Clients attempt to reconnect immediately instead of after one full reconnect interval.
- Incoming user messages are discarded while a client is reconnecting, to better match the usual behavior of a websocket connection. If you want messages to be buffered while reconnecting, you should implement your own buffer.
- rename `socket::Config` -> `socket::SocketConfig` and add a `heartbeat_ping_msg_fn` member variable in order to support custom Ping/Pong protocols
- add `ClientConfig::socket_config()` setter so clients can define their socket's config
- add `ezsockets::axum::Upgrade::on_upgrade_with_config()` that accepts a `SocketConfig`
- Rename `socket::Config` -> `socket::SocketConfig` and add a `heartbeat_ping_msg_fn` member variable in order to support custom Ping/Pong protocols.
- Add `ClientConfig::socket_config()` setter so clients can define their socket's config.
- Add `ezsockets::axum::Upgrade::on_upgrade_with_config()` that accepts a `SocketConfig`.
- Refactor `ezeockets::client::connect()` to use a retry loop for the initial connection. Add `max_initial_connect_attempts` and `max_reconnect_attempts` options to the `ClientConfig` (they default to 'infinite').


Migration guide:
Expand Down
179 changes: 114 additions & 65 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0);
#[derive(Debug)]
pub struct ClientConfig {
url: Url,
max_initial_connect_attempts: usize,
max_reconnect_attempts: usize,
reconnect_interval: Duration,
headers: http::HeaderMap,
socket_config: Option<SocketConfig>,
Expand All @@ -87,6 +89,8 @@ impl ClientConfig {
let url = url.try_into().expect("invalid URL");
Self {
url,
max_initial_connect_attempts: usize::MAX,
max_reconnect_attempts: usize::MAX,
reconnect_interval: DEFAULT_RECONNECT_INTERVAL,
headers: http::HeaderMap::new(),
socket_config: None,
Expand Down Expand Up @@ -152,6 +156,22 @@ impl ClientConfig {
self
}

/// Set the maximum number of connection attempts when starting a client.
///
/// Defaults to infinite.
pub fn max_initial_connect_attempts(mut self, max_initial_connect_attempts: usize) -> Self {
self.max_initial_connect_attempts = max_initial_connect_attempts;
self
}

/// Set the maximum number of attempts when reconnecting.
///
/// Defaults to infinite.
pub fn max_reconnect_attempts(mut self, max_reconnect_attempts: usize) -> Self {
self.max_reconnect_attempts = max_reconnect_attempts;
self
}

/// Set the reconnect interval.
pub fn reconnect_interval(mut self, reconnect_interval: Duration) -> Self {
self.reconnect_interval = reconnect_interval;
Expand Down Expand Up @@ -326,23 +346,27 @@ pub async fn connect<E: ClientExt + 'static>(
client_fn: impl FnOnce(Client<E>) -> E,
config: ClientConfig,
) -> (Client<E>, impl Future<Output = Result<(), Error>>) {
let (to_socket_sender, to_socket_receiver) = mpsc::unbounded_channel();
let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel();
let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel();
let handle = Client {
to_socket_sender,
client_call_sender,
};
let mut client = client_fn(handle.clone());
let future = tokio::spawn(async move {
let http_request = config.connect_http_request();
tracing::info!("connecting to {}...", config.url);
let (stream, _) = tokio_tungstenite::connect_async(http_request).await?;
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
return Err(err);
}
let socket = Socket::new(stream, config.socket_config.clone().unwrap_or_default());
let Some(socket) = client_connect(
config.max_initial_connect_attempts,
&config,
&mut to_socket_receiver,
&mut client,
)
.await?
else {
return Ok(());
};
tracing::info!("connected to {}", config.url);

let mut actor = ClientActor {
client,
to_socket_receiver,
Expand Down Expand Up @@ -393,9 +417,15 @@ impl<E: ClientExt> ClientActor<E> {
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
}
Expand All @@ -410,9 +440,15 @@ impl<E: ClientExt> ClientActor<E> {
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => {
if !self.reconnect().await? {
return Ok(())
}
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
}
Expand All @@ -425,61 +461,74 @@ impl<E: ClientExt> ClientActor<E> {

Ok(())
}
}

/// Returns Ok(true) if reconnecting succeeded, Ok(false) if the client closed itself, and `Err` if an error occurred.
async fn reconnect(&mut self) -> Result<bool, Error> {
for i in 1.. {
tracing::info!("reconnecting attempt no: {}...", i);
let connect_http_request = self.config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
match result {
Ok((socket, _)) => {
tracing::info!("successfully reconnected");
if let Err(err) = self.client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
}
self.socket = Socket::new(
socket,
self.config.socket_config.clone().unwrap_or_default(),
);
return Ok(true);
}
Err(err) => {
tracing::warn!(
"reconnecting failed due to {}. will retry in {}s",
err,
self.config.reconnect_interval.as_secs()
);
}
};
// Discard messages until either the reconnect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = tokio::time::sleep(self.config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(inmessage) = self.to_socket_receiver.recv() => {
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while reconnecting");
return Ok(false);
}
_ => {
tracing::warn!("client is reconnecting, discarding message from user");
continue;
}
}
},
else => {
tracing::warn!("client is dead, aborting reconnect");
return Err(Error::from("client died while trying to reconnect"));
},
/// Returns Ok(Some(socket)) if connecting succeeded, Ok(None) if the client closed itself, and `Err` if an error occurred.
async fn client_connect<E: ClientExt>(
max_attempts: usize,
config: &ClientConfig,
to_socket_receiver: &mut mpsc::UnboundedReceiver<InMessage>,
client: &mut E,
) -> Result<Option<Socket>, Error> {
for i in 1.. {
// connection attempt
tracing::info!("connecting attempt no: {}...", i);
let connect_http_request = config.connect_http_request();
let result = tokio_tungstenite::connect_async(connect_http_request).await;
match result {
Ok((socket, _)) => {
tracing::info!("successfully connected");
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}, closing client", err);
return Err(err);
}
let socket = Socket::new(socket, config.socket_config.clone().unwrap_or_default());
return Ok(Some(socket));
}
Err(err) => {
tracing::warn!(
"connecting failed due to {}, will retry in {}s",
err,
config.reconnect_interval.as_secs()
);
}
};

// abort if we have reached the max attempts
if i >= max_attempts {
return Err(Error::from(format!(
"failed to connect after {} attempt(s), aborting...",
i
)));
}

Err(Error::from("client failed to reconnect"))
// Discard messages until either the connect interval passes, the socket receiver disconnects, or
// the user sends a close message.
let sleep = tokio::time::sleep(config.reconnect_interval);
tokio::pin!(sleep);
loop {
tokio::select! {
_ = &mut sleep => break,
Some(inmessage) = to_socket_receiver.recv() => {
match &inmessage.message
{
Some(Message::Close(frame)) => {
tracing::trace!(?frame, "client closed itself while connecting");
return Ok(None);
}
_ => {
tracing::warn!("client is connecting, discarding message from user");
continue;
}
}
},
else => {
tracing::warn!("client is dead, aborting connection attempts");
return Err(Error::from("client died while trying to connect"));
},
}
}
}

Err(Error::from("client failed to connect"))
}

0 comments on commit 0cc591e

Please sign in to comment.