diff --git a/extensions/warp-blink-wrtc/src/host_media/mod.rs b/extensions/warp-blink-wrtc/src/host_media/mod.rs index bbb347bda..dc384202b 100644 --- a/extensions/warp-blink-wrtc/src/host_media/mod.rs +++ b/extensions/warp-blink-wrtc/src/host_media/mod.rs @@ -25,7 +25,9 @@ struct Data { audio_output_device: Option, audio_source_track: Option>, audio_sink_tracks: HashMap>, - is_recording: bool, + recording: bool, + muted: bool, + deafened: bool, } static LOCK: Lazy> = Lazy::new(|| RwLock::new(())); @@ -36,7 +38,9 @@ static mut DATA: Lazy = Lazy::new(|| { audio_output_device: cpal_host.default_output_device(), audio_source_track: None, audio_sink_tracks: HashMap::new(), - is_recording: false, + recording: false, + muted: false, + deafened: false, } }); @@ -61,7 +65,9 @@ pub async fn reset() { unsafe { DATA.audio_source_track.take(); DATA.audio_sink_tracks.clear(); - DATA.is_recording = false; + DATA.recording = false; + DATA.muted = false; + DATA.deafened = false; } mp4_logger::deinit().await; } @@ -89,6 +95,8 @@ pub async fn create_audio_source_track( } }; + let muted = unsafe { DATA.muted }; + let source_track = create_source_track( own_id, event_ch, @@ -98,9 +106,12 @@ pub async fn create_audio_source_track( source_config, ) .map_err(|e| anyhow::anyhow!("{e}: failed to create source track"))?; - source_track - .play() - .map_err(|e| anyhow::anyhow!("{e}: failed to play source track"))?; + + if !muted { + source_track + .play() + .map_err(|e| anyhow::anyhow!("{e}: failed to play source track"))?; + } unsafe { if let Some(mut track) = DATA.audio_source_track.replace(source_track) { @@ -109,7 +120,7 @@ pub async fn create_audio_source_track( log::error!("failed to remove mp4 logger when replacing source track: {e}"); } } - if DATA.is_recording { + if DATA.recording { if let Some(source_track) = DATA.audio_source_track.as_mut() { if let Err(e) = source_track.init_mp4_logger() { log::error!("failed to init mp4 logger for sink track: {e}"); @@ -143,7 +154,7 @@ pub async fn create_audio_sink_track( bail!("no audio output device selected"); } }; - + let deafened = unsafe { DATA.deafened }; let sink_track = create_sink_track( peer_id.clone(), event_ch, @@ -152,9 +163,13 @@ pub async fn create_audio_sink_track( webrtc_codec, sink_config, )?; - sink_track - .play() - .map_err(|e| anyhow::anyhow!("{e}: failed to play sink track"))?; + + if !deafened { + sink_track + .play() + .map_err(|e| anyhow::anyhow!("{e}: failed to play sink track"))?; + } + unsafe { // don't want two tracks logging at the same time if let Some(mut track) = DATA.audio_sink_tracks.insert(peer_id.clone(), sink_track) { @@ -162,7 +177,7 @@ pub async fn create_audio_sink_track( log::error!("failed to remove mp4 logger when replacing sink track: {e}"); } } - if DATA.is_recording { + if DATA.recording { if let Some(sink_track) = DATA.audio_sink_tracks.get_mut(&peer_id) { if let Err(e) = sink_track.init_mp4_logger() { log::error!("failed to init mp4 logger for sink track: {e}"); @@ -221,45 +236,57 @@ pub async fn remove_sink_track(peer_id: DID) -> anyhow::Result<()> { Ok(()) } -pub async fn mute_peer(peer_id: DID) -> anyhow::Result<()> { +pub async fn mute_self() -> anyhow::Result<()> { let _lock = LOCK.write().await; - if let Some(track) = unsafe { DATA.audio_sink_tracks.get_mut(&peer_id) } { + unsafe { + DATA.muted = true; + } + if let Some(track) = unsafe { DATA.audio_source_track.as_mut() } { track .pause() .map_err(|e| anyhow::anyhow!("failed to pause (mute) track: {e}"))?; } - Ok(()) } -pub async fn unmute_peer(peer_id: DID) -> anyhow::Result<()> { +pub async fn unmute_self() -> anyhow::Result<()> { let _lock = LOCK.write().await; - if let Some(track) = unsafe { DATA.audio_sink_tracks.get_mut(&peer_id) } { + unsafe { + DATA.muted = false; + } + if let Some(track) = unsafe { DATA.audio_source_track.as_mut() } { track .play() .map_err(|e| anyhow::anyhow!("failed to play (unmute) track: {e}"))?; } - Ok(()) } -pub async fn mute_self() -> anyhow::Result<()> { +pub async fn deafen() -> anyhow::Result<()> { let _lock = LOCK.write().await; - if let Some(track) = unsafe { DATA.audio_source_track.as_mut() } { - track - .pause() - .map_err(|e| anyhow::anyhow!("failed to pause (mute) track: {e}"))?; + unsafe { + DATA.deafened = true; + for (_id, track) in DATA.audio_sink_tracks.iter() { + track + .pause() + .map_err(|e| anyhow::anyhow!("failed to pause (mute) track: {e}"))?; + } } + Ok(()) } -pub async fn unmute_self() -> anyhow::Result<()> { +pub async fn undeafen() -> anyhow::Result<()> { let _lock = LOCK.write().await; - if let Some(track) = unsafe { DATA.audio_source_track.as_mut() } { - track - .play() - .map_err(|e| anyhow::anyhow!("failed to play (unmute) track: {e}"))?; + unsafe { + DATA.deafened = false; + for (_id, track) in DATA.audio_sink_tracks.iter() { + track + .play() + .map_err(|e| anyhow::anyhow!("failed to play (unmute) track: {e}"))?; + } } + Ok(()) } @@ -271,17 +298,17 @@ pub async fn init_recording(config: Mp4LoggerConfig) -> anyhow::Result<()> { let _lock = LOCK.write().await; unsafe { - if DATA.is_recording { + if DATA.recording { // this function was called twice for the same call. assume they mean to resume mp4_logger::resume(); return Ok(()); } - DATA.is_recording = true; + DATA.recording = true; } mp4_logger::init(config).await?; unsafe { - DATA.is_recording = true; + DATA.recording = true; } for track in unsafe { DATA.audio_sink_tracks.values_mut() } { diff --git a/extensions/warp-blink-wrtc/src/lib.rs b/extensions/warp-blink-wrtc/src/lib.rs index ee1f8cac2..e98f7bd99 100644 --- a/extensions/warp-blink-wrtc/src/lib.rs +++ b/extensions/warp-blink-wrtc/src/lib.rs @@ -47,7 +47,7 @@ use tokio::{ }; use uuid::Uuid; use warp::{ - blink::{AudioDeviceConfig, Blink, BlinkEventKind, BlinkEventStream, CallInfo}, + blink::{AudioDeviceConfig, Blink, BlinkEventKind, BlinkEventStream, CallConfig, CallInfo}, crypto::{did_key::Generate, zeroize::Zeroizing, DIDKey, Ed25519KeyPair, Fingerprint, DID}, error::Error, module::Module, @@ -93,6 +93,7 @@ struct ActiveCall { call: CallInfo, connected_participants: HashMap, call_state: CallState, + call_config: CallConfig, } #[derive(Clone, Eq, PartialEq)] @@ -119,6 +120,7 @@ impl From for ActiveCall { call: value, connected_participants: HashMap::new(), call_state: CallState::Uninitialized, + call_config: CallConfig::default(), } } } @@ -629,7 +631,27 @@ async fn handle_webrtc(params: WebRtcHandlerParams, mut webrtc_event_stream: Web } webrtc_controller.write().await.hang_up(&sender).await; if let Err(e) = ch.send(BlinkEventKind::ParticipantLeft { call_id, peer_id: sender }) { - log::error!("failed to send ParticipantLeft Event: {e}"); + log::error!("failed to send ParticipantLeft event: {e}"); + } + }, + CallSignal::Muted => { + if let Err(e) = ch.send(BlinkEventKind::ParticipantMuted { peer_id: sender }) { + log::error!("failed to send ParticipantMuted event: {e}"); + } + }, + CallSignal::Unmuted => { + if let Err(e) = ch.send(BlinkEventKind::ParticipantUnmuted { peer_id: sender }) { + log::error!("failed to send ParticipantUnmuted event: {e}"); + } + } + CallSignal::Deafened => { + if let Err(e) = ch.send(BlinkEventKind::ParticipantDeafened { peer_id: sender }) { + log::error!("failed to send ParticipantDeafened event: {e}"); + } + }, + CallSignal::Undeafened => { + if let Err(e) = ch.send(BlinkEventKind::ParticipantUndeafened { peer_id: sender }) { + log::error!("failed to send ParticipantUndeafened event: {e}"); } }, } @@ -880,9 +902,7 @@ impl Blink for BlinkImpl { mut participants: Vec, ) -> Result { if self.ipfs.read().await.is_none() { - return Err(Error::OtherWithContext( - "no ipfs - received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } if let Some(ac) = self.active_call.read().await.as_ref() { if ac.call_state != CallState::Closed { @@ -896,9 +916,7 @@ impl Blink for BlinkImpl { let own_id = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "no own_id - received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; @@ -914,18 +932,14 @@ impl Blink for BlinkImpl { let own_id = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "no own_id - received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; let lock = self.ipfs.read().await; let ipfs = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "no ipfs - received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; for dest in participants { @@ -946,9 +960,7 @@ impl Blink for BlinkImpl { /// accept/join a call. Automatically send and receive audio async fn answer_call(&mut self, call_id: Uuid) -> Result<(), Error> { if self.ipfs.read().await.is_none() { - return Err(Error::OtherWithContext( - "received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } if let Some(ac) = self.active_call.read().await.as_ref() { if ac.call_state != CallState::Closed { @@ -975,9 +987,7 @@ impl Blink for BlinkImpl { Some(r) => r, None => { // should never happen - return Err(Error::OtherWithContext( - "received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; if let Err(e) = send_signal_aes(ipfs, &call.group_key(), signal, topic).await { @@ -991,9 +1001,7 @@ impl Blink for BlinkImpl { let ipfs = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; if let Some(pc) = self.pending_calls.write().await.remove(&call_id) { @@ -1015,9 +1023,7 @@ impl Blink for BlinkImpl { let own_id = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; @@ -1025,9 +1031,7 @@ impl Blink for BlinkImpl { let ipfs = match lock.as_ref() { Some(r) => r, None => { - return Err(Error::OtherWithContext( - "received signal before blink is initialized".into(), - )); + return Err(Error::BlinkNotInitialized); } }; if let Some(ac) = self.active_call.write().await.as_mut() { @@ -1050,7 +1054,6 @@ impl Blink for BlinkImpl { }; let call_id = ac.call.call_id(); - let topic = ipfs_routes::call_signal_route(&call_id); let signal = CallSignal::Leave { call_id }; if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { @@ -1126,15 +1129,125 @@ impl Blink for BlinkImpl { // ------ Media controls ------ async fn mute_self(&mut self) -> Result<(), Error> { - host_media::mute_self() - .await - .map_err(|e| warp::error::Error::OtherWithContext(e.to_string())) + if self.active_call.read().await.is_none() { + return Err(Error::CallNotInProgress); + } + host_media::mute_self().await?; + + let lock = self.ipfs.read().await; + let ipfs = match lock.as_ref() { + Some(r) => r, + None => { + return Err(Error::BlinkNotInitialized); + } + }; + + if let Some(ac) = self.active_call.write().await.as_mut() { + ac.call_config.self_muted = true; + let call_id = ac.call.call_id(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Muted; + if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { + log::error!("failed to send signal: {e}"); + } else { + log::debug!("sent signal to mute self"); + } + } + + Ok(()) } async fn unmute_self(&mut self) -> Result<(), Error> { - host_media::unmute_self() + if self.active_call.read().await.is_none() { + return Err(Error::CallNotInProgress); + } + host_media::unmute_self().await?; + + let lock = self.ipfs.read().await; + let ipfs = match lock.as_ref() { + Some(r) => r, + None => { + return Err(Error::BlinkNotInitialized); + } + }; + + if let Some(ac) = self.active_call.write().await.as_mut() { + ac.call_config.self_muted = false; + let call_id = ac.call.call_id(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Unmuted; + if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { + log::error!("failed to send signal: {e}"); + } else { + log::debug!("sent signal to unmute self"); + } + } + + Ok(()) + } + async fn silence_call(&mut self) -> Result<(), Error> { + if self.active_call.read().await.is_none() { + return Err(Error::CallNotInProgress); + } + host_media::deafen().await?; + let lock = self.ipfs.read().await; + let ipfs = match lock.as_ref() { + Some(r) => r, + None => { + return Err(Error::BlinkNotInitialized); + } + }; + + if let Some(ac) = self.active_call.write().await.as_mut() { + ac.call_config.self_deafened = true; + let call_id = ac.call.call_id(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Deafened; + if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { + log::error!("failed to send signal: {e}"); + } else { + log::debug!("sent signal to deafen self"); + } + } + + Ok(()) + } + async fn unsilence_call(&mut self) -> Result<(), Error> { + if self.active_call.read().await.is_none() { + return Err(Error::CallNotInProgress); + } + host_media::undeafen().await?; + let lock = self.ipfs.read().await; + let ipfs = match lock.as_ref() { + Some(r) => r, + None => { + return Err(Error::BlinkNotInitialized); + } + }; + + if let Some(ac) = self.active_call.write().await.as_mut() { + ac.call_config.self_deafened = false; + let call_id = ac.call.call_id(); + let topic = ipfs_routes::call_signal_route(&call_id); + let signal = CallSignal::Undeafened; + if let Err(e) = send_signal_aes(ipfs, &ac.call.group_key(), signal, topic).await { + log::error!("failed to send signal: {e}"); + } else { + log::debug!("sent signal to undeafen self"); + } + } + + Ok(()) + } + + async fn get_call_config(&self) -> Result, Error> { + Ok(self + .active_call + .read() .await - .map_err(|e| warp::error::Error::OtherWithContext(e.to_string())) + .as_ref() + .map(|x| x.call_config.clone())) } + async fn enable_camera(&mut self) -> Result<(), Error> { Err(Error::Unimplemented) } @@ -1143,7 +1256,7 @@ impl Blink for BlinkImpl { } async fn record_call(&mut self, output_dir: &str) -> Result<(), Error> { match self.active_call.read().await.as_ref() { - None => return Err(Error::OtherWithContext("no call to record".into())), + None => return Err(Error::CallNotInProgress), Some(ActiveCall { call, .. }) => { host_media::init_recording(Mp4LoggerConfig { call_id: call.call_id(), @@ -1159,7 +1272,7 @@ impl Blink for BlinkImpl { } async fn stop_recording(&mut self) -> Result<(), Error> { match self.active_call.read().await.as_ref() { - None => return Err(Error::OtherWithContext("no call to pause".into())), + None => return Err(Error::CallNotInProgress), Some(_) => { host_media::pause_recording().await?; } diff --git a/extensions/warp-blink-wrtc/src/signaling.rs b/extensions/warp-blink-wrtc/src/signaling.rs index 9cee7da5b..323c2db65 100644 --- a/extensions/warp-blink-wrtc/src/signaling.rs +++ b/extensions/warp-blink-wrtc/src/signaling.rs @@ -28,6 +28,15 @@ pub enum CallSignal { Join { call_id: Uuid }, #[display(fmt = "Leave")] Leave { call_id: Uuid }, + + #[display(fmt = "Muted")] + Muted, + #[display(fmt = "Unmuted")] + Unmuted, + #[display(fmt = "Deafened")] + Deafened, + #[display(fmt = "Undeafened")] + Undeafened, } #[derive(Serialize, Deserialize, Display)] diff --git a/tools/blink-repl/src/main.rs b/tools/blink-repl/src/main.rs index 4510da682..0f5480289 100644 --- a/tools/blink-repl/src/main.rs +++ b/tools/blink-repl/src/main.rs @@ -59,6 +59,10 @@ enum Repl { MuteSelf, /// unmute self UnmuteSelf, + /// silence the call + Deafen, + /// unsilence the call + Undeafen, /// enable automute (enabled by default) EnableAutomute, /// disable automute @@ -157,6 +161,8 @@ async fn handle_command( Repl::UnmuteSelf => { blink.unmute_self().await?; } + Repl::Deafen => blink.silence_call().await?, + Repl::Undeafen => blink.unsilence_call().await?, Repl::EnableAutomute => { blink.enable_automute()?; } diff --git a/warp/src/blink/call_config.rs b/warp/src/blink/call_config.rs new file mode 100644 index 000000000..dcc9a1938 --- /dev/null +++ b/warp/src/blink/call_config.rs @@ -0,0 +1,10 @@ +use crate::crypto::DID; + +#[derive(Default, Debug, Clone)] +pub struct CallConfig { + pub recording: bool, + pub self_muted: bool, + pub self_deafened: bool, + pub participants_muted: Vec, + pub participants_deafened: Vec, +} diff --git a/warp/src/blink/mod.rs b/warp/src/blink/mod.rs index b6497a9f4..d3a662594 100644 --- a/warp/src/blink/mod.rs +++ b/warp/src/blink/mod.rs @@ -18,6 +18,8 @@ use mime_types::*; use uuid::Uuid; mod audio_config; pub use audio_config::*; +mod call_config; +pub use call_config::*; use crate::{ crypto::DID, @@ -77,11 +79,15 @@ pub trait Blink: Sync + Send + SingleHandle + DynClone { async fn mute_self(&mut self) -> Result<(), Error>; async fn unmute_self(&mut self) -> Result<(), Error>; + async fn silence_call(&mut self) -> Result<(), Error>; + async fn unsilence_call(&mut self) -> Result<(), Error>; async fn enable_camera(&mut self) -> Result<(), Error>; async fn disable_camera(&mut self) -> Result<(), Error>; async fn record_call(&mut self, output_dir: &str) -> Result<(), Error>; async fn stop_recording(&mut self) -> Result<(), Error>; + async fn get_call_config(&self) -> Result, Error>; + fn enable_automute(&mut self) -> Result<(), Error>; fn disable_automute(&mut self) -> Result<(), Error>; @@ -127,6 +133,14 @@ pub enum BlinkEventKind { ParticipantSpeaking { peer_id: DID }, #[display(fmt = "SelfSpeaking")] SelfSpeaking, + #[display(fmt = "ParticipantMuted")] + ParticipantMuted { peer_id: DID }, + #[display(fmt = "ParticipantUnmuted")] + ParticipantUnmuted { peer_id: DID }, + #[display(fmt = "ParticipantDeafened")] + ParticipantDeafened { peer_id: DID }, + #[display(fmt = "ParticipantUndeafened")] + ParticipantUndeafened { peer_id: DID }, /// audio packets were dropped for the peer #[display(fmt = "AudioDegradation")] AudioDegradation { peer_id: DID }, diff --git a/warp/src/error.rs b/warp/src/error.rs index 2df70f62d..950297533 100644 --- a/warp/src/error.rs +++ b/warp/src/error.rs @@ -222,6 +222,10 @@ pub enum Error { // indicates a problem enumerating audio I/O devices #[error("AudioHostError: {_0}")] AudioHostError(String), + #[error("CallNotInProgress")] + CallNotInProgress, + #[error("BlinkNotInitialized")] + BlinkNotInitialized, //Misc #[error("Length for '{context}' is invalid. Current length: {current}. Minimum Length: {minimum:?}, Maximum: {maximum:?}")]