From a20f88f36049a30d3cbe57c8be68a5f1fce982d6 Mon Sep 17 00:00:00 2001 From: sdwoodbury Date: Mon, 13 Nov 2023 11:40:05 -0500 Subject: [PATCH] fix(blink): resend signals to ensure all peers connect (#358) --- .../src/blink_impl/blink_controller.rs | 812 ++++++++++++++++++ .../src/blink_impl/call_initiation.rs | 101 --- .../src/blink_impl/data/mod.rs | 204 ++++- .../src/blink_impl/data/notify_wrapper.rs | 12 + .../src/blink_impl/gossipsub_listener.rs | 322 +++++++ .../src/blink_impl/gossipsub_sender.rs | 321 +++++++ .../warp-blink-wrtc/src/blink_impl/mod.rs | 642 +++----------- .../warp-blink-wrtc/src/blink_impl/readme.md | 19 + .../src/{ => blink_impl}/signaling.rs | 55 +- .../src/{ => blink_impl}/store.rs | 4 +- .../src/blink_impl/webrtc_handler.rs | 328 ------- .../warp-blink-wrtc/src/host_media/mod.rs | 103 ++- extensions/warp-blink-wrtc/src/lib.rs | 2 - .../warp-blink-wrtc/src/simple_webrtc/mod.rs | 12 +- warp/src/blink/call_config.rs | 10 - warp/src/blink/call_state.rs | 75 ++ warp/src/blink/mod.rs | 25 +- warp/src/error.rs | 4 + 18 files changed, 1983 insertions(+), 1068 deletions(-) create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs delete mode 100644 extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/data/notify_wrapper.rs create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs create mode 100644 extensions/warp-blink-wrtc/src/blink_impl/readme.md rename extensions/warp-blink-wrtc/src/{ => blink_impl}/signaling.rs (60%) rename extensions/warp-blink-wrtc/src/{ => blink_impl}/store.rs (96%) delete mode 100644 extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs delete mode 100644 warp/src/blink/call_config.rs create mode 100644 warp/src/blink/call_state.rs diff --git a/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs new file mode 100644 index 000000000..d146cb6ac --- /dev/null +++ b/extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs @@ -0,0 +1,812 @@ +use futures::channel::oneshot; +use futures::StreamExt; + +use super::signaling::{ + self, ipfs_routes, CallSignal, GossipSubSignal, InitiationSignal, PeerSignal, +}; + +use std::{cmp, sync::Arc, time::Duration}; +use tokio::{ + sync::{ + broadcast, + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Notify, + }, + time::Instant, +}; +use uuid::Uuid; +use warp::{ + blink::{BlinkEventKind, CallInfo, CallState}, + error::Error, +}; +use webrtc::{ + rtp_transceiver::rtp_codec::RTCRtpCodecCapability, + track::track_local::track_local_static_rtp::TrackLocalStaticRTP, +}; + +use crate::{ + host_media::{self, audio::AudioCodec, mp4_logger::Mp4LoggerConfig}, + simple_webrtc::{self, events::WebRtcEventStream, MediaSourceId}, +}; + +use super::{ + data::{CallDataMap, NotifyWrapper}, + gossipsub_listener::GossipSubListener, + gossipsub_sender::GossipSubSender, +}; + +#[derive(Debug)] +enum Cmd { + OfferCall { + call_info: CallInfo, + rsp: oneshot::Sender>, + }, + AnswerCall { + call_id: Uuid, + rsp: oneshot::Sender>, + }, + AddMediaSource { + source_id: MediaSourceId, + codec: RTCRtpCodecCapability, + rsp: oneshot::Sender>>, + }, + RemoveMediaSource { + source_id: MediaSourceId, + }, + GetCallInfo { + call_id: Uuid, + rsp: oneshot::Sender>, + }, + LeaveCall { + call_id: Option, + }, + MuteSelf, + UnmuteSelf, + SilenceCall, + UnsilenceCall, + GetPendingCalls { + rsp: oneshot::Sender>, + }, + GetActiveCallInfo { + rsp: oneshot::Sender>, + }, + GetActiveCallState { + rsp: oneshot::Sender>, + }, + RecordCall { + output_dir: String, + rsp: oneshot::Sender>, + }, + StopRecording { + rsp: oneshot::Sender>, + }, +} + +#[derive(Clone)] +pub struct BlinkController { + ch: UnboundedSender, + notify: Arc, +} + +pub struct Args { + pub webrtc_controller: simple_webrtc::Controller, + pub webrtc_event_stream: WebRtcEventStream, + pub gossipsub_sender: GossipSubSender, + pub gossipsub_listener: GossipSubListener, + pub signal_rx: UnboundedReceiver, + pub ui_event_ch: broadcast::Sender, +} + +impl BlinkController { + pub fn new(args: Args) -> Self { + let Args { + webrtc_controller, + webrtc_event_stream, + gossipsub_sender, + gossipsub_listener, + signal_rx, + ui_event_ch, + } = args; + + let (tx, cmd_rx) = mpsc::unbounded_channel(); + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + tokio::spawn(async move { + run( + webrtc_controller, + webrtc_event_stream, + gossipsub_sender, + gossipsub_listener, + signal_rx, + ui_event_ch, + cmd_rx, + notify2, + ) + .await; + }); + Self { + ch: tx, + notify: Arc::new(NotifyWrapper { notify }), + } + } + + pub async fn offer_call(&self, call_info: CallInfo) -> anyhow::Result<()> { + let (tx, rx) = oneshot::channel(); + self.ch.send(Cmd::OfferCall { call_info, rsp: tx })?; + rx.await??; + Ok(()) + } + + pub async fn answer_call(&self, call_id: Uuid) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::AnswerCall { call_id, rsp: tx }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await + .map_err(|x| Error::FailedToSendSignal(x.to_string()))? + } + + pub async fn add_media_source( + &self, + source_id: MediaSourceId, + codec: RTCRtpCodecCapability, + ) -> anyhow::Result> { + let (tx, rx) = oneshot::channel(); + self.ch.send(Cmd::AddMediaSource { + source_id, + codec, + rsp: tx, + })?; + rx.await? + } + + pub fn remove_media_source(&self, source_id: MediaSourceId) -> anyhow::Result<()> { + self.ch.send(Cmd::RemoveMediaSource { source_id })?; + Ok(()) + } + + pub async fn get_call_info(&self, call_id: Uuid) -> Option { + let (tx, rx) = oneshot::channel(); + self.ch.send(Cmd::GetCallInfo { call_id, rsp: tx }).ok()?; + rx.await.ok()? + } + + pub fn leave_call(&self, call_id: Option) -> anyhow::Result<()> { + self.ch.send(Cmd::LeaveCall { call_id })?; + Ok(()) + } + + pub fn mute_self(&self) -> anyhow::Result<()> { + self.ch.send(Cmd::MuteSelf)?; + Ok(()) + } + + pub fn unmute_self(&self) -> anyhow::Result<()> { + self.ch.send(Cmd::UnmuteSelf)?; + Ok(()) + } + pub fn silence_call(&self) -> anyhow::Result<()> { + self.ch.send(Cmd::SilenceCall)?; + Ok(()) + } + pub fn unsilence_call(&self) -> anyhow::Result<()> { + self.ch.send(Cmd::UnsilenceCall)?; + Ok(()) + } + + pub async fn get_pending_calls(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::GetPendingCalls { rsp: tx }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await.map_err(|x| Error::OtherWithContext(x.to_string())) + } + + pub async fn get_active_call_info(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::GetActiveCallInfo { rsp: tx }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await.map_err(|x| Error::OtherWithContext(x.to_string())) + } + + pub async fn get_active_call_state(&self) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::GetActiveCallState { rsp: tx }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await.map_err(|x| Error::OtherWithContext(x.to_string())) + } + pub async fn record_call(&self, output_dir: String) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::RecordCall { + output_dir, + rsp: tx, + }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await + .map_err(|x| Error::OtherWithContext(x.to_string()))? + } + + pub async fn stop_recording(&self) -> Result<(), Error> { + let (tx, rx) = oneshot::channel(); + self.ch + .send(Cmd::StopRecording { rsp: tx }) + .map_err(|x| Error::OtherWithContext(x.to_string()))?; + rx.await + .map_err(|x| Error::OtherWithContext(x.to_string()))? + } +} + +#[allow(clippy::too_many_arguments)] +async fn run( + mut webrtc_controller: simple_webrtc::Controller, + mut webrtc_event_stream: WebRtcEventStream, + gossipsub_sender: GossipSubSender, + gossipsub_listener: GossipSubListener, + mut signal_rx: UnboundedReceiver, + ui_event_ch: broadcast::Sender, + mut cmd_rx: UnboundedReceiver, + notify: Arc, +) { + let own_id = match gossipsub_sender.get_own_id().await { + Ok(r) => r, + Err(e) => { + log::error!("failed to get own id. quitting blink controller: {e}"); + return; + } + }; + + // prevent accidental moves + let own_id = &own_id; + let own_id_str = own_id.to_string(); + + let mut call_data_map = CallDataMap::new(own_id.clone()); + let mut dial_timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(3000), + Duration::from_millis(3000), + ); + + loop { + tokio::select! { + _ = notify.notified() => { + log::debug!("quitting blink event handler"); + break; + }, + _ = dial_timer.tick() => { + //log::trace!("dial timer: tick"); + if let Some(data) = call_data_map.get_active() { + let call_id = data.info.call_id(); + for (peer_id, _) in data.state.participants_joined.iter() { + if peer_id == own_id { + continue; + } + if webrtc_controller.is_connected(peer_id) { + continue; + } + let peer_str = peer_id.to_string(); + let mut should_dial = false; + for (l, r) in std::iter::zip(peer_str.as_bytes(), own_id_str.as_bytes()) { + match l.cmp(r) { + cmp::Ordering::Less => { + should_dial = true; + break; + } + cmp::Ordering::Greater => { + break; + } + _ => {} + } + } + if should_dial { + if let Err(e) = webrtc_controller.dial(peer_id).await { + log::error!("failed to dial peer: {e}"); + continue; + } + if let Err(e) = ui_event_ch.send(BlinkEventKind::ParticipantJoined { call_id, peer_id: peer_id.clone() }) { + log::error!("failed to send ParticipantJoined Event: {e}"); + } + } + } + } + } + opt = cmd_rx.recv() => { + let cmd = match opt { + Some(r) => r, + None => { + log::debug!("blink handler cmd_rx channel is closed. quitting"); + break; + } + }; + match cmd { + Cmd::OfferCall { call_info, rsp } => { + if call_data_map.is_active_call(call_info.call_id()) { + log::debug!("tried to offer call which is already in progress"); + let _ = rsp.send(Err(Error::CallAlreadyInProgress)); + continue; + } + if let Some(data) = call_data_map.get_active_mut() { + data.state.reset_self(); + let call_id = data.info.call_id(); + let _ = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id}); + let _ = webrtc_controller.deinit().await; + host_media::reset().await; + } + let call_id = call_info.call_id(); + call_data_map.add_call(call_info.clone(), own_id); + call_data_map.set_active(call_id); + + // automatically add an audio track + let webrtc_codec = AudioCodec::default(); + let rtc_rtp_codec: RTCRtpCodecCapability = RTCRtpCodecCapability { + mime_type: webrtc_codec.mime_type(), + clock_rate: webrtc_codec.sample_rate(), + channels: 1, + ..Default::default() + }; + match webrtc_controller.add_media_source(host_media::AUDIO_SOURCE_ID.into(), rtc_rtp_codec).await { + Ok(track) => { + match host_media::create_audio_source_track( + own_id.clone(), + ui_event_ch.clone(), + track, + webrtc_codec).await + { + Ok(_) => { + log::debug!("sending offer signal"); + let call_id = call_info.call_id(); + gossipsub_listener + .subscribe_call(call_id, call_info.group_key()); + gossipsub_listener + .subscribe_webrtc(call_id, own_id.clone()); + + let mut participants = call_info.participants(); + participants.retain(|x| x != own_id); + for dest in participants { + let topic = ipfs_routes::call_initiation_route(&dest); + let signal = InitiationSignal::Offer { + call_info: call_info.clone(), + }; + + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + log::error!("failed to send signal: {e}"); + } + } + + let own_state = call_data_map.get_own_state().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(call_info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + let _ = rsp.send(Ok(())); + } + Err(e) => { + let _ = webrtc_controller.remove_media_source(host_media::AUDIO_SOURCE_ID.into()).await; + let _ = rsp.send(Err(e)); + } + } + } + Err(e) => { + let _ = rsp.send(Err(Error::OtherWithContext(e.to_string()))); + } + } + }, + Cmd::AnswerCall { call_id, rsp } => { + if call_data_map.is_active_call(call_id) { + log::debug!("tried to answer call which is already in progress"); + let _ = rsp.send(Err(Error::CallAlreadyInProgress)); + continue; + } + + let call_info = match call_data_map.get_call_info(call_id) { + Some(r) => r, + None => { + let _ = rsp.send(Err(Error::CallNotFound)); + continue; + } + }; + + if let Some(data) = call_data_map.get_active_mut() { + data.state.reset_self(); + let _ = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id: data.info.call_id() }); + let _ = webrtc_controller.deinit().await; + host_media::reset().await; + } + + call_data_map.set_active(call_id); + + // automatically add an audio track + let webrtc_codec = AudioCodec::default(); + let rtc_rtp_codec: RTCRtpCodecCapability = RTCRtpCodecCapability { + mime_type: webrtc_codec.mime_type(), + clock_rate: webrtc_codec.sample_rate(), + channels: 1, + ..Default::default() + }; + match webrtc_controller.add_media_source(host_media::AUDIO_SOURCE_ID.into(), rtc_rtp_codec).await { + Ok(track) => { + let r = host_media::create_audio_source_track( + own_id.clone(), + ui_event_ch.clone(), + track, + webrtc_codec).await; + match r { + Ok(_) => { + log::debug!("answering call"); + gossipsub_listener.subscribe_call(call_id, call_info.group_key()); + gossipsub_listener.subscribe_webrtc(call_id, own_id.clone()); + + let own_state = call_data_map.get_own_state().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(call_info.group_key(), signal, topic) + { + let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string()))); + } else { + let _ = rsp.send(Ok(())); + } + } + Err(e) => { + let _ = webrtc_controller.remove_media_source(host_media::AUDIO_SOURCE_ID.into()).await; + let _ = rsp.send(Err(e)); + } + } + } + Err(e) => { + let _ = rsp.send(Err(e.into())); + } + } + } + Cmd::AddMediaSource { source_id, codec, rsp } => { + let r = webrtc_controller.add_media_source(source_id, codec).await; + let _ = rsp.send(r); + }, + Cmd::GetCallInfo { call_id, rsp } => { + let _ = rsp.send(call_data_map.get_call_info(call_id)); + } + Cmd::RemoveMediaSource { source_id } => { + let _ = webrtc_controller.remove_media_source(source_id).await; + }, + Cmd::LeaveCall { call_id } => { + let call_id = call_id.unwrap_or(call_data_map.active_call.unwrap_or_default()); + if call_data_map.is_active_call(call_id) { + call_data_map.leave_call(call_id); + let _ = gossipsub_sender.empty_queue(); + let _ = webrtc_controller.deinit().await; + host_media::reset().await; + if let Err(e) = ui_event_ch.send(BlinkEventKind::CallTerminated { call_id }) { + log::error!("failed to send CallTerminated Event: {e}"); + } + } + + // todo: if someone tries to dial you when you left the call, resend the leave signal + match call_data_map.get_call_info(call_id) { + Some(info) => { + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Leave; + if let Err(e) = gossipsub_sender + .send_signal_aes(info.group_key(), signal, topic) + { + log::error!("failed to send signal: {e}"); + } + } + None => { + log::error!("failed to leave call - not found"); + } + } + }, + Cmd::MuteSelf => { + if let Some(data) = call_data_map.get_active_mut() { + let call_id = data.info.call_id(); + data.state.set_self_muted(true); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + } + } + Cmd::UnmuteSelf => { + if let Some(data) = call_data_map.get_active_mut() { + let call_id = data.info.call_id(); + data.state.set_self_muted(false); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + } + } + Cmd::SilenceCall => { + if let Some(data) = call_data_map.get_active_mut() { + let call_id = data.info.call_id(); + if let Err(e) = host_media::deafen().await { + log::error!("{e}"); + } + data.state.set_deafened(own_id, true); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + } + } + Cmd::UnsilenceCall => { + if let Some(data) = call_data_map.get_active_mut() { + let call_id = data.info.call_id(); + if let Err(e) = host_media::undeafen().await { + log::error!("{e}"); + } + data.state.set_deafened(own_id, false); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + } + } + Cmd::GetPendingCalls { rsp } => { + let _ = rsp.send(call_data_map.get_pending_calls()); + } + Cmd::GetActiveCallState { rsp } => { + let _ = rsp.send(call_data_map.get_active().map(|data| data.get_state())); + } + Cmd::GetActiveCallInfo { rsp } => { + let _ = rsp.send(call_data_map.get_active().map(|data| data.get_info())); + } + Cmd::RecordCall { output_dir, rsp } => { + if let Some(data) = call_data_map.get_active_mut() { + let info = data.get_info(); + match + host_media::init_recording(Mp4LoggerConfig { + call_id: info.call_id(), + participants: info.participants(), + audio_codec: AudioCodec::default(), + log_path: output_dir.into(), + }) + .await + { + Ok(_) => { + data.state.set_self_recording(true); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&info.call_id()); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + let _ = rsp.send(Ok(())); + } + Err(e) => { + let _ = rsp.send(Err(Error::OtherWithContext(e.to_string()))); + } + } + } else { + let _ = rsp.send(Err(Error::CallNotInProgress)); + } + } + Cmd::StopRecording { rsp } => { + if let Some(data) = call_data_map.get_active_mut() { + match + host_media::pause_recording() + .await + { + Ok(_) => { + data.state.set_self_recording(false); + let own_state = data.state.participants_joined.get(own_id).cloned().unwrap_or_default(); + let topic = ipfs_routes::call_signal_route(&data.info.call_id()); + let signal = CallSignal::Announce { participant_state: own_state }; + if let Err(e) = + gossipsub_sender.announce(data.info.group_key(), signal, topic) + { + log::error!("failed to send announce signal: {e}"); + } + let _ = rsp.send(Ok(())); + } + Err(e) => { + let _ = rsp.send(Err(Error::OtherWithContext(e.to_string()))); + } + } + } else { + let _ = rsp.send(Err(Error::CallNotInProgress)); + } + } + } + }, + opt = signal_rx.recv() => { + let signal = match opt { + Some(r) => r, + None => { + log::debug!("blink handler signal_rx channel is closed. quitting"); + break; + } + }; + match signal { + GossipSubSignal::Peer { sender, call_id, signal } => match *signal { + _ if !call_data_map.is_active_call(call_id) => { + log::debug!("received webrtc signal for non-active call"); + continue; + } + _ if !call_data_map.contains_participant(call_id, &sender) => { + log::debug!("received signal from someone who isn't part of the call"); + continue; + } + signaling::PeerSignal::Ice(ice) => { + if let Err(e) = webrtc_controller.recv_ice(&sender, ice).await { + log::error!("failed to recv_ice {}", e); + } + }, + signaling::PeerSignal::Sdp(sdp) => { + log::debug!("received signal: SDP"); + if let Err(e) = webrtc_controller.recv_sdp(&sender, sdp).await { + log::error!("failed to recv_sdp: {}", e); + } + }, + signaling::PeerSignal::Dial(sdp) => { + log::debug!("received signal: Dial"); + // emits the SDP Event, which is sent to the peer via the SDP signal + if let Err(e) = webrtc_controller.accept_call(&sender, sdp).await { + log::error!("failed to accept_call: {}", e); + } + }, + }, + GossipSubSignal::Call { sender, call_id, signal } => match signal { + _ if !call_data_map.contains_participant(call_id, &sender) => { + log::debug!("received signal from someone who isn't part of the call"); + continue; + } + signaling::CallSignal::Announce { participant_state } => { + //log::trace!("received announce from {}", &sender); + let prev_state = call_data_map.get_participant_state(call_id, &sender); + let state_changed = prev_state.as_ref().map(|x| x != &participant_state).unwrap_or(true); + call_data_map.add_participant(call_id, &sender, participant_state.clone()); + if state_changed { + let _ = ui_event_ch.send(BlinkEventKind::ParticipantStateChanged { peer_id: sender, state: participant_state }); + } + }, + signaling::CallSignal::Leave => { + call_data_map.remove_participant(call_id, &sender); + let is_call_empty = call_data_map.call_empty(call_id); + + if call_data_map.is_active_call(call_id) { + webrtc_controller.hang_up(&sender).await; + if let Err(e) = ui_event_ch.send(BlinkEventKind::ParticipantLeft { call_id, peer_id: sender }) { + log::error!("failed to send ParticipantLeft event: {e}"); + } + } else if is_call_empty { + call_data_map.remove_call(call_id); + gossipsub_listener.unsubscribe_call(call_id); + if let Err(e) = ui_event_ch.send(BlinkEventKind::CallCancelled { call_id }) { + log::error!("failed to send CallCancelled event: {e}"); + } + } + }, + }, + GossipSubSignal::Initiation { sender, signal } => match signal { + signaling::InitiationSignal::Offer { call_info } => { + let call_id = call_info.call_id(); + let conversation_id = call_info.conversation_id(); + let participants = call_info.participants(); + call_data_map.add_call(call_info, &sender); + + if let Err(e) = ui_event_ch.send(BlinkEventKind::IncomingCall { call_id, conversation_id, sender, participants }) { + log::error!("failed to send IncomingCall event: {e}"); + } + }, + }, + } + } + opt = webrtc_event_stream.next() => { + let event = match opt { + Some(r) => r, + None => { + log::debug!("webrtc_event_stream closed!"); + // todo: get new webrtc controller or something + continue; + } + }; + match event { + simple_webrtc::events::EmittedEvents::Ice { dest, candidate } => { + if let Some(data) = call_data_map.get_active() { + let topic = ipfs_routes::peer_signal_route(&dest, &data.info.call_id()); + let signal = PeerSignal::Ice(*candidate); + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + log::error!("failed to send signal: {e}"); + } + } else { + log::warn!("received EmittedEvents::Ice without active call"); + } + }, + simple_webrtc::events::EmittedEvents::Connected { peer } => { + if let Some(data) = call_data_map.get_active() { + let call_id = data.info.call_id(); + if !call_data_map.contains_participant(call_id, &peer) { + log::warn!("webrtc controller connected to a peer who wasn't in the list for the active call"); + webrtc_controller.hang_up(&peer).await; + } + } else { + log::warn!("received EmittedEvents::Connected without active call"); + } + }, + simple_webrtc::events::EmittedEvents::Disconnected { peer } + | simple_webrtc::events::EmittedEvents::ConnectionFailed { peer } + | simple_webrtc::events::EmittedEvents::ConnectionClosed { peer } => { + log::debug!("webrtc: closed, disconnected or connection failed"); + + webrtc_controller.hang_up(&peer).await; + if let Err(e) = host_media::remove_sink_track(peer.clone()).await { + log::error!("failed to send media_track command: {e}"); + } + + if let Some(data) = call_data_map.get_active_mut() { + let call_id = data.info.call_id(); + if data.info.contains_participant(&peer) { + data.state.remove_participant(&peer); + } + if data.info.participants().len() == 2 && data.state.participants_joined.len() <= 1 { + log::info!("all participants have successfully been disconnected"); + if let Err(e) = webrtc_controller.deinit().await { + log::error!("webrtc deinit failed: {e}"); + } + //rtp_logger::deinit().await; + host_media::reset().await; + let event = BlinkEventKind::CallTerminated { call_id }; + let _ = ui_event_ch.send(event); + + gossipsub_listener.unsubscribe_call(call_id); + gossipsub_listener.unsubscribe_webrtc(call_id); + let _ = gossipsub_sender.empty_queue(); + } + } + }, + simple_webrtc::events::EmittedEvents::Sdp { dest, sdp } => { + if let Some(data) = call_data_map.get_active() { + let topic = ipfs_routes::peer_signal_route(&dest, &data.info.call_id()); + let signal = PeerSignal::Sdp(*sdp); + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + log::error!("failed to send signal: {e}"); + } + } else { + log::warn!("received EmittedEvents::Sdp without active call"); + } + }, + simple_webrtc::events::EmittedEvents::CallInitiated { dest, sdp } => { + if let Some(data) = call_data_map.get_active() { + log::debug!("sending dial signal"); + let topic = ipfs_routes::peer_signal_route(&dest, &data.info.call_id()); + let signal = PeerSignal::Dial(*sdp); + if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) { + log::error!("failed to send signal: {e}"); + } + } else { + log::warn!("dialing without active call"); + } + }, + simple_webrtc::events::EmittedEvents::TrackAdded { peer, track } => { + if let Err(e) = host_media::create_audio_sink_track(peer.clone(), ui_event_ch.clone(), track, AudioCodec::default()).await { + log::error!("failed to send media_track command: {e}"); + } + }, + } + } + } + } +} diff --git a/extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs b/extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs deleted file mode 100644 index 36110dc06..000000000 --- a/extensions/warp-blink-wrtc/src/blink_impl/call_initiation.rs +++ /dev/null @@ -1,101 +0,0 @@ -use futures::StreamExt; -use rust_ipfs::SubscriptionStream; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; -use tokio::sync::{broadcast::Sender, RwLock}; -use uuid::Uuid; -use warp::{blink::BlinkEventKind, crypto::DID, error::Error}; - -use crate::{ - signaling::InitiationSignal, - store::{decode_gossipsub_msg_ecdh, PeerIdExt}, -}; - -use super::data::PendingCall; - -pub async fn run( - own_id: Arc>>, - pending_calls: Arc>>, - mut stream: SubscriptionStream, - ch: Sender, -) { - while let Some(msg) = stream.next().await { - let sender = match msg.source.and_then(|s| s.to_did().ok()) { - Some(id) => id, - None => { - log::error!("msg received without source"); - continue; - } - }; - - let signal: InitiationSignal = { - let lock = own_id.read().await; - let own_id = match lock.as_ref().ok_or(Error::BlinkNotInitialized) { - Ok(r) => r, - Err(e) => { - log::error!("{e}"); - continue; - } - }; - - match decode_gossipsub_msg_ecdh(own_id, &sender, &msg) { - Ok(s) => s, - Err(e) => { - log::error!("failed to decode msg from call initiation stream: {e}"); - continue; - } - } - }; - - match signal { - InitiationSignal::Offer { call_info } => { - if !call_info.participants().contains(&sender) { - log::warn!("someone offered a call for which they weren't a participant"); - continue; - } - let call_id = call_info.call_id(); - let evt = BlinkEventKind::IncomingCall { - call_id, - conversation_id: call_info.conversation_id(), - sender: sender.clone(), - participants: call_info.participants(), - }; - - let pc = PendingCall { - call: call_info, - connected_participants: HashSet::from_iter(vec![sender].drain(..)), - }; - pending_calls.write().await.insert(call_id, pc); - if let Err(e) = ch.send(evt) { - log::error!("failed to send IncomingCall event: {e}"); - } - } - InitiationSignal::Join { call_id } => { - if let Some(pc) = pending_calls.write().await.get_mut(&call_id) { - if !pc.call.participants().contains(&sender) { - log::warn!("someone who wasn't a participant tried to cancel the call"); - continue; - } - pc.connected_participants.insert(sender); - } - } - InitiationSignal::Leave { call_id } => { - if let Some(pc) = pending_calls.write().await.get_mut(&call_id) { - if !pc.call.participants().contains(&sender) { - log::warn!("someone who wasn't a participant tried to cancel the call"); - continue; - } - pc.connected_participants.remove(&sender); - if pc.connected_participants.is_empty() { - let evt = BlinkEventKind::CallCancelled { call_id }; - if let Err(e) = ch.send(evt) { - log::error!("failed to send CallCancelled event: {e}"); - } - } - } - } - } - } -} diff --git a/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs index 4e007ed10..566c4fd7b 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/data/mod.rs @@ -1,47 +1,189 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; +use uuid::Uuid; use warp::{ - blink::{CallConfig, CallInfo}, + blink::{CallInfo, CallState, ParticipantState}, crypto::DID, }; +mod notify_wrapper; +pub use notify_wrapper::*; + #[derive(Clone)] -pub struct ActiveCall { - pub call: CallInfo, - pub connected_participants: HashMap, - pub call_state: CallState, - pub call_config: CallConfig, +pub struct CallData { + pub info: CallInfo, + pub state: CallState, } -#[derive(Clone, Eq, PartialEq)] -pub enum PeerState { - Disconnected, - Initializing, - Connected, - Closed, +impl CallData { + pub fn new(info: CallInfo, state: CallState) -> Self { + Self { info, state } + } + + pub fn get_info(&self) -> CallInfo { + self.info.clone() + } + + pub fn get_state(&self) -> CallState { + self.state.clone() + } } -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum CallState { - // the call was offered but no one joined and there is no peer connection - Uninitialized, - // at least one peer has connected - Started, - Closing, - Closed, + +pub struct CallDataMap { + pub own_id: DID, + pub active_call: Option, + pub map: HashMap, } -// used when a call is accepted -impl From for ActiveCall { - fn from(value: CallInfo) -> Self { +impl CallDataMap { + pub fn new(own_id: DID) -> Self { Self { - call: value, - connected_participants: HashMap::new(), - call_state: CallState::Uninitialized, - call_config: CallConfig::default(), + own_id, + active_call: None, + map: HashMap::default(), + } + } + pub fn add_call(&mut self, info: CallInfo, sender: &DID) { + let call_id = info.call_id(); + if self.map.contains_key(&call_id) { + log::warn!("tried to add a call for which a key already exists"); + return; } + + let mut state = CallState::new(self.own_id.clone()); + state.add_participant(sender, ParticipantState::default()); + self.map.insert(call_id, CallData::new(info, state)); + } + + pub fn get_pending_calls(&self) -> Vec { + self.map.values().map(|x| x.get_info()).collect() + } + + pub fn is_active_call(&self, call_id: Uuid) -> bool { + self.active_call + .as_ref() + .map(|x| x == &call_id) + .unwrap_or_default() + } + + pub fn get_mut(&mut self, call_id: Uuid) -> Option<&mut CallData> { + self.map.get_mut(&call_id) + } + + pub fn get_active_mut(&mut self) -> Option<&mut CallData> { + match self.active_call { + None => None, + Some(call_id) => self.map.get_mut(&call_id), + } + } + + pub fn get_active(&self) -> Option<&CallData> { + match self.active_call { + None => None, + Some(call_id) => self.map.get(&call_id), + } + } + + pub fn set_active(&mut self, call_id: Uuid) { + self.active_call.replace(call_id); } } -pub struct PendingCall { - pub call: CallInfo, - pub connected_participants: HashSet, +impl CallDataMap { + pub fn add_participant( + &mut self, + call_id: Uuid, + peer_id: &DID, + participant_state: ParticipantState, + ) { + if let Some(data) = self.map.get_mut(&call_id) { + if data.info.contains_participant(peer_id) { + data.state.add_participant(peer_id, participant_state); + } + } + } + + pub fn call_empty(&self, call_id: Uuid) -> bool { + self.map + .get(&call_id) + .map(|data| data.state.participants_joined.is_empty()) + .unwrap_or(true) + } + + pub fn contains_participant(&self, call_id: Uuid, peer_id: &DID) -> bool { + self.map + .get(&call_id) + .map(|data| data.info.contains_participant(peer_id)) + .unwrap_or_default() + } + + pub fn get_call_info(&self, id: Uuid) -> Option { + self.map.get(&id).map(|x| x.get_info()) + } + + pub fn get_call_state(&self, id: Uuid) -> Option { + self.map.get(&id).map(|x| x.get_state()) + } + + pub fn get_own_state(&self) -> Option { + self.get_active().cloned().and_then(|data| { + data.get_state() + .participants_joined + .get(&self.own_id) + .cloned() + }) + } + + pub fn get_participant_state(&self, call_id: Uuid, peer_id: &DID) -> Option { + self.get_call_state(call_id) + .and_then(|state| state.participants_joined.get(peer_id).cloned()) + } + + pub fn insert(&mut self, id: Uuid, data: CallData) { + self.map.insert(id, data); + } + + pub fn get_call_config(&self, id: Uuid) -> Option { + self.map.get(&id).map(|x| x.get_state()) + } + + pub fn leave_call(&mut self, call_id: Uuid) { + if self.is_active_call(call_id) { + self.active_call.take(); + } + if let Some(data) = self.map.get_mut(&call_id) { + data.state.reset_self(); + } + } + + pub fn remove_call(&mut self, call_id: Uuid) { + self.map.remove(&call_id); + } + + pub fn remove_participant(&mut self, call_id: Uuid, peer_id: &DID) { + if let Some(data) = self.map.get_mut(&call_id) { + if data.info.contains_participant(peer_id) { + data.state.remove_participant(peer_id); + } + } + } +} + +impl CallDataMap { + pub fn set_muted(&mut self, call_id: Uuid, participant: &DID, value: bool) { + if let Some(data) = self.map.get_mut(&call_id) { + data.state.set_muted(participant, value); + } + } + + pub fn set_deafened(&mut self, call_id: Uuid, participant: &DID, value: bool) { + if let Some(data) = self.map.get_mut(&call_id) { + data.state.set_deafened(participant, value); + } + } + + pub fn set_recording(&mut self, call_id: Uuid, participant: &DID, value: bool) { + if let Some(data) = self.map.get_mut(&call_id) { + data.state.set_recording(participant, value); + } + } } diff --git a/extensions/warp-blink-wrtc/src/blink_impl/data/notify_wrapper.rs b/extensions/warp-blink-wrtc/src/blink_impl/data/notify_wrapper.rs new file mode 100644 index 000000000..1c39ff0d8 --- /dev/null +++ b/extensions/warp-blink-wrtc/src/blink_impl/data/notify_wrapper.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; +use tokio::sync::Notify; + +pub struct NotifyWrapper { + pub notify: Arc, +} + +impl Drop for NotifyWrapper { + fn drop(&mut self) { + self.notify.notify_waiters(); + } +} diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs new file mode 100644 index 000000000..e3f90154f --- /dev/null +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_listener.rs @@ -0,0 +1,322 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use futures::StreamExt; +use rust_ipfs::Ipfs; +use tokio::{ + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Notify, + }, + time::Instant, +}; +use uuid::Uuid; +use warp::{crypto::DID, sync::RwLock}; + +use super::{ + signaling::{ + ipfs_routes::{call_initiation_route, call_signal_route, peer_signal_route}, + CallSignal, GossipSubSignal, InitiationSignal, PeerSignal, + }, + store::PeerIdExt, +}; + +use super::{data::NotifyWrapper, gossipsub_sender::GossipSubSender}; + +enum GossipSubCmd { + // unsubscribe from the call and close any webrtc connections + UnsubscribeCall { call_id: Uuid }, + DisconnectWebrtc { call_id: Uuid }, + // receive call wide broadcasts + SubscribeCall { call_id: Uuid, group_key: Vec }, + // webrtc signaling for a peer + ConnectWebRtc { call_id: Uuid, peer: DID }, + // allow peers to offer calls + ReceiveCalls { own_id: DID }, +} +#[derive(Clone)] +pub struct GossipSubListener { + ch: UnboundedSender, + // when GossipSubSender gets cloned, NotifyWrapper doesn't get cloned. + // when NotifyWrapper finally gets dropped, then it's ok to call notify_waiters + notify: Arc, +} + +impl GossipSubListener { + pub fn new( + ipfs: Arc>>, + signal_tx: UnboundedSender, + gossipsub_sender: GossipSubSender, + ) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + tokio::spawn(async move { + run(ipfs, rx, signal_tx, gossipsub_sender, notify2).await; + }); + Self { + ch: tx, + notify: Arc::new(NotifyWrapper { notify }), + } + } + + pub fn unsubscribe_call(&self, call_id: Uuid) { + let _ = self.ch.send(GossipSubCmd::UnsubscribeCall { call_id }); + } + + pub fn unsubscribe_webrtc(&self, call_id: Uuid) { + let _ = self.ch.send(GossipSubCmd::DisconnectWebrtc { call_id }); + } + + pub fn subscribe_call(&self, call_id: Uuid, group_key: Vec) { + let _ = self + .ch + .send(GossipSubCmd::SubscribeCall { call_id, group_key }); + } + + pub fn subscribe_webrtc(&self, call_id: Uuid, peer: DID) { + let _ = self.ch.send(GossipSubCmd::ConnectWebRtc { call_id, peer }); + } + + pub fn receive_calls(&self, own_id: DID) { + let _ = self.ch.send(GossipSubCmd::ReceiveCalls { own_id }); + } +} + +async fn run( + ipfs: Arc>>, + mut cmd_rx: UnboundedReceiver, + signal_tx: UnboundedSender, + gossipsub_sender: GossipSubSender, + notify: Arc, +) { + let notify2 = notify.clone(); + let mut timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(100), + Duration::from_millis(100), + ); + let ipfs = loop { + tokio::select! { + _ = notify2.notified() => { + log::debug!("GossibSubListener channel closed"); + return; + }, + _ = timer.tick() => { + if ipfs.read().is_some() { + break ipfs.read().clone().unwrap(); + } + } + } + }; + + // for tracking webrtc subscriptions + let mut current_call: Option = None; + let mut subscribed_calls: HashMap> = HashMap::new(); + + // replace webrtc_notify after notifying waiters + let mut webrtc_notify = Arc::new(Notify::new()); + let call_offer_notify = Arc::new(Notify::new()); + loop { + tokio::select! { + opt = cmd_rx.recv() => match opt { + Some(cmd) => match cmd { + GossipSubCmd::UnsubscribeCall { call_id } => { + if let Some(call) = subscribed_calls.remove(&call_id) { + call.notify_waiters(); + } + if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default(){ + let _ = current_call.take(); + webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); + } + } + GossipSubCmd::DisconnectWebrtc { call_id } => { + if current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { + webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); + } + } + GossipSubCmd::SubscribeCall { call_id, group_key } => { + let notify = Arc::new(Notify::new()); + if let Some(prev) = subscribed_calls.insert(call_id, notify.clone()) { + prev.notify_waiters(); + } + + let mut call_signal_stream = match ipfs + .pubsub_subscribe(call_signal_route(&call_id)) + .await + { + Ok(s) => s, + Err(e) => { + log::error!("failed to subscribe to call signal stream: {e}"); + continue; + } + }; + + let ch = signal_tx.clone(); + let gossipsub_sender = gossipsub_sender.clone(); + tokio::spawn(async move { + loop { + tokio::select!{ + _ = notify.notified() => { + log::debug!("call signal stream terminated by notify"); + break; + } + opt = call_signal_stream.next() => match opt { + Some(msg) => { + let sender = match msg.source.and_then(|s| s.to_did().ok()) { + Some(id) => id, + None => { + log::error!("msg received without source"); + continue + } + }; + match gossipsub_sender.decode_signal_aes::(group_key.clone(), msg.data.clone()).await { + Ok(msg) => { + let _ = ch.send(GossipSubSignal::Call{ + sender, + call_id, + signal: msg + }); + }, + Err(e) => { + log::error!("failed to decode call signal: {e}"); + } + }; + } + None => { + log::debug!("call signal stream terminated!"); + break; + } + } + }; + } + }); + }, + GossipSubCmd::ConnectWebRtc { call_id, peer } => { + if !current_call.as_ref().map(|x| x == &call_id).unwrap_or_default() { + if current_call.is_some() { + webrtc_notify.notify_waiters(); + webrtc_notify = Arc::new(Notify::new()); + } + current_call.replace(call_id); + } + + let mut peer_signal_stream = match ipfs + .pubsub_subscribe(peer_signal_route(&peer, &call_id)) + .await + { + Ok(s) => s, + Err(e) => { + log::error!("failed to subscribe to peer signal stream: {e}"); + continue; + } + }; + let ch = signal_tx.clone(); + let notify = webrtc_notify.clone(); + let gossipsub_sender = gossipsub_sender.clone(); + tokio::spawn(async move { + loop { + tokio::select!{ + _ = notify.notified() => { + log::debug!("peer signal stream terminated by notify"); + break; + } + opt = peer_signal_stream.next() => match opt { + Some(msg) => { + let sender = match msg.source.and_then(|s| s.to_did().ok()) { + Some(id) => id, + None => { + log::error!("msg received without source"); + continue + } + }; + match gossipsub_sender.decode_signal_ecdh::(sender.clone(), msg.data.clone()).await { + Ok(msg) => { + let _ = ch.send(GossipSubSignal::Peer { + sender, + call_id, + signal: Box::new(msg) + }); + }, + Err(e) => { + log::error!("failed to decode peer signal: {e}"); + } + }; + } + None => { + log::debug!("peer signal stream closed!"); + break; + } + } + }; + } + }); + }, + GossipSubCmd::ReceiveCalls { own_id } => { + let mut call_offer_stream = match ipfs + .pubsub_subscribe(call_initiation_route(&own_id)) + .await + { + Ok(s) => s, + Err(e) => { + log::error!("failed to subscribe to call offer stream: {e}"); + continue; + } + }; + let ch = signal_tx.clone(); + let notify = call_offer_notify.clone(); + let gossipsub_sender = gossipsub_sender.clone(); + tokio::spawn(async move { + loop { + tokio::select!{ + _ = notify.notified() => { + log::debug!("call offer stream terminated by notify"); + break; + } + opt = call_offer_stream.next() => match opt { + Some(msg) => { + let sender = match msg.source.and_then(|s| s.to_did().ok()) { + Some(id) => id, + None => { + log::error!("msg received without source"); + continue + } + }; + match gossipsub_sender.decode_signal_ecdh::(sender.clone(), msg.data.clone()).await { + Ok(msg) => { + let _ = ch.send(GossipSubSignal::Initiation{ + sender, + signal: msg + }); + }, + Err(e) => { + log::error!("failed to decode call offer: {e}"); + } + }; + } + None => { + log::debug!("call offer stream closed!"); + break; + } + } + }; + } + }); + }, + } + None => { + log::debug!("GossipSubListener channel closed"); + break; + } + }, + _ = notify.notified() => { + log::debug!("GossipSubListener terminated"); + break; + } + } + } + + log::debug!("quitting gossipsub listener"); + webrtc_notify.notify_waiters(); + call_offer_notify.notify_waiters(); +} diff --git a/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs new file mode 100644 index 000000000..adf66fdf6 --- /dev/null +++ b/extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs @@ -0,0 +1,321 @@ +use std::{ + collections::{HashMap, VecDeque}, + fmt::Display, + sync::Arc, + time::Duration, +}; + +use futures::channel::oneshot; +use rust_ipfs::Ipfs; +use serde::{de::DeserializeOwned, Serialize}; +use tokio::{ + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Notify, + }, + time::Instant, +}; +use warp::{ + crypto::{cipher::Cipher, DID}, + sync::RwLock, +}; + +use super::store::{ecdh_decrypt, ecdh_encrypt}; + +use super::data::NotifyWrapper; + +enum GossipSubCmd { + SendAes { + group_key: Vec, + signal: Vec, + topic: String, + }, + // if this command fails, it will periodically be resent + SendEcdh { + dest: DID, + signal: Vec, + topic: String, + }, + // when someone joins a call, they need to periodically announce their presence. + Announce { + group_key: Vec, + signal: Vec, + topic: String, + }, + DecodeEcdh { + src: DID, + data: Vec, + rsp: oneshot::Sender>>, + }, + GetOwnId { + rsp: oneshot::Sender, + }, + // drop resending signals (failed ECDH signals and the announce signal) + EmptyQueue, +} + +#[derive(Clone)] +pub struct GossipSubSender { + // used for signing messages + ch: UnboundedSender, + // when GossipSubSender gets cloned, NotifyWrapper doesn't get cloned. + // when NotifyWrapper finally gets dropped, then it's ok to call notify_waiters + notify: Arc, +} + +impl GossipSubSender { + pub fn new(own_id: Arc>>, ipfs: Arc>>) -> Self { + let (tx, rx) = mpsc::unbounded_channel(); + let notify = Arc::new(Notify::new()); + let notify2 = notify.clone(); + tokio::spawn(async move { + run(own_id, ipfs, rx, notify2).await; + }); + Self { + ch: tx, + notify: Arc::new(NotifyWrapper { notify }), + } + } + + pub async fn get_own_id(&self) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.ch.send(GossipSubCmd::GetOwnId { rsp: tx })?; + let id = rx.await?; + Ok(id) + } + + pub fn send_signal_aes( + &self, + group_key: Vec, + signal: T, + topic: String, + ) -> anyhow::Result<()> { + let signal = serde_cbor::to_vec(&signal)?; + self.ch.send(GossipSubCmd::SendAes { + group_key, + signal, + topic, + })?; + Ok(()) + } + + pub fn send_signal_ecdh( + &self, + dest: DID, + signal: T, + topic: String, + ) -> anyhow::Result<()> { + let signal = serde_cbor::to_vec(&signal)?; + self.ch.send(GossipSubCmd::SendEcdh { + dest, + signal, + topic, + })?; + Ok(()) + } + + pub fn announce( + &self, + group_key: Vec, + signal: T, + topic: String, + ) -> anyhow::Result<()> { + let signal = serde_cbor::to_vec(&signal)?; + self.ch.send(GossipSubCmd::Announce { + group_key, + signal, + topic, + })?; + Ok(()) + } + + // this one doesn't require access to own_id. it can be decrypted using just the group key. + pub async fn decode_signal_aes( + &self, + group_key: Vec, + message: Vec, + ) -> anyhow::Result { + let decrypted = Cipher::direct_decrypt(&message, &group_key)?; + let data: T = serde_cbor::from_slice(&decrypted)?; + Ok(data) + } + + pub async fn decode_signal_ecdh( + &self, + src: DID, + message: Vec, + ) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.ch.send(GossipSubCmd::DecodeEcdh { + src, + data: message, + rsp: tx, + })?; + let bytes = rx.await??; + let data: T = serde_cbor::from_slice(&bytes)?; + Ok(data) + } + + pub fn empty_queue(&self) -> anyhow::Result<()> { + self.ch.send(GossipSubCmd::EmptyQueue)?; + Ok(()) + } +} + +async fn run( + own_id: Arc>>, + ipfs: Arc>>, + mut ch: UnboundedReceiver, + notify: Arc, +) { + let notify2 = notify.clone(); + let mut timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(100), + Duration::from_millis(100), + ); + let own_id = loop { + tokio::select! { + _ = notify2.notified() => { + log::debug!("GossibSubSender channel closed"); + return; + }, + _ = timer.tick() => { + if own_id.read().is_some() { + break own_id.write().take().unwrap(); + } + } + } + }; + + let ipfs = loop { + tokio::select! { + _ = notify2.notified() => { + log::debug!("GossibSubSender channel closed"); + return; + }, + _ = timer.tick() => { + if ipfs.read().is_some() { + break ipfs.read().clone().unwrap(); + } + } + } + }; + + let mut to_announce: Option = None; + let mut ecdh_queue: HashMap> = HashMap::new(); + let mut retry_timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(2000), + Duration::from_millis(2000), + ); + + let mut announce_timer = tokio::time::interval_at( + Instant::now() + Duration::from_millis(5000), + Duration::from_millis(5000), + ); + + loop { + tokio::select! { + _ = retry_timer.tick() => { + for (_dest, queue) in ecdh_queue.iter_mut() { + while let Some(cmd) = queue.pop_front() { + if let GossipSubCmd::SendEcdh { dest, signal, topic } = cmd { + let encrypted = match ecdh_encrypt(&own_id, &dest, signal.clone()) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt ecdh message: {e}"); + continue; + } + }; + if ipfs.pubsub_publish(topic.clone(), encrypted).await.is_err() { + queue.push_front(GossipSubCmd::SendEcdh { dest, signal, topic }); + break; + } + } + } + } + } + _ = announce_timer.tick() => { + if let Some(GossipSubCmd::Announce { group_key, signal, topic }) = to_announce.as_ref() { + let encrypted = match Cipher::direct_encrypt(signal, group_key) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt aes message: {e}"); + continue; + } + }; + if let Err(e) = ipfs.pubsub_publish(topic.clone(), encrypted).await { + log::error!("failed to publish aes message: {e}"); + } + } + } + opt = ch.recv() => match opt { + Some(cmd) => match cmd { + GossipSubCmd::EmptyQueue => { + ecdh_queue.clear(); + to_announce.take(); + } + GossipSubCmd::GetOwnId { rsp } => { + let _ = rsp.send(own_id.clone()); + } + GossipSubCmd::SendAes { group_key, signal, topic } => { + let encrypted = match Cipher::direct_encrypt(&signal, &group_key) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt aes message: {e}"); + continue; + } + }; + if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await { + log::error!("failed to publish aes message: {e}"); + + } + }, + GossipSubCmd::SendEcdh { dest, signal, topic } => { + // only add to the queue if sending fails + let encrypted = match ecdh_encrypt(&own_id, &dest, signal.clone()) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt ecdh message: {e}"); + continue; + } + }; + if ipfs.pubsub_publish(topic.clone(), encrypted).await.is_err() { + let queue = ecdh_queue.entry(dest.clone()).or_default(); + queue.push_back(GossipSubCmd::SendEcdh { dest, signal, topic }); + } + } + GossipSubCmd::Announce { group_key, signal, topic } => { + let encrypted = match Cipher::direct_encrypt(&signal, &group_key) { + Ok(r) => r, + Err(e) => { + log::error!("failed to encrypt aes message: {e}"); + continue; + } + }; + if let Err(e) = ipfs.pubsub_publish(topic.clone(), encrypted).await { + log::error!("failed to publish aes message: {e}"); + } + to_announce.replace(GossipSubCmd::Announce { group_key, signal, topic }); + announce_timer.reset(); + }, + GossipSubCmd::DecodeEcdh { src, data, rsp } => { + let r = || { + let bytes = ecdh_decrypt(&own_id, &src, &data)?; + Ok(bytes) + }; + + let _ = rsp.send(r()); + } + } + None => { + log::debug!("GossibSubSender channel closed"); + return; + } + }, + _ = notify.notified() => { + log::debug!("GossibSubSender terminated"); + return; + } + } + } +} diff --git a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs index 46ae58460..75f0c61e4 100644 --- a/extensions/warp-blink-wrtc/src/blink_impl/mod.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/mod.rs @@ -1,89 +1,63 @@ -mod call_initiation; -use call_initiation::run as handle_call_initiation; - mod data; -use data::*; -mod webrtc_handler; -use webrtc_handler::run as handle_webrtc; -use webrtc_handler::WebRtcHandlerParams; +mod blink_controller; +mod gossipsub_listener; +mod gossipsub_sender; +mod signaling; +mod store; -use anyhow::{bail, Context}; +use anyhow::bail; use async_trait::async_trait; use cpal::traits::{DeviceTrait, HostTrait}; use rust_ipfs::{Ipfs, Keypair}; -use std::{any::Any, collections::HashMap, str::FromStr, sync::Arc, time::Duration}; -use tokio::{ - sync::{ - broadcast::{self}, - RwLock, - }, - task::JoinHandle, +use std::{any::Any, str::FromStr, sync::Arc, time::Duration}; +use tokio::sync::{ + broadcast::{self}, + mpsc, }; use uuid::Uuid; use warp::{ - blink::{AudioDeviceConfig, Blink, BlinkEventKind, BlinkEventStream, CallConfig, CallInfo}, + blink::{AudioDeviceConfig, Blink, BlinkEventKind, BlinkEventStream, CallInfo, CallState}, crypto::{did_key::Generate, zeroize::Zeroizing, DIDKey, Ed25519KeyPair, Fingerprint, DID}, error::Error, module::Module, multipass::MultiPass, Extension, SingleHandle, }; -use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; use crate::{ + blink_impl::blink_controller::BlinkController, host_media::{ self, - audio::{ - automute::{AutoMuteCmd, AUDIO_CMD_CH}, - AudioCodec, AudioHardwareConfig, AudioSampleRate, - }, - mp4_logger::Mp4LoggerConfig, + audio::automute::{AutoMuteCmd, AUDIO_CMD_CH}, }, - signaling::{ipfs_routes, CallSignal, InitiationSignal}, - simple_webrtc::{self, events::WebRtcEventStream}, - store::{send_signal_aes, send_signal_ecdh}, + simple_webrtc::{self}, }; +use self::gossipsub_listener::GossipSubListener; +use self::gossipsub_sender::GossipSubSender; + // implements Blink #[derive(Clone)] pub struct BlinkImpl { - ipfs: Arc>>, - pending_calls: Arc>>, - active_call: Arc>>, - webrtc_controller: Arc>, - // the DID generated from Multipass, never cloned. contains the private key - own_id: Arc>>, + // the DID generated from Multipass. has been cloned. doesn't contain the private key anymore. + own_id: Arc>>, ui_event_ch: broadcast::Sender, - audio_source_config: Arc>, - audio_sink_config: Arc>, - // subscribes to IPFS topic to receive incoming calls - offer_handler: Arc>>, - // handles 3 streams: one for webrtc events and two IPFS topics - // pertains to the active_call, which is stored in STATIC_DATA - webrtc_handler: Arc>>>, + gossipsub_listener: GossipSubListener, + gossipsub_sender: GossipSubSender, + blink_controller: blink_controller::BlinkController, - // prevents the UI from running multiple tests simultaneously - audio_device_config: Arc>, + drop_handler: Arc, } -impl Drop for BlinkImpl { +struct DropHandler {} +impl Drop for DropHandler { fn drop(&mut self) { - let webrtc_handler = std::mem::take(&mut *self.webrtc_handler.write()); - if let Some(handle) = webrtc_handler { - handle.abort(); - } - self.offer_handler.write().abort(); - let webrtc_controller = self.webrtc_controller.clone(); tokio::spawn(async move { - if let Err(e) = webrtc_controller.write().await.deinit().await { - log::error!("error in webrtc_controller deinit: {e}"); - } host_media::audio::automute::stop(); host_media::reset().await; - //rtp_logger::deinit().await; - log::debug!("deinit finished"); + log::debug!("blink drop handler finished"); }); } } @@ -92,69 +66,51 @@ impl BlinkImpl { pub async fn new(account: Box) -> anyhow::Result> { log::trace!("initializing WebRTC"); - // check SupportedStreamConfigs. if those channels aren't supported, use the default. - let mut source_config = AudioHardwareConfig { - sample_rate: AudioSampleRate::High, - channels: 1, - }; - - let mut sink_config = AudioHardwareConfig { - sample_rate: AudioSampleRate::High, - channels: 1, - }; - - let mut selected_speaker = None; - let mut selected_microphone = None; let cpal_host = cpal::default_host(); if let Some(input_device) = cpal_host.default_input_device() { - selected_microphone = input_device.name().ok(); - match Self::get_min_source_channels(&input_device) { - Ok(channels) => { - source_config.channels = channels; - host_media::change_audio_input(input_device, source_config.clone()).await?; - } - Err(e) => log::error!("{e}"), - } + host_media::change_audio_input(input_device).await?; } else { log::warn!("blink started with no input device"); } if let Some(output_device) = cpal_host.default_output_device() { - selected_speaker = output_device.name().ok(); - match Self::get_min_sink_channels(&output_device) { - Ok(channels) => { - sink_config.channels = channels; - host_media::change_audio_output(output_device, sink_config.clone()).await?; - } - Err(e) => log::error!("{e}"), - } + host_media::change_audio_output(output_device).await?; } else { log::warn!("blink started with no output device"); } + // todo: ensure rx doesn't get dropped let (ui_event_ch, _rx) = broadcast::channel(1024); + let (gossipsub_tx, gossipsub_rx) = mpsc::unbounded_channel(); + + let ipfs = Arc::new(warp::sync::RwLock::new(None)); + let own_id_private = Arc::new(warp::sync::RwLock::new(None)); + let gossipsub_sender = GossipSubSender::new(own_id_private.clone(), ipfs.clone()); + let gossipsub_listener = + GossipSubListener::new(ipfs.clone(), gossipsub_tx, gossipsub_sender.clone()); + + let webrtc_controller = simple_webrtc::Controller::new()?; + let webrtc_event_stream = webrtc_controller.get_event_stream(); + let blink_controller = BlinkController::new(blink_controller::Args { + webrtc_controller, + webrtc_event_stream, + gossipsub_sender: gossipsub_sender.clone(), + gossipsub_listener: gossipsub_listener.clone(), + signal_rx: gossipsub_rx, + ui_event_ch: ui_event_ch.clone(), + }); + let blink_impl = Self { - ipfs: Arc::new(RwLock::new(None)), - pending_calls: Arc::new(RwLock::new(HashMap::new())), - active_call: Arc::new(RwLock::new(None)), - webrtc_controller: Arc::new(RwLock::new(simple_webrtc::Controller::new()?)), - own_id: Arc::new(RwLock::new(None)), + own_id: Arc::new(warp::sync::RwLock::new(None)), ui_event_ch, - audio_source_config: Arc::new(RwLock::new(source_config)), - audio_sink_config: Arc::new(RwLock::new(sink_config)), - offer_handler: Arc::new(warp::sync::RwLock::new(tokio::spawn(async {}))), - webrtc_handler: Arc::new(warp::sync::RwLock::new(None)), - audio_device_config: Arc::new(RwLock::new(host_media::audio::DeviceConfig::new( - selected_speaker, - selected_microphone, - ))), + gossipsub_sender, + gossipsub_listener, + blink_controller, + drop_handler: Arc::new(DropHandler {}), }; - let ipfs = blink_impl.ipfs.clone(); let own_id = blink_impl.own_id.clone(); - let offer_handler = blink_impl.offer_handler.clone(); - let pending_calls = blink_impl.pending_calls.clone(); - let ui_event_ch = blink_impl.ui_event_ch.clone(); + let gossipsub_listener = blink_impl.gossipsub_listener.clone(); tokio::spawn(async move { let f = async move { @@ -176,35 +132,15 @@ impl BlinkImpl { None => bail!("Unable to use IPFS Handle"), }; - let call_offer_stream = match _ipfs - .pubsub_subscribe(ipfs_routes::call_initiation_route(&identity.did_key())) - .await - { - Ok(s) => s, - Err(e) => { - log::error!("failed to subscribe to call_broadcast_route: {e}"); - return Err(e); - } - }; - let _own_id = get_keypair_did(_ipfs.keypair()?)?; - own_id.write().await.replace(_own_id); - - let _offer_handler = tokio::spawn(async move { - handle_call_initiation( - own_id.clone(), - pending_calls, - call_offer_stream, - ui_event_ch, - ) - .await; - }); - - let mut x = offer_handler.write(); - *x = _offer_handler; - - // set ipfs last to quickly detect that webrtc hasn't been initialized. - ipfs.write().await.replace(_ipfs); + let public_did = identity.did_key(); + // this one better not be cloned + own_id_private.write().replace(_own_id); + // this one is for blink and can be cloned. might not even be needed. + own_id.write().replace(public_did.clone()); + ipfs.write().replace(_ipfs); + + gossipsub_listener.receive_calls(public_did); log::trace!("finished initializing WebRTC"); Ok(()) }; @@ -219,146 +155,6 @@ impl BlinkImpl { Ok(Box::new(blink_impl)) } - async fn init_call(&mut self, call: CallInfo) -> Result<(), Error> { - //rtp_logger::init(call.call_id(), std::path::PathBuf::from("")).await?; - let lock = self.own_id.read().await; - let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - let ipfs = self.get_ipfs().await?; - - self.active_call.write().await.replace(call.clone().into()); - let audio_source_config = self.audio_source_config.read().await; - let webrtc_codec = AudioCodec::default(); - // ensure there is an audio source track - let rtc_rtp_codec: RTCRtpCodecCapability = RTCRtpCodecCapability { - mime_type: webrtc_codec.mime_type(), - clock_rate: webrtc_codec.sample_rate(), - channels: 1, - ..Default::default() - }; - let track = self - .webrtc_controller - .write() - .await - .add_media_source(host_media::AUDIO_SOURCE_ID.into(), rtc_rtp_codec) - .await?; - if let Err(e) = host_media::create_audio_source_track( - own_id.clone(), - self.ui_event_ch.clone(), - track, - webrtc_codec, - audio_source_config.clone(), - ) - .await - { - let _ = self - .webrtc_controller - .write() - .await - .remove_media_source(host_media::AUDIO_SOURCE_ID.into()) - .await; - return Err(e); - } - - // next, create event streams and pass them to a task - let call_signaling_stream = ipfs - .pubsub_subscribe(ipfs_routes::call_signal_route(&call.call_id())) - .await - .context("failed to subscribe to call_broadcast_route")?; - - let peer_signaling_stream = ipfs - .pubsub_subscribe(ipfs_routes::peer_signal_route(own_id, &call.call_id())) - .await - .context("failed to subscribe to call_signaling_route")?; - - let webrtc_event_stream = WebRtcEventStream(Box::pin( - self.webrtc_controller - .read() - .await - .get_event_stream() - .context("failed to get webrtc event stream")?, - )); - - let webrtc_handler = std::mem::take(&mut *self.webrtc_handler.write()); - if let Some(handle) = webrtc_handler { - // just to be safe - handle.abort(); - } - - let own_id = self.own_id.clone(); - let active_call = self.active_call.clone(); - let webrtc_controller = self.webrtc_controller.clone(); - let audio_sink_config = self.audio_sink_config.clone(); - let ui_event_ch = self.ui_event_ch.clone(); - let event_ch2 = ui_event_ch.clone(); - - let webrtc_handle = tokio::task::spawn(async move { - handle_webrtc( - WebRtcHandlerParams { - own_id, - event_ch: event_ch2, - ipfs, - active_call, - webrtc_controller, - audio_sink_config, - ch: ui_event_ch, - call_signaling_stream, - peer_signaling_stream, - }, - webrtc_event_stream, - ) - .await; - }); - - self.webrtc_handler.write().replace(webrtc_handle); - Ok(()) - } - - async fn update_audio_source_config( - &mut self, - input_device: &cpal::Device, - ) -> anyhow::Result<()> { - let min_channels = Self::get_min_source_channels(input_device)?; - self.audio_source_config.write().await.channels = min_channels; - Ok(()) - } - - fn get_min_source_channels(input_device: &cpal::Device) -> anyhow::Result { - let min_channels = - input_device - .supported_input_configs()? - .fold(None, |acc: Option, x| match acc { - None => Some(x.channels()), - Some(y) => Some(std::cmp::min(x.channels(), y)), - }); - let channels = min_channels.ok_or(anyhow::anyhow!( - "unsupported audio input device - no input configuration available" - ))?; - Ok(channels) - } - - async fn update_audio_sink_config( - &mut self, - output_device: &cpal::Device, - ) -> anyhow::Result<()> { - let min_channels = Self::get_min_sink_channels(output_device)?; - self.audio_sink_config.write().await.channels = min_channels; - Ok(()) - } - - fn get_min_sink_channels(output_device: &cpal::Device) -> anyhow::Result { - let min_channels = - output_device - .supported_output_configs()? - .fold(None, |acc: Option, x| match acc { - None => Some(x.channels()), - Some(y) => Some(std::cmp::min(x.channels(), y)), - }); - let channels = min_channels.ok_or(anyhow::anyhow!( - "unsupported audio output device. no output configuration available" - ))?; - Ok(channels) - } - async fn select_microphone(&mut self, device_name: &str) -> Result<(), Error> { let host = cpal::default_host(); let device: cpal::Device = if device_name.to_ascii_lowercase().eq("default") { @@ -372,9 +168,7 @@ impl BlinkImpl { r.ok_or(Error::AudioDeviceNotFound)? }; - self.update_audio_source_config(&device).await?; - host_media::change_audio_input(device, self.audio_source_config.read().await.clone()) - .await?; + host_media::change_audio_input(device).await?; Ok(()) } @@ -391,27 +185,7 @@ impl BlinkImpl { r.ok_or(Error::AudioDeviceNotFound)? }; - self.update_audio_sink_config(&device).await?; - host_media::change_audio_output(device, self.audio_sink_config.read().await.clone()) - .await?; - Ok(()) - } -} - -impl BlinkImpl { - async fn get_ipfs(&self) -> Result { - let lock = self.ipfs.read().await; - let ipfs = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - Ok(ipfs.clone()) - } - - async fn ensure_call_not_in_progress(&self) -> Result<(), Error> { - if let Some(ac) = self.active_call.read().await.as_ref() { - if ac.call_state != CallState::Closed { - return Err(Error::OtherWithContext("previous call not finished".into())); - } - } - + host_media::change_audio_output(device).await?; Ok(()) } } @@ -468,157 +242,48 @@ impl Blink for BlinkImpl { conversation_id: Option, mut participants: Vec, ) -> Result { - self.ensure_call_not_in_progress().await?; - let ipfs = self.get_ipfs().await?; - - // need to drop lock to self.own_id before calling self.init_call - { - let lock = self.own_id.read().await; - let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - if !participants.contains(own_id) { - participants.push(DID::from_str(&own_id.fingerprint())?); - }; - } + let own_id = self + .own_id + .read() + .clone() + .ok_or(Error::BlinkNotInitialized)?; - let call_info = CallInfo::new(conversation_id, participants.clone()); - self.init_call(call_info.clone()).await?; + if !participants.contains(&own_id) { + participants.push(DID::from_str(&own_id.fingerprint())?); + }; - let lock = self.own_id.read().await; - let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; + let call_info = CallInfo::new(conversation_id, participants); + let call_id = call_info.call_id(); + self.blink_controller.offer_call(call_info).await?; - for dest in participants { - if dest == *own_id { - continue; - } - let topic = ipfs_routes::call_initiation_route(&dest); - let signal = InitiationSignal::Offer { - call_info: call_info.clone(), - }; - - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); - } - } - Ok(call_info.call_id()) + Ok(call_id) } /// accept/join a call. Automatically send and receive audio async fn answer_call(&mut self, call_id: Uuid) -> Result<(), Error> { - self.ensure_call_not_in_progress().await?; - let call = match self.pending_calls.write().await.remove(&call_id) { - Some(r) => r.call, - None => { - return Err(Error::OtherWithContext( - "could not answer call: not found".into(), - )) - } - }; - - self.init_call(call.clone()).await?; - let call_id = call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Join { call_id }; - - let ipfs = self.get_ipfs().await?; - send_signal_aes(&ipfs, &call.group_key(), signal, topic) - .await - .map_err(|e| Error::FailedToSendSignal(e.to_string())) + self.blink_controller.answer_call(call_id).await + // todo: periodically re-send join signals } /// use the Leave signal as a courtesy, to let the group know not to expect you to join. async fn reject_call(&mut self, call_id: Uuid) -> Result<(), Error> { - let ipfs = self.get_ipfs().await?; - if let Some(pc) = self.pending_calls.write().await.remove(&call_id) { - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Leave { call_id }; - if let Err(e) = send_signal_aes(&ipfs, &pc.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } - Ok(()) - } else { - Err(Error::OtherWithContext( - "could not reject call: not found".into(), - )) - } + self.blink_controller.leave_call(Some(call_id))?; + Ok(()) } /// end/leave the current call async fn leave_call(&mut self) -> Result<(), Error> { - let lock = self.own_id.read().await; - let own_id = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - let ipfs = self.get_ipfs().await?; - if let Some(ac) = self.active_call.write().await.as_mut() { - match ac.call_state.clone() { - CallState::Started => { - ac.call_state = CallState::Closing; - } - CallState::Closed => { - log::info!("call already closed"); - return Ok(()); - } - CallState::Uninitialized => { - log::info!("cancelling call"); - ac.call_state = CallState::Closed; - } - CallState::Closing => { - log::warn!("leave_call when call_state is: {:?}", ac.call_state); - return Ok(()); - } - }; - - let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Leave { call_id }; - if let Err(e) = send_signal_aes(&ipfs, &ac.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } else { - log::debug!("sent signal to leave call"); - } - - // send extra quit signal - for participant in ac - .call - .participants() - .iter() - .filter(|x| !ac.connected_participants.contains_key(x)) - { - if participant == own_id { - continue; - } - let topic = ipfs_routes::call_initiation_route(participant); - let signal = InitiationSignal::Leave { - call_id: ac.call.call_id(), - }; - if let Err(e) = send_signal_ecdh(&ipfs, own_id, participant, signal, topic).await { - log::error!("failed to send signal: {e}"); - } - } - - let r = self.webrtc_controller.write().await.deinit().await; - host_media::reset().await; - //rtp_logger::deinit().await; - let _ = r?; - Ok(()) - } else { - Err(Error::OtherWithContext( - "tried to leave nonexistent call".into(), - )) - } + self.blink_controller.leave_call(None)?; + Ok(()) } // ------ Select input/output devices ------ async fn get_audio_device_config(&self) -> Box { - Box::new(self.audio_device_config.read().await.clone()) + Box::new(host_media::get_audio_device_config().await) } async fn set_audio_device_config( &mut self, config: Box, ) -> Result<(), Error> { - let audio_device_config = host_media::audio::DeviceConfig::new( - config.speaker_device_name(), - config.microphone_device_name(), - ); - *self.audio_device_config.write().await = audio_device_config; - if let Some(device_name) = config.speaker_device_name() { self.select_speaker(&device_name).await?; } @@ -639,103 +304,24 @@ impl Blink for BlinkImpl { // ------ Media controls ------ async fn mute_self(&mut self) -> Result<(), Error> { - if self.active_call.read().await.is_none() { - return Err(Error::CallNotInProgress); - } - host_media::mute_self().await?; - - let lock = self.ipfs.read().await; - let ipfs = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - - if let Some(ac) = self.active_call.write().await.as_mut() { - ac.call_config.self_muted = true; - let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Muted; - if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } else { - log::debug!("sent signal to mute self"); - } - } - + self.blink_controller.mute_self()?; Ok(()) } async fn unmute_self(&mut self) -> Result<(), Error> { - if self.active_call.read().await.is_none() { - return Err(Error::CallNotInProgress); - } - host_media::unmute_self().await?; - - let lock = self.ipfs.read().await; - let ipfs = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - - if let Some(ac) = self.active_call.write().await.as_mut() { - ac.call_config.self_muted = false; - let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Unmuted; - if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } else { - log::debug!("sent signal to unmute self"); - } - } - + self.blink_controller.unmute_self()?; Ok(()) } async fn silence_call(&mut self) -> Result<(), Error> { - if self.active_call.read().await.is_none() { - return Err(Error::CallNotInProgress); - } - host_media::deafen().await?; - let lock = self.ipfs.read().await; - let ipfs = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - - if let Some(ac) = self.active_call.write().await.as_mut() { - ac.call_config.self_deafened = true; - let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Deafened; - if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } else { - log::debug!("sent signal to deafen self"); - } - } - + self.blink_controller.silence_call()?; Ok(()) } async fn unsilence_call(&mut self) -> Result<(), Error> { - if self.active_call.read().await.is_none() { - return Err(Error::CallNotInProgress); - } - host_media::undeafen().await?; - let lock = self.ipfs.read().await; - let ipfs = lock.as_ref().ok_or(Error::BlinkNotInitialized)?; - - if let Some(ac) = self.active_call.write().await.as_mut() { - ac.call_config.self_deafened = false; - let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); - let signal = CallSignal::Undeafened; - if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { - log::error!("failed to send signal: {e}"); - } else { - log::debug!("sent signal to undeafen self"); - } - } - + self.blink_controller.unsilence_call()?; Ok(()) } - async fn get_call_config(&self) -> Result, Error> { - Ok(self - .active_call - .read() - .await - .as_ref() - .map(|x| x.call_config.clone())) + async fn get_call_state(&self) -> Result, Error> { + self.blink_controller.get_active_call_state().await } async fn enable_camera(&mut self) -> Result<(), Error> { @@ -745,30 +331,10 @@ impl Blink for BlinkImpl { Err(Error::Unimplemented) } async fn record_call(&mut self, output_dir: &str) -> Result<(), Error> { - match self.active_call.read().await.as_ref() { - None => return Err(Error::CallNotInProgress), - Some(ActiveCall { call, .. }) => { - host_media::init_recording(Mp4LoggerConfig { - call_id: call.call_id(), - participants: call.participants(), - audio_codec: AudioCodec::default(), - log_path: output_dir.into(), - }) - .await?; - } - } - - Ok(()) + self.blink_controller.record_call(output_dir.into()).await } async fn stop_recording(&mut self) -> Result<(), Error> { - match self.active_call.read().await.as_ref() { - None => return Err(Error::CallNotInProgress), - Some(_) => { - host_media::pause_recording().await?; - } - } - - Ok(()) + self.blink_controller.stop_recording().await } fn enable_automute(&mut self) -> Result<(), Error> { @@ -790,20 +356,22 @@ impl Blink for BlinkImpl { // ------ Utility Functions ------ async fn pending_calls(&self) -> Vec { - Vec::from_iter( - self.pending_calls - .read() - .await - .values() - .map(|x| x.call.clone()), - ) + match self.blink_controller.get_pending_calls().await { + Ok(r) => r, + Err(e) => { + log::error!("{e}"); + vec![] + } + } } async fn current_call(&self) -> Option { - self.active_call - .read() - .await - .as_ref() - .map(|x| x.call.clone()) + match self.blink_controller.get_active_call_info().await { + Ok(r) => r, + Err(e) => { + log::error!("{e}"); + None + } + } } } diff --git a/extensions/warp-blink-wrtc/src/blink_impl/readme.md b/extensions/warp-blink-wrtc/src/blink_impl/readme.md new file mode 100644 index 000000000..8f12e73ed --- /dev/null +++ b/extensions/warp-blink-wrtc/src/blink_impl/readme.md @@ -0,0 +1,19 @@ +### BlinkImpl spawns three long running tasks +- `GossipSubListener`: receives messages via IPFS and forwards them to the `BlinkController` +- `GossipSubSender`: contains the user's full DID - both the public and private key - and is responsible for sending and decoding messages. GossipSubSender also can provide a clone of the DID (which returns just the public key) upon request. +- `BlinkController`: contains the instance of `SimpleWebrtc`. receives all the gossipsub messages, webrtc events, and user commands (invoked by BlinkImpl) + +### when BlinkImpl offers a call +- an Offer signal is sent (and retried) +- the sender subscribes to a gossip channel specific to that call +- all recipients subscribe to that gossip channel too, even if they don't join the call (this allows them to detect when all the other participants have left the call - in this case the call would be considered terminated). +- The sender automatically joins the call. + +### when someone joins the call +- they subscribe to another gossip channel, using the call id and their DID. This one is for receiving webrtc specific signals (SDP and ICE mostly). +- they broadcast an `Announce` signal on the call-wide channel periodically. +- they track all the `Announce` and `Leave` signals. +- they periodically go through a list of all participants who joined the call but to whom they aren't yet connected. they compare DIDs and based off of that, one of the peers will initiate a webrtc connection via the `Dial` signal. + +### in response to a dial signal +- the other side automatically accepts and proceeds with the webrtc connection process. \ No newline at end of file diff --git a/extensions/warp-blink-wrtc/src/signaling.rs b/extensions/warp-blink-wrtc/src/blink_impl/signaling.rs similarity index 60% rename from extensions/warp-blink-wrtc/src/signaling.rs rename to extensions/warp-blink-wrtc/src/blink_impl/signaling.rs index 323c2db65..48cabc7ec 100644 --- a/extensions/warp-blink-wrtc/src/signaling.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/signaling.rs @@ -2,13 +2,33 @@ use derive_more::Display; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use warp::blink::CallInfo; +use warp::{ + blink::{CallInfo, ParticipantState}, + crypto::DID, +}; use webrtc::{ ice_transport::ice_candidate::RTCIceCandidate, peer_connection::sdp::session_description::RTCSessionDescription, }; +#[derive(Clone)] +pub enum GossipSubSignal { + Peer { + sender: DID, + call_id: Uuid, + signal: Box, + }, + Call { + sender: DID, + call_id: Uuid, + signal: CallSignal, + }, + Initiation { + sender: DID, + signal: InitiationSignal, + }, +} -#[derive(Serialize, Deserialize, Display)] +#[derive(Serialize, Deserialize, Display, Clone)] pub enum PeerSignal { #[display(fmt = "Ice")] Ice(RTCIceCandidate), @@ -22,40 +42,19 @@ pub enum PeerSignal { // this is used for webrtc signaling. // it is somewhat redundant but for now i'll leave it in. -#[derive(Serialize, Deserialize, Display)] +#[derive(Serialize, Deserialize, Display, Clone)] pub enum CallSignal { - #[display(fmt = "Join")] - Join { call_id: Uuid }, + #[display(fmt = "Announce")] + Announce { participant_state: ParticipantState }, #[display(fmt = "Leave")] - Leave { call_id: Uuid }, - - #[display(fmt = "Muted")] - Muted, - #[display(fmt = "Unmuted")] - Unmuted, - #[display(fmt = "Deafened")] - Deafened, - #[display(fmt = "Undeafened")] - Undeafened, + Leave, } -#[derive(Serialize, Deserialize, Display)] +#[derive(Serialize, Deserialize, Display, Clone)] pub enum InitiationSignal { /// invite a peer to join a call #[display(fmt = "Offer")] Offer { call_info: CallInfo }, - /// used to dismiss an incoming call dialog - /// is needed when someone offers a call and - /// everyone who joined the call leaves. if this - /// happens and someone hasn't rejected the call, - /// they may have a call dialog displayed. they need - /// to track how many people joined and left the call to - /// know when to dismiss the dialog. - #[display(fmt = "Join")] - Join { call_id: Uuid }, - /// used to dismiss an incoming call dialog - #[display(fmt = "Leave")] - Leave { call_id: Uuid }, } pub mod ipfs_routes { diff --git a/extensions/warp-blink-wrtc/src/store.rs b/extensions/warp-blink-wrtc/src/blink_impl/store.rs similarity index 96% rename from extensions/warp-blink-wrtc/src/store.rs rename to extensions/warp-blink-wrtc/src/blink_impl/store.rs index e05cc46b1..a7d0ba6b1 100644 --- a/extensions/warp-blink-wrtc/src/store.rs +++ b/extensions/warp-blink-wrtc/src/blink_impl/store.rs @@ -78,7 +78,7 @@ pub fn decode_gossipsub_msg_aes( Ok(data) } -fn ecdh_encrypt>(own_did: &DID, recipient: &DID, data: K) -> Result> { +pub fn ecdh_encrypt>(own_did: &DID, recipient: &DID, data: K) -> Result> { let prikey = Ed25519KeyPair::from_secret_key(&own_did.private_key_bytes()).get_x25519(); let did_pubkey = recipient.public_key_bytes(); @@ -89,7 +89,7 @@ fn ecdh_encrypt>(own_did: &DID, recipient: &DID, data: K) -> Resu Ok(data) } -fn ecdh_decrypt>(own_did: &DID, sender: &DID, data: K) -> Result> { +pub fn ecdh_decrypt>(own_did: &DID, sender: &DID, data: K) -> Result> { let prikey = Ed25519KeyPair::from_secret_key(&own_did.private_key_bytes()).get_x25519(); let did_pubkey = sender.public_key_bytes(); diff --git a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs b/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs deleted file mode 100644 index 8af337f7d..000000000 --- a/extensions/warp-blink-wrtc/src/blink_impl/webrtc_handler.rs +++ /dev/null @@ -1,328 +0,0 @@ -use super::data::*; -use crate::host_media; -use crate::host_media::audio::AudioCodec; -use crate::simple_webrtc; -use futures::StreamExt; -use rust_ipfs::{Ipfs, SubscriptionStream}; -use std::sync::Arc; -use tokio::sync::{ - broadcast::{self, Sender}, - RwLock, -}; - -use warp::{blink::BlinkEventKind, crypto::DID, error::Error}; - -use crate::{ - host_media::audio::AudioHardwareConfig, - signaling::{ipfs_routes, CallSignal, PeerSignal}, - simple_webrtc::events::{EmittedEvents, WebRtcEventStream}, - store::{decode_gossipsub_msg_aes, decode_gossipsub_msg_ecdh, send_signal_ecdh, PeerIdExt}, -}; - -pub struct WebRtcHandlerParams { - pub own_id: Arc>>, - pub event_ch: broadcast::Sender, - pub ipfs: Ipfs, - pub active_call: Arc>>, - pub webrtc_controller: Arc>, - pub audio_sink_config: Arc>, - pub ch: Sender, - pub call_signaling_stream: SubscriptionStream, - pub peer_signaling_stream: SubscriptionStream, -} - -pub async fn run(params: WebRtcHandlerParams, mut webrtc_event_stream: WebRtcEventStream) { - let WebRtcHandlerParams { - own_id, - event_ch, - ipfs, - active_call, - webrtc_controller, - audio_sink_config: audio_sink_codec, - ch, - call_signaling_stream, - peer_signaling_stream, - } = params; - futures::pin_mut!(call_signaling_stream); - futures::pin_mut!(peer_signaling_stream); - - loop { - tokio::select! { - opt = call_signaling_stream.next() => { - let msg = match opt { - Some(m) => m, - None => continue - }; - let sender = match msg.source.and_then(|s| s.to_did().ok()) { - Some(id) => id, - None => { - log::error!("msg received without source"); - continue - } - }; - let mut lock = active_call.write().await; - let active_call = match lock.as_mut() { - Some(r) => r, - None => { - log::error!("received call signal without an active call"); - continue; - } - }; - let signal: CallSignal = match decode_gossipsub_msg_aes(&active_call.call.group_key(), &msg) { - Ok(s) => s, - Err(e) => { - log::error!("failed to decode msg from call signaling stream: {e}"); - continue; - }, - }; - match signal { - CallSignal::Join { call_id } => { - log::debug!("received signal: Join"); - match active_call.call_state.clone() { - CallState::Uninitialized => active_call.call_state = CallState::Started, - x => if x != CallState::Started { - log::error!("someone tried to join call with state: {:?}", active_call.call_state); - continue; - } - } - active_call.connected_participants.insert(sender.clone(), PeerState::Initializing); - // todo: properly hang up on error. - // emits CallInitiated Event, which returns the local sdp. will be sent to the peer with the dial signal - if let Err(e) = webrtc_controller.write().await.dial(&sender).await { - log::error!("failed to dial peer: {e}"); - continue; - } - if let Err(e) = ch.send(BlinkEventKind::ParticipantJoined { call_id, peer_id: sender }) { - log::error!("failed to send ParticipantJoined Event: {e}"); - } - - } - CallSignal::Leave { call_id } => { - log::debug!("received signal: Leave"); - if active_call.call_state == CallState::Closed { - log::error!("participant tried to leave a call which was already closed"); - continue; - } - if active_call.call.call_id() != call_id { - log::error!("participant tried to leave call which wasn't active"); - continue; - } - if !active_call.call.participants().contains(&sender) { - log::error!("participant tried to leave call who wasn't part of the call"); - continue; - } - webrtc_controller.write().await.hang_up(&sender).await; - if let Err(e) = ch.send(BlinkEventKind::ParticipantLeft { call_id, peer_id: sender }) { - log::error!("failed to send ParticipantLeft event: {e}"); - } - }, - CallSignal::Muted => { - if let Err(e) = ch.send(BlinkEventKind::ParticipantMuted { peer_id: sender }) { - log::error!("failed to send ParticipantMuted event: {e}"); - } - }, - CallSignal::Unmuted => { - if let Err(e) = ch.send(BlinkEventKind::ParticipantUnmuted { peer_id: sender }) { - log::error!("failed to send ParticipantUnmuted event: {e}"); - } - } - CallSignal::Deafened => { - if let Err(e) = ch.send(BlinkEventKind::ParticipantDeafened { peer_id: sender }) { - log::error!("failed to send ParticipantDeafened event: {e}"); - } - }, - CallSignal::Undeafened => { - if let Err(e) = ch.send(BlinkEventKind::ParticipantUndeafened { peer_id: sender }) { - log::error!("failed to send ParticipantUndeafened event: {e}"); - } - }, - } - }, - opt = peer_signaling_stream.next() => { - let msg = match opt { - Some(m) => m, - None => continue - }; - let sender = match msg.source.and_then(|s| s.to_did().ok()) { - Some(id) => id, - None => { - log::error!("msg received without source"); - continue - } - }; - - let signal: PeerSignal = { - let lock = own_id.read().await; - let own_id = match lock.as_ref().ok_or(Error::BlinkNotInitialized) { - Ok(r) => r, - Err(e) => { - log::error!("{e}"); - continue; - } - }; - match decode_gossipsub_msg_ecdh(own_id, &sender, &msg) { - Ok(s) => s, - Err(e) => { - log::error!("failed to decode msg from call signaling stream: {e}"); - continue; - }, - } - }; - - let mut lock = active_call.write().await; - let active_call = match lock.as_mut() { - Some(r) => r, - None => { - log::error!("received a peer_signal when there is no active call"); - continue; - } - }; - if matches!(active_call.call_state, CallState::Closing | CallState::Closed) { - log::warn!("received a signal for a call which is being closed"); - continue; - } - if !active_call.call.participants().contains(&sender) { - log::error!("received a signal from a peer who isn't part of the call"); - continue; - } - - let mut webrtc_controller = webrtc_controller.write().await; - - match signal { - PeerSignal::Ice(ice) => { - if active_call.call_state != CallState::Started { - log::error!("ice received for uninitialized call"); - continue; - } - if let Err(e) = webrtc_controller.recv_ice(&sender, ice).await { - log::error!("failed to recv_ice {}", e); - } - } - PeerSignal::Sdp(sdp) => { - if active_call.call_state != CallState::Started { - log::error!("sdp received for uninitialized call"); - continue; - } - log::debug!("received signal: SDP"); - if let Err(e) = webrtc_controller.recv_sdp(&sender, sdp).await { - log::error!("failed to recv_sdp: {}", e); - } - } - PeerSignal::Dial(sdp) => { - if active_call.call_state == CallState::Uninitialized { - active_call.call_state = CallState::Started; - } - log::debug!("received signal: Dial"); - // emits the SDP Event, which is sent to the peer via the SDP signal - if let Err(e) = webrtc_controller.accept_call(&sender, sdp).await { - log::error!("failed to accept_call: {}", e); - } - } - } - }, - opt = webrtc_event_stream.next() => { - match opt { - Some(event) => { - if let EmittedEvents::Ice{ .. } = event { - // don't log this event. it is too noisy. - // would use matches! but the enum's fields don't implement PartialEq - } else { - log::debug!("webrtc event: {event}"); - } - let lock = own_id.read().await; - let own_id = match lock.as_ref().ok_or(Error::BlinkNotInitialized) { - Ok(r) => r, - Err(e) => { - log::error!("{e}"); - continue; - } - }; - let mut lock = active_call.write().await; - let active_call = match lock.as_mut() { - Some(ac) => ac, - None => { - log::error!("event emitted but no active call"); - continue; - } - }; - let mut webrtc_controller = webrtc_controller.write().await; - let call_id = active_call.call.call_id(); - match event { - EmittedEvents::TrackAdded { peer, track } => { - if peer == *own_id { - log::warn!("got TrackAdded event for own id"); - continue; - } - let audio_sink_codec = audio_sink_codec.read().await.clone(); - if let Err(e) = host_media::create_audio_sink_track(peer.clone(), event_ch.clone(), track, AudioCodec::default(), audio_sink_codec).await { - log::error!("failed to send media_track command: {e}"); - } - } - EmittedEvents::Connected { peer } => { - active_call.connected_participants.insert(peer.clone(), PeerState::Connected); - let event = BlinkEventKind::ParticipantJoined { call_id, peer_id: peer}; - let _ = ch.send(event); - } - EmittedEvents::ConnectionClosed { peer } => { - // sometimes this event triggers without Disconnected being triggered. - // need to hang_up here as well. - active_call.connected_participants.insert(peer.clone(), PeerState::Closed); - let all_closed = !active_call.connected_participants.iter().any(|(_k, v)| *v != PeerState::Closed); - if all_closed { - active_call.call_state = CallState::Closed; - } - // have to use data after active_call or there will be 2 mutable borrows, which isn't allowed - webrtc_controller.hang_up(&peer).await; - // only autoclose for 2-person calls (group or direct). - // library user should respond to CallTerminated event. - if all_closed && active_call.call.participants().len() == 2 { - log::info!("all participants have successfully been disconnected"); - if let Err(e) = webrtc_controller.deinit().await { - log::error!("webrtc deinit failed: {e}"); - } - //rtp_logger::deinit().await; - host_media::reset().await; - let event = BlinkEventKind::CallTerminated { call_id }; - let _ = ch.send(event); - // terminate the task on purpose. - return; - } - } - EmittedEvents::Disconnected { peer } - | EmittedEvents::ConnectionFailed { peer } => { - // todo: could need to retry - active_call.connected_participants.insert(peer.clone(), PeerState::Disconnected); - if let Err(e) = host_media::remove_sink_track(peer.clone()).await { - log::error!("failed to send media_track command: {e}"); - } - webrtc_controller.hang_up(&peer).await; - } - EmittedEvents::CallInitiated { dest, sdp } => { - let topic = ipfs_routes::peer_signal_route(&dest, &call_id); - let signal = PeerSignal::Dial(*sdp); - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); - } - } - EmittedEvents::Sdp { dest, sdp } => { - let topic = ipfs_routes::peer_signal_route(&dest, &call_id); - let signal = PeerSignal::Sdp(*sdp); - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); - } - } - EmittedEvents::Ice { dest, candidate } => { - let topic = ipfs_routes::peer_signal_route(&dest, &call_id); - let signal = PeerSignal::Ice(*candidate); - if let Err(e) = send_signal_ecdh(&ipfs, own_id, &dest, signal, topic).await { - log::error!("failed to send signal: {e}"); - } - } - } - } - None => todo!() - } - } - } - } -} diff --git a/extensions/warp-blink-wrtc/src/host_media/mod.rs b/extensions/warp-blink-wrtc/src/host_media/mod.rs index afc2c7715..c92f8d612 100644 --- a/extensions/warp-blink-wrtc/src/host_media/mod.rs +++ b/extensions/warp-blink-wrtc/src/host_media/mod.rs @@ -16,13 +16,16 @@ use webrtc::track::track_remote::TrackRemote; pub(crate) mod audio; use audio::{create_sink_track, create_source_track, AudioCodec, AudioHardwareConfig}; -use self::mp4_logger::Mp4LoggerConfig; +use self::audio::DeviceConfig; +use self::{audio::AudioSampleRate, mp4_logger::Mp4LoggerConfig}; pub(crate) mod mp4_logger; struct Data { audio_input_device: Option, audio_output_device: Option, + audio_source_config: AudioHardwareConfig, + audio_sink_config: AudioHardwareConfig, audio_source_track: Option>, audio_sink_tracks: HashMap>, recording: bool, @@ -36,6 +39,14 @@ static mut DATA: Lazy = Lazy::new(|| { Data { audio_input_device: cpal_host.default_input_device(), audio_output_device: cpal_host.default_output_device(), + audio_source_config: AudioHardwareConfig { + sample_rate: AudioSampleRate::High, + channels: 1, + }, + audio_sink_config: AudioHardwareConfig { + sample_rate: AudioSampleRate::High, + channels: 1, + }, audio_source_track: None, audio_sink_tracks: HashMap::new(), recording: false, @@ -85,7 +96,6 @@ pub async fn create_audio_source_track( event_ch: broadcast::Sender, track: Arc, webrtc_codec: AudioCodec, - source_config: AudioHardwareConfig, ) -> Result<(), Error> { let _lock = LOCK.write().await; let input_device = match unsafe { DATA.audio_input_device.as_ref() } { @@ -94,7 +104,7 @@ pub async fn create_audio_source_track( }; let muted = unsafe { DATA.muted }; - + let source_config = unsafe { DATA.audio_source_config.clone() }; let source_track = create_source_track( own_id, event_ch, @@ -140,7 +150,6 @@ pub async fn create_audio_sink_track( track: Arc, // the format to decode to. Opus supports encoding and decoding to arbitrary sample rates and number of channels. webrtc_codec: AudioCodec, - sink_config: AudioHardwareConfig, ) -> anyhow::Result<()> { let _lock = LOCK.write().await; let output_device = match unsafe { DATA.audio_output_device.as_ref() } { @@ -150,6 +159,7 @@ pub async fn create_audio_sink_track( } }; let deafened = unsafe { DATA.deafened }; + let sink_config = unsafe { DATA.audio_sink_config.clone() }; let sink_track = create_sink_track( peer_id.clone(), event_ch, @@ -183,12 +193,12 @@ pub async fn create_audio_sink_track( Ok(()) } -pub async fn change_audio_input( - device: cpal::Device, - source_config: AudioHardwareConfig, -) -> anyhow::Result<()> { +pub async fn change_audio_input(device: cpal::Device) -> anyhow::Result<()> { let _lock = LOCK.write().await; + let mut source_config = unsafe { DATA.audio_source_config.clone() }; + source_config.channels = get_min_source_channels(&device)?; + // change_input_device destroys the audio stream. if that function fails. there should be // no audio_input. unsafe { @@ -196,19 +206,32 @@ pub async fn change_audio_input( } if let Some(source) = unsafe { DATA.audio_source_track.as_mut() } { - source.change_input_device(&device, source_config)?; + source.change_input_device(&device, source_config.clone())?; } unsafe { DATA.audio_input_device.replace(device); + DATA.audio_source_config = source_config; } Ok(()) } -pub async fn change_audio_output( - device: cpal::Device, - sink_config: AudioHardwareConfig, -) -> anyhow::Result<()> { +pub async fn set_audio_source_config(source_config: AudioHardwareConfig) { let _lock = LOCK.write().await; + unsafe { + DATA.audio_source_config = source_config; + } +} + +pub async fn get_audio_source_config() -> AudioHardwareConfig { + let _lock = LOCK.write().await; + unsafe { DATA.audio_source_config.clone() } +} + +pub async fn change_audio_output(device: cpal::Device) -> anyhow::Result<()> { + let _lock = LOCK.write().await; + + let mut sink_config = unsafe { DATA.audio_sink_config.clone() }; + sink_config.channels = get_min_sink_channels(&device)?; // todo: if this fails, return an error or keep going? for (_k, v) in unsafe { DATA.audio_sink_tracks.iter_mut() } { @@ -219,10 +242,37 @@ pub async fn change_audio_output( unsafe { DATA.audio_output_device.replace(device); + DATA.audio_sink_config = sink_config; } Ok(()) } +pub async fn set_audio_sink_config(sink_config: AudioHardwareConfig) { + let _lock = LOCK.write().await; + unsafe { + DATA.audio_sink_config = sink_config; + } +} + +pub async fn get_audio_sink_config() -> AudioHardwareConfig { + let _lock = LOCK.write().await; + unsafe { DATA.audio_sink_config.clone() } +} + +pub async fn get_audio_device_config() -> DeviceConfig { + let _lock = LOCK.write().await; + unsafe { + DeviceConfig::new( + DATA.audio_input_device + .as_ref() + .map(|x| x.name().unwrap_or_default()), + DATA.audio_output_device + .as_ref() + .map(|x| x.name().unwrap_or_default()), + ) + } +} + pub async fn remove_sink_track(peer_id: DID) -> anyhow::Result<()> { let _lock = LOCK.write().await; unsafe { @@ -343,3 +393,30 @@ pub async fn set_peer_audio_gain(peer_id: DID, multiplier: f32) -> anyhow::Resul Ok(()) } + +fn get_min_source_channels(input_device: &cpal::Device) -> anyhow::Result { + let min_channels = input_device + .supported_input_configs()? + .fold(None, |acc: Option, x| match acc { + None => Some(x.channels()), + Some(y) => Some(std::cmp::min(x.channels(), y)), + }); + let channels = min_channels.ok_or(anyhow::anyhow!( + "unsupported audio input device - no input configuration available" + ))?; + Ok(channels) +} + +fn get_min_sink_channels(output_device: &cpal::Device) -> anyhow::Result { + let min_channels = + output_device + .supported_output_configs()? + .fold(None, |acc: Option, x| match acc { + None => Some(x.channels()), + Some(y) => Some(std::cmp::min(x.channels(), y)), + }); + let channels = min_channels.ok_or(anyhow::anyhow!( + "unsupported audio output device. no output configuration available" + ))?; + Ok(channels) +} diff --git a/extensions/warp-blink-wrtc/src/lib.rs b/extensions/warp-blink-wrtc/src/lib.rs index 29e4327a5..0d8c8b915 100644 --- a/extensions/warp-blink-wrtc/src/lib.rs +++ b/extensions/warp-blink-wrtc/src/lib.rs @@ -12,8 +12,6 @@ mod host_media; // mod rtp_logger; mod blink_impl; -mod signaling; mod simple_webrtc; -mod store; pub use blink_impl::*; diff --git a/extensions/warp-blink-wrtc/src/simple_webrtc/mod.rs b/extensions/warp-blink-wrtc/src/simple_webrtc/mod.rs index e0aa4d762..a59306648 100644 --- a/extensions/warp-blink-wrtc/src/simple_webrtc/mod.rs +++ b/extensions/warp-blink-wrtc/src/simple_webrtc/mod.rs @@ -17,7 +17,7 @@ //! use anyhow::{bail, Result}; -use futures::Stream; + use std::collections::HashMap; use std::sync::Arc; use tokio::sync::broadcast; @@ -51,7 +51,7 @@ pub mod events; pub use webrtc::rtp_transceiver::rtp_codec::RTCRtpCodecCapability; use webrtc::rtp_transceiver::rtp_sender::RTCRtpSender; -use self::events::EmittedEvents; +use self::events::{EmittedEvents, WebRtcEventStream}; /// simple-webrtc /// This library augments the [webrtc-rs](https://github.com/webrtc-rs/webrtc) library, hopefully @@ -141,7 +141,7 @@ impl Controller { bail!("peers is not empty after deinit") } } - pub fn get_event_stream(&self) -> anyhow::Result> { + pub fn get_event_stream(&self) -> WebRtcEventStream { let mut rx = self.event_ch.subscribe(); let stream = async_stream::stream! { loop { @@ -152,7 +152,7 @@ impl Controller { }; } }; - Ok(Box::pin(stream)) + WebRtcEventStream(Box::pin(stream)) } /// creates a RTCPeerConnection, sets the local SDP object, emits a CallInitiatedEvent, @@ -223,6 +223,10 @@ impl Controller { } } + pub fn is_connected(&self, peer_id: &DID) -> bool { + self.peers.contains_key(peer_id) + } + /// Spawns a MediaWorker which will receive RTP packets and forward them to all peers /// todo: the peers may want to agree on the MimeType pub async fn add_media_source( diff --git a/warp/src/blink/call_config.rs b/warp/src/blink/call_config.rs deleted file mode 100644 index dcc9a1938..000000000 --- a/warp/src/blink/call_config.rs +++ /dev/null @@ -1,10 +0,0 @@ -use crate::crypto::DID; - -#[derive(Default, Debug, Clone)] -pub struct CallConfig { - pub recording: bool, - pub self_muted: bool, - pub self_deafened: bool, - pub participants_muted: Vec, - pub participants_deafened: Vec, -} diff --git a/warp/src/blink/call_state.rs b/warp/src/blink/call_state.rs new file mode 100644 index 000000000..1b160d7ce --- /dev/null +++ b/warp/src/blink/call_state.rs @@ -0,0 +1,75 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::crypto::DID; + +#[derive(Debug, Clone)] +pub struct CallState { + pub own_id: DID, + pub participants_joined: HashMap, +} + +#[derive(Default, Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] +pub struct ParticipantState { + pub muted: bool, + pub deafened: bool, + pub recording: bool, +} + +impl CallState { + pub fn new(own_id: DID) -> Self { + Self { + own_id, + participants_joined: HashMap::default(), + } + } + pub fn add_participant(&mut self, id: &DID, state: ParticipantState) { + self.participants_joined.insert(id.clone(), state); + } + + pub fn is_call_empty(&self) -> bool { + self.participants_joined.is_empty() + } + + pub fn remove_participant(&mut self, id: &DID) { + self.participants_joined.remove(id); + } + + pub fn set_muted(&mut self, id: &DID, muted: bool) { + if let Some(participant) = self.participants_joined.get_mut(id) { + participant.muted = muted; + } + } + + pub fn set_deafened(&mut self, id: &DID, deafened: bool) { + if let Some(participant) = self.participants_joined.get_mut(id) { + participant.deafened = deafened; + } + } + + pub fn set_recording(&mut self, id: &DID, recording: bool) { + if let Some(participant) = self.participants_joined.get_mut(id) { + participant.recording = recording; + } + } + + pub fn set_self_muted(&mut self, muted: bool) { + let own_id = self.own_id.clone(); + self.set_muted(&own_id, muted); + } + + pub fn set_self_deafened(&mut self, deafened: bool) { + let own_id = self.own_id.clone(); + self.set_deafened(&own_id, deafened); + } + + pub fn set_self_recording(&mut self, recording: bool) { + let own_id = self.own_id.clone(); + self.set_recording(&own_id, recording); + } + + pub fn reset_self(&mut self) { + self.participants_joined.remove(&self.own_id); + } +} diff --git a/warp/src/blink/mod.rs b/warp/src/blink/mod.rs index 5d0927c0b..5ee3a147d 100644 --- a/warp/src/blink/mod.rs +++ b/warp/src/blink/mod.rs @@ -18,8 +18,8 @@ use mime_types::*; use uuid::Uuid; mod audio_config; pub use audio_config::*; -mod call_config; -pub use call_config::*; +mod call_state; +pub use call_state::*; use crate::{ crypto::DID, @@ -86,7 +86,7 @@ pub trait Blink: Sync + Send + SingleHandle + DynClone { async fn record_call(&mut self, output_dir: &str) -> Result<(), Error>; async fn stop_recording(&mut self) -> Result<(), Error>; - async fn get_call_config(&self) -> Result, Error>; + async fn get_call_state(&self) -> Result, Error>; fn enable_automute(&mut self) -> Result<(), Error>; fn disable_automute(&mut self) -> Result<(), Error>; @@ -133,14 +133,11 @@ pub enum BlinkEventKind { ParticipantSpeaking { peer_id: DID }, #[display(fmt = "SelfSpeaking")] SelfSpeaking, - #[display(fmt = "ParticipantMuted")] - ParticipantMuted { peer_id: DID }, - #[display(fmt = "ParticipantUnmuted")] - ParticipantUnmuted { peer_id: DID }, - #[display(fmt = "ParticipantDeafened")] - ParticipantDeafened { peer_id: DID }, - #[display(fmt = "ParticipantUndeafened")] - ParticipantUndeafened { peer_id: DID }, + #[display(fmt = "ParticipantStateChanged")] + ParticipantStateChanged { + peer_id: DID, + state: ParticipantState, + }, /// audio packets were dropped for the peer #[display(fmt = "AudioDegradation")] AudioDegradation { peer_id: DID }, @@ -152,7 +149,7 @@ pub enum BlinkEventKind { AudioStreamError, } -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct CallInfo { call_id: Uuid, conversation_id: Option, @@ -185,6 +182,10 @@ impl CallInfo { self.participants.clone() } + pub fn contains_participant(&self, id: &DID) -> bool { + self.participants.contains(id) + } + pub fn group_key(&self) -> Vec { self.group_key.clone() } diff --git a/warp/src/error.rs b/warp/src/error.rs index ad59180ad..5a9e06ede 100644 --- a/warp/src/error.rs +++ b/warp/src/error.rs @@ -224,8 +224,12 @@ pub enum Error { AudioHostError(String), #[error("BlinkNotInitialized")] BlinkNotInitialized, + #[error("CallNotFound")] + CallNotFound, #[error("CallNotInProgress")] CallNotInProgress, + #[error("CallAlreadyInProgress")] + CallAlreadyInProgress, #[error("FailedToSendSignal: {_0}")] FailedToSendSignal(String), #[error("Invalid MIME type: {_0}")]