Skip to content

Commit

Permalink
cleanup leftover panics, downgrade errors to warnings where appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
UkoeHB committed Sep 26, 2023
1 parent b2baddd commit 0d861ff
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- add reason to `ServerExt::on_disconnect()`
- improved tracing emitted during close sequences
- add `ClientConfig::query_parameter()` so connection requests can pass data via the URI (since additional connection headers are not supported by the websockets spec, this method should be more compatible with other implementations)
- removed panics from the internals
- downgraded tracing errors to warnings


Migration guide:
Expand Down
6 changes: 4 additions & 2 deletions src/axum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ where
for (k, v) in req.headers() {
pure_req = pure_req.header(k, v);
}
let pure_req = pure_req.body(()).unwrap();
let Ok(pure_req) = pure_req.body(()) else {
return Err(ConnectionNotUpgradable::default().into());
};

Ok(Self {
ws: ws::WebSocketUpgrade::from_request(req, state).await?,
Expand All @@ -129,7 +131,7 @@ impl From<ws::Message> for RawMessage {
ws::Message::Ping(ping) => RawMessage::Ping(ping),
ws::Message::Pong(pong) => RawMessage::Pong(pong),
ws::Message::Close(Some(close)) => RawMessage::Close(Some(CloseFrame {
code: CloseCode::try_from(close.code).unwrap(),
code: CloseCode::try_from(close.code).unwrap_or(CloseCode::Abnormal),
reason: close.reason.into(),
})),
ws::Message::Close(None) => RawMessage::Close(None),
Expand Down
25 changes: 14 additions & 11 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ pub async fn connect<E: ClientExt + 'static>(
let (stream, _) = tokio_tungstenite::connect_async(http_request).await?;
if let Err(err) = client.on_connect().await {
tracing::error!("calling on_connect() failed due to {}", err);
return Err(err);
}
let socket = Socket::new(stream, Config::default());
tracing::info!("connected to {}", config.url);
Expand All @@ -314,7 +315,7 @@ pub async fn connect<E: ClientExt + 'static>(
actor.run().await?;
Ok(())
});
let future = async move { future.await.unwrap() };
let future = async move { future.await.unwrap_or(Err("client actor crashed".into())) };
(handle, future)
}

Expand Down Expand Up @@ -354,20 +355,20 @@ impl<E: ClientExt> ClientActor<E> {
tracing::trace!("client closed by server");
match self.client.on_close(frame).await?
{
ClientCloseMode::Reconnect => self.reconnect().await,
ClientCloseMode::Reconnect => { if !self.try_reconnect().await { return Ok(()) } }
ClientCloseMode::Close => return Ok(())
}
}
};
}
Some(Err(error)) => {
tracing::error!("connection error: {error}");
tracing::warn!("connection error: {error}");
}
None => {
tracing::trace!("client socket died");
match self.client.on_disconnect().await?
{
ClientCloseMode::Reconnect => self.reconnect().await,
ClientCloseMode::Reconnect => { if !self.try_reconnect().await { return Ok(()) } }
ClientCloseMode::Close => return Ok(())
}
}
Expand All @@ -380,11 +381,11 @@ impl<E: ClientExt> ClientActor<E> {
Ok(())
}

async fn reconnect(&mut self) {
let reconnect_interval = self
.config
.reconnect_interval
.expect("reconnect interval should be set for reconnecting");
async fn try_reconnect(&mut self) -> bool {
let Some(reconnect_interval) = self.config.reconnect_interval else {
tracing::warn!("no reconnect interval set, aborting reconnect attempt");
return false;
};
tracing::info!("reconnecting in {}s", reconnect_interval.as_secs());
for i in 1.. {
tokio::time::sleep(reconnect_interval).await;
Expand All @@ -400,16 +401,18 @@ impl<E: ClientExt> ClientActor<E> {
let socket = Socket::new(socket, Config::default());
self.socket = socket;
self.heartbeat = Instant::now();
return;
return true;
}
Err(err) => {
tracing::error!(
tracing::warn!(
"reconnecting failed due to {}. will retry in {}s",
err,
reconnect_interval.as_secs()
);
}
};
}

false
}
}
2 changes: 1 addition & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ where
Ok::<_, Error>(())
}
.await {
tracing::error!("error when processing: {err:?}");
tracing::warn!("error when processing: {err:?}");
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<I: std::fmt::Display + Clone + Send, C: Send> Session<I, C> {

tokio::spawn(async move {
let result = actor.run().await;
closed_sender.send(result).unwrap();
closed_sender.send(result).unwrap_or_default();
});

handle
Expand All @@ -99,7 +99,9 @@ impl<I: std::fmt::Display + Clone, C> Session<I, C> {
let closed_indicator = closed_indicator
.take()
.expect("someone already called .await_close() before");
closed_indicator.await.unwrap()
closed_indicator
.await
.unwrap_or(Err("session actor crashed".into()))
}

/// Checks if the Session is still alive, if so you can proceed sending calls or messages.
Expand Down Expand Up @@ -201,7 +203,7 @@ impl<E: SessionExt> SessionActor<E> {
},
}
Some(Err(error)) => {
tracing::error!(id = %self.id, "connection error: {error}");
tracing::warn!(id = %self.id, "connection error: {error}");
}
None => break
};
Expand Down
6 changes: 3 additions & 3 deletions src/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ where
let timestamp = Duration::from_millis(timestamp as u64); // TODO: handle overflow
let latency = SystemTime::now()
.duration_since(UNIX_EPOCH + timestamp)
.unwrap();
.unwrap_or(Duration::default());
// TODO: handle time zone
tracing::trace!("latency: {}ms", latency.as_millis());
}
Expand Down Expand Up @@ -358,7 +358,7 @@ impl Socket {
}
let timestamp = SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap();
.unwrap_or(Duration::default());
let timestamp = timestamp.as_millis();
let bytes = timestamp.to_be_bytes();
if sink
Expand All @@ -373,7 +373,7 @@ impl Socket {
});

tokio::spawn(async move {
stream_future.await.unwrap();
stream_future.await.unwrap_or_default();
sink_future.abort();
heartbeat_future.abort();
});
Expand Down
10 changes: 6 additions & 4 deletions src/tungstenite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ cfg_if::cfg_if! {
for (k, v) in req.headers() {
req1 = req1.header(k, v);
}
req0 = Some(req1.body(()).unwrap());
let Ok(body) = req1.body(()) else { return Err(ErrorResponse::default()); };
req0 = Some(body);

Ok(resp)
};
Expand All @@ -187,7 +188,8 @@ cfg_if::cfg_if! {
Socket::new(socket, socket::Config::default())
}
};
Ok((socket, req0.unwrap()))
let Some(req_body) = req0 else { return Err("invalid request body".into()); };
Ok((socket, req_body))
}
}

Expand All @@ -204,14 +206,14 @@ cfg_if::cfg_if! {
let (stream, address) = match listener.accept().await {
Ok(stream) => stream,
Err(err) => {
tracing::error!("failed to accept tcp connection: {:?}", err);
tracing::warn!("failed to accept tcp connection: {:?}", err);
continue;
},
};
let (socket, request) = match acceptor.accept(stream).await {
Ok(socket) => socket,
Err(err) => {
tracing::error!(%address, "failed to accept websocket connection: {:?}", err);
tracing::warn!(%address, "failed to accept websocket connection: {:?}", err);
continue;
}
};
Expand Down

0 comments on commit 0d861ff

Please sign in to comment.