diff --git a/relay_client/src/websocket.rs b/relay_client/src/websocket.rs index 06c3008..36a7a3d 100644 --- a/relay_client/src/websocket.rs +++ b/relay_client/src/websocket.rs @@ -3,7 +3,6 @@ use tokio::spawn; #[cfg(target_arch = "wasm32")] use wasm_bindgen_futures::spawn_local as spawn; use { - self::connection::connection_event_loop, crate::{ error::{ClientError, Error}, ConnectionOptions, @@ -34,7 +33,7 @@ use { }, }; pub use { - connection::{Connection, ConnectionControl}, + connection::{connection_event_loop, Connection, ConnectionControl}, fetch::*, inbound::*, outbound::*, @@ -155,6 +154,7 @@ impl Client { T: ConnectionHandler, { let (control_tx, control_rx) = mpsc::unbounded_channel(); + let control_rx = Arc::new(control_rx.into()); spawn(connection_event_loop(control_rx, handler)); @@ -167,6 +167,7 @@ impl Client { /// Creates a new managed [`Client`] with the provided handler. pub fn new_unmanaged() -> Self { let (control_tx, control_rx) = mpsc::unbounded_channel(); + Self { control_tx, control_rx: Some(Arc::new(control_rx.into())), diff --git a/relay_client/src/websocket/connection.rs b/relay_client/src/websocket/connection.rs index 20a4d82..d3aed3a 100644 --- a/relay_client/src/websocket/connection.rs +++ b/relay_client/src/websocket/connection.rs @@ -13,10 +13,11 @@ use { }, futures_util::{stream::FusedStream, Stream, StreamExt}, std::{ + sync::Arc, pin::Pin, task::{Context, Poll}, }, - tokio::sync::{mpsc::UnboundedReceiver, oneshot}, + tokio::sync::{mpsc::UnboundedReceiver,Mutex, oneshot}, }; pub enum ConnectionControl { @@ -32,12 +33,13 @@ pub enum ConnectionControl { OutboundRequest(OutboundRequest), } -pub(super) async fn connection_event_loop( - mut control_rx: UnboundedReceiver, +pub async fn connection_event_loop( + control_rx: Arc>>, mut handler: T, ) where T: ConnectionHandler, { + let mut control_rx = control_rx.lock().await; let mut conn = Connection::new(); loop {