Skip to content

Commit

Permalink
try to fix blink
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 8, 2023
1 parent d721714 commit d68e01e
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
37 changes: 23 additions & 14 deletions extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,12 @@ async fn run(
if let Some(data) = call_data_map.map.get_mut(&prev_active) {
data.state.reset_self();
}
if active_call.replace(call_info.call_id()).is_some() {
let call_id = call_info.call_id();
if active_call.as_ref().map(|x| x != &call_id).unwrap_or_default() {
let _ = webrtc_controller.deinit().await;
host_media::reset().await;
}
active_call.replace(call_id);
call_data_map.add_call(call_info.clone(), own_id);

// automatically add an audio track
Expand All @@ -318,7 +320,7 @@ async fn run(
gossipsub_listener
.subscribe_call(call_info.call_id(), call_info.group_key());
gossipsub_listener
.connect_webrtc(call_info.call_id(), own_id.clone());
.subscribe_webrtc(call_info.call_id(), own_id.clone());

// todo: resend periodically. perhaps somewhere else
let mut participants = call_info.participants();
Expand Down Expand Up @@ -359,10 +361,11 @@ async fn run(
if let Some(data) = call_data_map.map.get_mut(&prev_active) {
data.state.reset_self();
}
if active_call.replace(call_id).is_some() {
if active_call.as_ref().map(|x| x != &call_id).unwrap_or_default() {
let _ = webrtc_controller.deinit().await;
host_media::reset().await;
}
active_call.replace(call_id);

// automatically add an audio track
let webrtc_codec = AudioCodec::default();
Expand All @@ -382,7 +385,7 @@ async fn run(
match r {
Ok(_) => {
gossipsub_listener.subscribe_call(call_id, call_info.group_key());
gossipsub_listener.connect_webrtc(call_id, own_id.clone());
gossipsub_listener.subscribe_webrtc(call_id, own_id.clone());
let topic = ipfs_routes::call_signal_route(&call_id);

// todo? periodically re-send join signals. perhaps somewhere else
Expand Down Expand Up @@ -419,7 +422,18 @@ async fn run(
},
Cmd::LeaveCall { call_id } => {
let call_id = call_id.unwrap_or(active_call.unwrap_or_default());
match call_data_map.get_call_info(call_id) {
let info = call_data_map.get_call_info(call_id);
if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
call_data_map.leave_call(call_id);
let _ = active_call.take();
let _ = webrtc_controller.deinit().await;
host_media::reset().await;
if let Err(e) = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id }) {
log::error!("failed to send CallTerminated Event: {e}");
}
}

match info {
Some(info) => {
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Leave;
Expand All @@ -433,15 +447,6 @@ async fn run(
log::error!("failed to leave call - not found");
}
}
if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
call_data_map.leave_call(call_id);
let _ = active_call.take();
let _ = webrtc_controller.deinit().await;
host_media::reset().await;
if let Err(e) = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id }) {
log::error!("failed to send CallTerminated Event: {e}");
}
}
},
Cmd::MuteSelf => {
let call_id = active_call.unwrap_or_default();
Expand Down Expand Up @@ -655,6 +660,7 @@ async fn run(
}
} else if is_call_empty {
call_data_map.remove_call(call_id);
gossipsub_listener.unsubscribe_call(call_id);
if let Err(e) = ui_event_ch.send(BlinkEventKind::CallCancelled { call_id }) {
log::error!("failed to send CallCancelled event: {e}");
}
Expand Down Expand Up @@ -777,6 +783,9 @@ async fn run(
host_media::reset().await;
let event = BlinkEventKind::CallTerminated { call_id: ac };
let _ = ui_event_ch.send(event);

gossipsub_listener.unsubscribe_call(ac);
gossipsub_listener.unsubscribe_webrtc(ac);
}
}
},
Expand Down
12 changes: 7 additions & 5 deletions extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl GossipSubListener {
.send(GossipSubCmd::SubscribeCall { call_id, group_key });
}

pub fn connect_webrtc(&self, call_id: Uuid, peer: DID) {
pub fn subscribe_webrtc(&self, call_id: Uuid, peer: DID) {
let _ = self.ch.send(GossipSubCmd::ConnectWebRtc { call_id, peer });
}

Expand Down Expand Up @@ -112,8 +112,8 @@ async fn run(
let mut current_call: Option<Uuid> = None;
let mut subscribed_calls: HashMap<Uuid, Arc<Notify>> = HashMap::new();

let webrtc_notify = Arc::new(Notify::new());
let call_signal_notify = Arc::new(Notify::new());
// replace webrtc_notify after notifying waiters
let mut webrtc_notify = Arc::new(Notify::new());
let call_offer_notify = Arc::new(Notify::new());
loop {
tokio::select! {
Expand All @@ -126,11 +126,13 @@ async fn run(
if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default(){
let _ = current_call.take();
webrtc_notify.notify_waiters();
webrtc_notify = Arc::new(Notify::new());
}
}
GossipSubCmd::DisconnectWebrtc { call_id } => {
if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
webrtc_notify.notify_waiters();
webrtc_notify = Arc::new(Notify::new());
}
}
GossipSubCmd::SubscribeCall { call_id, group_key } => {
Expand Down Expand Up @@ -194,6 +196,7 @@ async fn run(
if !current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
if current_call.is_some() {
webrtc_notify.notify_waiters();
webrtc_notify = Arc::new(Notify::new());
}
current_call.replace(call_id);
}
Expand Down Expand Up @@ -250,7 +253,6 @@ async fn run(
});
},
GossipSubCmd::ReceiveCalls { own_id } => {
call_offer_notify.notify_waiters();
let mut call_offer_stream = match ipfs
.pubsub_subscribe(call_initiation_route(&own_id))
.await
Expand Down Expand Up @@ -313,8 +315,8 @@ async fn run(
}
}

log::debug!("quitting gossipsub listener");
webrtc_notify.notify_waiters();
call_signal_notify.notify_waiters();
call_offer_notify.notify_waiters();
}
}

0 comments on commit d68e01e

Please sign in to comment.