Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(blink): resend signals to ensure all peers connect #358

Merged
merged 43 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
6bf6913
improve CallConfig
sdwoodbury Nov 6, 2023
5feb3f6
add file to handle sending stuff over gossipsub
sdwoodbury Nov 6, 2023
7e6fccc
add gossipsub listener
sdwoodbury Nov 6, 2023
0b83737
wire up public functions for gossipsub_listener
sdwoodbury Nov 7, 2023
ffc4a9d
saving progress
sdwoodbury Nov 7, 2023
c70e77e
saving progress
sdwoodbury Nov 8, 2023
0c3716b
fix some clippy
sdwoodbury Nov 8, 2023
87ed70f
fix clippy
sdwoodbury Nov 8, 2023
99d34f9
rename event_handler to blink_controller
sdwoodbury Nov 8, 2023
7382c2d
move some stuff into blink_impl
sdwoodbury Nov 8, 2023
a3b04e5
fix clippy
sdwoodbury Nov 8, 2023
b344d70
fix(fmt)
sdwoodbury Nov 8, 2023
7eaa8e2
Merge branch 'main' into fix/blink-signaling2
sdwoodbury Nov 8, 2023
b027b86
fix clippy
sdwoodbury Nov 8, 2023
94a5dff
wire up connectionclosed
sdwoodbury Nov 8, 2023
0865bb6
fix clippy
sdwoodbury Nov 8, 2023
6377a4b
change ConnectionClosedEvent
sdwoodbury Nov 8, 2023
a346e9d
add logging
sdwoodbury Nov 8, 2023
9fafe0e
remove useless logging
sdwoodbury Nov 8, 2023
8ae71da
change logging
sdwoodbury Nov 8, 2023
d721714
handle connectionclosed with other events again
sdwoodbury Nov 8, 2023
d68e01e
try to fix blink
sdwoodbury Nov 8, 2023
652075d
fix gossipsub listener
sdwoodbury Nov 8, 2023
c551e8d
fix some stuff
sdwoodbury Nov 8, 2023
32412e2
added logging
sdwoodbury Nov 9, 2023
2d2fb43
make gossipsub listener's send signal functions async
sdwoodbury Nov 9, 2023
d05a131
add queue to gossipsub sender
sdwoodbury Nov 9, 2023
5218ead
fix clippy
sdwoodbury Nov 9, 2023
2203293
fix typo
sdwoodbury Nov 9, 2023
9594e8d
resend join signal. might be a bug
sdwoodbury Nov 9, 2023
e3368cf
change signaling to periodically announce presence and then
sdwoodbury Nov 9, 2023
96469af
fix clippy
sdwoodbury Nov 9, 2023
7c277cc
increase duration of dial timer
sdwoodbury Nov 9, 2023
451397c
fix signalling - use announce
sdwoodbury Nov 9, 2023
2416ae8
try to fix logic for leaving call
sdwoodbury Nov 9, 2023
5439793
add logging
sdwoodbury Nov 9, 2023
38053c3
remove some logging
sdwoodbury Nov 9, 2023
81ffb62
added readme
sdwoodbury Nov 9, 2023
615d30f
fix(signaling): send participant state in the announce event
sdwoodbury Nov 9, 2023
0037af7
fix clippy
sdwoodbury Nov 9, 2023
c93703a
fix clippy
sdwoodbury Nov 9, 2023
864cb89
reset announce timer in response to GossipSubCmd::Announce
sdwoodbury Nov 9, 2023
74221df
fix fmt
sdwoodbury Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
812 changes: 812 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs

Large diffs are not rendered by default.

101 changes: 0 additions & 101 deletions extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs

This file was deleted.

204 changes: 173 additions & 31 deletions extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,47 +1,189 @@
use std::collections::{HashMap, HashSet};
use std::collections::HashMap;
use uuid::Uuid;
use warp::{
blink::{CallConfig, CallInfo},
blink::{CallInfo, CallState, ParticipantState},
crypto::DID,
};

mod notify_wrapper;
pub use notify_wrapper::*;

