Skip to content

Commit

Permalink
chore(blink): split Blink extension's lib file into multiple files (#353
Browse files Browse the repository at this point in the history
)
  • Loading branch information
sdwoodbury authored Nov 3, 2023
1 parent c937a67 commit 0255837
Show file tree
Hide file tree
Showing 6 changed files with 1,295 additions and 1,317 deletions.
101 changes: 101 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use futures::StreamExt;
use rust_ipfs::SubscriptionStream;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::{broadcast::Sender, RwLock};
use uuid::Uuid;
use warp::{blink::BlinkEventKind, crypto::DID, error::Error};

use crate::{
signaling::InitiationSignal,
store::{decode_gossipsub_msg_ecdh, PeerIdExt},
};

use super::data::PendingCall;

pub async fn run(
own_id: Arc<RwLock<Option<DID>>>,
pending_calls: Arc<RwLock<HashMap<Uuid, PendingCall>>>,
mut stream: SubscriptionStream,
ch: Sender<BlinkEventKind>,
) {
while let Some(msg) = stream.next().await {
let sender = match msg.source.and_then(|s| s.to_did().ok()) {
Some(id) => id,
None => {
log::error!("msg received without source");
continue;
}
};

let signal: InitiationSignal = {
let lock = own_id.read().await;
let own_id = match lock.as_ref().ok_or(Error::BlinkNotInitialized) {
Ok(r) => r,
Err(e) => {
log::error!("{e}");
continue;
}
};

match decode_gossipsub_msg_ecdh(own_id, &sender, &msg) {
Ok(s) => s,
Err(e) => {
log::error!("failed to decode msg from call initiation stream: {e}");
continue;
}
}
};

match signal {
InitiationSignal::Offer { call_info } => {
if !call_info.participants().contains(&sender) {
log::warn!("someone offered a call for which they weren't a participant");
continue;
}
let call_id = call_info.call_id();
let evt = BlinkEventKind::IncomingCall {
call_id,
conversation_id: call_info.conversation_id(),
sender: sender.clone(),
participants: call_info.participants(),
};

let pc = PendingCall {
call: call_info,
connected_participants: HashSet::from_iter(vec![sender].drain(..)),
};
pending_calls.write().await.insert(call_id, pc);
if let Err(e) = ch.send(evt) {
log::error!("failed to send IncomingCall event: {e}");
}
}
InitiationSignal::Join { call_id } => {
if let Some(pc) = pending_calls.write().await.get_mut(&call_id) {
if !pc.call.participants().contains(&sender) {
log::warn!("someone who wasn't a participant tried to cancel the call");
continue;
}
pc.connected_participants.insert(sender);
}
}
InitiationSignal::Leave { call_id } => {
if let Some(pc) = pending_calls.write().await.get_mut(&call_id) {
if !pc.call.participants().contains(&sender) {
log::warn!("someone who wasn't a participant tried to cancel the call");
continue;
}
pc.connected_participants.remove(&sender);
if pc.connected_participants.is_empty() {
let evt = BlinkEventKind::CallCancelled { call_id };
if let Err(e) = ch.send(evt) {
log::error!("failed to send CallCancelled event: {e}");
}
}
}
}
}
}
}
47 changes: 47 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::collections::{HashMap, HashSet};
use warp::{
blink::{CallConfig, CallInfo},
crypto::DID,
};

#[derive(Clone)]
pub struct ActiveCall {
pub call: CallInfo,
pub connected_participants: HashMap<DID, PeerState>,
pub call_state: CallState,
pub call_config: CallConfig,
}

#[derive(Clone, Eq, PartialEq)]
pub enum PeerState {
Disconnected,
Initializing,
Connected,
Closed,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CallState {
// the call was offered but no one joined and there is no peer connection
Uninitialized,
// at least one peer has connected
Started,
Closing,
Closed,
}

// used when a call is accepted
impl From<CallInfo> for ActiveCall {
fn from(value: CallInfo) -> Self {
Self {
call: value,
connected_participants: HashMap::new(),
call_state: CallState::Uninitialized,
call_config: CallConfig::default(),
}
}
}

pub struct PendingCall {
pub call: CallInfo,
pub connected_participants: HashSet<DID>,
}
Loading

0 comments on commit 0255837

Please sign in to comment.