From d058d57049cf8f02b83fb1954e77d22209aae519 Mon Sep 17 00:00:00 2001 From: UkoeHB <37489173+UkoeHB@users.noreply.github.com> Date: Mon, 16 Oct 2023 10:52:21 -0500 Subject: [PATCH] Fix WASM clients (#93) * fix features * remove tokio-tungstenite dependency from clients * remove tokio dependency from clients for WASM builds * add demo WASM client; bug fixes --- CHANGELOG.md | 3 +- Cargo.toml | 34 ++-- README.md | 3 +- examples/chat-client-wasm/Cargo.toml | 16 ++ examples/chat-client-wasm/README.md | 7 + examples/chat-client-wasm/src/main.rs | 63 ++++++ examples/chat-client/Cargo.toml | 2 - examples/chat-server-axum/Cargo.toml | 2 - examples/chat-server/Cargo.toml | 2 - examples/chat-server/src/main.rs | 11 +- examples/counter-server/Cargo.toml | 2 - examples/echo-server-native-tls/Cargo.toml | 2 - examples/echo-server/Cargo.toml | 2 - examples/simple-client/Cargo.toml | 2 - src/client.rs | 92 +++++---- .../client_connector_wasm.rs | 140 +++++++++++++- src/lib.rs | 3 + src/server_runners/mod.rs | 5 +- src/socket.rs | 179 +++++++++++------- .../tungstenite_common.rs | 4 - tests/chat.rs | 4 +- 21 files changed, 423 insertions(+), 155 deletions(-) create mode 100644 examples/chat-client-wasm/Cargo.toml create mode 100644 examples/chat-client-wasm/README.md create mode 100644 examples/chat-client-wasm/src/main.rs rename src/{server_runners => }/tungstenite_common.rs (96%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e6dc2f..77b1317 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Use [`tokio-tungstenite-wasm`](https://github.com/TannerRogalsky/tokio-tungstenite-wasm) errors internally to better support cross-platform clients. - Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio. - Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`. -- Add `ClientConnectorWasm` and `wasm_client` feature. +- Add `ClientConnectorWasm` and `wasm_client` feature. Added `chat-client-wasm` example to show a WASM client that compiles. It currently only listens to the chat and can't input anything since browser does not have a terminal. +- Refactor `Socket` and `Client` to not depend on `tokio` when compiling to WASM. This is a breaking change as the `Client` API now exposes `async_channel` error types instead of `tokio` error types, and `Client::call_with()` now takes an `async_channel::Sender` instead of a `tokio` oneshot. Migration guide: diff --git a/Cargo.toml b/Cargo.toml index 6c11ee2..bebd819 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,49 +7,58 @@ description = "WebSockets server & client made easy" readme = "README.md" repository = "https://github.com/gbaranski/ezsockets" license = "MIT" -keywords = ["websocket", "networking", "async"] +keywords = ["websocket", "networking", "async", "wasm"] categories = ["asynchronous", "network-programming", "web-programming::websocket"] -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] +async-channel = "1.9.0" async-trait = "0.1.52" atomic_enum = "0.2.0" base64 = "0.21.0" enfync = "0.1.0" futures = "0.3.21" +futures-util = { version = "0.3.25", default-features = false } http = "0.2.8" -tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } tracing = "0.1.31" +tungstenite = "0.20.0" url = "2.2.2" cfg-if = "1.0.0" axum = { version = "0.6.1", optional = true } axum-core = { version = "0.3.0", optional = true } bytes = { version = "1.3.0", optional = true } -futures-util = { version = "0.3.25", default-features = false, features = ["alloc"], optional = true } +fragile = { version = "2.0", optional = true } http-body = { version = "0.4.5", optional = true } hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } +tokio-tungstenite = { version = "0.20.0", optional = true } -tokio-tungstenite-wasm = { version = "0.2.0", optional = true } +tokio-tungstenite-wasm = { version = "0.2.1", optional = true } -tokio-tungstenite = { version = "0.20.0", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } +[target.'cfg(not(target_family = "wasm"))'.dependencies] +tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } + +[target.'cfg(target_family = "wasm")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } +wasm-bindgen-futures = "0.4" +wasmtimer = "0.2.0" + [features] default = ["native_client", "server"] client = ["tokio-tungstenite-wasm"] -native_client = ["client", "tokio-tungstenite", "tokio/rt"] -wasm_client = ["client", "tokio-tungstenite-wasm"] - -tungstenite_common = ["tokio-tungstenite"] +native_client = ["client", "tokio/rt", "tokio-tungstenite"] +wasm_client = ["client", "fragile"] -server = ["tungstenite_common", "tokio-tungstenite-wasm", "tokio/rt"] +server = ["tokio-tungstenite", "tokio-tungstenite-wasm", "tokio/rt"] tungstenite = ["server"] -axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"] +axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util/alloc", "http-body", "hyper", "sha-1"] tls = [] native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls"] @@ -69,6 +78,7 @@ rustdoc-args = ["--cfg", "docsrs"] [workspace] members = [ "examples/chat-client", + "examples/chat-client-wasm", "examples/chat-server", "examples/chat-server-axum", "examples/echo-server", diff --git a/README.md b/README.md index 9dcfe07..765a9ea 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,12 @@ Creating a WebSocket server or a client in Rust can be troublesome. This crate f View the full documentation at [docs.rs/ezsockets](http://docs.rs/ezsockets) ## Examples -- [`simple-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client) - a simplest WebSocket client which uses stdin as input. +- [`simple-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/simple-client) - a simplest WebSocket client which uses stdin as input. - [`echo-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/echo-server) - server that echoes back every message it receives. - [`echo-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/echo-server-native-tls) - same as `echo-server`, but with `native-tls`. - [`counter-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/counter-server) - server that increments global value every second and shares it with client - [`chat-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client) - chat client for `chat-server` and `chat-server-axum` examples +- [`wasm-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client-wasm) - chat client for `chat-server` and `chat-server-axum` examples that runs in the browser (only listens to the chat) - [`chat-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-server) - chat server with support of rooms - [`chat-server-axum`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-server-axum) - same as above, but using `axum` as a back-end diff --git a/examples/chat-client-wasm/Cargo.toml b/examples/chat-client-wasm/Cargo.toml new file mode 100644 index 0000000..845184a --- /dev/null +++ b/examples/chat-client-wasm/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "ezsockets-chat-client-wasm" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1.52" +console_error_panic_hook = "0.1" +ezsockets = { path = "../../", default-features = false, features = ["wasm_client"] } +tracing = "0.1.32" +tracing-subscriber = "0.3.9" +tracing-wasm = "0.2" +url = "2.2.2" +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" +wasmtimer = "0.2.0" diff --git a/examples/chat-client-wasm/README.md b/examples/chat-client-wasm/README.md new file mode 100644 index 0000000..bcb242d --- /dev/null +++ b/examples/chat-client-wasm/README.md @@ -0,0 +1,7 @@ +1. install [wasm-server-runner](https://github.com/jakobhellermann/wasm-server-runner) +2. `cd` to this directory +3. `cargo run --target wasm32-unknown-unknown` +4. paste the address provided into a new browser window +5. open the browser window console to view the chat logs + +Note: This demo does not currently have a way to pass messages into the chat. diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs new file mode 100644 index 0000000..5f0653a --- /dev/null +++ b/examples/chat-client-wasm/src/main.rs @@ -0,0 +1,63 @@ +#![allow(unused_imports)] + +use async_trait::async_trait; +use ezsockets::{ClientConfig, RawMessage, SocketConfig}; +use std::io::BufRead; +use std::sync::Arc; +use std::time::Duration; + +struct Client {} + +#[async_trait] +impl ezsockets::ClientExt for Client { + type Call = (); + + async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { + tracing::info!("received message: {text}"); + Ok(()) + } + + async fn on_binary(&mut self, bytes: Vec) -> Result<(), ezsockets::Error> { + tracing::info!("received bytes: {bytes:?}"); + Ok(()) + } + + async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { + let () = call; + Ok(()) + } +} + +#[cfg(target_family = "wasm")] +#[wasm_bindgen::prelude::wasm_bindgen(main)] +async fn main() -> Result<(), wasm_bindgen::JsValue> { + // setup tracing + console_error_panic_hook::set_once(); + tracing_wasm::set_as_global_default(); + + // make client + // - note: we use a hacky custom Ping/Pong protocol to keep the client alive (see the 'chat-server' example + // for the Ping side via SessionExt::on_binary()) + let config = ClientConfig::new("ws://localhost:8080/websocket").socket_config(SocketConfig { + heartbeat_ping_msg_fn: Arc::new(|_t: Duration| RawMessage::Binary("ping".into())), + ..Default::default() + }); + let (_client, mut handle) = ezsockets::connect_with( + |_client| Client {}, + config, + ezsockets::ClientConnectorWasm::default(), + ); + + // collect inputs: todo + + // keep main alive until it is manually terminated + handle.extract().await.unwrap().unwrap(); + + Ok(()) +} + +#[cfg(not(target_family = "wasm"))] +fn main() { + // need per-package targets https://github.com/rust-lang/cargo/issues/9406 + unreachable!() +} diff --git a/examples/chat-client/Cargo.toml b/examples/chat-client/Cargo.toml index de2dcce..812d8d6 100644 --- a/examples/chat-client/Cargo.toml +++ b/examples/chat-client/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-client" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["client"] } diff --git a/examples/chat-server-axum/Cargo.toml b/examples/chat-server-axum/Cargo.toml index 19aad1d..ac926dc 100644 --- a/examples/chat-server-axum/Cargo.toml +++ b/examples/chat-server-axum/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-server-axum" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" axum = "0.6.1" diff --git a/examples/chat-server/Cargo.toml b/examples/chat-server/Cargo.toml index c5d216d..7110f4f 100644 --- a/examples/chat-server/Cargo.toml +++ b/examples/chat-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/chat-server/src/main.rs b/examples/chat-server/src/main.rs index 47e0a85..5973b1a 100644 --- a/examples/chat-server/src/main.rs +++ b/examples/chat-server/src/main.rs @@ -42,9 +42,10 @@ impl ezsockets::ServerExt for ChatServer { ) -> Result> { let id = (0..).find(|i| !self.sessions.contains_key(i)).unwrap_or(0); let session = Session::create( - |_handle| SessionActor { + |session_handle| SessionActor { id, server: self.handle.clone(), + session: session_handle, room: DEFAULT_ROOM.to_string(), }, id, @@ -129,6 +130,7 @@ impl ezsockets::ServerExt for ChatServer { struct SessionActor { id: SessionID, server: Server, + session: Session, room: String, } @@ -168,8 +170,11 @@ impl ezsockets::SessionExt for SessionActor { Ok(()) } - async fn on_binary(&mut self, _bytes: Vec) -> Result<(), Error> { - unimplemented!() + async fn on_binary(&mut self, bytes: Vec) -> Result<(), Error> { + // echo bytes back (we use this for a hacky ping/pong protocol for the wasm client demo) + tracing::info!("echoing bytes: {bytes:?}"); + self.session.binary("pong".as_bytes())?; + Ok(()) } async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { diff --git a/examples/counter-server/Cargo.toml b/examples/counter-server/Cargo.toml index 87a9d94..f735c6c 100644 --- a/examples/counter-server/Cargo.toml +++ b/examples/counter-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-counter-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/echo-server-native-tls/Cargo.toml b/examples/echo-server-native-tls/Cargo.toml index 34ba1af..1883235 100644 --- a/examples/echo-server-native-tls/Cargo.toml +++ b/examples/echo-server-native-tls/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsocket-echo-server-native-tls" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite", "native-tls"] } diff --git a/examples/echo-server/Cargo.toml b/examples/echo-server/Cargo.toml index e877510..5c5f461 100644 --- a/examples/echo-server/Cargo.toml +++ b/examples/echo-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsocket-echo-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/simple-client/Cargo.toml b/examples/simple-client/Cargo.toml index f98f00a..683c1c3 100644 --- a/examples/simple-client/Cargo.toml +++ b/examples/simple-client/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-simple-client" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["rustls"] } diff --git a/src/client.rs b/src/client.rs index 67ce786..7467280 100644 --- a/src/client.rs +++ b/src/client.rs @@ -58,18 +58,20 @@ use crate::Socket; use async_trait::async_trait; use base64::Engine; use enfync::Handle; -use futures::{SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use http::header::HeaderName; use http::HeaderValue; use std::fmt; -use std::future::Future; use std::time::Duration; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio_tungstenite::tungstenite; use tokio_tungstenite_wasm::Error as WSError; use url::Url; +#[cfg(not(target_family = "wasm"))] +use tokio::time::sleep; + +#[cfg(target_family = "wasm")] +use wasmtimer::tokio::sleep; + pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0); #[derive(Debug)] @@ -297,8 +299,8 @@ pub trait ClientConnector { /// An `ezsockets` client. #[derive(Debug)] pub struct Client { - to_socket_sender: mpsc::UnboundedSender, - client_call_sender: mpsc::UnboundedSender, + to_socket_sender: async_channel::Sender, + client_call_sender: async_channel::Sender, } impl Clone for Client { @@ -310,7 +312,7 @@ impl Clone for Client { } } -impl From> for mpsc::UnboundedSender { +impl From> for async_channel::Sender { fn from(client: Client) -> Self { client.client_call_sender } @@ -323,11 +325,11 @@ impl Client { pub fn text( &self, text: impl Into, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Text(text.into())); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } @@ -337,19 +339,19 @@ impl Client { pub fn binary( &self, bytes: impl Into>, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Binary(bytes.into())); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } /// Call a custom method on the Client. /// /// Refer to `ClientExt::on_call`. - pub fn call(&self, message: E::Call) -> Result<(), mpsc::error::SendError> { - self.client_call_sender.send(message) + pub fn call(&self, message: E::Call) -> Result<(), async_channel::SendError> { + self.client_call_sender.send_blocking(message) } /// Call a custom method on the Client, with a reply from the `ClientExt::on_call`. @@ -357,15 +359,15 @@ impl Client { /// This works just as syntactic sugar for `Client::call(sender)` pub async fn call_with( &self, - f: impl FnOnce(oneshot::Sender) -> E::Call, + f: impl FnOnce(async_channel::Sender) -> E::Call, ) -> Option { - let (sender, receiver) = oneshot::channel(); + let (sender, receiver) = async_channel::bounded(1usize); let call = f(sender); - let Ok(_) = self.client_call_sender.send(call) else { + let Ok(_) = self.client_call_sender.send(call).await else { return None; }; - let Ok(result) = receiver.await else { + let Ok(result) = receiver.recv().await else { return None; }; Some(result) @@ -380,11 +382,11 @@ impl Client { pub fn close( &self, frame: Option, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Close(frame)); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } } @@ -396,7 +398,10 @@ impl Client { pub async fn connect( client_fn: impl FnOnce(Client) -> E, config: ClientConfig, -) -> (Client, impl Future>) { +) -> ( + Client, + impl std::future::Future>, +) { let client_connector = crate::ClientConnectorTokio::default(); let (handle, mut future) = connect_with(client_fn, config, client_connector); let future = async move { @@ -414,8 +419,8 @@ pub fn connect_with( config: ClientConfig, client_connector: impl ClientConnector + Send + Sync + 'static, ) -> (Client, enfync::PendingResult>) { - let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel(); - let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel(); + let (to_socket_sender, mut to_socket_receiver) = async_channel::unbounded(); + let (client_call_sender, client_call_receiver) = async_channel::unbounded(); let handle = Client { to_socket_sender, client_call_sender, @@ -453,8 +458,8 @@ pub fn connect_with( struct ClientActor { client: E, - to_socket_receiver: mpsc::UnboundedReceiver, - client_call_receiver: mpsc::UnboundedReceiver, + to_socket_receiver: async_channel::Receiver, + client_call_receiver: async_channel::Receiver, socket: Socket, config: ClientConfig, client_connector: C, @@ -463,8 +468,11 @@ struct ClientActor { impl ClientActor { async fn run(&mut self) -> Result<(), Error> { loop { - tokio::select! { - Some(inmessage) = self.to_socket_receiver.recv() => { + futures::select! { + res = self.to_socket_receiver.recv().fuse() => { + let Ok(inmessage) = res else { + break; + }; let closed_self = matches!(inmessage.message, Some(Message::Close(_))); if self.socket.send(inmessage).await.is_err() { let result = self.socket.await_sink_close().await; @@ -480,7 +488,7 @@ impl ClientActor { // A) The connection was closed via the close protocol, so we will allow the stream to // handle it. // B) We already tried and failed to submit another message, so now we are - // waiting for other parts of the tokio::select to shut us down. + // waiting for other parts of the select! to shut us down. // C) An IO error means the connection closed unexpectedly, so we can try to reconnect when // the stream fails. } @@ -493,10 +501,13 @@ impl ClientActor { return Ok(()) } } - Some(call) = self.client_call_receiver.recv() => { + res = self.client_call_receiver.recv().fuse() => { + let Ok(call) = res else { + break; + }; self.client.on_call(call).await?; } - result = self.socket.stream.recv() => { + result = self.socket.stream.recv().fuse() => { match result { Some(Ok(message)) => { match message.to_owned() { @@ -548,7 +559,6 @@ impl ClientActor { } }; } - else => break, } } @@ -561,7 +571,7 @@ async fn client_connect( max_attempts: usize, config: &ClientConfig, client_connector: &Connector, - to_socket_receiver: &mut mpsc::UnboundedReceiver, + to_socket_receiver: &mut async_channel::Receiver, client: &mut E, ) -> Result, Error> { for i in 1.. { @@ -601,12 +611,16 @@ async fn client_connect( // Discard messages until either the connect interval passes, the socket receiver disconnects, or // the user sends a close message. - let sleep = tokio::time::sleep(config.reconnect_interval); - tokio::pin!(sleep); + let sleep = sleep(config.reconnect_interval).fuse(); + futures::pin_mut!(sleep); loop { - tokio::select! { - _ = &mut sleep => break, - Some(inmessage) = to_socket_receiver.recv() => { + futures::select! { + _ = sleep => break, + res = to_socket_receiver.recv().fuse() => { + let Ok(inmessage) = res else { + tracing::warn!("client is dead, aborting connection attempts"); + return Err(Error::from("client died while trying to connect")); + }; match &inmessage.message { Some(Message::Close(frame)) => { @@ -619,10 +633,6 @@ async fn client_connect( } } }, - else => { - tracing::warn!("client is dead, aborting connection attempts"); - return Err(Error::from("client died while trying to connect")); - }, } } } diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 1d33dad..a40c545 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -1,6 +1,71 @@ use crate::client::{ClientConfig, ClientConnector}; +use crate::socket::{CloseFrame, Message, RawMessage}; -/// Implementation of [`ClientConnector`] for tokio runtimes. +use std::pin::Pin; +use std::task::{Context, Poll}; +use tungstenite::protocol::frame::coding::CloseCode as TungsteniteCloseCode; + +impl<'t> From> for CloseFrame { + fn from(frame: tokio_tungstenite_wasm::CloseFrame) -> Self { + Self { + code: Into::::into(Into::::into(frame.code)).into(), + reason: frame.reason.into(), + } + } +} + +impl<'t> From for tokio_tungstenite_wasm::CloseFrame<'t> { + fn from(frame: CloseFrame) -> Self { + Self { + code: Into::::into(Into::::into(frame.code)).into(), + reason: frame.reason.into(), + } + } +} + +impl From for tokio_tungstenite_wasm::Message { + fn from(message: RawMessage) -> Self { + match message { + RawMessage::Text(text) => Self::Text(text), + RawMessage::Binary(bytes) => Self::Binary(bytes), + RawMessage::Ping(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame { + code: tokio_tungstenite_wasm::CloseCode::Abnormal, + reason: "raw pings not supported".into(), + })), + RawMessage::Pong(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame { + code: tokio_tungstenite_wasm::CloseCode::Abnormal, + reason: "raw pongs not supported".into(), + })), + RawMessage::Close(frame) => Self::Close(frame.map(CloseFrame::into)), + } + } +} + +impl From for RawMessage { + fn from(message: tokio_tungstenite_wasm::Message) -> Self { + match message { + tokio_tungstenite_wasm::Message::Text(text) => Self::Text(text), + tokio_tungstenite_wasm::Message::Binary(bytes) => Self::Binary(bytes), + tokio_tungstenite_wasm::Message::Close(frame) => { + Self::Close(frame.map(CloseFrame::from)) + } + } + } +} + +impl From for tokio_tungstenite_wasm::Message { + fn from(message: Message) -> Self { + match message { + Message::Text(text) => tokio_tungstenite_wasm::Message::Text(text), + Message::Binary(bytes) => tokio_tungstenite_wasm::Message::Binary(bytes), + Message::Close(frame) => { + tokio_tungstenite_wasm::Message::Close(frame.map(CloseFrame::into)) + } + } + } +} + +/// Implementation of [`ClientConnector`] for WASM targets. #[derive(Clone)] pub struct ClientConnectorWasm { handle: enfync::builtin::wasm::WASMHandle, @@ -18,7 +83,7 @@ impl ClientConnector for ClientConnectorWasm { type Handle = enfync::builtin::wasm::WASMHandle; type Message = tokio_tungstenite_wasm::Message; type WSError = tokio_tungstenite_wasm::Error; - type Socket = tokio_tungstenite_wasm::WebSocketStream; + type Socket = WebSocketStreamProxy; type ConnectError = tokio_tungstenite_wasm::Error; /// Get the connector's runtime handle. @@ -37,7 +102,76 @@ impl ClientConnector for ClientConnectorWasm { panic!("client may not submit HTTP headers in WASM connection requests"); } let request_url = config.connect_url(); - let (socket, _) = tokio_tungstenite_wasm::connect(request_url).await?; + let socket = wasm_client_connect(String::from(request_url)).await?; Ok(socket) } } + +/// Proxy websocket that allows access to a WASM websocket. We need this because +/// `tokio_tungstenite_wasm::WebSocketStream` is !Send. +pub struct WebSocketStreamProxy { + inner: fragile::Fragile, +} + +impl futures_util::Stream for WebSocketStreamProxy { + type Item = tokio_tungstenite_wasm::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_next(cx) + } +} + +impl futures_util::Sink for WebSocketStreamProxy { + type Error = tokio_tungstenite_wasm::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_ready(cx) + } + + fn start_send( + mut self: Pin<&mut Self>, + item: tokio_tungstenite_wasm::Message, + ) -> Result<(), Self::Error> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.start_send(item) + } + + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + Ok(()).into() + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_close(cx) + } +} + +async fn wasm_client_connect( + request_url: String, +) -> Result { + let (result_sender, result_receiver) = async_channel::bounded(1usize); + + wasm_bindgen_futures::spawn_local(async move { + let result = tokio_tungstenite_wasm::connect(request_url.as_str()).await; + result_sender + .send_blocking(result.map(|websocket| fragile::Fragile::new(websocket))) + .unwrap(); + }); + + let websocket = result_receiver + .recv() + .await + .unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?; + + Ok(WebSocketStreamProxy { inner: websocket }) +} diff --git a/src/lib.rs b/src/lib.rs index 3580033..96bfec9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ //! Refer to [`client`] or [`server`] module for detailed implementation guides. mod socket; +mod tungstenite_common; pub use socket::CloseCode; pub use socket::CloseFrame; @@ -25,7 +26,9 @@ cfg_if::cfg_if! { pub use client_connectors::*; + #[cfg(feature = "native_client")] pub use client::connect; + pub use client::connect_with; pub use client::ClientConfig; pub use client::ClientExt; diff --git a/src/server_runners/mod.rs b/src/server_runners/mod.rs index 0a4c59a..6e43800 100644 --- a/src/server_runners/mod.rs +++ b/src/server_runners/mod.rs @@ -5,8 +5,5 @@ cfg_if::cfg_if! { } } -#[cfg(feature = "tokio-tungstenite")] +#[cfg(feature = "tungstenite")] pub mod tungstenite; - -#[cfg(feature = "tungstenite_common")] -pub mod tungstenite_common; diff --git a/src/socket.rs b/src/socket.rs index 9aeec54..49c1c79 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -1,16 +1,17 @@ -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::lock::Mutex; +use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; +use std::marker::PhantomData; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use std::time::Instant; -use std::{ - marker::PhantomData, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio::sync::Mutex; +use std::time::Duration; use tokio_tungstenite_wasm::Error as WSError; +#[cfg(not(target_family = "wasm"))] +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +#[cfg(target_family = "wasm")] +use wasmtimer::std::{Instant, SystemTime, UNIX_EPOCH}; + /// Wrapper trait for `Fn(Duration) -> RawMessage`. pub trait SocketHeartbeatPingFn: Fn(Duration) -> RawMessage + Sync + Send {} impl SocketHeartbeatPingFn for F where F: Fn(Duration) -> RawMessage + Sync + Send {} @@ -318,8 +319,8 @@ where M: From, S: SinkExt + Unpin, { - receiver: mpsc::UnboundedReceiver, - abort_receiver: oneshot::Receiver<()>, + receiver: async_channel::Receiver, + abort_receiver: async_channel::Receiver<()>, sink: S, phantom: PhantomData, } @@ -331,8 +332,11 @@ where { async fn run(&mut self) -> Result<(), WSError> { loop { - tokio::select! { - Some(mut inmessage) = self.receiver.recv() => { + futures::select! { + res = self.receiver.recv().fuse() => { + let Ok(mut inmessage) = res else { + break; + }; let Some(message) = inmessage.take_message() else { continue; }; @@ -346,12 +350,9 @@ where } } }, - Ok(()) = &mut self.abort_receiver => { + _ = &mut self.abort_receiver.recv().fuse() => { break; }, - else => { - break; - } } } Ok(()) @@ -360,20 +361,20 @@ where #[derive(Debug, Clone)] pub struct Sink { - sender: mpsc::UnboundedSender, + sender: async_channel::Sender, } impl Sink { fn new( sink: S, - abort_receiver: oneshot::Receiver<()>, + abort_receiver: async_channel::Receiver<()>, handle: impl enfync::Handle, ) -> (enfync::PendingResult>, Self) where M: From + Send + 'static, S: SinkExt + Unpin + Send + 'static, { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = async_channel::unbounded(); let mut actor = SinkActor { receiver, abort_receiver, @@ -391,15 +392,15 @@ impl Sink { pub async fn send( &self, inmessage: InMessage, - ) -> Result<(), mpsc::error::SendError> { - self.sender.send(inmessage.into()) + ) -> Result<(), async_channel::SendError> { + self.sender.send(inmessage.into()).await } pub(crate) async fn send_raw( &self, inmessage: InRawMessage, - ) -> Result<(), mpsc::error::SendError> { - self.sender.send(inmessage) + ) -> Result<(), async_channel::SendError> { + self.sender.send(inmessage).await } } @@ -409,7 +410,7 @@ where M: Into, S: StreamExt> + Unpin, { - sender: mpsc::UnboundedSender>, + sender: async_channel::Sender>, stream: S, last_alive: Arc>, } @@ -452,7 +453,7 @@ where }), Err(err) => Err(err), // maybe early return here? }; - if self.sender.send(message).is_err() { + if self.sender.send(message).await.is_err() { // In websockets, you always echo a close frame received from your connection partner back to them. // This means a normal close sequence will always end with the following line emitted by the socket of // the client/server that initiated the close sequence (in response to the close frame echoed by their @@ -470,7 +471,7 @@ where #[derive(Debug)] pub struct Stream { - receiver: mpsc::UnboundedReceiver>, + receiver: async_channel::Receiver>, } impl Stream { @@ -483,7 +484,7 @@ impl Stream { M: Into + std::fmt::Debug + Send + 'static, S: StreamExt> + Unpin + Send + 'static, { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = async_channel::unbounded(); let actor = StreamActor { sender, stream, @@ -495,7 +496,7 @@ impl Stream { } pub async fn recv(&mut self) -> Option> { - self.receiver.recv().await + self.receiver.recv().await.ok() } } @@ -503,7 +504,7 @@ impl Stream { pub struct Socket { pub sink: Sink, pub stream: Stream, - sink_result_receiver: Option>>, + sink_result_receiver: Option>>, } impl Socket { @@ -520,23 +521,23 @@ impl Socket { let last_alive = Instant::now(); let last_alive = Arc::new(Mutex::new(last_alive)); let (sink, stream) = socket.sink_err_into().err_into().split(); - let (sink_abort_sender, sink_abort_receiver) = oneshot::channel(); + let (sink_abort_sender, sink_abort_receiver) = async_channel::bounded(1usize); let ((mut sink_future, sink), (mut stream_future, stream)) = ( Sink::new(sink, sink_abort_receiver, handle.clone()), Stream::new(stream, last_alive.clone(), handle.clone()), ); - let (hearbeat_abort_sender, hearbeat_abort_receiver) = oneshot::channel(); + let (hearbeat_abort_sender, hearbeat_abort_receiver) = async_channel::bounded(1usize); let sink_clone = sink.clone(); handle.spawn(async move { socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await }); - let (sink_result_sender, sink_result_receiver) = oneshot::channel(); + let (sink_result_sender, sink_result_receiver) = async_channel::bounded(1usize); handle.spawn(async move { let _ = stream_future.extract().await; - let _ = sink_abort_sender.send(()); - let _ = hearbeat_abort_sender.send(()); - let _ = sink_result_sender.send( + let _ = sink_abort_sender.send_blocking(()); + let _ = hearbeat_abort_sender.send_blocking(()); + let _ = sink_result_sender.send_blocking( sink_future .extract() .await @@ -554,14 +555,14 @@ impl Socket { pub async fn send( &self, message: InMessage, - ) -> Result<(), mpsc::error::SendError> { + ) -> Result<(), async_channel::SendError> { self.sink.send(message).await } pub async fn send_raw( &self, message: InRawMessage, - ) -> Result<(), mpsc::error::SendError> { + ) -> Result<(), async_channel::SendError> { self.sink.send_raw(message).await } @@ -574,15 +575,17 @@ impl Socket { return Err(WSError::AlreadyClosed); }; sink_result_receiver + .recv() .await .unwrap_or(Err(WSError::AlreadyClosed)) } } +#[cfg(not(target_family = "wasm"))] async fn socket_heartbeat( sink: Sink, config: SocketConfig, - mut abort_receiver: oneshot::Receiver<()>, + abort_receiver: async_channel::Receiver<()>, last_alive: Arc>, ) { let sleep = tokio::time::sleep(config.heartbeat); @@ -591,39 +594,75 @@ async fn socket_heartbeat( loop { tokio::select! { _ = &mut sleep => { - let elapsed_since_last_alive = last_alive.lock().await.elapsed(); - if elapsed_since_last_alive > config.timeout { - tracing::info!("closing connection due to timeout"); - let _ = sink - .send_raw(InRawMessage::new(RawMessage::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: String::from("remote partner is inactive"), - })))) - .await; - return; - } else if elapsed_since_last_alive < config.heartbeat { - // todo: this branch will needlessly fire at least once per heartbeat for idle connections since - // Pongs arrive after some delay - sleep.as_mut().reset( - tokio::time::Instant::now() + config.heartbeat.saturating_sub(elapsed_since_last_alive) - ); - continue; - } else { - sleep.as_mut().reset(tokio::time::Instant::now() + config.heartbeat); - } - let timestamp = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default(); - if sink - .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) - .await - .is_err() - { + let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive).await else { break; - } + }; + sleep.as_mut().reset(tokio::time::Instant::now() + next_sleep_duration); } - _ = &mut abort_receiver => break, - else => break, + _ = &mut abort_receiver.recv() => break, } } } + +#[cfg(target_family = "wasm")] +async fn socket_heartbeat( + sink: Sink, + config: SocketConfig, + abort_receiver: async_channel::Receiver<()>, + last_alive: Arc>, +) { + let mut sleep_duration = config.heartbeat; + + loop { + // It is better to use Sleep::reset(), but we can't do it here because fuse() consumes the sleep + // and we need futures::select since we can't use tokio on WASM targets. + let sleep = wasmtimer::tokio::sleep(sleep_duration).fuse(); + futures::pin_mut!(sleep); + futures::select! { + _ = sleep => { + let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive).await else { + break; + }; + sleep_duration = next_sleep_duration; + } + _ = &mut abort_receiver.recv().fuse() => break, + } + } +} + +async fn handle_heartbeat_sleep_elapsed( + sink: &Sink, + config: &SocketConfig, + last_alive: &Arc>, +) -> Option { + // check last alive + let elapsed_since_last_alive = last_alive.lock().await.elapsed(); + if elapsed_since_last_alive > config.timeout { + tracing::info!("closing connection due to timeout"); + let _ = sink + .send_raw(InRawMessage::new(RawMessage::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: String::from("remote partner is inactive"), + })))) + .await; + return None; + } else if elapsed_since_last_alive < config.heartbeat { + // todo: this branch will needlessly fire at least once per heartbeat for idle connections since + // Pongs arrive after some delay + return Some(config.heartbeat.saturating_sub(elapsed_since_last_alive)); + } + + // send ping + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default(); + if sink + .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) + .await + .is_err() + { + return None; + } + + Some(config.heartbeat) +} diff --git a/src/server_runners/tungstenite_common.rs b/src/tungstenite_common.rs similarity index 96% rename from src/server_runners/tungstenite_common.rs rename to src/tungstenite_common.rs index e0f1e4a..2044928 100644 --- a/src/server_runners/tungstenite_common.rs +++ b/src/tungstenite_common.rs @@ -1,11 +1,7 @@ -//! The `tungstenite_common` feature must be enabled in order to use this module. -//! - use crate::socket::RawMessage; use crate::CloseCode; use crate::CloseFrame; use crate::Message; -use tokio_tungstenite::tungstenite; use tungstenite::protocol::frame::coding::CloseCode as TungsteniteCloseCode; impl<'t> From> for CloseFrame { diff --git a/tests/chat.rs b/tests/chat.rs index d1c39de..81f6f0a 100644 --- a/tests/chat.rs +++ b/tests/chat.rs @@ -218,7 +218,7 @@ pub struct ChatClient { #[derive(Debug)] pub enum ChatClientMessage { Send(String), - Subscribe(oneshot::Sender>), + Subscribe(async_channel::Sender>), } impl ChatClient { @@ -252,7 +252,7 @@ impl ezsockets::ClientExt for ChatClient { let _ = self.handle.text(message).unwrap(); } ChatClientMessage::Subscribe(respond_to) => { - respond_to.send(self.messages.subscribe()).unwrap() + respond_to.send(self.messages.subscribe()).await.unwrap() } } Ok(())