From 0fd4dfea89245e0777c4539393e2d3d340254e92 Mon Sep 17 00:00:00 2001 From: Stuart Woodbury Date: Fri, 3 Nov 2023 17:37:00 -0400 Subject: [PATCH] add retry for join signal and remove active_call when hanging up --- .../warp-blink-wrtc/src/blink_impl/mod.rs | 152 +++++++++++++----- .../src/blink_impl/webrtc_handler.rs | 5 + 2 files changed, 120 insertions(+), 37 deletions(-) diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index d4622ce81..4e1a91d66 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -5,7 +5,10 @@ mod data; use data::*; mod webrtc_handler; +use futures::stream::futures_unordered; +use futures::StreamExt; use tokio::sync::Notify; +use warp::multipass::identity::IdentityStatus; use webrtc_handler::run as handle_webrtc; use webrtc_handler::WebRtcHandlerParams; @@ -69,6 +72,9 @@ pub struct BlinkImpl { // prevents the UI from running multiple tests simultaneously audio_device_config: Arc>, + + // used to determine if someone is online + multipass: Box, } impl Drop for BlinkImpl { @@ -152,6 +158,7 @@ impl BlinkImpl { selected_speaker, selected_microphone, ))), + multipass: account.clone(), }; let ipfs = blink_impl.ipfs.clone(); @@ -497,51 +504,69 @@ impl Blink for BlinkImpl { let call_id = call_info.call_id(); let notify = self.notify.clone(); let own_id = self.own_id.clone(); + let multipass = self.multipass.clone(); tokio::task::spawn(async move { let handle_signals = async { - loop { - let mut new_participants = vec![]; - while let Some(dest) = participants.pop() { + 'OFFER_CALL_OUTER: loop { + // the inner loop is used as a scope to let the lock on own_id be dropped before sleeping. + // it's basically a goto. + #[allow(clippy::never_loop)] + 'OFFER_CALL_INNER: loop { let lock = own_id.read().await; let own_id = match lock.as_ref() { Some(r) => r, None => { + // sleep and retry + break 'OFFER_CALL_INNER; + } + }; + let mut new_participants = vec![]; + while let Some(dest) = participants.pop() { + // if not online and available, don't try to send the signal + if multipass + .identity_status(&dest) + .await + .map(|x| matches!(x, IdentityStatus::Online)) + .unwrap_or_default() + { new_participants.push(dest); continue; } - }; - if dest == *own_id { - continue; - } + if dest == *own_id { + continue; + } - let topic = ipfs_routes::call_initiation_route(&dest); - let signal = InitiationSignal::Offer { - call_info: call_info.clone(), - }; + let topic = ipfs_routes::call_initiation_route(&dest); + let signal = InitiationSignal::Offer { + call_info: call_info.clone(), + }; - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await - { - log::error!("failed to send offer signal: {e}"); - new_participants.push(dest); - continue; + if let Err(e) = + send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await + { + log::error!("failed to send offer signal: {e}"); + new_participants.push(dest); + continue; + } } + participants = new_participants; + if participants.is_empty() { + break 'OFFER_CALL_OUTER; + } + // the label isn't needed but helps with readability + break 'OFFER_CALL_INNER; } - participants = new_participants; - if participants.is_empty() { - break; - } else { - tokio::time::sleep(Duration::from_secs(1)).await; - } + tokio::time::sleep(Duration::from_secs(2)).await; } }; tokio::select! { _ = handle_signals => { - log::debug!("all signals sent successfully"); + log::debug!("all offers were sent successfully"); }, _ = notify.notified() => { - log::debug!("call retry task successfully cancelled"); + log::debug!("offer retry task was successfully cancelled"); } } }); @@ -562,14 +587,70 @@ impl Blink for BlinkImpl { }; self.init_call(call.clone()).await?; - let call_id = call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Join { call_id }; + let active_call = self.active_call.clone(); + let participants = call.participants(); let ipfs = self.get_ipfs().await?; - send_signal_aes(&ipfs, &call.group_key(), signal, topic) - .await - .map_err(|e| Error::FailedToSendSignal(e.to_string())) + let call_id = call.call_id(); + let notify = self.notify.clone(); + let multipass = self.multipass.clone(); + tokio::task::spawn(async move { + let handle_signals = async { + loop { + // this scope is to make sure the locks drop + { + let connected_participants: Vec = active_call + .read() + .await + .as_ref() + .map(|x| x.connected_participants.keys().cloned().collect()) + .unwrap_or_default(); + + let to_connect: Vec<_> = participants + .iter() + .filter(|x| !connected_participants.contains(x)) + .collect(); + + // if the other participants, who aren't yet connected to you, aren't online, don't bother sending signals + let futures = to_connect.iter().map(|x| async { + let status = multipass.identity_status(x).await; + // note that if they joined the call, they may appear as busy. + matches!(status, Ok(IdentityStatus::Online | IdentityStatus::Busy)) + }); + let fut = futures_unordered::FuturesUnordered::from_iter(futures); + let vals: Vec = fut.collect().await; + if vals.iter().any(|x| *x) { + // participants contains your own id. + if to_connect.len() > 1 { + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Join { call_id }; + + if let Err(e) = + send_signal_aes(&ipfs, &call.group_key(), signal, topic).await + { + log::error!("failed to send join signal: {e}"); + } + } else { + break; + } + } + } + // todo: make this random + tokio::time::sleep(Duration::from_secs(5)).await; + } + }; + + tokio::select! { + _ = handle_signals => { + log::debug!("all participants have been joined"); + }, + _ = notify.notified() => { + log::debug!("join retry task successfully cancelled"); + } + } + }); + + Ok(()) } /// use the Leave signal as a courtesy, to let the group know not to expect you to join. async fn reject_call(&mut self, call_id: Uuid) -> Result<(), Error> { @@ -593,23 +674,20 @@ impl Blink for BlinkImpl { let lock = self.own_id.read().await; let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; let ipfs = self.get_ipfs().await?; - if let Some(ac) = self.active_call.write().await.as_mut() { - match ac.call_state.clone() { - CallState::Started => { - ac.call_state = CallState::Closing; - } + if let Some(ac) = self.active_call.write().await.take() { + match &ac.call_state { CallState::Closed => { log::info!("call already closed"); return Ok(()); } CallState::Uninitialized => { log::info!("cancelling call"); - ac.call_state = CallState::Closed; } CallState::Closing => { log::warn!("leave_call when call_state is: {:?}", ac.call_state); return Ok(()); } + _ => {} }; let call_id = ac.call.call_id(); @@ -621,7 +699,7 @@ impl Blink for BlinkImpl { log::debug!("sent signal to leave call"); } - // send extra quit signal + // send extra quit signal to participants who aren't connected - to update their pending call info for participant in ac .call .participants() diff --git a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs index 8af337f7d..c5cc798f3 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs @@ -85,6 +85,11 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve continue; } } + if let Some(state) = active_call.connected_participants.get(&sender) { + if matches!(state, PeerState::Connected | PeerState::Initializing) { + continue; + } + } active_call.connected_participants.insert(sender.clone(), PeerState::Initializing); // todo: properly hang up on error. // emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal