diff --git a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs index bb3fd2c14..b3d6959b5 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs @@ -292,10 +292,12 @@ async fn run( if let Some(data) = call_data_map.map.get_mut(&prev_active) { data.state.reset_self(); } - if active_call.replace(call_info.call_id()).is_some() { + let call_id = call_info.call_id(); + if active_call.as_ref().map(|x| x != &call_id).unwrap_or_default() { let _ = webrtc_controller.deinit().await; host_media::reset().await; } + active_call.replace(call_id); call_data_map.add_call(call_info.clone(), own_id); // automatically add an audio track @@ -318,7 +320,7 @@ async fn run( gossipsub_listener .subscribe_call(call_info.call_id(), call_info.group_key()); gossipsub_listener - .connect_webrtc(call_info.call_id(), own_id.clone()); + .subscribe_webrtc(call_info.call_id(), own_id.clone()); // todo: resend periodically. perhaps somewhere else let mut participants = call_info.participants(); @@ -359,10 +361,11 @@ async fn run( if let Some(data) = call_data_map.map.get_mut(&prev_active) { data.state.reset_self(); } - if active_call.replace(call_id).is_some() { + if active_call.as_ref().map(|x| x != &call_id).unwrap_or_default() { let _ = webrtc_controller.deinit().await; host_media::reset().await; } + active_call.replace(call_id); // automatically add an audio track let webrtc_codec = AudioCodec::default(); @@ -382,7 +385,7 @@ async fn run( match r { Ok(_) => { gossipsub_listener.subscribe_call(call_id, call_info.group_key()); - gossipsub_listener.connect_webrtc(call_id, own_id.clone()); + gossipsub_listener.subscribe_webrtc(call_id, own_id.clone()); let topic = ipfs_routes::call_signal_route(&call_id); // todo? periodically re-send join signals. perhaps somewhere else @@ -419,7 +422,18 @@ async fn run( }, Cmd::LeaveCall { call_id } => { let call_id = call_id.unwrap_or(active_call.unwrap_or_default()); - match call_data_map.get_call_info(call_id) { + let info = call_data_map.get_call_info(call_id); + if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { + call_data_map.leave_call(call_id); + let _ = active_call.take(); + let _ = webrtc_controller.deinit().await; + host_media::reset().await; + if let Err(e) = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id }) { + log::error!("failed to send CallTerminated Event: {e}"); + } + } + + match info { Some(info) => { let topic = ipfs_routes::call_signal_route(&call_id); let signal = CallSignal::Leave; @@ -433,15 +447,6 @@ async fn run( log::error!("failed to leave call - not found"); } } - if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { - call_data_map.leave_call(call_id); - let _ = active_call.take(); - let _ = webrtc_controller.deinit().await; - host_media::reset().await; - if let Err(e) = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id }) { - log::error!("failed to send CallTerminated Event: {e}"); - } - } }, Cmd::MuteSelf => { let call_id = active_call.unwrap_or_default(); @@ -655,6 +660,7 @@ async fn run( } } else if is_call_empty { call_data_map.remove_call(call_id); + gossipsub_listener.unsubscribe_call(call_id); if let Err(e) = ui_event_ch.send(BlinkEventKind::CallCancelled { call_id }) { log::error!("failed to send CallCancelled event: {e}"); } @@ -777,6 +783,9 @@ async fn run( host_media::reset().await; let event = BlinkEventKind::CallTerminated { call_id: ac }; let _ = ui_event_ch.send(event); + + gossipsub_listener.unsubscribe_call(ac); + gossipsub_listener.unsubscribe_webrtc(ac); } } }, diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs index 0db6edc48..46723f239 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs @@ -73,7 +73,7 @@ impl GossipSubListener { .send(GossipSubCmd::SubscribeCall { call_id, group_key }); } - pub fn connect_webrtc(&self, call_id: Uuid, peer: DID) { + pub fn subscribe_webrtc(&self, call_id: Uuid, peer: DID) { let _ = self.ch.send(GossipSubCmd::ConnectWebRtc { call_id, peer }); } @@ -112,8 +112,8 @@ async fn run( let mut current_call: Option = None; let mut subscribed_calls: HashMap> = HashMap::new(); - let webrtc_notify = Arc::new(Notify::new()); - let call_signal_notify = Arc::new(Notify::new()); + // replace webrtc_notify after notifying waiters + let mut webrtc_notify = Arc::new(Notify::new()); let call_offer_notify = Arc::new(Notify::new()); loop { tokio::select! { @@ -126,11 +126,13 @@ async fn run( if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default(){ let _ = current_call.take(); webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); } } GossipSubCmd::DisconnectWebrtc { call_id } => { if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); } } GossipSubCmd::SubscribeCall { call_id, group_key } => { @@ -194,6 +196,7 @@ async fn run( if !current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { if current_call.is_some() { webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); } current_call.replace(call_id); } @@ -250,7 +253,6 @@ async fn run( }); }, GossipSubCmd::ReceiveCalls { own_id } => { - call_offer_notify.notify_waiters(); let mut call_offer_stream = match ipfs .pubsub_subscribe(call_initiation_route(&own_id)) .await @@ -313,8 +315,8 @@ async fn run( } } + log::debug!("quitting gossipsub listener"); webrtc_notify.notify_waiters(); - call_signal_notify.notify_waiters(); call_offer_notify.notify_waiters(); } }