From c5da78ddf7df32adaad30a778eeabda0c7746fec Mon Sep 17 00:00:00 2001 From: koe Date: Sun, 8 Oct 2023 01:18:20 -0500 Subject: [PATCH 01/16] fix features --- Cargo.toml | 10 ++++------ src/lib.rs | 1 + src/server_runners/mod.rs | 5 +---- src/{server_runners => }/tungstenite_common.rs | 3 --- 4 files changed, 6 insertions(+), 13 deletions(-) rename src/{server_runners => }/tungstenite_common.rs (97%) diff --git a/Cargo.toml b/Cargo.toml index 6c11ee2..3cf79da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,10 +31,10 @@ futures-util = { version = "0.3.25", default-features = false, features = ["allo http-body = { version = "0.4.5", optional = true } hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } +tokio-tungstenite = { version = "0.20.0" } tokio-tungstenite-wasm = { version = "0.2.0", optional = true } -tokio-tungstenite = { version = "0.20.0", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } @@ -42,12 +42,10 @@ tokio-native-tls = { version = "0.3.1", optional = true } default = ["native_client", "server"] client = ["tokio-tungstenite-wasm"] -native_client = ["client", "tokio-tungstenite", "tokio/rt"] -wasm_client = ["client", "tokio-tungstenite-wasm"] +native_client = ["client", "tokio/rt"] +wasm_client = ["client"] -tungstenite_common = ["tokio-tungstenite"] - -server = ["tungstenite_common", "tokio-tungstenite-wasm", "tokio/rt"] +server = ["tokio-tungstenite-wasm", "tokio/rt"] tungstenite = ["server"] axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"] diff --git a/src/lib.rs b/src/lib.rs index 3580033..fca71cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ //! Refer to [`client`] or [`server`] module for detailed implementation guides. mod socket; +mod tungstenite_common; pub use socket::CloseCode; pub use socket::CloseFrame; diff --git a/src/server_runners/mod.rs b/src/server_runners/mod.rs index 0a4c59a..6e43800 100644 --- a/src/server_runners/mod.rs +++ b/src/server_runners/mod.rs @@ -5,8 +5,5 @@ cfg_if::cfg_if! { } } -#[cfg(feature = "tokio-tungstenite")] +#[cfg(feature = "tungstenite")] pub mod tungstenite; - -#[cfg(feature = "tungstenite_common")] -pub mod tungstenite_common; diff --git a/src/server_runners/tungstenite_common.rs b/src/tungstenite_common.rs similarity index 97% rename from src/server_runners/tungstenite_common.rs rename to src/tungstenite_common.rs index e0f1e4a..3145e03 100644 --- a/src/server_runners/tungstenite_common.rs +++ b/src/tungstenite_common.rs @@ -1,6 +1,3 @@ -//! The `tungstenite_common` feature must be enabled in order to use this module. -//! - use crate::socket::RawMessage; use crate::CloseCode; use crate::CloseFrame; From 31189d055c3a45887c1db185f1b35a86a27e2ad4 Mon Sep 17 00:00:00 2001 From: koe Date: Wed, 11 Oct 2023 19:51:25 -0500 Subject: [PATCH 02/16] remove tokio-tungstenite dependency from clients --- Cargo.toml | 5 +++-- src/tungstenite_common.rs | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3cf79da..8f2194e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ futures = "0.3.21" http = "0.2.8" tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } tracing = "0.1.31" +tungstenite = "0.20.0" url = "2.2.2" cfg-if = "1.0.0" @@ -31,7 +32,7 @@ futures-util = { version = "0.3.25", default-features = false, features = ["allo http-body = { version = "0.4.5", optional = true } hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } -tokio-tungstenite = { version = "0.20.0" } +tokio-tungstenite = { version = "0.20.0", optional = true } tokio-tungstenite-wasm = { version = "0.2.0", optional = true } @@ -45,7 +46,7 @@ client = ["tokio-tungstenite-wasm"] native_client = ["client", "tokio/rt"] wasm_client = ["client"] -server = ["tokio-tungstenite-wasm", "tokio/rt"] +server = ["tokio-tungstenite", "tokio-tungstenite-wasm", "tokio/rt"] tungstenite = ["server"] axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"] diff --git a/src/tungstenite_common.rs b/src/tungstenite_common.rs index 3145e03..2044928 100644 --- a/src/tungstenite_common.rs +++ b/src/tungstenite_common.rs @@ -2,7 +2,6 @@ use crate::socket::RawMessage; use crate::CloseCode; use crate::CloseFrame; use crate::Message; -use tokio_tungstenite::tungstenite; use tungstenite::protocol::frame::coding::CloseCode as TungsteniteCloseCode; impl<'t> From> for CloseFrame { From ab52e65a5892b4984a4cad7b92dc18607ddfac88 Mon Sep 17 00:00:00 2001 From: koe Date: Wed, 11 Oct 2023 22:20:15 -0500 Subject: [PATCH 03/16] remove tokio dependency from clients for WASM builds --- CHANGELOG.md | 1 + Cargo.toml | 9 +- benches/my_benchmark.rs | 8 +- src/client.rs | 86 +++++----- .../client_connector_wasm.rs | 2 +- src/lib.rs | 2 + src/socket.rs | 155 +++++++++++------- tests/chat.rs | 4 +- 8 files changed, 163 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e6dc2f..5a1d7fa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio. - Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`. - Add `ClientConnectorWasm` and `wasm_client` feature. +- Refactor `Socket` and `Client` to not depend on `tokio` when compiling to WASM. This is a breaking change as the `Client` API now exposes `async_channel` error types instead of `tokio` error types, and `Client::call_with()` now takes an `async_channel::Sender` instead of a `tokio` oneshot. Migration guide: diff --git a/Cargo.toml b/Cargo.toml index 8f2194e..352739d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,17 +9,18 @@ repository = "https://github.com/gbaranski/ezsockets" license = "MIT" keywords = ["websocket", "networking", "async"] categories = ["asynchronous", "network-programming", "web-programming::websocket"] +resolver = "2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-channel = "1.9.0" async-trait = "0.1.52" atomic_enum = "0.2.0" base64 = "0.21.0" enfync = "0.1.0" futures = "0.3.21" http = "0.2.8" -tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } tracing = "0.1.31" tungstenite = "0.20.0" url = "2.2.2" @@ -39,6 +40,12 @@ tokio-tungstenite-wasm = { version = "0.2.0", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } +[target.'cfg(not(target_family = "wasm"))'.dependencies] +tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } + +[target.'cfg(target_family = "wasm")'.dependencies] +wasmtimer = "0.2.0" + [features] default = ["native_client", "server"] diff --git a/benches/my_benchmark.rs b/benches/my_benchmark.rs index 57b8b16..e879bd6 100644 --- a/benches/my_benchmark.rs +++ b/benches/my_benchmark.rs @@ -54,7 +54,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap(); ezsockets_server::run(listener).await; }); - sleep(Duration::from_millis(1000)); + sleep(Duration::from_millis(100)); let mut client = tungstenite::connect(format!("ws://127.0.0.1:{}", port)) .unwrap() .0; @@ -63,5 +63,9 @@ pub fn criterion_benchmark(c: &mut Criterion) { task.abort(); } -criterion_group!(benches, criterion_benchmark); +criterion_group! { + name = benches; + config = Criterion::default().sample_size(10); + targets = criterion_benchmark +} criterion_main!(benches); diff --git a/src/client.rs b/src/client.rs index 67ce786..cada8ea 100644 --- a/src/client.rs +++ b/src/client.rs @@ -58,18 +58,21 @@ use crate::Socket; use async_trait::async_trait; use base64::Engine; use enfync::Handle; -use futures::{SinkExt, StreamExt}; +use futures::{FutureExt, SinkExt, StreamExt}; use http::header::HeaderName; use http::HeaderValue; use std::fmt; use std::future::Future; use std::time::Duration; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio_tungstenite::tungstenite; use tokio_tungstenite_wasm::Error as WSError; use url::Url; +#[cfg(not(target_family = "wasm"))] +use tokio::time::sleep; + +#[cfg(target_family = "wasm")] +use wasmtimer::tokio::sleep; + pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::new(5, 0); #[derive(Debug)] @@ -297,8 +300,8 @@ pub trait ClientConnector { /// An `ezsockets` client. #[derive(Debug)] pub struct Client { - to_socket_sender: mpsc::UnboundedSender, - client_call_sender: mpsc::UnboundedSender, + to_socket_sender: async_channel::Sender, + client_call_sender: async_channel::Sender, } impl Clone for Client { @@ -310,7 +313,7 @@ impl Clone for Client { } } -impl From> for mpsc::UnboundedSender { +impl From> for async_channel::Sender { fn from(client: Client) -> Self { client.client_call_sender } @@ -323,11 +326,11 @@ impl Client { pub fn text( &self, text: impl Into, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Text(text.into())); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } @@ -337,19 +340,19 @@ impl Client { pub fn binary( &self, bytes: impl Into>, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Binary(bytes.into())); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } /// Call a custom method on the Client. /// /// Refer to `ClientExt::on_call`. - pub fn call(&self, message: E::Call) -> Result<(), mpsc::error::SendError> { - self.client_call_sender.send(message) + pub fn call(&self, message: E::Call) -> Result<(), async_channel::SendError> { + self.client_call_sender.send_blocking(message) } /// Call a custom method on the Client, with a reply from the `ClientExt::on_call`. @@ -357,15 +360,15 @@ impl Client { /// This works just as syntactic sugar for `Client::call(sender)` pub async fn call_with( &self, - f: impl FnOnce(oneshot::Sender) -> E::Call, + f: impl FnOnce(async_channel::Sender) -> E::Call, ) -> Option { - let (sender, receiver) = oneshot::channel(); + let (sender, receiver) = async_channel::bounded(1usize); let call = f(sender); - let Ok(_) = self.client_call_sender.send(call) else { + let Ok(_) = self.client_call_sender.send(call).await else { return None; }; - let Ok(result) = receiver.await else { + let Ok(result) = receiver.recv().await else { return None; }; Some(result) @@ -380,11 +383,11 @@ impl Client { pub fn close( &self, frame: Option, - ) -> Result> { + ) -> Result> { let inmessage = InMessage::new(Message::Close(frame)); let inmessage_signal = inmessage.clone_signal().unwrap(); //safety: always available on construction self.to_socket_sender - .send(inmessage) + .send_blocking(inmessage) .map(|_| inmessage_signal) } } @@ -414,8 +417,8 @@ pub fn connect_with( config: ClientConfig, client_connector: impl ClientConnector + Send + Sync + 'static, ) -> (Client, enfync::PendingResult>) { - let (to_socket_sender, mut to_socket_receiver) = mpsc::unbounded_channel(); - let (client_call_sender, client_call_receiver) = mpsc::unbounded_channel(); + let (to_socket_sender, mut to_socket_receiver) = async_channel::unbounded(); + let (client_call_sender, client_call_receiver) = async_channel::unbounded(); let handle = Client { to_socket_sender, client_call_sender, @@ -453,8 +456,8 @@ pub fn connect_with( struct ClientActor { client: E, - to_socket_receiver: mpsc::UnboundedReceiver, - client_call_receiver: mpsc::UnboundedReceiver, + to_socket_receiver: async_channel::Receiver, + client_call_receiver: async_channel::Receiver, socket: Socket, config: ClientConfig, client_connector: C, @@ -463,8 +466,11 @@ struct ClientActor { impl ClientActor { async fn run(&mut self) -> Result<(), Error> { loop { - tokio::select! { - Some(inmessage) = self.to_socket_receiver.recv() => { + futures::select! { + res = self.to_socket_receiver.recv().fuse() => { + let Ok(inmessage) = res else { + break; + }; let closed_self = matches!(inmessage.message, Some(Message::Close(_))); if self.socket.send(inmessage).await.is_err() { let result = self.socket.await_sink_close().await; @@ -480,7 +486,7 @@ impl ClientActor { // A) The connection was closed via the close protocol, so we will allow the stream to // handle it. // B) We already tried and failed to submit another message, so now we are - // waiting for other parts of the tokio::select to shut us down. + // waiting for other parts of the select! to shut us down. // C) An IO error means the connection closed unexpectedly, so we can try to reconnect when // the stream fails. } @@ -493,10 +499,13 @@ impl ClientActor { return Ok(()) } } - Some(call) = self.client_call_receiver.recv() => { + res = self.client_call_receiver.recv().fuse() => { + let Ok(call) = res else { + break; + }; self.client.on_call(call).await?; } - result = self.socket.stream.recv() => { + result = self.socket.stream.recv().fuse() => { match result { Some(Ok(message)) => { match message.to_owned() { @@ -548,7 +557,6 @@ impl ClientActor { } }; } - else => break, } } @@ -561,7 +569,7 @@ async fn client_connect( max_attempts: usize, config: &ClientConfig, client_connector: &Connector, - to_socket_receiver: &mut mpsc::UnboundedReceiver, + to_socket_receiver: &mut async_channel::Receiver, client: &mut E, ) -> Result, Error> { for i in 1.. { @@ -601,12 +609,16 @@ async fn client_connect( // Discard messages until either the connect interval passes, the socket receiver disconnects, or // the user sends a close message. - let sleep = tokio::time::sleep(config.reconnect_interval); - tokio::pin!(sleep); + let sleep = sleep(config.reconnect_interval).fuse(); + futures::pin_mut!(sleep); loop { - tokio::select! { - _ = &mut sleep => break, - Some(inmessage) = to_socket_receiver.recv() => { + futures::select! { + _ = sleep => break, + res = to_socket_receiver.recv().fuse() => { + let Ok(inmessage) = res else { + tracing::warn!("client is dead, aborting connection attempts"); + return Err(Error::from("client died while trying to connect")); + }; match &inmessage.message { Some(Message::Close(frame)) => { @@ -619,10 +631,6 @@ async fn client_connect( } } }, - else => { - tracing::warn!("client is dead, aborting connection attempts"); - return Err(Error::from("client died while trying to connect")); - }, } } } diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 1d33dad..3ddb1b7 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -37,7 +37,7 @@ impl ClientConnector for ClientConnectorWasm { panic!("client may not submit HTTP headers in WASM connection requests"); } let request_url = config.connect_url(); - let (socket, _) = tokio_tungstenite_wasm::connect(request_url).await?; + let socket = tokio_tungstenite_wasm::connect(request_url).await?; Ok(socket) } } diff --git a/src/lib.rs b/src/lib.rs index fca71cd..96bfec9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,9 @@ cfg_if::cfg_if! { pub use client_connectors::*; + #[cfg(feature = "native_client")] pub use client::connect; + pub use client::connect_with; pub use client::ClientConfig; pub use client::ClientExt; diff --git a/src/socket.rs b/src/socket.rs index 9aeec54..5e78492 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -1,4 +1,5 @@ -use futures::{SinkExt, StreamExt, TryStreamExt}; +use futures::lock::Mutex; +use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; use std::time::Instant; @@ -6,9 +7,6 @@ use std::{ marker::PhantomData, time::{Duration, SystemTime, UNIX_EPOCH}, }; -use tokio::sync::mpsc; -use tokio::sync::oneshot; -use tokio::sync::Mutex; use tokio_tungstenite_wasm::Error as WSError; /// Wrapper trait for `Fn(Duration) -> RawMessage`. @@ -318,8 +316,8 @@ where M: From, S: SinkExt + Unpin, { - receiver: mpsc::UnboundedReceiver, - abort_receiver: oneshot::Receiver<()>, + receiver: async_channel::Receiver, + abort_receiver: async_channel::Receiver<()>, sink: S, phantom: PhantomData, } @@ -331,8 +329,9 @@ where { async fn run(&mut self) -> Result<(), WSError> { loop { - tokio::select! { - Some(mut inmessage) = self.receiver.recv() => { + futures::select! { + res = self.receiver.recv().fuse() => { + let Ok(mut inmessage) = res else { break; }; let Some(message) = inmessage.take_message() else { continue; }; @@ -346,10 +345,10 @@ where } } }, - Ok(()) = &mut self.abort_receiver => { + _ = &mut self.abort_receiver.recv().fuse() => { break; }, - else => { + complete => { break; } } @@ -360,20 +359,20 @@ where #[derive(Debug, Clone)] pub struct Sink { - sender: mpsc::UnboundedSender, + sender: async_channel::Sender, } impl Sink { fn new( sink: S, - abort_receiver: oneshot::Receiver<()>, + abort_receiver: async_channel::Receiver<()>, handle: impl enfync::Handle, ) -> (enfync::PendingResult>, Self) where M: From + Send + 'static, S: SinkExt + Unpin + Send + 'static, { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = async_channel::unbounded(); let mut actor = SinkActor { receiver, abort_receiver, @@ -391,15 +390,15 @@ impl Sink { pub async fn send( &self, inmessage: InMessage, - ) -> Result<(), mpsc::error::SendError> { - self.sender.send(inmessage.into()) + ) -> Result<(), async_channel::SendError> { + self.sender.send(inmessage.into()).await } pub(crate) async fn send_raw( &self, inmessage: InRawMessage, - ) -> Result<(), mpsc::error::SendError> { - self.sender.send(inmessage) + ) -> Result<(), async_channel::SendError> { + self.sender.send(inmessage).await } } @@ -409,7 +408,7 @@ where M: Into, S: StreamExt> + Unpin, { - sender: mpsc::UnboundedSender>, + sender: async_channel::Sender>, stream: S, last_alive: Arc>, } @@ -452,7 +451,7 @@ where }), Err(err) => Err(err), // maybe early return here? }; - if self.sender.send(message).is_err() { + if self.sender.send(message).await.is_err() { // In websockets, you always echo a close frame received from your connection partner back to them. // This means a normal close sequence will always end with the following line emitted by the socket of // the client/server that initiated the close sequence (in response to the close frame echoed by their @@ -470,7 +469,7 @@ where #[derive(Debug)] pub struct Stream { - receiver: mpsc::UnboundedReceiver>, + receiver: async_channel::Receiver>, } impl Stream { @@ -483,7 +482,7 @@ impl Stream { M: Into + std::fmt::Debug + Send + 'static, S: StreamExt> + Unpin + Send + 'static, { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = async_channel::unbounded(); let actor = StreamActor { sender, stream, @@ -495,7 +494,7 @@ impl Stream { } pub async fn recv(&mut self) -> Option> { - self.receiver.recv().await + self.receiver.recv().await.ok() } } @@ -503,7 +502,7 @@ impl Stream { pub struct Socket { pub sink: Sink, pub stream: Stream, - sink_result_receiver: Option>>, + sink_result_receiver: Option>>, } impl Socket { @@ -520,18 +519,18 @@ impl Socket { let last_alive = Instant::now(); let last_alive = Arc::new(Mutex::new(last_alive)); let (sink, stream) = socket.sink_err_into().err_into().split(); - let (sink_abort_sender, sink_abort_receiver) = oneshot::channel(); + let (sink_abort_sender, sink_abort_receiver) = async_channel::bounded(1usize); let ((mut sink_future, sink), (mut stream_future, stream)) = ( Sink::new(sink, sink_abort_receiver, handle.clone()), Stream::new(stream, last_alive.clone(), handle.clone()), ); - let (hearbeat_abort_sender, hearbeat_abort_receiver) = oneshot::channel(); + let (hearbeat_abort_sender, hearbeat_abort_receiver) = async_channel::bounded(1usize); let sink_clone = sink.clone(); handle.spawn(async move { socket_heartbeat(sink_clone, config, hearbeat_abort_receiver, last_alive).await }); - let (sink_result_sender, sink_result_receiver) = oneshot::channel(); + let (sink_result_sender, sink_result_receiver) = async_channel::bounded(1usize); handle.spawn(async move { let _ = stream_future.extract().await; let _ = sink_abort_sender.send(()); @@ -554,14 +553,14 @@ impl Socket { pub async fn send( &self, message: InMessage, - ) -> Result<(), mpsc::error::SendError> { + ) -> Result<(), async_channel::SendError> { self.sink.send(message).await } pub async fn send_raw( &self, message: InRawMessage, - ) -> Result<(), mpsc::error::SendError> { + ) -> Result<(), async_channel::SendError> { self.sink.send_raw(message).await } @@ -574,15 +573,17 @@ impl Socket { return Err(WSError::AlreadyClosed); }; sink_result_receiver + .recv() .await .unwrap_or(Err(WSError::AlreadyClosed)) } } +#[cfg(not(target_family = "wasm"))] async fn socket_heartbeat( sink: Sink, config: SocketConfig, - mut abort_receiver: oneshot::Receiver<()>, + abort_receiver: async_channel::Receiver<()>, last_alive: Arc>, ) { let sleep = tokio::time::sleep(config.heartbeat); @@ -591,39 +592,75 @@ async fn socket_heartbeat( loop { tokio::select! { _ = &mut sleep => { - let elapsed_since_last_alive = last_alive.lock().await.elapsed(); - if elapsed_since_last_alive > config.timeout { - tracing::info!("closing connection due to timeout"); - let _ = sink - .send_raw(InRawMessage::new(RawMessage::Close(Some(CloseFrame { - code: CloseCode::Normal, - reason: String::from("remote partner is inactive"), - })))) - .await; - return; - } else if elapsed_since_last_alive < config.heartbeat { - // todo: this branch will needlessly fire at least once per heartbeat for idle connections since - // Pongs arrive after some delay - sleep.as_mut().reset( - tokio::time::Instant::now() + config.heartbeat.saturating_sub(elapsed_since_last_alive) - ); - continue; - } else { - sleep.as_mut().reset(tokio::time::Instant::now() + config.heartbeat); - } - let timestamp = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default(); - if sink - .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) - .await - .is_err() - { + let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive).await else { + break; + }; + sleep.as_mut().reset(tokio::time::Instant::now() + next_sleep_duration); + } + _ = &mut abort_receiver.recv() => break, + } + } +} + +#[cfg(target_family = "wasm")] +async fn socket_heartbeat( + sink: Sink, + config: SocketConfig, + abort_receiver: async_channel::Receiver<()>, + last_alive: Arc>, +) { + let mut sleep_duration = config.heartbeat; + + loop { + // It is better to use Sleep::reset(), but we can't do it here because fuse() consumes the sleep + // and we need futures::select to avoid depending on tokio. + let sleep = wasmtimer::tokio::sleep(sleep_duration).fuse(); + futures::pin_mut!(sleep); + futures::select! { + _ = sleep => { + let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive) else { break; } + sleep_duration = next_sleep_duration; } - _ = &mut abort_receiver => break, - else => break, + _ = &mut abort_receiver.recv().fuse() => break, } } } + +async fn handle_heartbeat_sleep_elapsed( + sink: &Sink, + config: &SocketConfig, + last_alive: &Arc>, +) -> Option { + // check last alive + let elapsed_since_last_alive = last_alive.lock().await.elapsed(); + if elapsed_since_last_alive > config.timeout { + tracing::info!("closing connection due to timeout"); + let _ = sink + .send_raw(InRawMessage::new(RawMessage::Close(Some(CloseFrame { + code: CloseCode::Normal, + reason: String::from("remote partner is inactive"), + })))) + .await; + return None; + } else if elapsed_since_last_alive < config.heartbeat { + // todo: this branch will needlessly fire at least once per heartbeat for idle connections since + // Pongs arrive after some delay + return Some(config.heartbeat.saturating_sub(elapsed_since_last_alive)); + } + + // send ping + let timestamp = SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default(); + if sink + .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) + .await + .is_err() + { + return None; + } + + Some(config.heartbeat) +} diff --git a/tests/chat.rs b/tests/chat.rs index d1c39de..81f6f0a 100644 --- a/tests/chat.rs +++ b/tests/chat.rs @@ -218,7 +218,7 @@ pub struct ChatClient { #[derive(Debug)] pub enum ChatClientMessage { Send(String), - Subscribe(oneshot::Sender>), + Subscribe(async_channel::Sender>), } impl ChatClient { @@ -252,7 +252,7 @@ impl ezsockets::ClientExt for ChatClient { let _ = self.handle.text(message).unwrap(); } ChatClientMessage::Subscribe(respond_to) => { - respond_to.send(self.messages.subscribe()).unwrap() + respond_to.send(self.messages.subscribe()).await.unwrap() } } Ok(()) From 398f652e00f33eaf88d784d1383aee3520a87042 Mon Sep 17 00:00:00 2001 From: koe Date: Wed, 11 Oct 2023 23:23:19 -0500 Subject: [PATCH 04/16] wasm fixes --- Cargo.toml | 3 +- .../client_connector_wasm.rs | 51 +++++++++++++++++++ src/socket.rs | 4 +- 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 352739d..0b353f4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,8 @@ hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } tokio-tungstenite = { version = "0.20.0", optional = true } -tokio-tungstenite-wasm = { version = "0.2.0", optional = true } +#tokio-tungstenite-wasm = { version = "0.2.0", optional = true } +tokio-tungstenite-wasm = { git = "https://github.com/UkoeHB/tokio-tungstenite-wasm", rev = "503cf12", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 3ddb1b7..22cbcde 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -1,4 +1,55 @@ use crate::client::{ClientConfig, ClientConnector}; +use crate::socket::{CloseFrame, Message, RawMessage}; + +impl<'t> From> for CloseFrame { + fn from(frame: tokio_tungstenite_wasm::CloseFrame) -> Self { + Self { + code: Into::::into(Into::::into(frame.code)).into(), + reason: frame.reason.into(), + } + } +} + +impl<'t> From for tokio_tungstenite_wasm::CloseFrame<'t> { + fn from(frame: CloseFrame) -> Self { + Self { + code: Into::::into(Into::::into(frame.code)).into(), + reason: frame.reason.into(), + } + } +} + +impl From for tokio_tungstenite_wasm::Message { + fn from(message: RawMessage) -> Self { + match message { + RawMessage::Text(text) => Self::Text(text), + RawMessage::Binary(bytes) => Self::Binary(bytes), + RawMessage::Ping(_) => Self::Close(None), + RawMessage::Pong(_) => Self::Close(None), + RawMessage::Close(frame) => Self::Close(frame.map(CloseFrame::into)), + } + } +} + +impl From for RawMessage { + fn from(message: tokio_tungstenite_wasm::Message) -> Self { + match message { + tokio_tungstenite_wasm::Message::Text(text) => Self::Text(text), + tokio_tungstenite_wasm::Message::Binary(bytes) => Self::Binary(bytes), + tokio_tungstenite_wasm::Message::Close(frame) => Self::Close(frame.map(CloseFrame::from)), + } + } +} + +impl From for tokio_tungstenite_wasm::Message { + fn from(message: Message) -> Self { + match message { + Message::Text(text) => tokio_tungstenite_wasm::Message::Text(text), + Message::Binary(bytes) => tokio_tungstenite_wasm::Message::Binary(bytes), + Message::Close(frame) => tokio_tungstenite_wasm::Message::Close(frame.map(CloseFrame::into)), + } + } +} /// Implementation of [`ClientConnector`] for tokio runtimes. #[derive(Clone)] diff --git a/src/socket.rs b/src/socket.rs index 5e78492..bebcf49 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -618,9 +618,9 @@ async fn socket_heartbeat( futures::pin_mut!(sleep); futures::select! { _ = sleep => { - let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive) else { + let Some(next_sleep_duration) = handle_heartbeat_sleep_elapsed(&sink, &config, &last_alive).await else { break; - } + }; sleep_duration = next_sleep_duration; } _ = &mut abort_receiver.recv().fuse() => break, From f3c043a8c2b750995a1cad9561b15717c8f98c02 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 17:37:18 -0500 Subject: [PATCH 05/16] add demo WASM client; bug fixes --- Cargo.toml | 13 ++- examples/chat-client-wasm/Cargo.toml | 18 ++++ examples/chat-client-wasm/src/main.rs | 74 ++++++++++++++++ examples/chat-server/src/main.rs | 11 ++- src/client.rs | 3 +- .../client_connector_wasm.rs | 88 +++++++++++++++++-- src/socket.rs | 15 ++-- 7 files changed, 202 insertions(+), 20 deletions(-) create mode 100644 examples/chat-client-wasm/Cargo.toml create mode 100644 examples/chat-client-wasm/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 0b353f4..ce9d834 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,8 @@ keywords = ["websocket", "networking", "async"] categories = ["asynchronous", "network-programming", "web-programming::websocket"] resolver = "2" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[lib] +crate-type = ["cdylib", "rlib"] [dependencies] async-channel = "1.9.0" @@ -20,6 +21,7 @@ atomic_enum = "0.2.0" base64 = "0.21.0" enfync = "0.1.0" futures = "0.3.21" +futures-util = { version = "0.3.25", default-features = false } http = "0.2.8" tracing = "0.1.31" tungstenite = "0.20.0" @@ -29,7 +31,7 @@ cfg-if = "1.0.0" axum = { version = "0.6.1", optional = true } axum-core = { version = "0.3.0", optional = true } bytes = { version = "1.3.0", optional = true } -futures-util = { version = "0.3.25", default-features = false, features = ["alloc"], optional = true } +fragile = { version = "2.0", optional = true } http-body = { version = "0.4.5", optional = true } hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } @@ -45,6 +47,8 @@ tokio-native-tls = { version = "0.3.1", optional = true } tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } [target.'cfg(target_family = "wasm")'.dependencies] +getrandom = { version = "0.2", features = ["js"] } +wasm-bindgen-futures = { version = "0.4" } wasmtimer = "0.2.0" [features] @@ -52,11 +56,11 @@ default = ["native_client", "server"] client = ["tokio-tungstenite-wasm"] native_client = ["client", "tokio/rt"] -wasm_client = ["client"] +wasm_client = ["client", "fragile"] server = ["tokio-tungstenite", "tokio-tungstenite-wasm", "tokio/rt"] tungstenite = ["server"] -axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util", "http-body", "hyper", "sha-1"] +axum = ["server", "dep:axum", "axum-core", "bytes", "futures-util/alloc", "http-body", "hyper", "sha-1"] tls = [] native-tls = ["tls", "tokio-native-tls", "tokio-tungstenite/native-tls"] @@ -76,6 +80,7 @@ rustdoc-args = ["--cfg", "docsrs"] [workspace] members = [ "examples/chat-client", + "examples/chat-client-wasm", "examples/chat-server", "examples/chat-server-axum", "examples/echo-server", diff --git a/examples/chat-client-wasm/Cargo.toml b/examples/chat-client-wasm/Cargo.toml new file mode 100644 index 0000000..eeb9b91 --- /dev/null +++ b/examples/chat-client-wasm/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "ezsockets-chat-client-wasm" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1.52" +console_error_panic_hook = "0.1" +ezsockets = { path = "../../", default-features = false, features = ["wasm_client"] } +tracing = "0.1.32" +tracing-subscriber = "0.3.9" +tracing-wasm = { version = "0.2" } +url = "2.2.2" +wasm-bindgen = { version = "0.2" } +wasm-bindgen-futures = { version = "0.4" } +wasmtimer = "0.2.0" diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs new file mode 100644 index 0000000..7e56eb4 --- /dev/null +++ b/examples/chat-client-wasm/src/main.rs @@ -0,0 +1,74 @@ +#![allow(unused_imports)] + +use async_trait::async_trait; +use ezsockets::{ClientConfig, RawMessage, SocketConfig}; +use std::io::BufRead; +use std::sync::Arc; +use std::time::Duration; + +struct Client {} + +#[async_trait] +impl ezsockets::ClientExt for Client { + type Call = (); + + async fn on_text(&mut self, text: String) -> Result<(), ezsockets::Error> { + tracing::info!("received message: {text}"); + Ok(()) + } + + async fn on_binary(&mut self, bytes: Vec) -> Result<(), ezsockets::Error> { + tracing::info!("received bytes: {bytes:?}"); + Ok(()) + } + + async fn on_call(&mut self, call: Self::Call) -> Result<(), ezsockets::Error> { + let () = call; + Ok(()) + } +} + +#[wasm_bindgen::prelude::wasm_bindgen(main)] +async fn main() -> Result<(), wasm_bindgen::JsValue> { + // setup tracing + console_error_panic_hook::set_once(); + tracing_wasm::set_as_global_default(); + + // make client + let config = ClientConfig::new("ws://localhost:8080/websocket") + .socket_config( + SocketConfig { + heartbeat: Duration::from_secs(5), + timeout: Duration::from_secs(10), + heartbeat_ping_msg_fn: Arc::new(|_t: Duration| { + RawMessage::Binary("ping".into()) + }), + } + ); + let (client, mut handle) = ezsockets::connect_with( + |_client| Client {}, + config, + ezsockets::ClientConnectorWasm::default(), + ); + + // collect inputs + wasm_bindgen_futures::spawn_local( + async move { + loop { + let stdin = std::io::stdin(); + let lines = stdin.lock().lines(); + for line in lines { + let line = line.unwrap(); + tracing::info!("sending {line}"); + client.text(line).unwrap(); + } + + wasmtimer::tokio::sleep(Duration::from_secs(1)).await; + } + } + ); + + handle.extract().await.unwrap().unwrap(); + + Ok(()) +} diff --git a/examples/chat-server/src/main.rs b/examples/chat-server/src/main.rs index 47e0a85..80f4493 100644 --- a/examples/chat-server/src/main.rs +++ b/examples/chat-server/src/main.rs @@ -42,9 +42,10 @@ impl ezsockets::ServerExt for ChatServer { ) -> Result> { let id = (0..).find(|i| !self.sessions.contains_key(i)).unwrap_or(0); let session = Session::create( - |_handle| SessionActor { + |session_handle| SessionActor { id, server: self.handle.clone(), + session: session_handle, room: DEFAULT_ROOM.to_string(), }, id, @@ -129,6 +130,7 @@ impl ezsockets::ServerExt for ChatServer { struct SessionActor { id: SessionID, server: Server, + session: Session, room: String, } @@ -168,8 +170,11 @@ impl ezsockets::SessionExt for SessionActor { Ok(()) } - async fn on_binary(&mut self, _bytes: Vec) -> Result<(), Error> { - unimplemented!() + async fn on_binary(&mut self, bytes: Vec) -> Result<(), Error> { + // echo bytes back (we use this for a hacky ping/pong protocol for the client demo) + tracing::info!("echoing bytes: {bytes:?}"); + self.session.binary("pong".as_bytes())?; + Ok(()) } async fn on_call(&mut self, call: Self::Call) -> Result<(), Error> { diff --git a/src/client.rs b/src/client.rs index cada8ea..f154104 100644 --- a/src/client.rs +++ b/src/client.rs @@ -62,7 +62,6 @@ use futures::{FutureExt, SinkExt, StreamExt}; use http::header::HeaderName; use http::HeaderValue; use std::fmt; -use std::future::Future; use std::time::Duration; use tokio_tungstenite_wasm::Error as WSError; use url::Url; @@ -399,7 +398,7 @@ impl Client { pub async fn connect( client_fn: impl FnOnce(Client) -> E, config: ClientConfig, -) -> (Client, impl Future>) { +) -> (Client, impl std::future::Future>) { let client_connector = crate::ClientConnectorTokio::default(); let (handle, mut future) = connect_with(client_fn, config, client_connector); let future = async move { diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 22cbcde..8187764 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -1,10 +1,14 @@ use crate::client::{ClientConfig, ClientConnector}; use crate::socket::{CloseFrame, Message, RawMessage}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tungstenite::protocol::frame::coding::CloseCode as TungsteniteCloseCode; + impl<'t> From> for CloseFrame { fn from(frame: tokio_tungstenite_wasm::CloseFrame) -> Self { Self { - code: Into::::into(Into::::into(frame.code)).into(), + code: Into::::into(Into::::into(frame.code)).into(), reason: frame.reason.into(), } } @@ -13,7 +17,7 @@ impl<'t> From> for CloseFrame { impl<'t> From for tokio_tungstenite_wasm::CloseFrame<'t> { fn from(frame: CloseFrame) -> Self { Self { - code: Into::::into(Into::::into(frame.code)).into(), + code: Into::::into(Into::::into(frame.code)).into(), reason: frame.reason.into(), } } @@ -51,7 +55,7 @@ impl From for tokio_tungstenite_wasm::Message { } } -/// Implementation of [`ClientConnector`] for tokio runtimes. +/// Implementation of [`ClientConnector`] for WASM targets. #[derive(Clone)] pub struct ClientConnectorWasm { handle: enfync::builtin::wasm::WASMHandle, @@ -69,7 +73,7 @@ impl ClientConnector for ClientConnectorWasm { type Handle = enfync::builtin::wasm::WASMHandle; type Message = tokio_tungstenite_wasm::Message; type WSError = tokio_tungstenite_wasm::Error; - type Socket = tokio_tungstenite_wasm::WebSocketStream; + type Socket = WebSocketStreamProxy; type ConnectError = tokio_tungstenite_wasm::Error; /// Get the connector's runtime handle. @@ -88,7 +92,81 @@ impl ClientConnector for ClientConnectorWasm { panic!("client may not submit HTTP headers in WASM connection requests"); } let request_url = config.connect_url(); - let socket = tokio_tungstenite_wasm::connect(request_url).await?; + let socket = wasm_client_connect(String::from(request_url)).await?; Ok(socket) } } + +/// Proxy websocket that allows access to a WASM websocket. We need this because +/// `tokio_tungstenite_wasm::WebSocketStream` is !Send. +pub struct WebSocketStreamProxy { + inner: fragile::Fragile, +} + +impl futures_util::Stream for WebSocketStreamProxy{ + type Item = tokio_tungstenite_wasm::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_next(cx) + } +} + +impl futures_util::Sink for WebSocketStreamProxy{ + type Error = tokio_tungstenite_wasm::Error; + + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_ready(cx) + } + + fn start_send(mut self: Pin<&mut Self>, item: tokio_tungstenite_wasm::Message) -> Result<(), Self::Error> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.start_send(item) + } + + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { + Ok(()).into() + } + + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let inner = self.inner.get_mut(); + futures::pin_mut!(inner); + + inner.poll_close(cx) + } +} + +async fn wasm_client_connect( + request_url: String, +) -> Result { + // connect the websocket + let (result_sender, result_receiver) = async_channel::bounded(1usize); + + wasm_bindgen_futures::spawn_local( + async move { + let result = tokio_tungstenite_wasm::connect(request_url.as_str()).await; + result_sender.send_blocking(result.map(|websocket| fragile::Fragile::new(websocket))).unwrap(); + } + ); + + let websocket = result_receiver.recv().await.unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?; + + // build proxy + Ok(WebSocketStreamProxy{ inner: websocket }) +} diff --git a/src/socket.rs b/src/socket.rs index bebcf49..6d3edcc 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -1,14 +1,17 @@ use futures::lock::Mutex; use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; +use std::marker::PhantomData; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; -use std::time::Instant; -use std::{ - marker::PhantomData, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::time::Duration; use tokio_tungstenite_wasm::Error as WSError; +#[cfg(not(target_family = "wasm"))] +use std::time::{Instant, SystemTime, UNIX_EPOCH}; + +#[cfg(target_family = "wasm")] +use wasmtimer::std::{Instant, SystemTime, UNIX_EPOCH}; + /// Wrapper trait for `Fn(Duration) -> RawMessage`. pub trait SocketHeartbeatPingFn: Fn(Duration) -> RawMessage + Sync + Send {} impl SocketHeartbeatPingFn for F where F: Fn(Duration) -> RawMessage + Sync + Send {} @@ -652,7 +655,7 @@ async fn handle_heartbeat_sleep_elapsed( // send ping let timestamp = SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) + .duration_since(UNIX_EPOCH) .unwrap_or_default(); if sink .send_raw(InRawMessage::new((config.heartbeat_ping_msg_fn)(timestamp))) From 914a1c692c61e21ca334636874ce4769abf03ed2 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 17:41:24 -0500 Subject: [PATCH 06/16] cleanup --- examples/chat-client-wasm/src/main.rs | 39 ++++++-------- src/client.rs | 5 +- .../client_connector_wasm.rs | 53 ++++++++++--------- src/socket.rs | 2 +- 4 files changed, 48 insertions(+), 51 deletions(-) diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs index 7e56eb4..1088038 100644 --- a/examples/chat-client-wasm/src/main.rs +++ b/examples/chat-client-wasm/src/main.rs @@ -35,16 +35,11 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { tracing_wasm::set_as_global_default(); // make client - let config = ClientConfig::new("ws://localhost:8080/websocket") - .socket_config( - SocketConfig { - heartbeat: Duration::from_secs(5), - timeout: Duration::from_secs(10), - heartbeat_ping_msg_fn: Arc::new(|_t: Duration| { - RawMessage::Binary("ping".into()) - }), - } - ); + let config = ClientConfig::new("ws://localhost:8080/websocket").socket_config(SocketConfig { + heartbeat: Duration::from_secs(5), + timeout: Duration::from_secs(10), + heartbeat_ping_msg_fn: Arc::new(|_t: Duration| RawMessage::Binary("ping".into())), + }); let (client, mut handle) = ezsockets::connect_with( |_client| Client {}, config, @@ -52,21 +47,19 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { ); // collect inputs - wasm_bindgen_futures::spawn_local( - async move { - loop { - let stdin = std::io::stdin(); - let lines = stdin.lock().lines(); - for line in lines { - let line = line.unwrap(); - tracing::info!("sending {line}"); - client.text(line).unwrap(); - } - - wasmtimer::tokio::sleep(Duration::from_secs(1)).await; + wasm_bindgen_futures::spawn_local(async move { + loop { + let stdin = std::io::stdin(); + let lines = stdin.lock().lines(); + for line in lines { + let line = line.unwrap(); + tracing::info!("sending {line}"); + client.text(line).unwrap(); } + + wasmtimer::tokio::sleep(Duration::from_secs(1)).await; } - ); + }); handle.extract().await.unwrap().unwrap(); diff --git a/src/client.rs b/src/client.rs index f154104..7467280 100644 --- a/src/client.rs +++ b/src/client.rs @@ -398,7 +398,10 @@ impl Client { pub async fn connect( client_fn: impl FnOnce(Client) -> E, config: ClientConfig, -) -> (Client, impl std::future::Future>) { +) -> ( + Client, + impl std::future::Future>, +) { let client_connector = crate::ClientConnectorTokio::default(); let (handle, mut future) = connect_with(client_fn, config, client_connector); let future = async move { diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 8187764..afa4c7e 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -40,7 +40,9 @@ impl From for RawMessage { match message { tokio_tungstenite_wasm::Message::Text(text) => Self::Text(text), tokio_tungstenite_wasm::Message::Binary(bytes) => Self::Binary(bytes), - tokio_tungstenite_wasm::Message::Close(frame) => Self::Close(frame.map(CloseFrame::from)), + tokio_tungstenite_wasm::Message::Close(frame) => { + Self::Close(frame.map(CloseFrame::from)) + } } } } @@ -50,7 +52,9 @@ impl From for tokio_tungstenite_wasm::Message { match message { Message::Text(text) => tokio_tungstenite_wasm::Message::Text(text), Message::Binary(bytes) => tokio_tungstenite_wasm::Message::Binary(bytes), - Message::Close(frame) => tokio_tungstenite_wasm::Message::Close(frame.map(CloseFrame::into)), + Message::Close(frame) => { + tokio_tungstenite_wasm::Message::Close(frame.map(CloseFrame::into)) + } } } } @@ -103,7 +107,7 @@ pub struct WebSocketStreamProxy { inner: fragile::Fragile, } -impl futures_util::Stream for WebSocketStreamProxy{ +impl futures_util::Stream for WebSocketStreamProxy { type Item = tokio_tungstenite_wasm::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { @@ -114,37 +118,31 @@ impl futures_util::Stream for WebSocketStreamProxy{ } } -impl futures_util::Sink for WebSocketStreamProxy{ +impl futures_util::Sink for WebSocketStreamProxy { type Error = tokio_tungstenite_wasm::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = self.inner.get_mut(); futures::pin_mut!(inner); - + inner.poll_ready(cx) } - fn start_send(mut self: Pin<&mut Self>, item: tokio_tungstenite_wasm::Message) -> Result<(), Self::Error> { + fn start_send( + mut self: Pin<&mut Self>, + item: tokio_tungstenite_wasm::Message, + ) -> Result<(), Self::Error> { let inner = self.inner.get_mut(); futures::pin_mut!(inner); inner.start_send(item) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ok(()).into() } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let inner = self.inner.get_mut(); futures::pin_mut!(inner); @@ -158,15 +156,18 @@ async fn wasm_client_connect( // connect the websocket let (result_sender, result_receiver) = async_channel::bounded(1usize); - wasm_bindgen_futures::spawn_local( - async move { - let result = tokio_tungstenite_wasm::connect(request_url.as_str()).await; - result_sender.send_blocking(result.map(|websocket| fragile::Fragile::new(websocket))).unwrap(); - } - ); + wasm_bindgen_futures::spawn_local(async move { + let result = tokio_tungstenite_wasm::connect(request_url.as_str()).await; + result_sender + .send_blocking(result.map(|websocket| fragile::Fragile::new(websocket))) + .unwrap(); + }); - let websocket = result_receiver.recv().await.unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?; + let websocket = result_receiver + .recv() + .await + .unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?; // build proxy - Ok(WebSocketStreamProxy{ inner: websocket }) + Ok(WebSocketStreamProxy { inner: websocket }) } diff --git a/src/socket.rs b/src/socket.rs index 6d3edcc..1a5febb 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -616,7 +616,7 @@ async fn socket_heartbeat( loop { // It is better to use Sleep::reset(), but we can't do it here because fuse() consumes the sleep - // and we need futures::select to avoid depending on tokio. + // and we need futures::select since we can't use tokio on WASM targets. let sleep = wasmtimer::tokio::sleep(sleep_duration).fuse(); futures::pin_mut!(sleep); futures::select! { From 58518c9e9e2bf8ca2d49b4498f09765a9a256785 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 17:50:14 -0500 Subject: [PATCH 07/16] cleanup --- examples/chat-client-wasm/README.md | 3 +++ examples/chat-client-wasm/src/main.rs | 16 ++-------------- 2 files changed, 5 insertions(+), 14 deletions(-) create mode 100644 examples/chat-client-wasm/README.md diff --git a/examples/chat-client-wasm/README.md b/examples/chat-client-wasm/README.md new file mode 100644 index 0000000..3c7a711 --- /dev/null +++ b/examples/chat-client-wasm/README.md @@ -0,0 +1,3 @@ +Install [wasm-server-runner] and use `cargo run --target wasm32-unknown-unknown` from in this directory. + +This demo does not currently have a way to pass messages into the socket, but you can see the console log to read messages from native clients. diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs index 1088038..29df354 100644 --- a/examples/chat-client-wasm/src/main.rs +++ b/examples/chat-client-wasm/src/main.rs @@ -46,21 +46,9 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { ezsockets::ClientConnectorWasm::default(), ); - // collect inputs - wasm_bindgen_futures::spawn_local(async move { - loop { - let stdin = std::io::stdin(); - let lines = stdin.lock().lines(); - for line in lines { - let line = line.unwrap(); - tracing::info!("sending {line}"); - client.text(line).unwrap(); - } - - wasmtimer::tokio::sleep(Duration::from_secs(1)).await; - } - }); + // collect inputs: todo + // keep main alive until it is manually terminated handle.extract().await.unwrap().unwrap(); Ok(()) From 09319de726de6ef4f5318c3511d28afcc2e76063 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 21:04:17 -0500 Subject: [PATCH 08/16] fix native_client feature --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ce9d834..67ec3da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ wasmtimer = "0.2.0" default = ["native_client", "server"] client = ["tokio-tungstenite-wasm"] -native_client = ["client", "tokio/rt"] +native_client = ["client", "tokio/rt", "tokio-tungstenite"] wasm_client = ["client", "fragile"] server = ["tokio-tungstenite", "tokio-tungstenite-wasm", "tokio/rt"] From a7b704c113b5515e00e9825e8f6beaead6d260d1 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 21:54:16 -0500 Subject: [PATCH 09/16] raw ping/pong close frames --- src/client_connectors/client_connector_wasm.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index afa4c7e..92fb5cf 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -28,8 +28,14 @@ impl From for tokio_tungstenite_wasm::Message { match message { RawMessage::Text(text) => Self::Text(text), RawMessage::Binary(bytes) => Self::Binary(bytes), - RawMessage::Ping(_) => Self::Close(None), - RawMessage::Pong(_) => Self::Close(None), + RawMessage::Ping(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame { + code: tokio_tungstenite_wasm::CloseCode::Abnormal, + reason: "raw pings not supported".into(), + })), + RawMessage::Pong(_) => Self::Close(Some(tokio_tungstenite_wasm::CloseFrame { + code: tokio_tungstenite_wasm::CloseCode::Abnormal, + reason: "raw pongs not supported".into(), + })), RawMessage::Close(frame) => Self::Close(frame.map(CloseFrame::into)), } } From d8e40cda5975a5ca7061db9fbe532fe888043bf1 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 22:33:43 -0500 Subject: [PATCH 10/16] changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a1d7fa..77b1317 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - Use [`tokio-tungstenite-wasm`](https://github.com/TannerRogalsky/tokio-tungstenite-wasm) errors internally to better support cross-platform clients. - Use [`enfync`](https://github.com/UkoeHB/enfync) runtime handles internally to better support cross-platform clients. Default clients continue to use tokio. - Add `ClientConnector` abstraction for connecting clients and add `ezsockets::client::connect_with`. -- Add `ClientConnectorWasm` and `wasm_client` feature. +- Add `ClientConnectorWasm` and `wasm_client` feature. Added `chat-client-wasm` example to show a WASM client that compiles. It currently only listens to the chat and can't input anything since browser does not have a terminal. - Refactor `Socket` and `Client` to not depend on `tokio` when compiling to WASM. This is a breaking change as the `Client` API now exposes `async_channel` error types instead of `tokio` error types, and `Client::call_with()` now takes an `async_channel::Sender` instead of a `tokio` oneshot. From 4c2028c971d783a194b52c38b948e56db0e77b31 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 22:44:33 -0500 Subject: [PATCH 11/16] cleanup --- Cargo.toml | 5 ++--- README.md | 3 ++- benches/my_benchmark.rs | 8 ++------ examples/chat-client-wasm/Cargo.toml | 2 -- examples/chat-client-wasm/README.md | 8 ++++++-- examples/chat-client/Cargo.toml | 2 -- examples/chat-server-axum/Cargo.toml | 2 -- examples/chat-server/Cargo.toml | 2 -- examples/counter-server/Cargo.toml | 2 -- examples/echo-server-native-tls/Cargo.toml | 2 -- examples/echo-server/Cargo.toml | 2 -- examples/simple-client/Cargo.toml | 2 -- 12 files changed, 12 insertions(+), 28 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 67ec3da..030f336 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,8 @@ description = "WebSockets server & client made easy" readme = "README.md" repository = "https://github.com/gbaranski/ezsockets" license = "MIT" -keywords = ["websocket", "networking", "async"] +keywords = ["websocket", "networking", "async", "wasm"] categories = ["asynchronous", "network-programming", "web-programming::websocket"] -resolver = "2" [lib] crate-type = ["cdylib", "rlib"] @@ -48,7 +47,7 @@ tokio = { version = "1.17.0", features = ["sync", "macros", "time"] } [target.'cfg(target_family = "wasm")'.dependencies] getrandom = { version = "0.2", features = ["js"] } -wasm-bindgen-futures = { version = "0.4" } +wasm-bindgen-futures = "0.4" wasmtimer = "0.2.0" [features] diff --git a/README.md b/README.md index 9dcfe07..765a9ea 100644 --- a/README.md +++ b/README.md @@ -15,11 +15,12 @@ Creating a WebSocket server or a client in Rust can be troublesome. This crate f View the full documentation at [docs.rs/ezsockets](http://docs.rs/ezsockets) ## Examples -- [`simple-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client) - a simplest WebSocket client which uses stdin as input. +- [`simple-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/simple-client) - a simplest WebSocket client which uses stdin as input. - [`echo-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/echo-server) - server that echoes back every message it receives. - [`echo-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/echo-server-native-tls) - same as `echo-server`, but with `native-tls`. - [`counter-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/counter-server) - server that increments global value every second and shares it with client - [`chat-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client) - chat client for `chat-server` and `chat-server-axum` examples +- [`wasm-client`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-client-wasm) - chat client for `chat-server` and `chat-server-axum` examples that runs in the browser (only listens to the chat) - [`chat-server`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-server) - chat server with support of rooms - [`chat-server-axum`](https://github.com/gbaranski/ezsockets/tree/master/examples/chat-server-axum) - same as above, but using `axum` as a back-end diff --git a/benches/my_benchmark.rs b/benches/my_benchmark.rs index e879bd6..57b8b16 100644 --- a/benches/my_benchmark.rs +++ b/benches/my_benchmark.rs @@ -54,7 +54,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).unwrap(); ezsockets_server::run(listener).await; }); - sleep(Duration::from_millis(100)); + sleep(Duration::from_millis(1000)); let mut client = tungstenite::connect(format!("ws://127.0.0.1:{}", port)) .unwrap() .0; @@ -63,9 +63,5 @@ pub fn criterion_benchmark(c: &mut Criterion) { task.abort(); } -criterion_group! { - name = benches; - config = Criterion::default().sample_size(10); - targets = criterion_benchmark -} +criterion_group!(benches, criterion_benchmark); criterion_main!(benches); diff --git a/examples/chat-client-wasm/Cargo.toml b/examples/chat-client-wasm/Cargo.toml index eeb9b91..2742215 100644 --- a/examples/chat-client-wasm/Cargo.toml +++ b/examples/chat-client-wasm/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-client-wasm" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" console_error_panic_hook = "0.1" diff --git a/examples/chat-client-wasm/README.md b/examples/chat-client-wasm/README.md index 3c7a711..3d4712b 100644 --- a/examples/chat-client-wasm/README.md +++ b/examples/chat-client-wasm/README.md @@ -1,3 +1,7 @@ -Install [wasm-server-runner] and use `cargo run --target wasm32-unknown-unknown` from in this directory. +1. install [wasm-server-runner](https://github.com/jakobhellermann/wasm-server-runner) +2. `cd` to this directory +3. `cargo run --target wasm32-unknown-unknown` +4. paste the address provided into a new browser window +5. open the browser console to view the chat logs -This demo does not currently have a way to pass messages into the socket, but you can see the console log to read messages from native clients. +Note: This demo does not currently have a way to pass messages into the chat. diff --git a/examples/chat-client/Cargo.toml b/examples/chat-client/Cargo.toml index de2dcce..812d8d6 100644 --- a/examples/chat-client/Cargo.toml +++ b/examples/chat-client/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-client" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["client"] } diff --git a/examples/chat-server-axum/Cargo.toml b/examples/chat-server-axum/Cargo.toml index 19aad1d..ac926dc 100644 --- a/examples/chat-server-axum/Cargo.toml +++ b/examples/chat-server-axum/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-server-axum" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" axum = "0.6.1" diff --git a/examples/chat-server/Cargo.toml b/examples/chat-server/Cargo.toml index c5d216d..7110f4f 100644 --- a/examples/chat-server/Cargo.toml +++ b/examples/chat-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-chat-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/counter-server/Cargo.toml b/examples/counter-server/Cargo.toml index 87a9d94..f735c6c 100644 --- a/examples/counter-server/Cargo.toml +++ b/examples/counter-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-counter-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/echo-server-native-tls/Cargo.toml b/examples/echo-server-native-tls/Cargo.toml index 34ba1af..1883235 100644 --- a/examples/echo-server-native-tls/Cargo.toml +++ b/examples/echo-server-native-tls/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsocket-echo-server-native-tls" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite", "native-tls"] } diff --git a/examples/echo-server/Cargo.toml b/examples/echo-server/Cargo.toml index e877510..5c5f461 100644 --- a/examples/echo-server/Cargo.toml +++ b/examples/echo-server/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsocket-echo-server" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["tungstenite"] } diff --git a/examples/simple-client/Cargo.toml b/examples/simple-client/Cargo.toml index f98f00a..683c1c3 100644 --- a/examples/simple-client/Cargo.toml +++ b/examples/simple-client/Cargo.toml @@ -3,8 +3,6 @@ name = "ezsockets-simple-client" version = "0.1.0" edition = "2021" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - [dependencies] async-trait = "0.1.52" ezsockets = { path = "../../", features = ["rustls"] } From 15d1cf70633fb686aee62e48f0b8eeaa9b2db318 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 23:02:35 -0500 Subject: [PATCH 12/16] cleanup --- examples/chat-client-wasm/Cargo.toml | 6 +++--- examples/chat-client-wasm/src/main.rs | 5 +++-- examples/chat-server/src/main.rs | 2 +- src/client_connectors/client_connector_wasm.rs | 2 -- src/socket.rs | 7 +++---- 5 files changed, 10 insertions(+), 12 deletions(-) diff --git a/examples/chat-client-wasm/Cargo.toml b/examples/chat-client-wasm/Cargo.toml index 2742215..845184a 100644 --- a/examples/chat-client-wasm/Cargo.toml +++ b/examples/chat-client-wasm/Cargo.toml @@ -9,8 +9,8 @@ console_error_panic_hook = "0.1" ezsockets = { path = "../../", default-features = false, features = ["wasm_client"] } tracing = "0.1.32" tracing-subscriber = "0.3.9" -tracing-wasm = { version = "0.2" } +tracing-wasm = "0.2" url = "2.2.2" -wasm-bindgen = { version = "0.2" } -wasm-bindgen-futures = { version = "0.4" } +wasm-bindgen = "0.2" +wasm-bindgen-futures = "0.4" wasmtimer = "0.2.0" diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs index 29df354..6fb3a8c 100644 --- a/examples/chat-client-wasm/src/main.rs +++ b/examples/chat-client-wasm/src/main.rs @@ -35,10 +35,11 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { tracing_wasm::set_as_global_default(); // make client + // - note: we use a hacky custom Ping/Pong protocol to keep the client alive (see the 'chat-server' example + // for the Ping side via SessionExt::on_binary()) let config = ClientConfig::new("ws://localhost:8080/websocket").socket_config(SocketConfig { - heartbeat: Duration::from_secs(5), - timeout: Duration::from_secs(10), heartbeat_ping_msg_fn: Arc::new(|_t: Duration| RawMessage::Binary("ping".into())), + ..Default::default() }); let (client, mut handle) = ezsockets::connect_with( |_client| Client {}, diff --git a/examples/chat-server/src/main.rs b/examples/chat-server/src/main.rs index 80f4493..5973b1a 100644 --- a/examples/chat-server/src/main.rs +++ b/examples/chat-server/src/main.rs @@ -171,7 +171,7 @@ impl ezsockets::SessionExt for SessionActor { } async fn on_binary(&mut self, bytes: Vec) -> Result<(), Error> { - // echo bytes back (we use this for a hacky ping/pong protocol for the client demo) + // echo bytes back (we use this for a hacky ping/pong protocol for the wasm client demo) tracing::info!("echoing bytes: {bytes:?}"); self.session.binary("pong".as_bytes())?; Ok(()) diff --git a/src/client_connectors/client_connector_wasm.rs b/src/client_connectors/client_connector_wasm.rs index 92fb5cf..a40c545 100644 --- a/src/client_connectors/client_connector_wasm.rs +++ b/src/client_connectors/client_connector_wasm.rs @@ -159,7 +159,6 @@ impl futures_util::Sink for WebSocketStreamProx async fn wasm_client_connect( request_url: String, ) -> Result { - // connect the websocket let (result_sender, result_receiver) = async_channel::bounded(1usize); wasm_bindgen_futures::spawn_local(async move { @@ -174,6 +173,5 @@ async fn wasm_client_connect( .await .unwrap_or(Err(tokio_tungstenite_wasm::Error::ConnectionClosed))?; - // build proxy Ok(WebSocketStreamProxy { inner: websocket }) } diff --git a/src/socket.rs b/src/socket.rs index 1a5febb..affeb6c 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -334,7 +334,9 @@ where loop { futures::select! { res = self.receiver.recv().fuse() => { - let Ok(mut inmessage) = res else { break; }; + let Ok(mut inmessage) = res else { + break; + }; let Some(message) = inmessage.take_message() else { continue; }; @@ -351,9 +353,6 @@ where _ = &mut self.abort_receiver.recv().fuse() => { break; }, - complete => { - break; - } } } Ok(()) From f61f32134add12419fcd8eaca5b4913c2d8ba893 Mon Sep 17 00:00:00 2001 From: koe Date: Thu, 12 Oct 2023 23:07:53 -0500 Subject: [PATCH 13/16] lints --- examples/chat-client-wasm/README.md | 2 +- src/socket.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/chat-client-wasm/README.md b/examples/chat-client-wasm/README.md index 3d4712b..bcb242d 100644 --- a/examples/chat-client-wasm/README.md +++ b/examples/chat-client-wasm/README.md @@ -2,6 +2,6 @@ 2. `cd` to this directory 3. `cargo run --target wasm32-unknown-unknown` 4. paste the address provided into a new browser window -5. open the browser console to view the chat logs +5. open the browser window console to view the chat logs Note: This demo does not currently have a way to pass messages into the chat. diff --git a/src/socket.rs b/src/socket.rs index affeb6c..49c1c79 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -535,9 +535,9 @@ impl Socket { let (sink_result_sender, sink_result_receiver) = async_channel::bounded(1usize); handle.spawn(async move { let _ = stream_future.extract().await; - let _ = sink_abort_sender.send(()); - let _ = hearbeat_abort_sender.send(()); - let _ = sink_result_sender.send( + let _ = sink_abort_sender.send_blocking(()); + let _ = hearbeat_abort_sender.send_blocking(()); + let _ = sink_result_sender.send_blocking( sink_future .extract() .await From f6ddaadd97d170bd9bff9ff77516dca26996383c Mon Sep 17 00:00:00 2001 From: koe Date: Sat, 14 Oct 2023 13:29:40 -0500 Subject: [PATCH 14/16] compilation fixes --- Cargo.toml | 2 +- examples/chat-client-wasm/src/main.rs | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 030f336..ddef370 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ sha-1 = { version = "0.10.1", optional = true } tokio-tungstenite = { version = "0.20.0", optional = true } #tokio-tungstenite-wasm = { version = "0.2.0", optional = true } -tokio-tungstenite-wasm = { git = "https://github.com/UkoeHB/tokio-tungstenite-wasm", rev = "503cf12", optional = true } +tokio-tungstenite-wasm = { git = "https://github.com/UkoeHB/tokio-tungstenite-wasm", rev = "15e61e4", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true } diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs index 6fb3a8c..4446a02 100644 --- a/examples/chat-client-wasm/src/main.rs +++ b/examples/chat-client-wasm/src/main.rs @@ -28,6 +28,7 @@ impl ezsockets::ClientExt for Client { } } +#[cfg(target_family = "wasm")] #[wasm_bindgen::prelude::wasm_bindgen(main)] async fn main() -> Result<(), wasm_bindgen::JsValue> { // setup tracing @@ -41,7 +42,7 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { heartbeat_ping_msg_fn: Arc::new(|_t: Duration| RawMessage::Binary("ping".into())), ..Default::default() }); - let (client, mut handle) = ezsockets::connect_with( + let (_client, mut handle) = ezsockets::connect_with( |_client| Client {}, config, ezsockets::ClientConnectorWasm::default(), @@ -54,3 +55,8 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { Ok(()) } + +#[cfg(not(target_family = "wasm"))] +fn main() { + unreachable!() +} From 7237beb3659b4b46d8d89a6340698d93b01f5352 Mon Sep 17 00:00:00 2001 From: koe Date: Sun, 15 Oct 2023 13:05:18 -0500 Subject: [PATCH 15/16] comment --- examples/chat-client-wasm/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/chat-client-wasm/src/main.rs b/examples/chat-client-wasm/src/main.rs index 4446a02..5f0653a 100644 --- a/examples/chat-client-wasm/src/main.rs +++ b/examples/chat-client-wasm/src/main.rs @@ -58,5 +58,6 @@ async fn main() -> Result<(), wasm_bindgen::JsValue> { #[cfg(not(target_family = "wasm"))] fn main() { + // need per-package targets https://github.com/rust-lang/cargo/issues/9406 unreachable!() } From e12ad18b3cb56d3ce62d54f56dee663bec3291a0 Mon Sep 17 00:00:00 2001 From: koe Date: Mon, 16 Oct 2023 10:42:35 -0500 Subject: [PATCH 16/16] dep update --- Cargo.toml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ddef370..bebd819 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,8 +36,7 @@ hyper = { version = "0.14.23", optional = true } sha-1 = { version = "0.10.1", optional = true } tokio-tungstenite = { version = "0.20.0", optional = true } -#tokio-tungstenite-wasm = { version = "0.2.0", optional = true } -tokio-tungstenite-wasm = { git = "https://github.com/UkoeHB/tokio-tungstenite-wasm", rev = "15e61e4", optional = true } +tokio-tungstenite-wasm = { version = "0.2.1", optional = true } tokio-rustls = { version = "0.24.1", optional = true } tokio-native-tls = { version = "0.3.1", optional = true }