Skip to content

Commit

Permalink
refactor ClientActor::run() so the socket can be dropped before tryin…
Browse files Browse the repository at this point in the history
…g to reconnect
  • Loading branch information
UkoeHB committed Nov 14, 2023
1 parent ccad632 commit 4b82ef0
Showing 1 changed file with 111 additions and 81 deletions.
192 changes: 111 additions & 81 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,11 +465,10 @@ pub fn connect_with<E: ClientExt + 'static>(
client,
to_socket_receiver,
client_call_receiver,
socket,
config,
client_connector,
};
actor.run().await?;
actor.run(Some(socket)).await?;
Ok(())
});
(handle, future)
Expand All @@ -479,109 +478,140 @@ struct ClientActor<E: ClientExt, C: ClientConnector> {
client: E,
to_socket_receiver: async_channel::Receiver<InMessage>,
client_call_receiver: async_channel::Receiver<E::Call>,
socket: Socket,
config: ClientConfig,
client_connector: C,
}

impl<E: ClientExt, C: ClientConnector> ClientActor<E, C> {
async fn run(&mut self) -> Result<(), Error> {
async fn run(&mut self, mut socket_shuttle: Option<Socket>) -> Result<(), Error> {
loop {
let Some(mut socket) = socket_shuttle.take() else {
return Ok(());
};
futures::select! {
res = self.to_socket_receiver.recv().fuse() => {
let Ok(inmessage) = res else {
break;
};
let closed_self = matches!(inmessage.message, Some(Message::Close(_)));
if self.socket.send(inmessage).await.is_err() {
let result = self.socket.await_sink_close().await;
if let Err(err) = &result {
tracing::warn!(?err, "encountered sink closing error when trying to send a message");
}
match result {
Err(WSError::ConnectionClosed) |
Err(WSError::AlreadyClosed) |
Err(WSError::Io(_)) |
Err(WSError::Tls(_)) => {
// either:
// A) The connection was closed via the close protocol, so we will allow the stream to
// handle it.
// B) We already tried and failed to submit another message, so now we are
// waiting for other parts of the select! to shut us down.
// C) An IO error means the connection closed unexpectedly, so we can try to reconnect when
// the stream fails.
}
Err(_) if !closed_self => return Err(Error::from("unexpected sink error, aborting client actor")),
_ => (),
}
}
if closed_self {
tracing::debug!("client closed itself");
return Ok(())
}
socket_shuttle = Self::handle_outgoing_msg(socket, inmessage).await?;
}
res = self.client_call_receiver.recv().fuse() => {
let Ok(call) = res else {
break;
};
self.client.on_call(call).await?;
socket_shuttle = Some(socket);
}
result = self.socket.stream.recv().fuse() => {
match result {
Some(Ok(message)) => {
match message.to_owned() {
Message::Text(text) => self.client.on_text(text).await?,
Message::Binary(bytes) => self.client.on_binary(bytes).await?,
Message::Close(frame) => {
tracing::debug!("client closed by server");
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
}
}
};
}
Some(Err(error)) => {
let error = Error::from(error);
tracing::warn!("connection error: {error}");
}
None => {
tracing::debug!("client socket died");
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => {
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
).await? else {
return Ok(());
};
self.socket = socket;
},
ClientCloseMode::Close => return Ok(())
result = socket.stream.recv().fuse() => {
socket_shuttle = self.handle_incoming_msg(socket, result).await?;
}
}
}

Ok(())
}

async fn handle_outgoing_msg(
mut socket: Socket,
inmessage: InMessage,
) -> Result<Option<Socket>, Error> {
let closed_self = matches!(inmessage.message, Some(Message::Close(_)));
if socket.send(inmessage).await.is_err() {
let result = socket.await_sink_close().await;
if let Err(err) = &result {
tracing::warn!(
?err,
"encountered sink closing error when trying to send a message"
);
}
match result {
Err(WSError::ConnectionClosed)
| Err(WSError::AlreadyClosed)
| Err(WSError::Io(_))
| Err(WSError::Tls(_)) => {
// either:
// A) The connection was closed via the close protocol, so we will allow the stream to
// handle it.
// B) We already tried and failed to submit another message, so now we are
// waiting for other parts of the select! to shut us down.
// C) An IO error means the connection closed unexpectedly, so we can try to reconnect when
// the stream fails.
}
Err(_) if !closed_self => {
return Err(Error::from("unexpected sink error, aborting client actor"))
}
_ => (),
}
}
if closed_self {
tracing::debug!("client closed itself");
return Ok(None);
}

Ok(Some(socket))
}

async fn handle_incoming_msg(
&mut self,
socket: Socket,
result: Option<Result<Message, WSError>>,
) -> Result<Option<Socket>, Error> {
match result {
Some(Ok(message)) => {
match message.to_owned() {
Message::Text(text) => self.client.on_text(text).await?,
Message::Binary(bytes) => self.client.on_binary(bytes).await?,
Message::Close(frame) => {
tracing::debug!("client closed by server");
match self.client.on_close(frame).await? {
ClientCloseMode::Reconnect => {
std::mem::drop(socket);
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
)
.await?
else {
return Ok(None);
};
return Ok(Some(socket));
}
ClientCloseMode::Close => return Ok(None),
}
};
}
};
}
Some(Err(error)) => {
let error = Error::from(error);
tracing::warn!("connection error: {error}");
}
None => {
tracing::debug!("client socket died");
match self.client.on_disconnect().await? {
ClientCloseMode::Reconnect => {
std::mem::drop(socket);
let Some(socket) = client_connect(
self.config.max_reconnect_attempts,
&self.config,
&self.client_connector,
&mut self.to_socket_receiver,
&mut self.client,
)
.await?
else {
return Ok(None);
};
return Ok(Some(socket));
}
ClientCloseMode::Close => return Ok(None),
}
}
}

Ok(())
Ok(Some(socket))
}
}

Expand Down

0 comments on commit 4b82ef0

Please sign in to comment.