diff --git a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs index c774f063d..5f7d5f69d 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs @@ -329,7 +329,7 @@ async fn run( call_info: call_info.clone(), }; - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { log::error!("failed to send signal: {e}"); } } @@ -395,7 +395,7 @@ async fn run( let signal = CallSignal::Join; if let Err(e) = gossipsub_sender - .send_signal_aes(call_info.group_key(), signal, topic).await + .send_signal_aes(call_info.group_key(), signal, topic) { let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string()))); } else { @@ -428,6 +428,7 @@ async fn run( let info = call_data_map.get_call_info(call_id); if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { call_data_map.leave_call(call_id); + let _ = gossipsub_sender.empty_queue(); let _ = active_call.take(); let _ = webrtc_controller.deinit().await; host_media::reset().await; @@ -441,7 +442,7 @@ async fn run( let topic = ipfs_routes::call_signal_route(&call_id); let signal = CallSignal::Leave; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic).await + .send_signal_aes(info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } @@ -459,7 +460,7 @@ async fn run( let signal = CallSignal::Muted; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic).await + .send_signal_aes(data.info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } else { @@ -475,7 +476,7 @@ async fn run( let signal = CallSignal::Unmuted; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic).await + .send_signal_aes(data.info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } else { @@ -494,7 +495,7 @@ async fn run( let signal = CallSignal::Deafened; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic).await + .send_signal_aes(data.info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } @@ -511,7 +512,7 @@ async fn run( let signal = CallSignal::Undeafened; if let Err(e) = gossipsub_sender - .send_signal_aes(data.info.group_key(), signal, topic).await + .send_signal_aes(data.info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } @@ -553,7 +554,7 @@ async fn run( let signal = CallSignal::Recording; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic).await + .send_signal_aes(info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } @@ -581,7 +582,7 @@ async fn run( let signal = CallSignal::NotRecording; if let Err(e) = gossipsub_sender - .send_signal_aes(info.group_key(), signal, topic).await + .send_signal_aes(info.group_key(), signal, topic) { log::error!("failed to send signal: {e}"); } @@ -749,7 +750,7 @@ async fn run( simple_webrtc::events::EmittedEvents::Ice { dest, candidate } => { let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Ice(*candidate); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { log::error!("failed to send signal: {e}"); } }, @@ -789,13 +790,14 @@ async fn run( gossipsub_listener.unsubscribe_call(ac); gossipsub_listener.unsubscribe_webrtc(ac); + let _ = gossipsub_sender.empty_queue(); } } }, simple_webrtc::events::EmittedEvents::Sdp { dest, sdp } => { let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Sdp(*sdp); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { log::error!("failed to send signal: {e}"); } }, @@ -803,7 +805,7 @@ async fn run( log::debug!("sending dial signal"); let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default()); let signal = PeerSignal::Dial(*sdp); - if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await { + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { log::error!("failed to send signal: {e}"); } }, diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs index 9aa5323cc..98850a1fe 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs @@ -1,4 +1,9 @@ -use std::{fmt::Display, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + sync::Arc, + time::Duration, +}; use futures::channel::oneshot; use rust_ipfs::Ipfs; @@ -24,13 +29,11 @@ enum GossipSubCmd { group_key: Vec, signal: Vec, topic: String, - rsp: oneshot::Sender>, }, SendEcdh { dest: DID, signal: Vec, topic: String, - rsp: oneshot::Sender>, }, DecodeEcdh { src: DID, @@ -40,6 +43,7 @@ enum GossipSubCmd { GetOwnId { rsp: oneshot::Sender, }, + EmptyQueue, } #[derive(Clone)] @@ -72,39 +76,33 @@ impl GossipSubSender { Ok(id) } - pub async fn send_signal_aes( + pub fn send_signal_aes( &self, group_key: Vec, signal: T, topic: String, ) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); let signal = serde_cbor::to_vec(&signal)?; self.ch.send(GossipSubCmd::SendAes { group_key, signal, topic, - rsp: tx, })?; - rx.await??; Ok(()) } - pub async fn send_signal_ecdh( + pub fn send_signal_ecdh( &self, dest: DID, signal: T, topic: String, ) -> anyhow::Result<()> { - let (tx, rx) = oneshot::channel(); let signal = serde_cbor::to_vec(&signal)?; self.ch.send(GossipSubCmd::SendEcdh { dest, signal, topic, - rsp: tx, })?; - rx.await??; Ok(()) } @@ -134,6 +132,11 @@ impl GossipSubSender { let data: T = serde_cbor::from_slice(&bytes)?; Ok(data) } + + pub fn empty_queue(&self) -> anyhow::Result<()> { + self.ch.send(GossipSubCmd::EmptyQueue)?; + Ok(()) + } } async fn run( @@ -175,44 +178,56 @@ async fn run( } }; + let mut ecdh_queue: HashMap> = HashMap::new(); + loop { tokio::select! { + _ = timer.tick() => { + for (_dest, queue) in ecdh_queue.iter_mut() { + while let Some(cmd) = queue.pop_front() { + match cmd { + GossipSubCmd::SendEcdh { dest, signal, topic } => { + let encrypted = match ecdh_encrypt(&own_id, &dest, signal.clone()) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt ecdh message: {e}"); + break; + } + }; + if ipfs.pubsub_publish(topic.clone(), encrypted).await.is_err() { + queue.push_front(GossipSubCmd::SendEcdh { dest, signal, topic }); + break; + } + } + _ => {} + } + } + } + } opt = ch.recv() => match opt { Some(cmd) => match cmd { + GossipSubCmd::EmptyQueue => { + ecdh_queue.clear(); + } GossipSubCmd::GetOwnId { rsp } => { let _ = rsp.send(own_id.clone()); } - GossipSubCmd::SendAes { group_key, signal, topic, rsp } => { + GossipSubCmd::SendAes { group_key, signal, topic } => { let encrypted = match Cipher::direct_encrypt(&signal, &group_key) { Ok(r) => r, Err(e) => { log::error!("failed to encrypt aes message: {e}"); - let _ = rsp.send(Err(anyhow::anyhow!(e))); continue; } }; if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { log::error!("failed to publish message: {e}"); - let _ = rsp.send(Err(anyhow::anyhow!(e))); - } else { - let _ = rsp.send(Ok(())); + } }, - GossipSubCmd::SendEcdh { dest, signal, topic, rsp } => { - let encrypted = match ecdh_encrypt(&own_id, &dest, signal) { - Ok(r) => r, - Err(e) => { - log::error!("failed to encrypt ecdh message: {e}"); - let _ = rsp.send(Err(anyhow::anyhow!(e))); - continue; - } - }; - if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { - log::error!("failed to publish message: {e}"); - let _ = rsp.send(Err(anyhow::anyhow!(e))); - }else { - let _ = rsp.send(Ok(())); - } + GossipSubCmd::SendEcdh { dest, signal, topic } => { + let queue = ecdh_queue.entry(dest.clone()).or_insert(VecDeque::new()); + queue.push_back(GossipSubCmd::SendEcdh { dest, signal, topic }); } GossipSubCmd::DecodeEcdh { src, data, rsp } => { let r = || { diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 56a77d2ff..75df5fced 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -3,7 +3,6 @@ mod data; mod blink_controller; mod gossipsub_listener; mod gossipsub_sender; -mod sender_queue; mod signaling; mod store; diff --git a/extensions/warp-blink-wrtc/src/blink_impl/sender_queue.rs b/extensions/warp-blink-wrtc/src/blink_impl/sender_queue.rs deleted file mode 100644 index e69de29bb..000000000