diff --git a/Cargo.lock b/Cargo.lock index 1330281..1797c83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -319,6 +319,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener 2.5.3", +] + [[package]] name = "async-once-cell" version = "0.5.3" @@ -1182,9 +1191,11 @@ dependencies = [ name = "ewebsock" version = "0.5.0" dependencies = [ + "async-mutex", "async-stream", "document-features", "futures", + "futures-channel", "futures-util", "js-sys", "log", @@ -2453,16 +2464,17 @@ checksum = "216080ab382b992234dda86873c18d4c48358f5cfcb70fd693d7f6f2131b628b" [[package]] name = "ring" -version = "0.17.7" +version = "0.17.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74" +checksum = "c17fa4cb658e3583423e915b9f3acc01cceaee1860e33d59ebae66adc3a2dc0d" dependencies = [ "cc", + "cfg-if", "getrandom", "libc", "spin", "untrusted", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -2520,9 +2532,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "048a63e5b3ac996d78d402940b5fa47973d2d080c6c6fffa1d0f19c4445310b7" +checksum = "5ede67b28608b4c60685c7d54122d4400d90f62b40caee7700e700380a390fa8" [[package]] name = "rustls-webpki" diff --git a/ewebsock/Cargo.toml b/ewebsock/Cargo.toml index 6db4041..34d93ef 100644 --- a/ewebsock/Cargo.toml +++ b/ewebsock/Cargo.toml @@ -38,8 +38,11 @@ tokio = [ [dependencies] +async-mutex = "1.4.0" document-features = "0.2" +futures-channel = "0.3.30" log = "0.4" +futures-util = { version = "0.3.21", default-features = false, features = ["sink"] } # native: [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -60,6 +63,7 @@ tokio-tungstenite = { version = ">=0.17, <=0.21", optional = true } wasm-bindgen = "0.2" js-sys = "0.3" wasm-bindgen-futures = "0.4" +futures = "0.3" [dependencies.web-sys] version = "0.3" diff --git a/ewebsock/src/lib.rs b/ewebsock/src/lib.rs index ae588eb..4155b69 100644 --- a/ewebsock/src/lib.rs +++ b/ewebsock/src/lib.rs @@ -13,9 +13,9 @@ //! ## Feature flags #![doc = document_features::document_features!()] //! +//! #![warn(missing_docs)] // let's keep ewebsock well-documented - #[cfg(not(target_arch = "wasm32"))] #[cfg(not(feature = "tokio"))] mod native_tungstenite; @@ -41,6 +41,10 @@ mod web; #[cfg(target_arch = "wasm32")] pub use web::*; +use futures_channel::mpsc::{ Receiver, Sender, channel}; +use async_mutex::Mutex; +use futures_util::stream::StreamExt; + // ---------------------------------------------------------------------------- /// A web-socket message. @@ -81,7 +85,7 @@ pub enum WsEvent { /// Receiver for incoming [`WsEvent`]s. pub struct WsReceiver { - rx: std::sync::mpsc::Receiver, + pub rx: Mutex>, } impl WsReceiver { @@ -94,22 +98,29 @@ impl WsReceiver { /// /// This can be used to wake up the UI thread. pub fn new_with_callback(wake_up: impl Fn() + Send + Sync + 'static) -> (Self, EventHandler) { - let (tx, rx) = std::sync::mpsc::channel(); + let (mut tx, rx) = channel(0); + let on_event = Box::new(move |event| { wake_up(); // wake up UI thread - if tx.send(event).is_ok() { + if tx.try_send(event).is_ok() { std::ops::ControlFlow::Continue(()) } else { std::ops::ControlFlow::Break(()) } }); - let ws_receiver = WsReceiver { rx }; + + let ws_receiver = WsReceiver { rx: Mutex::new(rx) }; (ws_receiver, on_event) } /// Try receiving a new event without blocking. - pub fn try_recv(&self) -> Option { - self.rx.try_recv().ok() + pub async fn try_recv(&self) -> Option { + self.rx.lock().await.try_next().ok().flatten() + } + + /// Get next message + pub async fn next(&mut self) -> std::option::Option { + self.rx.lock().await.next().await } } @@ -119,7 +130,7 @@ pub type Error = String; /// Short for `Result`. pub type Result = std::result::Result; -pub(crate) type EventHandler = Box std::ops::ControlFlow<()>>; +pub(crate) type EventHandler = Box std::ops::ControlFlow<()>>; /// Options for a connection. #[derive(Clone, Copy, Debug, PartialEq, Eq)] diff --git a/ewebsock/src/native_tungstenite.rs b/ewebsock/src/native_tungstenite.rs index e2e38cb..ea7f7cc 100644 --- a/ewebsock/src/native_tungstenite.rs +++ b/ewebsock/src/native_tungstenite.rs @@ -47,11 +47,15 @@ impl WsSender { } } -pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> { +pub(crate) fn ws_receive_impl( + url: String, + options: Options, + mut on_event: EventHandler, +) -> Result<()> { std::thread::Builder::new() .name("ewebsock".to_owned()) .spawn(move || { - if let Err(err) = ws_receiver_blocking(&url, options, &on_event) { + if let Err(err) = ws_receiver_blocking(&url, options, &mut on_event) { on_event(WsEvent::Error(err)); } else { log::debug!("WebSocket connection closed."); @@ -68,7 +72,11 @@ pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHand /// /// # Errors /// All errors are returned to the caller, and NOT reported via `on_event`. -pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler) -> Result<()> { +pub fn ws_receiver_blocking( + url: &str, + options: Options, + on_event: &mut EventHandler, +) -> Result<()> { let config = tungstenite::protocol::WebSocketConfig::from(options); let max_redirects = 3; // tungstenite default @@ -122,14 +130,14 @@ pub fn ws_receiver_blocking(url: &str, options: Options, on_event: &EventHandler pub(crate) fn ws_connect_impl( url: String, options: Options, - on_event: EventHandler, + mut on_event: EventHandler, ) -> Result { let (tx, rx) = std::sync::mpsc::channel(); std::thread::Builder::new() .name("ewebsock".to_owned()) .spawn(move || { - if let Err(err) = ws_connect_blocking(&url, options, &on_event, &rx) { + if let Err(err) = ws_connect_blocking(&url, options, &mut on_event, &rx) { on_event(WsEvent::Error(err)); } else { log::debug!("WebSocket connection closed."); @@ -149,7 +157,7 @@ pub(crate) fn ws_connect_impl( pub fn ws_connect_blocking( url: &str, options: Options, - on_event: &EventHandler, + on_event: &mut EventHandler, rx: &Receiver, ) -> Result<()> { let config = tungstenite::protocol::WebSocketConfig::from(options); diff --git a/ewebsock/src/native_tungstenite_tokio.rs b/ewebsock/src/native_tungstenite_tokio.rs index 63906f7..e4609ef 100644 --- a/ewebsock/src/native_tungstenite_tokio.rs +++ b/ewebsock/src/native_tungstenite_tokio.rs @@ -47,7 +47,7 @@ async fn ws_connect_async( url: String, options: Options, outgoing_messages_stream: impl futures::Stream, - on_event: EventHandler, + mut on_event: EventHandler, ) { use futures::StreamExt as _; @@ -118,13 +118,13 @@ async fn ws_connect_async( pub(crate) fn ws_connect_impl( url: String, options: Options, - on_event: EventHandler, + mut on_event: EventHandler, ) -> Result { Ok(ws_connect_native(url, options, on_event)) } /// Like [`ws_connect`], but cannot fail. Only available on native builds. -fn ws_connect_native(url: String, options: Options, on_event: EventHandler) -> WsSender { +fn ws_connect_native(url: String, options: Options, mut on_event: EventHandler) -> WsSender { let (tx, mut rx) = tokio::sync::mpsc::channel(1000); let outgoing_messages_stream = async_stream::stream! { diff --git a/ewebsock/src/web.rs b/ewebsock/src/web.rs index ae9a81b..5853ee4 100644 --- a/ewebsock/src/web.rs +++ b/ewebsock/src/web.rs @@ -1,5 +1,8 @@ use crate::{EventHandler, Options, Result, WsEvent, WsMessage}; +use async_mutex::Mutex; +use wasm_bindgen_futures::spawn_local; + #[allow(clippy::needless_pass_by_value)] fn string_from_js_value(s: wasm_bindgen::JsValue) -> String { s.as_string().unwrap_or(format!("{:#?}", s)) @@ -63,14 +66,18 @@ impl WsSender { } } -pub(crate) fn ws_receive_impl(url: String, options: Options, on_event: EventHandler) -> Result<()> { +pub(crate) fn ws_receive_impl( + url: String, + options: Options, + mut on_event: EventHandler, +) -> Result<()> { ws_connect_impl(url, options, on_event).map(|sender| sender.forget()) } pub(crate) fn ws_connect_impl( url: String, _ignored_options: Options, - on_event: EventHandler, + mut on_event: EventHandler, ) -> Result { // Based on https://rustwasm.github.io/wasm-bindgen/examples/websockets.html @@ -84,43 +91,57 @@ pub(crate) fn ws_connect_impl( ws.set_binary_type(web_sys::BinaryType::Arraybuffer); // Allow it to be shared by the different callbacks: - let on_event: std::rc::Rc std::ops::ControlFlow<()>> = - on_event.into(); + let on_event = std::rc::Rc::new(Mutex::new(on_event)); // onmessage callback { let on_event = on_event.clone(); let onmessage_callback = Closure::wrap(Box::new(move |e: web_sys::MessageEvent| { + let on_event = on_event.clone(); // Handle difference Text/Binary,... if let Ok(abuf) = e.data().dyn_into::() { let array = js_sys::Uint8Array::new(&abuf); - on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))); + spawn_local(async move { + on_event.lock().await(WsEvent::Message(WsMessage::Binary(array.to_vec()))); + }); } else if let Ok(blob) = e.data().dyn_into::() { // better alternative to juggling with FileReader is to use https://crates.io/crates/gloo-file - let file_reader = web_sys::FileReader::new().expect("Failed to create FileReader"); + let file_reader = + web_sys::FileReader::new().expect("Failed to create FileReader"); let file_reader_clone = file_reader.clone(); // create onLoadEnd callback - let on_event = on_event.clone(); + let onloadend_cb = Closure::wrap(Box::new(move |_e: web_sys::ProgressEvent| { let array = js_sys::Uint8Array::new(&file_reader_clone.result().unwrap()); - on_event(WsEvent::Message(WsMessage::Binary(array.to_vec()))); + let on_event = on_event.clone(); + spawn_local(async move { + on_event.lock().await(WsEvent::Message(WsMessage::Binary( + array.to_vec(), + ))); + }); }) as Box); + file_reader.set_onloadend(Some(onloadend_cb.as_ref().unchecked_ref())); file_reader .read_as_array_buffer(&blob) .expect("blob not readable"); onloadend_cb.forget(); } else if let Ok(txt) = e.data().dyn_into::() { - on_event(WsEvent::Message(WsMessage::Text(string_from_js_string( - txt, - )))); + spawn_local(async move { + on_event.lock().await(WsEvent::Message(WsMessage::Text( + string_from_js_string(txt), + ))); + }); } else { log::debug!("Unknown websocket message received: {:?}", e.data()); - on_event(WsEvent::Message(WsMessage::Unknown(string_from_js_value( - e.data(), - )))); + spawn_local(async move { + on_event.lock().await(WsEvent::Message(WsMessage::Unknown( + string_from_js_value(e.data()), + ))); + }); } + }) as Box); // set message event handler on WebSocket @@ -131,15 +152,21 @@ pub(crate) fn ws_connect_impl( } { + + // let on_event_cb = &on_event.clone(); let on_event = on_event.clone(); let onerror_callback = Closure::wrap(Box::new(move |error_event: web_sys::ErrorEvent| { - log::error!( - "error event: {}: {:?}", - error_event.message(), - error_event.error() - ); - on_event(WsEvent::Error(error_event.message())); + let on_event = on_event.clone(); + spawn_local(async move { + log::error!( + "error event: {}: {:?}", + error_event.message(), + error_event.error() + ); + on_event.clone().lock().await(WsEvent::Error(error_event.message())); + }); }) as Box); + ws.set_onerror(Some(onerror_callback.as_ref().unchecked_ref())); onerror_callback.forget(); } @@ -147,16 +174,24 @@ pub(crate) fn ws_connect_impl( { let on_event = on_event.clone(); let onopen_callback = Closure::wrap(Box::new(move |_| { - on_event(WsEvent::Opened); + let on_event = on_event.clone(); + spawn_local(async move { + on_event.lock().await(WsEvent::Opened); + }); }) as Box); ws.set_onopen(Some(onopen_callback.as_ref().unchecked_ref())); onopen_callback.forget(); } { + let on_event = on_event.clone(); let onclose_callback = Closure::wrap(Box::new(move |_| { - on_event(WsEvent::Closed); + let on_event = on_event.clone(); + spawn_local(async move { + on_event.lock().await(WsEvent::Closed); + }); }) as Box); + ws.set_onclose(Some(onclose_callback.as_ref().unchecked_ref())); onclose_callback.forget(); } diff --git a/example_app/src/app.rs b/example_app/src/app.rs index 6ce5926..35d950b 100644 --- a/example_app/src/app.rs +++ b/example_app/src/app.rs @@ -93,8 +93,8 @@ impl FrontEnd { } } - fn ui(&mut self, ctx: &egui::Context) { - while let Some(event) = self.ws_receiver.try_recv() { + async fn ui(&mut self, ctx: &egui::Context) { + while let Some(event) = self.ws_receiver.try_recv().await { self.events.push(event); }