Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(blink): split Blink extension's lib file into multiple files #353

Merged
merged 9 commits into from
Nov 3, 2023
101 changes: 101 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use futures::StreamExt;
use rust_ipfs::SubscriptionStream;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::sync::{broadcast::Sender, RwLock};
use uuid::Uuid;
use warp::{blink::BlinkEventKind, crypto::DID, error::Error};

use crate::{
signaling::InitiationSignal,
store::{decode_gossipsub_msg_ecdh, PeerIdExt},
};

use super::data::PendingCall;

pub async fn run(
own_id: Arc<RwLock<Option<DID>>>,
pending_calls: Arc<RwLock<HashMap<Uuid, PendingCall>>>,
mut stream: SubscriptionStream,
ch: Sender<BlinkEventKind>,
) {
while let Some(msg) = stream.next().await {
let sender = match msg.source.and_then(|s| s.to_did().ok()) {
Some(id) => id,
None => {
log::error!("msg received without source");
continue;
}
};

let signal: InitiationSignal = {
let lock = own_id.read().await;
let own_id = match lock.as_ref().ok_or(Error::BlinkNotInitialized) {
Ok(r) => r,
Err(e) => {
log::error!("{e}");
continue;
}
};

match decode_gossipsub_msg_ecdh(own_id, &sender, &msg) {
Ok(s) => s,
Err(e) => {
log::error!("failed to decode msg from call initiation stream: {e}");
continue;
}
}
};

match signal {
InitiationSignal::Offer { call_info } => {
if !call_info.participants().contains(&sender) {
log::warn!("someone offered a call for which they weren't a participant");
continue;
}
let call_id = call_info.call_id();
let evt = BlinkEventKind::IncomingCall {
call_id,
conversation_id: call_info.conversation_id(),
sender: sender.clone(),
participants: call_info.participants(),
};

let pc = PendingCall {
call: call_info,
connected_participants: HashSet::from_iter(vec![sender].drain(..)),
};
pending_calls.write().await.insert(call_id, pc);
if let Err(e) = ch.send(evt) {
log::error!("failed to send IncomingCall event: {e}");
}
}
InitiationSignal::Join { call_id } => {
if let Some(pc) = pending_calls.write().await.get_mut(&call_id) {
if !pc.call.participants().contains(&sender) {
log::warn!("someone who wasn't a participant tried to cancel the call");
continue;
}
pc.connected_participants.insert(sender);
}
}
InitiationSignal::Leave { call_id } => {
if let Some(pc) = pending_calls.write().await.get_mut(&call_id) {
if !pc.call.participants().contains(&sender) {
log::warn!("someone who wasn't a participant tried to cancel the call");
continue;
}
pc.connected_participants.remove(&sender);
if pc.connected_participants.is_empty() {
let evt = BlinkEventKind::CallCancelled { call_id };
if let Err(e) = ch.send(evt) {
log::error!("failed to send CallCancelled event: {e}");
}
}
}
}
}
}
}
47 changes: 47 additions & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::collections::{HashMap, HashSet};
use warp::{
blink::{CallConfig, CallInfo},
crypto::DID,
};

#[derive(Clone)]
pub struct ActiveCall {
pub call: CallInfo,
pub connected_participants: HashMap<DID, PeerState>,
pub call_state: CallState,
pub call_config: CallConfig,
}

#[derive(Clone, Eq, PartialEq)]
pub enum PeerState {
Disconnected,
Initializing,
Connected,
Closed,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum CallState {
// the call was offered but no one joined and there is no peer connection
Uninitialized,
// at least one peer has connected
Started,
Closing,
Closed,
}

// used when a call is accepted
impl From<CallInfo> for ActiveCall {
fn from(value: CallInfo) -> Self {
Self {
call: value,
connected_participants: HashMap::new(),
call_state: CallState::Uninitialized,
call_config: CallConfig::default(),
}
}
}

pub struct PendingCall {
pub call: CallInfo,
pub connected_participants: HashSet<DID>,
}
Loading
Loading