Skip to content

Commit

Permalink
resend join signal. might be a bug
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 9, 2023
1 parent 2203293 commit 9594e8d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
45 changes: 39 additions & 6 deletions extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use super::signaling::{
self, ipfs_routes, CallSignal, GossipSubSignal, InitiationSignal, PeerSignal,
};

use std::sync::Arc;
use tokio::sync::{
broadcast,
mpsc::{self, UnboundedReceiver, UnboundedSender},
Notify,
use std::{sync::Arc, time::Duration};
use tokio::{
sync::{
broadcast,
mpsc::{self, UnboundedReceiver, UnboundedSender},
Notify,
},
time::Instant,
};
use uuid::Uuid;
use warp::{
Expand Down Expand Up @@ -260,13 +263,37 @@ async fn run(

let mut call_data_map = CallDataMap::new(own_id.clone());
let mut active_call: Option<Uuid> = None;
// if you aren't the one to offer the call, then this will get set to true.
let mut resend_join = false;
let mut timer = tokio::time::interval_at(
Instant::now() + Duration::from_millis(5000),
Duration::from_millis(5000),
);

loop {
tokio::select! {
_ = notify.notified() => {
log::debug!("quitting blink event handler");
break;
},
_ = timer.tick() => {
if !resend_join {
continue;
}
if let Some(call_id) = active_call.as_ref() {
let topic = ipfs_routes::call_signal_route(call_id);
if let Some(data) = call_data_map.map.get(call_id) {
if data.state.participants_joined.len() == data.info.participants().len() {
continue;
}
let signal = CallSignal::Join;
let _ = gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic);
}
}


}
opt = cmd_rx.recv() => {
let cmd = match opt {
Some(r) => r,
Expand Down Expand Up @@ -391,14 +418,14 @@ async fn run(
let topic = ipfs_routes::call_signal_route(&call_id);

log::debug!("answering call. sending join signal");
// todo? periodically re-send join signals. perhaps somewhere else
let signal = CallSignal::Join;
if let Err(e) =
gossipsub_sender
.send_signal_aes(call_info.group_key(), signal, topic)
{
let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string())));
} else {
resend_join = true;
let _ = rsp.send(Ok(()));
}
}
Expand All @@ -424,6 +451,7 @@ async fn run(
let _ = webrtc_controller.remove_media_source(source_id).await;
},
Cmd::LeaveCall { call_id } => {
resend_join = false;
let call_id = call_id.unwrap_or(active_call.unwrap_or_default());
let info = call_data_map.get_call_info(call_id);
if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
Expand Down Expand Up @@ -641,6 +669,10 @@ async fn run(
continue;
}
signaling::CallSignal::Join => {
// ignore extra join signals
if call_data_map.contains_participant(call_id, &sender) {
continue;
}
call_data_map.add_participant(call_id, &sender);
if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
log::debug!("dialing peer");
Expand Down Expand Up @@ -780,6 +812,7 @@ async fn run(
if let Some(data) = call_data_map.map.get(&ac) {
if data.info.participants().len() == 2 && data.state.participants_joined.len() <= 1 {
log::info!("all participants have successfully been disconnected");
resend_join = false;
if let Err(e) = webrtc_controller.deinit().await {
log::error!("webrtc deinit failed: {e}");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ async fn run(
};

let mut ecdh_queue: HashMap<DID, VecDeque<GossipSubCmd>> = HashMap::new();
let mut timer = tokio::time::interval_at(
Instant::now() + Duration::from_millis(2000),
Duration::from_millis(2000),
);

loop {
tokio::select! {
Expand Down Expand Up @@ -221,7 +225,7 @@ async fn run(
}
};
if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await {
log::error!("failed to publish message: {e}");
log::error!("failed to publish aes message: {e}");

}
},
Expand Down

0 comments on commit 9594e8d

Please sign in to comment.