From ae604d6643598220b266e992b7f7024944342c20 Mon Sep 17 00:00:00 2001 From: koe Date: Fri, 6 Oct 2023 11:28:50 -0500 Subject: [PATCH] update changelog --- CHANGELOG.md | 4 ++++ Cargo.toml | 8 ++++---- src/client.rs | 20 +++++++++++-------- .../client_connector_tokio.rs | 14 +++++++++---- src/client_connectors/mod.rs | 1 - src/socket.rs | 18 +++++++++++------ 6 files changed, 42 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a73fcf..c33ef1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,10 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Move `axum` and `tungstenite` server runners into new submodule `src/server_runners`. - Update to `tokio-tungstenite` v0.20.0. - Fork [axum-tungstenite](https://crates.io/crates/axum-tungstenite) crate into `src/server_runners` and refactor the `axum` runner to use that instead of `axum::extract::ws`. +- Bug fix: remove race condition between sending a message and a socket connection closing that would cause a client to shut down instead of calling `on_disconnect/on_close`. +- Use [`tokio-tungstenite-wasm`](https://github.com/TannerRogalsky/tokio-tungstenite-wasm) errors internally to better support cross-platform clients. +- Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio. +- Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`. Migration guide: diff --git a/Cargo.toml b/Cargo.toml index df56c46..7f26598 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,7 @@ categories = ["asynchronous", "network-programming", "web-programming::websocket async-trait = "0.1.52" atomic_enum = "0.2.0" base64 = "0.21.0" -enfync = { git = "https://github.com/UkoeHB/enfync", rev = "010e30f" } +enfync = "0.1.0" futures = "0.3.21" http = "0.2.8" tokio = { version = "1.17.0", features = ["sync", "rt", "macros", "time"] } @@ -51,9 +51,9 @@ server = ["tungstenite_common", "tokio-tungstenite-wasm"] tungstenite = ["server"] axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"] -tls = ["server"] -native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls" ] -rustls = ["tls", "tokio-rustls", "tokio-tungstenite/rustls-tls-webpki-roots" ] +tls = [] +native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls"] +rustls = ["tls", "tokio-rustls", "tokio-tungstenite/rustls-tls-webpki-roots"] [dev-dependencies] tokio = { version = "1.17.0", features = ["full"] } diff --git a/src/client.rs b/src/client.rs index 0756dc1..968161d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -58,9 +58,9 @@ use crate::Socket; use async_trait::async_trait; use base64::Engine; use enfync::Handle; +use futures::{SinkExt, StreamExt}; use http::header::HeaderName; use http::HeaderValue; -use futures::{SinkExt, StreamExt}; use std::fmt; use std::future::Future; use std::time::Duration; @@ -262,12 +262,11 @@ pub trait ClientConnector { type Handle: enfync::Handle; type Message: Into + From + std::fmt::Debug + Send + 'static; type WSError: std::error::Error + Into; - type Socket: - SinkExt + - StreamExt> + - Unpin + - Send + - 'static; + type Socket: SinkExt + + StreamExt> + + Unpin + + Send + + 'static; type ConnectError: std::error::Error + Send; /// Get the connector's runtime handle. @@ -384,7 +383,12 @@ pub async fn connect( ) -> (Client, impl Future>) { let client_connector = crate::client_connector_tokio::ClientConnectorTokio::default(); let (handle, mut future) = connect_with(client_fn, config, client_connector); - let future = async move { future.extract().await.unwrap_or(Err("client actor crashed".into())) }; + let future = async move { + future + .extract() + .await + .unwrap_or(Err("client actor crashed".into())) + }; (handle, future) } diff --git a/src/client_connectors/client_connector_tokio.rs b/src/client_connectors/client_connector_tokio.rs index 9e7de44..9979fd3 100644 --- a/src/client_connectors/client_connector_tokio.rs +++ b/src/client_connectors/client_connector_tokio.rs @@ -11,24 +11,30 @@ pub struct ClientConnectorTokio { impl ClientConnectorTokio { pub fn new(handle: tokio::runtime::Handle) -> Self { - Self { handle: handle.into() } + Self { + handle: handle.into(), + } } } impl Default for ClientConnectorTokio { fn default() -> Self { let handle = enfync::builtin::native::TokioHandle::try_adopt() - .expect("ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokionew() instead"); + .expect( + "ClientConnectorTokio::default() only works inside a tokio runtime; use ClientConnectorTokio::new() instead" + ); Self { handle } } } #[async_trait::async_trait] impl ClientConnector for ClientConnectorTokio { - type Handle = enfync::builtin::native::TokioHandle; + type Handle = enfync::builtin::native::TokioHandle; type Message = tungstenite::Message; type WSError = tungstenite::error::Error; - type Socket = tokio_tungstenite::WebSocketStream>; + type Socket = tokio_tungstenite::WebSocketStream< + tokio_tungstenite::MaybeTlsStream, + >; type ConnectError = tungstenite::error::Error; /// Get the connector's runtime handle. diff --git a/src/client_connectors/mod.rs b/src/client_connectors/mod.rs index b7914f2..1c183c8 100644 --- a/src/client_connectors/mod.rs +++ b/src/client_connectors/mod.rs @@ -1,4 +1,3 @@ - #[cfg(feature = "native_client")] pub mod client_connector_tokio; diff --git a/src/socket.rs b/src/socket.rs index dccef3f..000208b 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -477,7 +477,7 @@ impl Stream { fn new( stream: S, last_alive: Arc>, - handle: impl enfync::Handle + handle: impl enfync::Handle, ) -> (enfync::PendingResult<()>, Self) where M: Into + std::fmt::Debug + Send + 'static, @@ -507,7 +507,11 @@ pub struct Socket { } impl Socket { - pub fn new(socket: S, config: SocketConfig, handle: impl enfync::Handle) -> Self + pub fn new( + socket: S, + config: SocketConfig, + handle: impl enfync::Handle, + ) -> Self where M: Into + From + std::fmt::Debug + Send + 'static, E: Into, @@ -523,7 +527,9 @@ impl Socket { ); let (hearbeat_abort_sender, hearbeat_abort_receiver) = oneshot::channel(); let sink_clone = sink.clone(); - handle.spawn(async move { socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await }); + handle.spawn(async move { + socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await + }); let (sink_result_sender, sink_result_receiver) = oneshot::channel(); handle.spawn(async move { @@ -531,7 +537,7 @@ impl Socket { let _ = sink_abort_sender.send(()); let _ = hearbeat_abort_sender.send(()); let _ = - sink_result_sender.send(sink_future.await.unwrap_or(Err(WSError::AlreadyClosed))); + sink_result_sender.send(sink_future.extract().await.unwrap_or(Err(WSError::AlreadyClosed))); }); Self { @@ -574,7 +580,7 @@ async fn socket_heartbeat( config: SocketConfig, mut abort_receiver: oneshot::Receiver<()>, last_alive: Arc>, -){ +) { let mut interval = tokio::time::interval(config.heartbeat); loop { @@ -592,7 +598,7 @@ async fn socket_heartbeat( } let timestamp = SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap_or(Duration::default()); + .unwrap_or_default(); if sink .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) .await