diff --git a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs index 1fa56834c..3b4101182 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs @@ -5,11 +5,14 @@ use super::signaling::{ self, ipfs_routes, CallSignal, GossipSubSignal, InitiationSignal, PeerSignal, }; -use std::sync::Arc; -use tokio::sync::{ - broadcast, - mpsc::{self, UnboundedReceiver, UnboundedSender}, - Notify, +use std::{sync::Arc, time::Duration}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Notify, + }, + time::Instant, }; use uuid::Uuid; use warp::{ @@ -260,6 +263,12 @@ async fn run( let mut call_data_map = CallDataMap::new(own_id.clone()); let mut active_call: Option = None; + // if you aren't the one to offer the call, then this will get set to true. + let mut resend_join = false; + let mut timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(5000), + Duration::from_millis(5000), + ); loop { tokio::select! { @@ -267,6 +276,24 @@ async fn run( log::debug!("quitting blink event handler"); break; }, + _ = timer.tick() => { + if !resend_join { + continue; + } + if let Some(call_id) = active_call.as_ref() { + let topic = ipfs_routes::call_signal_route(call_id); + if let Some(data) = call_data_map.map.get(call_id) { + if data.state.participants_joined.len() == data.info.participants().len() { + continue; + } + let signal = CallSignal::Join; + let _ = gossipsub_sender + .send_signal_aes(data.info.group_key(), signal, topic); + } + } + + + } opt = cmd_rx.recv() => { let cmd = match opt { Some(r) => r, @@ -391,7 +418,6 @@ async fn run( let topic = ipfs_routes::call_signal_route(&call_id); log::debug!("answering call. sending join signal"); - // todo? periodically re-send join signals. perhaps somewhere else let signal = CallSignal::Join; if let Err(e) = gossipsub_sender @@ -399,6 +425,7 @@ async fn run( { let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string()))); } else { + resend_join = true; let _ = rsp.send(Ok(())); } } @@ -424,6 +451,7 @@ async fn run( let _ = webrtc_controller.remove_media_source(source_id).await; }, Cmd::LeaveCall { call_id } => { + resend_join = false; let call_id = call_id.unwrap_or(active_call.unwrap_or_default()); let info = call_data_map.get_call_info(call_id); if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { @@ -641,6 +669,10 @@ async fn run( continue; } signaling::CallSignal::Join => { + // ignore extra join signals + if call_data_map.contains_participant(call_id, &sender) { + continue; + } call_data_map.add_participant(call_id, &sender); if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { log::debug!("dialing peer"); @@ -780,6 +812,7 @@ async fn run( if let Some(data) = call_data_map.map.get(&ac) { if data.info.participants().len() == 2 && data.state.participants_joined.len() <= 1 { log::info!("all participants have successfully been disconnected"); + resend_join = false; if let Err(e) = webrtc_controller.deinit().await { log::error!("webrtc deinit failed: {e}"); } diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs index 98850a1fe..d339f32f9 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs @@ -179,6 +179,10 @@ async fn run( }; let mut ecdh_queue: HashMap> = HashMap::new(); + let mut timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(2000), + Duration::from_millis(2000), + ); loop { tokio::select! { @@ -221,7 +225,7 @@ async fn run( } }; if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { - log::error!("failed to publish message: {e}"); + log::error!("failed to publish aes message: {e}"); } },