Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blink): fix signaling when offering a call #355

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ use warp::{
pub struct ActiveCall {
pub call: CallInfo,
pub connected_participants: HashMap<DID, PeerState>,
// participants who refused the call or hung up
pub left_call: HashSet<DID>,
pub call_state: CallState,
pub call_config: CallConfig,
}

#[derive(Clone, Eq, PartialEq)]
pub enum PeerState {
// one of the webrtc transport layers got disconnected.
Disconnected,
Initializing,
Connected,
// the the webrtc controller hung up.
Closed,
}
#[derive(Debug, Clone, Eq, PartialEq)]
Expand All @@ -35,6 +39,7 @@ impl From<CallInfo> for ActiveCall {
Self {
call: value,
connected_participants: HashMap::new(),
left_call: HashSet::new(),
call_state: CallState::Uninitialized,
call_config: CallConfig::default(),
}
Expand Down
218 changes: 191 additions & 27 deletions extensions/warp-blink-wrtc/src/blink_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ mod data;
use data::*;

mod webrtc_handler;
use futures::stream::futures_unordered;
use futures::StreamExt;
use tokio::sync::Notify;
use warp::multipass::identity::IdentityStatus;
use webrtc_handler::run as handle_webrtc;
use webrtc_handler::WebRtcHandlerParams;

Expand Down Expand Up @@ -63,9 +67,14 @@ pub struct BlinkImpl {
// handles 3 streams: one for webrtc events and two IPFS topics
// pertains to the active_call, which is stored in STATIC_DATA
webrtc_handler: Arc<warp::sync::RwLock<Option<JoinHandle<()>>>>,
// call initiation signals frequently fail. this will retry while the call is active
notify: Arc<Notify>,

// prevents the UI from running multiple tests simultaneously
audio_device_config: Arc<RwLock<host_media::audio::DeviceConfig>>,

// used to determine if someone is online
multipass: Box<dyn MultiPass>,
}

impl Drop for BlinkImpl {
Expand Down Expand Up @@ -138,6 +147,7 @@ impl BlinkImpl {
pending_calls: Arc::new(RwLock::new(HashMap::new())),
active_call: Arc::new(RwLock::new(None)),
webrtc_controller: Arc::new(RwLock::new(simple_webrtc::Controller::new()?)),
notify: Arc::new(Notify::new()),
own_id: Arc::new(RwLock::new(None)),
ui_event_ch,
audio_source_config: Arc::new(RwLock::new(source_config)),
Expand All @@ -148,6 +158,7 @@ impl BlinkImpl {
selected_speaker,
selected_microphone,
))),
multipass: account.clone(),
};

let ipfs = blink_impl.ipfs.clone();
Expand Down Expand Up @@ -414,6 +425,12 @@ impl BlinkImpl {

Ok(())
}

fn reinit_notify(&mut self) {
self.notify.notify_waiters();
let new_notify = Arc::new(Notify::new());
let _to_drop = std::mem::replace(&mut self.notify, new_notify);
}
}

