From 2d2fb43ed0112dcaafc9693a1f348040ea9d33ba Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Wed, 8 Nov 2023 20:45:27 -0500 Subject: [PATCH] make gossipsub listener's send signal functions async --- .../src/blink_impl/blink_controller.rs | 47 +++++++------------ .../src/blink_impl/gossipsub_sender.rs | 26 +++++++--- .../warp-blink-wrtc/src/blink_impl/mod.rs | 1 + .../src/blink_impl/sender_queue.rs | 0 4 files changed, 39 insertions(+), 35 deletions(-) create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/sender_queue.rs diff --git a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs index e03c7c9e7..c774f063d 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs @@ -247,25 +247,14 @@ async fn run( mut cmd_rx: UnboundedReceiver, notify: Arc, ) { - let own_id = { - let notify2 = notify.clone(); - let fut = gossipsub_sender.get_own_id(); - tokio::select! { - _ = notify2.notified() => { - log::debug!("quitting blink event handler"); - return; - } - r = fut => { - match r { - Ok(r) => r, - Err(e) => { - log::debug!("failed to get own id. quitting blink event handler: {e}"); - return; - } - } - } + let own_id = match gossipsub_sender.get_own_id().await { + Ok(r) => r, + Err(e) => { + log::error!("failed to get own id. quitting blnk controller"); + return; } }; + // prevent accidental moves let own_id = &own_id; @@ -340,7 +329,7 @@ async fn run( call_info: call_info.clone(), }; - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { log::error!("failed to send signal: {e}"); } } @@ -406,7 +395,7 @@ async fn run( let signal = CallSignal::Join; if let Err(e) = gossipsub_sender - .send_signal_aes(call_info.group_key(), signal, topic) + .send_signal_aes(call_info.group_key(), signal, topic).await { let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string()))); } else { @@ -452,7 +441,7 @@ async fn run( let topic = ipfs_routes::call_signal_route(&call_id); let signal = CallSignal::Leave; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic) + .send_signal_aes(info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } @@ -470,7 +459,7 @@ async fn run( let signal = CallSignal::Muted; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic) + .send_signal_aes(data.info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } else { @@ -486,7 +475,7 @@ async fn run( let signal = CallSignal::Unmuted; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic) + .send_signal_aes(data.info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } else { @@ -505,7 +494,7 @@ async fn run( let signal = CallSignal::Deafened; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic) + .send_signal_aes(data.info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } @@ -522,7 +511,7 @@ async fn run( let signal = CallSignal::Undeafened; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic) + .send_signal_aes(data.info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } @@ -564,7 +553,7 @@ async fn run( let signal = CallSignal::Recording; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic) + .send_signal_aes(info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } @@ -592,7 +581,7 @@ async fn run( let signal = CallSignal::NotRecording; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic) + .send_signal_aes(info.group_key(), signal, topic).await { log::error!("failed to send signal: {e}"); } @@ -760,7 +749,7 @@ async fn run( simple_webrtc::events::EmittedEvents::Ice { dest, candidate } => { let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Ice(*candidate); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { log::error!("failed to send signal: {e}"); } }, @@ -806,7 +795,7 @@ async fn run( simple_webrtc::events::EmittedEvents::Sdp { dest, sdp } => { let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Sdp(*sdp); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { log::error!("failed to send signal: {e}"); } }, @@ -814,7 +803,7 @@ async fn run( log::debug!("sending dial signal"); let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Dial(*sdp); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { log::error!("failed to send signal: {e}"); } }, diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs index b54fda7c2..9aa5323cc 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs @@ -24,11 +24,13 @@ enum GossipSubCmd { group_key: Vec, signal: Vec, topic: String, + rsp: oneshot::Sender>, }, SendEcdh { dest: DID, signal: Vec, topic: String, + rsp: oneshot::Sender>, }, DecodeEcdh { src: DID, @@ -70,35 +72,39 @@ impl GossipSubSender { Ok(id) } - pub fn send_signal_aes( + pub async fn send_signal_aes( &self, group_key: Vec, signal: T, topic: String, ) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); let signal = serde_cbor::to_vec(&signal)?; self.ch.send(GossipSubCmd::SendAes { group_key, signal, topic, + rsp: tx, })?; - + rx.await??; Ok(()) } - pub fn send_signal_ecdh( + pub async fn send_signal_ecdh( &self, dest: DID, signal: T, topic: String, ) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); let signal = serde_cbor::to_vec(&signal)?; self.ch.send(GossipSubCmd::SendEcdh { dest, signal, topic, + rsp: tx, })?; - + rx.await??; Ok(()) } @@ -176,28 +182,36 @@ async fn run( GossipSubCmd::GetOwnId { rsp } => { let _ = rsp.send(own_id.clone()); } - GossipSubCmd::SendAes { group_key, signal, topic } => { + GossipSubCmd::SendAes { group_key, signal, topic, rsp } => { let encrypted = match Cipher::direct_encrypt(&signal, &group_key) { Ok(r) => r, Err(e) => { log::error!("failed to encrypt aes message: {e}"); + let _ = rsp.send(Err(anyhow::anyhow!(e))); continue; } }; if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { log::error!("failed to publish message: {e}"); + let _ = rsp.send(Err(anyhow::anyhow!(e))); + } else { + let _ = rsp.send(Ok(())); } }, - GossipSubCmd::SendEcdh { dest, signal, topic } => { + GossipSubCmd::SendEcdh { dest, signal, topic, rsp } => { let encrypted = match ecdh_encrypt(&own_id, &dest, signal) { Ok(r) => r, Err(e) => { log::error!("failed to encrypt ecdh message: {e}"); + let _ = rsp.send(Err(anyhow::anyhow!(e))); continue; } }; if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { log::error!("failed to publish message: {e}"); + let _ = rsp.send(Err(anyhow::anyhow!(e))); + }else { + let _ = rsp.send(Ok(())); } } GossipSubCmd::DecodeEcdh { src, data, rsp } => { diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 75df5fced..56a77d2ff 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -3,6 +3,7 @@ mod data; mod blink_controller; mod gossipsub_listener; mod gossipsub_sender; +mod sender_queue; mod signaling; mod store; diff --git a/extensions/warp-blink-wrtc/src/blink_impl/sender_queue.rs b/extensions/warp-blink-wrtc/src/blink_impl/sender_queue.rs new file mode 100644 index 000000000..e69de29bb