#[derive(Clone)]
pub struct ActiveCall {
pub call: CallInfo,
pub connected_participants: HashMap<DID, PeerState>,
pub call_state: CallState,
pub call_config: CallConfig,
pub struct CallData {
pub info: CallInfo,
pub state: CallState,
}

#[derive(Clone, Eq, PartialEq)]
pub enum PeerState {
Disconnected,
Initializing,
Connected,
Closed,
impl CallData {
pub fn new(info: CallInfo, state: CallState) -> Self {
Self { info, state }
}

pub fn get_info(&self) -> CallInfo {
self.info.clone()
}

pub fn get_state(&self) -> CallState {
self.state.clone()
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CallState {
// the call was offered but no one joined and there is no peer connection
Uninitialized,
// at least one peer has connected
Started,
Closing,
Closed,

pub struct CallDataMap {
pub own_id: DID,
pub active_call: Option<Uuid>,
pub map: HashMap<Uuid, CallData>,
}

// used when a call is accepted
impl From<CallInfo> for ActiveCall {
fn from(value: CallInfo) -> Self {
impl CallDataMap {
pub fn new(own_id: DID) -> Self {
Self {
call: value,
connected_participants: HashMap::new(),
call_state: CallState::Uninitialized,
call_config: CallConfig::default(),
own_id,
active_call: None,
map: HashMap::default(),
}
}
pub fn add_call(&mut self, info: CallInfo, sender: &DID) {
let call_id = info.call_id();
if self.map.contains_key(&call_id) {
log::warn!("tried to add a call for which a key already exists");
return;
}

let mut state = CallState::new(self.own_id.clone());
state.add_participant(sender, ParticipantState::default());
self.map.insert(call_id, CallData::new(info, state));
}

pub fn get_pending_calls(&self) -> Vec<CallInfo> {
self.map.values().map(|x| x.get_info()).collect()
}

pub fn is_active_call(&self, call_id: Uuid) -> bool {
self.active_call
.as_ref()
.map(|x| x == &call_id)
.unwrap_or_default()
}

pub fn get_mut(&mut self, call_id: Uuid) -> Option<&mut CallData> {
self.map.get_mut(&call_id)
}

pub fn get_active_mut(&mut self) -> Option<&mut CallData> {
match self.active_call {
None => None,
Some(call_id) => self.map.get_mut(&call_id),
}
}

pub fn get_active(&self) -> Option<&CallData> {
match self.active_call {
None => None,
Some(call_id) => self.map.get(&call_id),
}
}

pub fn set_active(&mut self, call_id: Uuid) {
self.active_call.replace(call_id);
}
}

pub struct PendingCall {
pub call: CallInfo,
pub connected_participants: HashSet<DID>,
impl CallDataMap {
pub fn add_participant(
&mut self,
call_id: Uuid,
peer_id: &DID,
participant_state: ParticipantState,
) {
if let Some(data) = self.map.get_mut(&call_id) {
if data.info.contains_participant(peer_id) {
data.state.add_participant(peer_id, participant_state);
}
}
}

pub fn call_empty(&self, call_id: Uuid) -> bool {
self.map
.get(&call_id)
.map(|data| data.state.participants_joined.is_empty())
.unwrap_or(true)
}

pub fn contains_participant(&self, call_id: Uuid, peer_id: &DID) -> bool {
self.map
.get(&call_id)
.map(|data| data.info.contains_participant(peer_id))
.unwrap_or_default()
}

pub fn get_call_info(&self, id: Uuid) -> Option<CallInfo> {
self.map.get(&id).map(|x| x.get_info())
}

pub fn get_call_state(&self, id: Uuid) -> Option<CallState> {
self.map.get(&id).map(|x| x.get_state())
}

pub fn get_own_state(&self) -> Option<ParticipantState> {
self.get_active().cloned().and_then(|data| {
data.get_state()
.participants_joined
.get(&self.own_id)
.cloned()
})
}

pub fn get_participant_state(&self, call_id: Uuid, peer_id: &DID) -> Option<ParticipantState> {
self.get_call_state(call_id)
.and_then(|state| state.participants_joined.get(peer_id).cloned())
}

pub fn insert(&mut self, id: Uuid, data: CallData) {
self.map.insert(id, data);
}

pub fn get_call_config(&self, id: Uuid) -> Option<CallState> {
self.map.get(&id).map(|x| x.get_state())
}

pub fn leave_call(&mut self, call_id: Uuid) {
if self.is_active_call(call_id) {
self.active_call.take();
}
if let Some(data) = self.map.get_mut(&call_id) {
data.state.reset_self();
}
}

pub fn remove_call(&mut self, call_id: Uuid) {
self.map.remove(&call_id);
}

pub fn remove_participant(&mut self, call_id: Uuid, peer_id: &DID) {
if let Some(data) = self.map.get_mut(&call_id) {
if data.info.contains_participant(peer_id) {
data.state.remove_participant(peer_id);
}
}
}
}

impl CallDataMap {
pub fn set_muted(&mut self, call_id: Uuid, participant: &DID, value: bool) {
if let Some(data) = self.map.get_mut(&call_id) {
data.state.set_muted(participant, value);
}
}

pub fn set_deafened(&mut self, call_id: Uuid, participant: &DID, value: bool) {
if let Some(data) = self.map.get_mut(&call_id) {
data.state.set_deafened(participant, value);
}
}

pub fn set_recording(&mut self, call_id: Uuid, participant: &DID, value: bool) {
if let Some(data) = self.map.get_mut(&call_id) {
data.state.set_recording(participant, value);
}
}
}
12 changes: 12 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/data/notify_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use std::sync::Arc;
use tokio::sync::Notify;

pub struct NotifyWrapper {
pub notify: Arc<Notify>,
}

impl Drop for NotifyWrapper {
fn drop(&mut self) {
self.notify.notify_waiters();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should probably check the strong count of the reference so if its the only thing left, it can signal its waiters. Eg

if Arc::strong_count(&self.notify) == 1 {
   self.notify.notify_waiters();
}

Copy link
Contributor Author

@sdwoodbury sdwoodbury Nov 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notify gets cloned and passed to some tasks. I think that dropping NotifyWrapper wouldn't necessarily mean the strong count would be one. I hoped that by putting NotifyWrapper in an Arc, Drop wouldn't be called until all the Arc<NotifyWrapper>s were dropped.

}
}
Loading
Loading