From 795223eab2b046b6cf361c85d0d29ce028bde66a Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 17 Aug 2023 21:15:40 -0500 Subject: [PATCH] cleanup leftover panics, downgrade errors to warnings where appropriate --- CHANGELOG.md | 2 ++ src/axum.rs | 4 ++-- src/client.rs | 25 ++++++++++++++----------- src/server.rs | 2 +- src/session.rs | 8 +++++--- src/socket.rs | 6 +++--- src/tungstenite.rs | 10 ++++++---- 7 files changed, 33 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f54952..454ecf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - add reason to `ServerExt::on_disconnect()` - improved tracing emitted during close sequences - add `ClientConfig::query_parameter()` so connection requests can pass data via the URI (since additional connection headers are not supported by the websockets spec, this method should be more compatible with other implementations) +- removed panics from the internals +- downgraded tracing errors to warnings Migration guide: diff --git a/src/axum.rs b/src/axum.rs index bd1d6c9..058c797 100644 --- a/src/axum.rs +++ b/src/axum.rs @@ -111,7 +111,7 @@ where for (k, v) in req.headers() { pure_req = pure_req.header(k, v); } - let pure_req = pure_req.body(()).unwrap(); + let Ok(pure_req) = pure_req.body(()) else { return Err(ConnectionNotUpgradable::default().into()); }; Ok(Self { ws: ws::WebSocketUpgrade::from_request(req, state).await?, @@ -129,7 +129,7 @@ impl From for RawMessage { ws::Message::Ping(ping) => RawMessage::Ping(ping), ws::Message::Pong(pong) => RawMessage::Pong(pong), ws::Message::Close(Some(close)) => RawMessage::Close(Some(CloseFrame { - code: CloseCode::try_from(close.code).unwrap(), + code: CloseCode::try_from(close.code).unwrap_or(CloseCode::Abnormal), reason: close.reason.into(), })), ws::Message::Close(None) => RawMessage::Close(None), diff --git a/src/client.rs b/src/client.rs index 16845bf..c571d9c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -296,6 +296,7 @@ pub async fn connect( let (stream, _) = tokio_tungstenite::connect_async(http_request).await?; if let Err(err) = client.on_connect().await { tracing::error!("calling on_connect() failed due to {}", err); + return Err(err); } let socket = Socket::new(stream, Config::default()); tracing::info!("connected to {}", config.url); @@ -310,7 +311,7 @@ pub async fn connect( actor.run().await?; Ok(()) }); - let future = async move { future.await.unwrap() }; + let future = async move { future.await.unwrap_or(Err("client actor crashed".into())) }; (handle, future) } @@ -350,20 +351,20 @@ impl ClientActor { tracing::trace!("client closed by server"); match self.client.on_close(frame).await? { - ClientCloseMode::Reconnect => self.reconnect().await, + ClientCloseMode::Reconnect => { if !self.try_reconnect().await { return Ok(()) } } ClientCloseMode::Close => return Ok(()) } } }; } Some(Err(error)) => { - tracing::error!("connection error: {error}"); + tracing::warn!("connection error: {error}"); } None => { tracing::trace!("client socket died"); match self.client.on_disconnect().await? { - ClientCloseMode::Reconnect => self.reconnect().await, + ClientCloseMode::Reconnect => { if !self.try_reconnect().await { return Ok(()) } } ClientCloseMode::Close => return Ok(()) } } @@ -376,11 +377,11 @@ impl ClientActor { Ok(()) } - async fn reconnect(&mut self) { - let reconnect_interval = self - .config - .reconnect_interval - .expect("reconnect interval should be set for reconnecting"); + async fn try_reconnect(&mut self) -> bool { + let Some(reconnect_interval) = self.config.reconnect_interval else { + tracing::warn!("no reconnect interval set, aborting reconnect attempt"); + return false; + }; tracing::info!("reconnecting in {}s", reconnect_interval.as_secs()); for i in 1.. { tokio::time::sleep(reconnect_interval).await; @@ -396,10 +397,10 @@ impl ClientActor { let socket = Socket::new(socket, Config::default()); self.socket = socket; self.heartbeat = Instant::now(); - return; + return true; } Err(err) => { - tracing::error!( + tracing::warn!( "reconnecting failed due to {}. will retry in {}s", err, reconnect_interval.as_secs() @@ -407,5 +408,7 @@ impl ClientActor { } }; } + + false } } diff --git a/src/server.rs b/src/server.rs index 657239c..a760f99 100644 --- a/src/server.rs +++ b/src/server.rs @@ -172,7 +172,7 @@ where Ok::<_, Error>(()) } .await { - tracing::error!("error when processing: {err:?}"); + tracing::warn!("error when processing: {err:?}"); } } } diff --git a/src/session.rs b/src/session.rs index a3e49de..f81989a 100644 --- a/src/session.rs +++ b/src/session.rs @@ -82,7 +82,7 @@ impl Session { tokio::spawn(async move { let result = actor.run().await; - closed_sender.send(result).unwrap(); + closed_sender.send(result).unwrap_or_default(); }); handle @@ -99,7 +99,9 @@ impl Session { let closed_indicator = closed_indicator .take() .expect("someone already called .await_close() before"); - closed_indicator.await.unwrap() + closed_indicator + .await + .unwrap_or(Err("session actor crashed".into())) } /// Checks if the Session is still alive, if so you can proceed sending calls or messages. @@ -197,7 +199,7 @@ impl SessionActor { }, } Some(Err(error)) => { - tracing::error!(id = %self.id, "connection error: {error}"); + tracing::warn!(id = %self.id, "connection error: {error}"); } None => break }; diff --git a/src/socket.rs b/src/socket.rs index 417450c..13e36f5 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -264,7 +264,7 @@ where let timestamp = Duration::from_millis(timestamp as u64); // TODO: handle overflow let latency = SystemTime::now() .duration_since(UNIX_EPOCH + timestamp) - .unwrap(); + .unwrap_or(Duration::default()); // TODO: handle time zone tracing::trace!("latency: {}ms", latency.as_millis()); } @@ -358,7 +358,7 @@ impl Socket { } let timestamp = SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap(); + .unwrap_or(Duration::default()); let timestamp = timestamp.as_millis(); let bytes = timestamp.to_be_bytes(); if sink @@ -373,7 +373,7 @@ impl Socket { }); tokio::spawn(async move { - stream_future.await.unwrap(); + stream_future.await.unwrap_or_default(); sink_future.abort(); heartbeat_future.abort(); }); diff --git a/src/tungstenite.rs b/src/tungstenite.rs index 96c672a..0b1dde6 100644 --- a/src/tungstenite.rs +++ b/src/tungstenite.rs @@ -165,7 +165,8 @@ cfg_if::cfg_if! { for (k, v) in req.headers() { req1 = req1.header(k, v); } - req0 = Some(req1.body(()).unwrap()); + let Ok(body) = req1.body(()) else { return Err(ErrorResponse::default()); }; + req0 = Some(body); Ok(resp) }; @@ -187,7 +188,8 @@ cfg_if::cfg_if! { Socket::new(socket, socket::Config::default()) } }; - Ok((socket, req0.unwrap())) + let Some(req_body) = req0 else { return Err("invalid request body".into()); }; + Ok((socket, req_body)) } } @@ -204,14 +206,14 @@ cfg_if::cfg_if! { let (stream, address) = match listener.accept().await { Ok(stream) => stream, Err(err) => { - tracing::error!("failed to accept tcp connection: {:?}", err); + tracing::warn!("failed to accept tcp connection: {:?}", err); continue; }, }; let (socket, request) = match acceptor.accept(stream).await { Ok(socket) => socket, Err(err) => { - tracing::error!(%address, "failed to accept websocket connection: {:?}", err); + tracing::warn!(%address, "failed to accept websocket connection: {:?}", err); continue; } };