From 99325816d2ed17c7f0adbfec636e3c921ede2404 Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Fri, 3 Nov 2023 14:57:53 -0400 Subject: [PATCH 1/4] add retry when dialing someone --- .../warp-blink-wrtc/src/blink_impl/mod.rs | 71 +++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 46ae58460..8e93da124 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -5,6 +5,7 @@ mod data; use data::*; mod webrtc_handler; +use tokio::sync::Notify; use webrtc_handler::run as handle_webrtc; use webrtc_handler::WebRtcHandlerParams; @@ -63,6 +64,8 @@ pub struct BlinkImpl { // handles 3 streams: one for webrtc events and two IPFS topics // pertains to the active_call, which is stored in STATIC_DATA webrtc_handler: Arc>>>, + // call initiation signals frequently fail. this will retry while the call is active + notify: Arc, // prevents the UI from running multiple tests simultaneously audio_device_config: Arc>, @@ -138,6 +141,7 @@ impl BlinkImpl { pending_calls: Arc::new(RwLock::new(HashMap::new())), active_call: Arc::new(RwLock::new(None)), webrtc_controller: Arc::new(RwLock::new(simple_webrtc::Controller::new()?)), + notify: Arc::new(Notify::new()), own_id: Arc::new(RwLock::new(None)), ui_event_ch, audio_source_config: Arc::new(RwLock::new(source_config)), @@ -414,6 +418,12 @@ impl BlinkImpl { Ok(()) } + + fn reinit_notify(&mut self) { + self.notify.notify_waiters(); + let new_notify = Arc::new(Notify::new()); + let _to_drop = std::mem::replace(&mut self.notify, new_notify); + } } impl Extension for BlinkImpl { @@ -469,6 +479,7 @@ impl Blink for BlinkImpl { mut participants: Vec, ) -> Result { self.ensure_call_not_in_progress().await?; + self.reinit_notify(); let ipfs = self.get_ipfs().await?; // need to drop lock to self.own_id before calling self.init_call @@ -483,27 +494,58 @@ impl Blink for BlinkImpl { let call_info = CallInfo::new(conversation_id, participants.clone()); self.init_call(call_info.clone()).await?; - let lock = self.own_id.read().await; - let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - - for dest in participants { - if dest == *own_id { - continue; - } - let topic = ipfs_routes::call_initiation_route(&dest); - let signal = InitiationSignal::Offer { - call_info: call_info.clone(), + let call_id = call_info.call_id(); + let notify = self.notify.clone(); + let own_id = self.own_id.clone(); + tokio::task::spawn(async move { + let handle_signals = async { + loop { + let mut new_participants = vec![]; + while let Some(dest) = participants.pop() { + let lock = own_id.read().await; + let own_id = match lock.as_ref() { + Some(r) => r, + None => { + new_participants.push(dest); + continue; + } + }; + + let topic = ipfs_routes::call_initiation_route(&dest); + let signal = InitiationSignal::Offer { + call_info: call_info.clone(), + }; + + if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await + { + log::error!("failed to send signal: {e}"); + new_participants.push(dest); + continue; + } + } + participants = new_participants; + if participants.is_empty() { + break; + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } }; - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); + tokio::select! { + _ = handle_signals => { + log::debug!("all signals sent successfully"); + }, + _ = notify.notified() => {} } - } - Ok(call_info.call_id()) + }); + + Ok(call_id) } /// accept/join a call. Automatically send and receive audio async fn answer_call(&mut self, call_id: Uuid) -> Result<(), Error> { self.ensure_call_not_in_progress().await?; + self.reinit_notify(); let call = match self.pending_calls.write().await.remove(&call_id) { Some(r) => r.call, None => { @@ -541,6 +583,7 @@ impl Blink for BlinkImpl { } /// end/leave the current call async fn leave_call(&mut self) -> Result<(), Error> { + self.reinit_notify(); let lock = self.own_id.read().await; let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; let ipfs = self.get_ipfs().await?; From e4fbfecf29644c1b7c7c5055e75ad8b02e3adaea Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Fri, 3 Nov 2023 15:10:26 -0400 Subject: [PATCH 2/4] don't send signal to self --- extensions/warp-blink-wrtc/src/blink_impl/mod.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 8e93da124..d4622ce81 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -511,6 +511,10 @@ impl Blink for BlinkImpl { } }; + if dest == *own_id { + continue; + } + let topic = ipfs_routes::call_initiation_route(&dest); let signal = InitiationSignal::Offer { call_info: call_info.clone(), @@ -518,7 +522,7 @@ impl Blink for BlinkImpl { if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); + log::error!("failed to send offer signal: {e}"); new_participants.push(dest); continue; } @@ -536,7 +540,9 @@ impl Blink for BlinkImpl { _ = handle_signals => { log::debug!("all signals sent successfully"); }, - _ = notify.notified() => {} + _ = notify.notified() => { + log::debug!("call retry task successfully cancelled"); + } } }); From 0fd4dfea89245e0777c4539393e2d3d340254e92 Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Fri, 3 Nov 2023 17:37:00 -0400 Subject: [PATCH 3/4] add retry for join signal and remove active_call when hanging up --- .../warp-blink-wrtc/src/blink_impl/mod.rs | 152 +++++++++++++----- .../src/blink_impl/webrtc_handler.rs | 5 + 2 files changed, 120 insertions(+), 37 deletions(-) diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index d4622ce81..4e1a91d66 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -5,7 +5,10 @@ mod data; use data::*; mod webrtc_handler; +use futures::stream::futures_unordered; +use futures::StreamExt; use tokio::sync::Notify; +use warp::multipass::identity::IdentityStatus; use webrtc_handler::run as handle_webrtc; use webrtc_handler::WebRtcHandlerParams; @@ -69,6 +72,9 @@ pub struct BlinkImpl { // prevents the UI from running multiple tests simultaneously audio_device_config: Arc>, + + // used to determine if someone is online + multipass: Box, } impl Drop for BlinkImpl { @@ -152,6 +158,7 @@ impl BlinkImpl { selected_speaker, selected_microphone, ))), + multipass: account.clone(), }; let ipfs = blink_impl.ipfs.clone(); @@ -497,51 +504,69 @@ impl Blink for BlinkImpl { let call_id = call_info.call_id(); let notify = self.notify.clone(); let own_id = self.own_id.clone(); + let multipass = self.multipass.clone(); tokio::task::spawn(async move { let handle_signals = async { - loop { - let mut new_participants = vec![]; - while let Some(dest) = participants.pop() { + 'OFFER_CALL_OUTER: loop { + // the inner loop is used as a scope to let the lock on own_id be dropped before sleeping. + // it's basically a goto. + #[allow(clippy::never_loop)] + 'OFFER_CALL_INNER: loop { let lock = own_id.read().await; let own_id = match lock.as_ref() { Some(r) => r, None => { + // sleep and retry + break 'OFFER_CALL_INNER; + } + }; + let mut new_participants = vec![]; + while let Some(dest) = participants.pop() { + // if not online and available, don't try to send the signal + if multipass + .identity_status(&dest) + .await + .map(|x| matches!(x, IdentityStatus::Online)) + .unwrap_or_default() + { new_participants.push(dest); continue; } - }; - if dest == *own_id { - continue; - } + if dest == *own_id { + continue; + } - let topic = ipfs_routes::call_initiation_route(&dest); - let signal = InitiationSignal::Offer { - call_info: call_info.clone(), - }; + let topic = ipfs_routes::call_initiation_route(&dest); + let signal = InitiationSignal::Offer { + call_info: call_info.clone(), + }; - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await - { - log::error!("failed to send offer signal: {e}"); - new_participants.push(dest); - continue; + if let Err(e) = + send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await + { + log::error!("failed to send offer signal: {e}"); + new_participants.push(dest); + continue; + } } + participants = new_participants; + if participants.is_empty() { + break 'OFFER_CALL_OUTER; + } + // the label isn't needed but helps with readability + break 'OFFER_CALL_INNER; } - participants = new_participants; - if participants.is_empty() { - break; - } else { - tokio::time::sleep(Duration::from_secs(1)).await; - } + tokio::time::sleep(Duration::from_secs(2)).await; } }; tokio::select! { _ = handle_signals => { - log::debug!("all signals sent successfully"); + log::debug!("all offers were sent successfully"); }, _ = notify.notified() => { - log::debug!("call retry task successfully cancelled"); + log::debug!("offer retry task was successfully cancelled"); } } }); @@ -562,14 +587,70 @@ impl Blink for BlinkImpl { }; self.init_call(call.clone()).await?; - let call_id = call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Join { call_id }; + let active_call = self.active_call.clone(); + let participants = call.participants(); let ipfs = self.get_ipfs().await?; - send_signal_aes(&ipfs, &call.group_key(), signal, topic) - .await - .map_err(|e| Error::FailedToSendSignal(e.to_string())) + let call_id = call.call_id(); + let notify = self.notify.clone(); + let multipass = self.multipass.clone(); + tokio::task::spawn(async move { + let handle_signals = async { + loop { + // this scope is to make sure the locks drop + { + let connected_participants: Vec = active_call + .read() + .await + .as_ref() + .map(|x| x.connected_participants.keys().cloned().collect()) + .unwrap_or_default(); + + let to_connect: Vec<_> = participants + .iter() + .filter(|x| !connected_participants.contains(x)) + .collect(); + + // if the other participants, who aren't yet connected to you, aren't online, don't bother sending signals + let futures = to_connect.iter().map(|x| async { + let status = multipass.identity_status(x).await; + // note that if they joined the call, they may appear as busy. + matches!(status, Ok(IdentityStatus::Online | IdentityStatus::Busy)) + }); + let fut = futures_unordered::FuturesUnordered::from_iter(futures); + let vals: Vec = fut.collect().await; + if vals.iter().any(|x| *x) { + // participants contains your own id. + if to_connect.len() > 1 { + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Join { call_id }; + + if let Err(e) = + send_signal_aes(&ipfs, &call.group_key(), signal, topic).await + { + log::error!("failed to send join signal: {e}"); + } + } else { + break; + } + } + } + // todo: make this random + tokio::time::sleep(Duration::from_secs(5)).await; + } + }; + + tokio::select! { + _ = handle_signals => { + log::debug!("all participants have been joined"); + }, + _ = notify.notified() => { + log::debug!("join retry task successfully cancelled"); + } + } + }); + + Ok(()) } /// use the Leave signal as a courtesy, to let the group know not to expect you to join. async fn reject_call(&mut self, call_id: Uuid) -> Result<(), Error> { @@ -593,23 +674,20 @@ impl Blink for BlinkImpl { let lock = self.own_id.read().await; let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; let ipfs = self.get_ipfs().await?; - if let Some(ac) = self.active_call.write().await.as_mut() { - match ac.call_state.clone() { - CallState::Started => { - ac.call_state = CallState::Closing; - } + if let Some(ac) = self.active_call.write().await.take() { + match &ac.call_state { CallState::Closed => { log::info!("call already closed"); return Ok(()); } CallState::Uninitialized => { log::info!("cancelling call"); - ac.call_state = CallState::Closed; } CallState::Closing => { log::warn!("leave_call when call_state is: {:?}", ac.call_state); return Ok(()); } + _ => {} }; let call_id = ac.call.call_id(); @@ -621,7 +699,7 @@ impl Blink for BlinkImpl { log::debug!("sent signal to leave call"); } - // send extra quit signal + // send extra quit signal to participants who aren't connected - to update their pending call info for participant in ac .call .participants() diff --git a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs index 8af337f7d..c5cc798f3 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs @@ -85,6 +85,11 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve continue; } } + if let Some(state) = active_call.connected_participants.get(&sender) { + if matches!(state, PeerState::Connected | PeerState::Initializing) { + continue; + } + } active_call.connected_participants.insert(sender.clone(), PeerState::Initializing); // todo: properly hang up on error. // emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal From 401b3c5e1071e5b02718abb9513b52845b92e19c Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Mon, 6 Nov 2023 09:56:54 -0500 Subject: [PATCH 4/4] fix(blink): send InitiationSignal::Join when accepting a call --- .../src/blink_impl/data/mod.rs | 5 +++ .../warp-blink-wrtc/src/blink_impl/mod.rs | 37 +++++++++++++++++++ .../src/blink_impl/webrtc_handler.rs | 2 + 3 files changed, 44 insertions(+) diff --git a/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs index 4e007ed10..c57d5cd55 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs @@ -8,15 +8,19 @@ use warp::{ pub struct ActiveCall { pub call: CallInfo, pub connected_participants: HashMap, + // participants who refused the call or hung up + pub left_call: HashSet, pub call_state: CallState, pub call_config: CallConfig, } #[derive(Clone, Eq, PartialEq)] pub enum PeerState { + // one of the webrtc transport layers got disconnected. Disconnected, Initializing, Connected, + // the the webrtc controller hung up. Closed, } #[derive(Debug, Clone, Eq, PartialEq)] @@ -35,6 +39,7 @@ impl From for ActiveCall { Self { call: value, connected_participants: HashMap::new(), + left_call: HashSet::new(), call_state: CallState::Uninitialized, call_config: CallConfig::default(), } diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 4e1a91d66..6b4cce98c 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -501,6 +501,7 @@ impl Blink for BlinkImpl { let call_info = CallInfo::new(conversation_id, participants.clone()); self.init_call(call_info.clone()).await?; + let active_call = self.active_call.clone(); let call_id = call_info.call_id(); let notify = self.notify.clone(); let own_id = self.own_id.clone(); @@ -522,6 +523,15 @@ impl Blink for BlinkImpl { }; let mut new_participants = vec![]; while let Some(dest) = participants.pop() { + if active_call + .read() + .await + .as_ref() + .map(|x| x.left_call.contains(&dest)) + .unwrap_or_default() + { + continue; + } // if not online and available, don't try to send the signal if multipass .identity_status(&dest) @@ -588,6 +598,7 @@ impl Blink for BlinkImpl { self.init_call(call.clone()).await?; + let own_id = self.own_id.clone(); let active_call = self.active_call.clone(); let participants = call.participants(); let ipfs = self.get_ipfs().await?; @@ -596,6 +607,32 @@ impl Blink for BlinkImpl { let multipass = self.multipass.clone(); tokio::task::spawn(async move { let handle_signals = async { + // this block ensures the lock gets dropped + { + // send InitiationSignal::Join to everyone so they can update their pending calls correctly + let lock = own_id.read().await; + let own_id = match lock.as_ref() { + Some(r) => r, + None => { + log::error!("error accepting call. couldn't get own_id"); + return; + } + }; + // send extra quit signal to participants who aren't connected - to update their pending call info + for participant in participants.iter() { + if participant == own_id { + continue; + } + let topic = ipfs_routes::call_initiation_route(participant); + let signal = InitiationSignal::Join { call_id }; + if let Err(e) = + send_signal_ecdh(&ipfs, own_id, participant, signal, topic).await + { + log::error!("failed to send signal: {e}"); + } + } + } + loop { // this scope is to make sure the locks drop { diff --git a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs index c5cc798f3..8ad5afb7c 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs @@ -91,6 +91,7 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve } } active_call.connected_participants.insert(sender.clone(), PeerState::Initializing); + active_call.left_call.remove(&sender); // todo: properly hang up on error. // emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal if let Err(e) = webrtc_controller.write().await.dial(&sender).await { @@ -116,6 +117,7 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve log::error!("participant tried to leave call who wasn't part of the call"); continue; } + active_call.left_call.insert(sender.clone()); webrtc_controller.write().await.hang_up(&sender).await; if let Err(e) = ch.send(BlinkEventKind::ParticipantLeft { call_id, peer_id: sender }) { log::error!("failed to send ParticipantLeft event: {e}");