impl Extension for BlinkImpl {
Expand Down Expand Up @@ -469,6 +486,7 @@ impl Blink for BlinkImpl {
mut participants: Vec<DID>,
) -> Result<Uuid, Error> {
self.ensure_call_not_in_progress().await?;
self.reinit_notify();
let ipfs = self.get_ipfs().await?;

// need to drop lock to self.own_id before calling self.init_call
Expand All @@ -483,27 +501,92 @@ impl Blink for BlinkImpl {
let call_info = CallInfo::new(conversation_id, participants.clone());
self.init_call(call_info.clone()).await?;

let lock = self.own_id.read().await;
let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?;

for dest in participants {
if dest == *own_id {
continue;
}
let topic = ipfs_routes::call_initiation_route(&dest);
let signal = InitiationSignal::Offer {
call_info: call_info.clone(),
let active_call = self.active_call.clone();
let call_id = call_info.call_id();
let notify = self.notify.clone();
let own_id = self.own_id.clone();
let multipass = self.multipass.clone();
tokio::task::spawn(async move {
let handle_signals = async {
'OFFER_CALL_OUTER: loop {
// the inner loop is used as a scope to let the lock on own_id be dropped before sleeping.
// it's basically a goto.
#[allow(clippy::never_loop)]
'OFFER_CALL_INNER: loop {
let lock = own_id.read().await;
let own_id = match lock.as_ref() {
Some(r) => r,
None => {
// sleep and retry
break 'OFFER_CALL_INNER;
}
};
let mut new_participants = vec![];
while let Some(dest) = participants.pop() {
if active_call
.read()
.await
.as_ref()
.map(|x| x.left_call.contains(&dest))
.unwrap_or_default()
{
continue;
}
// if not online and available, don't try to send the signal
if multipass
.identity_status(&dest)
.await
.map(|x| matches!(x, IdentityStatus::Online))
.unwrap_or_default()
{
new_participants.push(dest);
continue;
}

if dest == *own_id {
continue;
}

let topic = ipfs_routes::call_initiation_route(&dest);
let signal = InitiationSignal::Offer {
call_info: call_info.clone(),
};

if let Err(e) =
send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await
{
log::error!("failed to send offer signal: {e}");
new_participants.push(dest);
continue;
}
}
participants = new_participants;
if participants.is_empty() {
break 'OFFER_CALL_OUTER;
}
// the label isn't needed but helps with readability
break 'OFFER_CALL_INNER;
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
};

if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await {
log::error!("failed to send signal: {e}");
tokio::select! {
_ = handle_signals => {
log::debug!("all offers were sent successfully");
},
_ = notify.notified() => {
log::debug!("offer retry task was successfully cancelled");
}
}
}
Ok(call_info.call_id())
});

Ok(call_id)
}
/// accept/join a call. Automatically send and receive audio
async fn answer_call(&mut self, call_id: Uuid) -> Result<(), Error> {
self.ensure_call_not_in_progress().await?;
self.reinit_notify();
let call = match self.pending_calls.write().await.remove(&call_id) {
Some(r) => r.call,
None => {
Expand All @@ -514,14 +597,97 @@ impl Blink for BlinkImpl {
};

self.init_call(call.clone()).await?;
let call_id = call.call_id();
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Join { call_id };

let own_id = self.own_id.clone();
let active_call = self.active_call.clone();
let participants = call.participants();
let ipfs = self.get_ipfs().await?;
send_signal_aes(&ipfs, &call.group_key(), signal, topic)
.await
.map_err(|e| Error::FailedToSendSignal(e.to_string()))
let call_id = call.call_id();
let notify = self.notify.clone();
let multipass = self.multipass.clone();
tokio::task::spawn(async move {
let handle_signals = async {
// this block ensures the lock gets dropped
{
// send InitiationSignal::Join to everyone so they can update their pending calls correctly
let lock = own_id.read().await;
let own_id = match lock.as_ref() {
Some(r) => r,
None => {
log::error!("error accepting call. couldn't get own_id");
return;
}
};
// send extra quit signal to participants who aren't connected - to update their pending call info
for participant in participants.iter() {
if participant == own_id {
continue;
}
let topic = ipfs_routes::call_initiation_route(participant);
let signal = InitiationSignal::Join { call_id };
if let Err(e) =
send_signal_ecdh(&ipfs, own_id, participant, signal, topic).await
{
log::error!("failed to send signal: {e}");
}
}
}

loop {
// this scope is to make sure the locks drop
{
let connected_participants: Vec<DID> = active_call
.read()
.await
.as_ref()
.map(|x| x.connected_participants.keys().cloned().collect())
.unwrap_or_default();

let to_connect: Vec<_> = participants
.iter()
.filter(|x| !connected_participants.contains(x))
.collect();

// if the other participants, who aren't yet connected to you, aren't online, don't bother sending signals
let futures = to_connect.iter().map(|x| async {
let status = multipass.identity_status(x).await;
// note that if they joined the call, they may appear as busy.
matches!(status, Ok(IdentityStatus::Online | IdentityStatus::Busy))
});
let fut = futures_unordered::FuturesUnordered::from_iter(futures);
let vals: Vec<bool> = fut.collect().await;
if vals.iter().any(|x| *x) {
// participants contains your own id.
if to_connect.len() > 1 {
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Join { call_id };

if let Err(e) =
send_signal_aes(&ipfs, &call.group_key(), signal, topic).await
{
log::error!("failed to send join signal: {e}");
}
} else {
break;
}
}
}
// todo: make this random
tokio::time::sleep(Duration::from_secs(5)).await;
}
};

tokio::select! {
_ = handle_signals => {
log::debug!("all participants have been joined");
},
_ = notify.notified() => {
log::debug!("join retry task successfully cancelled");
}
}
});

Ok(())
}
/// use the Leave signal as a courtesy, to let the group know not to expect you to join.
async fn reject_call(&mut self, call_id: Uuid) -> Result<(), Error> {
Expand All @@ -541,26 +707,24 @@ impl Blink for BlinkImpl {
}
/// end/leave the current call
async fn leave_call(&mut self) -> Result<(), Error> {
self.reinit_notify();
let lock = self.own_id.read().await;
let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?;
let ipfs = self.get_ipfs().await?;
if let Some(ac) = self.active_call.write().await.as_mut() {
match ac.call_state.clone() {
CallState::Started => {
ac.call_state = CallState::Closing;
}
if let Some(ac) = self.active_call.write().await.take() {
match &ac.call_state {
CallState::Closed => {
log::info!("call already closed");
return Ok(());
}
CallState::Uninitialized => {
log::info!("cancelling call");
ac.call_state = CallState::Closed;
}
CallState::Closing => {
log::warn!("leave_call when call_state is: {:?}", ac.call_state);
return Ok(());
}
_ => {}
};

let call_id = ac.call.call_id();
Expand All @@ -572,7 +736,7 @@ impl Blink for BlinkImpl {
log::debug!("sent signal to leave call");
}

// send extra quit signal
// send extra quit signal to participants who aren't connected - to update their pending call info
for participant in ac
.call
.participants()
Expand Down
7 changes: 7 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve
continue;
}
}
if let Some(state) = active_call.connected_participants.get(&sender) {
if matches!(state, PeerState::Connected | PeerState::Initializing) {
continue;
}
}
active_call.connected_participants.insert(sender.clone(), PeerState::Initializing);
active_call.left_call.remove(&sender);
// todo: properly hang up on error.
// emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal
if let Err(e) = webrtc_controller.write().await.dial(&sender).await {
Expand All @@ -111,6 +117,7 @@ pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEve
log::error!("participant tried to leave call who wasn't part of the call");
continue;
}
active_call.left_call.insert(sender.clone());
webrtc_controller.write().await.hang_up(&sender).await;
if let Err(e) = ch.send(BlinkEventKind::ParticipantLeft { call_id, peer_id: sender }) {
log::error!("failed to send ParticipantLeft event: {e}");
Expand Down
Loading