Skip to content

Commit

Permalink
add retry for join signal and remove active_call when hanging up
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 3, 2023
1 parent e4fbfec commit 0fd4dfe
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 37 deletions.
152 changes: 115 additions & 37 deletions extensions/warp-blink-wrtc/src/blink_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ mod data;
use data::*;

mod webrtc_handler;
use futures::stream::futures_unordered;
use futures::StreamExt;
use tokio::sync::Notify;
use warp::multipass::identity::IdentityStatus;
use webrtc_handler::run as handle_webrtc;
use webrtc_handler::WebRtcHandlerParams;

Expand Down Expand Up @@ -69,6 +72,9 @@ pub struct BlinkImpl {

// prevents the UI from running multiple tests simultaneously
audio_device_config: Arc<RwLock<host_media::audio::DeviceConfig>>,

// used to determine if someone is online
multipass: Box<dyn MultiPass>,
}

impl Drop for BlinkImpl {
Expand Down Expand Up @@ -152,6 +158,7 @@ impl BlinkImpl {
selected_speaker,
selected_microphone,
))),
multipass: account.clone(),
};

let ipfs = blink_impl.ipfs.clone();
Expand Down Expand Up @@ -497,51 +504,69 @@ impl Blink for BlinkImpl {
let call_id = call_info.call_id();
let notify = self.notify.clone();
let own_id = self.own_id.clone();
let multipass = self.multipass.clone();
tokio::task::spawn(async move {
let handle_signals = async {
loop {
let mut new_participants = vec![];
while let Some(dest) = participants.pop() {
'OFFER_CALL_OUTER: loop {
// the inner loop is used as a scope to let the lock on own_id be dropped before sleeping.
// it's basically a goto.
#[allow(clippy::never_loop)]
'OFFER_CALL_INNER: loop {
let lock = own_id.read().await;
let own_id = match lock.as_ref() {
Some(r) => r,
None => {
// sleep and retry
break 'OFFER_CALL_INNER;
}
};
let mut new_participants = vec![];
while let Some(dest) = participants.pop() {
// if not online and available, don't try to send the signal
if multipass
.identity_status(&dest)
.await
.map(|x| matches!(x, IdentityStatus::Online))
.unwrap_or_default()
{
new_participants.push(dest);
continue;
}
};

if dest == *own_id {
continue;
}
if dest == *own_id {
continue;
}

let topic = ipfs_routes::call_initiation_route(&dest);
let signal = InitiationSignal::Offer {
call_info: call_info.clone(),
};
let topic = ipfs_routes::call_initiation_route(&dest);
let signal = InitiationSignal::Offer {
call_info: call_info.clone(),
};

if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await
{
log::error!("failed to send offer signal: {e}");
new_participants.push(dest);
continue;
if let Err(e) =
send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await
{
log::error!("failed to send offer signal: {e}");
new_participants.push(dest);
continue;
}
}
participants = new_participants;
if participants.is_empty() {
break 'OFFER_CALL_OUTER;
}
// the label isn't needed but helps with readability
break 'OFFER_CALL_INNER;
}
participants = new_participants;
if participants.is_empty() {
break;
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
};

tokio::select! {
_ = handle_signals => {
log::debug!("all signals sent successfully");
log::debug!("all offers were sent successfully");
},
_ = notify.notified() => {
log::debug!("call retry task successfully cancelled");
log::debug!("offer retry task was successfully cancelled");
}
}
});
Expand All @@ -562,14 +587,70 @@ impl Blink for BlinkImpl {
};

self.init_call(call.clone()).await?;
let call_id = call.call_id();
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Join { call_id };

let active_call = self.active_call.clone();
let participants = call.participants();
let ipfs = self.get_ipfs().await?;
send_signal_aes(&ipfs, &call.group_key(), signal, topic)
.await
.map_err(|e| Error::FailedToSendSignal(e.to_string()))
let call_id = call.call_id();
let notify = self.notify.clone();
let multipass = self.multipass.clone();
tokio::task::spawn(async move {
let handle_signals = async {
loop {
// this scope is to make sure the locks drop
{
let connected_participants: Vec<DID> = active_call
.read()
.await
.as_ref()
.map(|x| x.connected_participants.keys().cloned().collect())
.unwrap_or_default();

let to_connect: Vec<_> = participants
.iter()
.filter(|x| !connected_participants.contains(x))
.collect();

// if the other participants, who aren't yet connected to you, aren't online, don't bother sending signals
let futures = to_connect.iter().map(|x| async {
let status = multipass.identity_status(x).await;
// note that if they joined the call, they may appear as busy.
matches!(status, Ok(IdentityStatus::Online | IdentityStatus::Busy))
});
let fut = futures_unordered::FuturesUnordered::from_iter(futures);
let vals: Vec<bool> = fut.collect().await;
if vals.iter().any(|x| *x) {
// participants contains your own id.
if to_connect.len() > 1 {
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Join { call_id };

if let Err(e) =
send_signal_aes(&ipfs, &call.group_key(), signal, topic).await
{
log::error!("failed to send join signal: {e}");
}
} else {
break;
}
}
}
// todo: make this random
tokio::time::sleep(Duration::from_secs(5)).await;
}
};

tokio::select! {
_ = handle_signals => {
log::debug!("all participants have been joined");
},
_ = notify.notified() => {
log::debug!("join retry task successfully cancelled");
}
}
});

Ok(())
}
/// use the Leave signal as a courtesy, to let the group know not to expect you to join.
async fn reject_call(&mut self, call_id: Uuid) -> Result<(), Error> {
Expand All @@ -593,23 +674,20 @@ impl Blink for BlinkImpl {
let lock = self.own_id.read().await;
let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?;
let ipfs = self.get_ipfs().await?;
if let Some(ac) = self.active_call.write().await.as_mut() {
match ac.call_state.clone() {
CallState::Started => {
ac.call_state = CallState::Closing;
}
if let Some(ac) = self.active_call.write().await.take() {
match &ac.call_state {
CallState::Closed => {
log::info!("call already closed");
return Ok(());
}
CallState::Uninitialized => {
log::info!("cancelling call");
ac.call_state = CallState::Closed;
}
CallState::Closing => {
log::warn!("leave_call when call_state is: {:?}", ac.call_state);
return Ok(());
}
_ => {}
};

let call_id = ac.call.call_id();
Expand All @@ -621,7 +699,7 @@ impl Blink for BlinkImpl {
log::debug!("sent signal to leave call");
}

// send extra quit signal
// send extra quit signal to participants who aren't connected - to update their pending call info
for participant in ac
.call
.participants()
Expand Down
5 changes: 5 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve
continue;
}
}
if let Some(state) = active_call.connected_participants.get(&sender) {
if matches!(state, PeerState::Connected | PeerState::Initializing) {
continue;
}
}
active_call.connected_participants.insert(sender.clone(), PeerState::Initializing);
// todo: properly hang up on error.
// emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal
Expand Down

0 comments on commit 0fd4dfe

Please sign in to comment.