Skip to content

Commit

Permalink
add queue to gossipsub sender
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 9, 2023
1 parent 2d2fb43 commit d05a131
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 44 deletions.
26 changes: 14 additions & 12 deletions extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ async fn run(
call_info: call_info.clone(),
};

if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
log::error!("failed to send signal: {e}");
}
}
Expand Down Expand Up @@ -395,7 +395,7 @@ async fn run(
let signal = CallSignal::Join;
if let Err(e) =
gossipsub_sender
.send_signal_aes(call_info.group_key(), signal, topic).await
.send_signal_aes(call_info.group_key(), signal, topic)
{
let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string())));
} else {
Expand Down Expand Up @@ -428,6 +428,7 @@ async fn run(
let info = call_data_map.get_call_info(call_id);
if active_call.as_ref().map(|x| x == &call_id).unwrap_or_default() {
call_data_map.leave_call(call_id);
let _ = gossipsub_sender.empty_queue();
let _ = active_call.take();
let _ = webrtc_controller.deinit().await;
host_media::reset().await;
Expand All @@ -441,7 +442,7 @@ async fn run(
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Leave;
if let Err(e) = gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic).await
.send_signal_aes(info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
}
Expand All @@ -459,7 +460,7 @@ async fn run(
let signal = CallSignal::Muted;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic).await
.send_signal_aes(data.info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
} else {
Expand All @@ -475,7 +476,7 @@ async fn run(
let signal = CallSignal::Unmuted;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic).await
.send_signal_aes(data.info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
} else {
Expand All @@ -494,7 +495,7 @@ async fn run(
let signal = CallSignal::Deafened;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic).await
.send_signal_aes(data.info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
}
Expand All @@ -511,7 +512,7 @@ async fn run(
let signal = CallSignal::Undeafened;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic).await
.send_signal_aes(data.info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -553,7 +554,7 @@ async fn run(
let signal = CallSignal::Recording;
if let Err(e) =
gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic).await
.send_signal_aes(info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -581,7 +582,7 @@ async fn run(
let signal = CallSignal::NotRecording;
if let Err(e) =
gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic).await
.send_signal_aes(info.group_key(), signal, topic)
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -749,7 +750,7 @@ async fn run(
simple_webrtc::events::EmittedEvents::Ice { dest, candidate } => {
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Ice(*candidate);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
log::error!("failed to send signal: {e}");
}
},
Expand Down Expand Up @@ -789,21 +790,22 @@ async fn run(

gossipsub_listener.unsubscribe_call(ac);
gossipsub_listener.unsubscribe_webrtc(ac);
let _ = gossipsub_sender.empty_queue();
}
}
},
simple_webrtc::events::EmittedEvents::Sdp { dest, sdp } => {
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Sdp(*sdp);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
log::error!("failed to send signal: {e}");
}
},
simple_webrtc::events::EmittedEvents::CallInitiated { dest, sdp } => {
log::debug!("sending dial signal");
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Dial(*sdp);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
log::error!("failed to send signal: {e}");
}
},
Expand Down
77 changes: 46 additions & 31 deletions extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use std::{fmt::Display, sync::Arc, time::Duration};
use std::{
collections::{HashMap, VecDeque},
fmt::Display,
sync::Arc,
time::Duration,
};

use futures::channel::oneshot;
use rust_ipfs::Ipfs;
Expand All @@ -24,13 +29,11 @@ enum GossipSubCmd {
group_key: Vec<u8>,
signal: Vec<u8>,
topic: String,
rsp: oneshot::Sender<anyhow::Result<()>>,
},
SendEcdh {
dest: DID,
signal: Vec<u8>,
topic: String,
rsp: oneshot::Sender<anyhow::Result<()>>,
},
DecodeEcdh {
src: DID,
Expand All @@ -40,6 +43,7 @@ enum GossipSubCmd {
GetOwnId {
rsp: oneshot::Sender<DID>,
},
EmptyQueue,
}

#[derive(Clone)]
Expand Down Expand Up @@ -72,39 +76,33 @@ impl GossipSubSender {
Ok(id)
}

pub async fn send_signal_aes<T: Serialize + Display>(
pub fn send_signal_aes<T: Serialize + Display>(
&self,
group_key: Vec<u8>,
signal: T,
topic: String,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let signal = serde_cbor::to_vec(&signal)?;
self.ch.send(GossipSubCmd::SendAes {
group_key,
signal,
topic,
rsp: tx,
})?;
rx.await??;
Ok(())
}

pub async fn send_signal_ecdh<T: Serialize + Display>(
pub fn send_signal_ecdh<T: Serialize + Display>(
&self,
dest: DID,
signal: T,
topic: String,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let signal = serde_cbor::to_vec(&signal)?;
self.ch.send(GossipSubCmd::SendEcdh {
dest,
signal,
topic,
rsp: tx,
})?;
rx.await??;
Ok(())
}

Expand Down Expand Up @@ -134,6 +132,11 @@ impl GossipSubSender {
let data: T = serde_cbor::from_slice(&bytes)?;
Ok(data)
}

pub fn empty_queue(&self) -> anyhow::Result<()> {
self.ch.send(GossipSubCmd::EmptyQueue)?;
Ok(())
}
}

async fn run(
Expand Down Expand Up @@ -175,44 +178,56 @@ async fn run(
}
};

let mut ecdh_queue: HashMap<DID, VecDeque<GossipSubCmd>> = HashMap::new();

loop {
tokio::select! {
_ = timer.tick() => {
for (_dest, queue) in ecdh_queue.iter_mut() {
while let Some(cmd) = queue.pop_front() {
match cmd {
GossipSubCmd::SendEcdh { dest, signal, topic } => {
let encrypted = match ecdh_encrypt(&own_id, &dest, signal.clone()) {
Ok(r) => r,
Err(e) => {
log::error!("failed to encrypt ecdh message: {e}");
break;
}
};
if ipfs.pubsub_publish(topic.clone(), encrypted).await.is_err() {
queue.push_front(GossipSubCmd::SendEcdh { dest, signal, topic });
break;
}
}
_ => {}
}
}
}
}
opt = ch.recv() => match opt {
Some(cmd) => match cmd {
GossipSubCmd::EmptyQueue => {
ecdh_queue.clear();
}
GossipSubCmd::GetOwnId { rsp } => {
let _ = rsp.send(own_id.clone());
}
GossipSubCmd::SendAes { group_key, signal, topic, rsp } => {
GossipSubCmd::SendAes { group_key, signal, topic } => {
let encrypted = match Cipher::direct_encrypt(&signal, &group_key) {
Ok(r) => r,
Err(e) => {
log::error!("failed to encrypt aes message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
continue;
}
};
if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await {
log::error!("failed to publish message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
} else {
let _ = rsp.send(Ok(()));

}
},
GossipSubCmd::SendEcdh { dest, signal, topic, rsp } => {
let encrypted = match ecdh_encrypt(&own_id, &dest, signal) {
Ok(r) => r,
Err(e) => {
log::error!("failed to encrypt ecdh message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
continue;
}
};
if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await {
log::error!("failed to publish message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
}else {
let _ = rsp.send(Ok(()));
}
GossipSubCmd::SendEcdh { dest, signal, topic } => {
let queue = ecdh_queue.entry(dest.clone()).or_insert(VecDeque::new());
queue.push_back(GossipSubCmd::SendEcdh { dest, signal, topic });
}
GossipSubCmd::DecodeEcdh { src, data, rsp } => {
let r = || {
Expand Down
1 change: 0 additions & 1 deletion extensions/warp-blink-wrtc/src/blink_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ mod data;
mod blink_controller;
mod gossipsub_listener;
mod gossipsub_sender;
mod sender_queue;
mod signaling;
mod store;

Expand Down
Empty file.

0 comments on commit d05a131

Please sign in to comment.