Skip to content

Commit

Permalink
make gossipsub listener's send signal functions async
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 9, 2023
1 parent 32412e2 commit 2d2fb43
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 35 deletions.
47 changes: 18 additions & 29 deletions extensions/warp-blink-wrtc/src/blink_impl/blink_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,14 @@ async fn run(
mut cmd_rx: UnboundedReceiver<Cmd>,
notify: Arc<Notify>,
) {
let own_id = {
let notify2 = notify.clone();
let fut = gossipsub_sender.get_own_id();
tokio::select! {
_ = notify2.notified() => {
log::debug!("quitting blink event handler");
return;
}
r = fut => {
match r {
Ok(r) => r,
Err(e) => {
log::debug!("failed to get own id. quitting blink event handler: {e}");
return;
}
}
}
let own_id = match gossipsub_sender.get_own_id().await {
Ok(r) => r,
Err(e) => {
log::error!("failed to get own id. quitting blnk controller");
return;
}
};

// prevent accidental moves
let own_id = &own_id;

Expand Down Expand Up @@ -340,7 +329,7 @@ async fn run(
call_info: call_info.clone(),
};

if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
log::error!("failed to send signal: {e}");
}
}
Expand Down Expand Up @@ -406,7 +395,7 @@ async fn run(
let signal = CallSignal::Join;
if let Err(e) =
gossipsub_sender
.send_signal_aes(call_info.group_key(), signal, topic)
.send_signal_aes(call_info.group_key(), signal, topic).await
{
let _ = rsp.send(Err(Error::FailedToSendSignal(e.to_string())));
} else {
Expand Down Expand Up @@ -452,7 +441,7 @@ async fn run(
let topic = ipfs_routes::call_signal_route(&call_id);
let signal = CallSignal::Leave;
if let Err(e) = gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic)
.send_signal_aes(info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
}
Expand All @@ -470,7 +459,7 @@ async fn run(
let signal = CallSignal::Muted;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic)
.send_signal_aes(data.info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
} else {
Expand All @@ -486,7 +475,7 @@ async fn run(
let signal = CallSignal::Unmuted;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic)
.send_signal_aes(data.info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
} else {
Expand All @@ -505,7 +494,7 @@ async fn run(
let signal = CallSignal::Deafened;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic)
.send_signal_aes(data.info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
}
Expand All @@ -522,7 +511,7 @@ async fn run(
let signal = CallSignal::Undeafened;
if let Err(e) =
gossipsub_sender
.send_signal_aes(data.info.group_key(), signal, topic)
.send_signal_aes(data.info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -564,7 +553,7 @@ async fn run(
let signal = CallSignal::Recording;
if let Err(e) =
gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic)
.send_signal_aes(info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -592,7 +581,7 @@ async fn run(
let signal = CallSignal::NotRecording;
if let Err(e) =
gossipsub_sender
.send_signal_aes(info.group_key(), signal, topic)
.send_signal_aes(info.group_key(), signal, topic).await
{
log::error!("failed to send signal: {e}");
}
Expand Down Expand Up @@ -760,7 +749,7 @@ async fn run(
simple_webrtc::events::EmittedEvents::Ice { dest, candidate } => {
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Ice(*candidate);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
log::error!("failed to send signal: {e}");
}
},
Expand Down Expand Up @@ -806,15 +795,15 @@ async fn run(
simple_webrtc::events::EmittedEvents::Sdp { dest, sdp } => {
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Sdp(*sdp);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
log::error!("failed to send signal: {e}");
}
},
simple_webrtc::events::EmittedEvents::CallInitiated { dest, sdp } => {
log::debug!("sending dial signal");
let topic = ipfs_routes::peer_signal_route(&dest, &active_call.unwrap_or_default());
let signal = PeerSignal::Dial(*sdp);
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic) {
if let Err(e) = gossipsub_sender.send_signal_ecdh(dest, signal, topic).await {
log::error!("failed to send signal: {e}");
}
},
Expand Down
26 changes: 20 additions & 6 deletions extensions/warp-blink-wrtc/src/blink_impl/gossipsub_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ enum GossipSubCmd {
group_key: Vec<u8>,
signal: Vec<u8>,
topic: String,
rsp: oneshot::Sender<anyhow::Result<()>>,
},
SendEcdh {
dest: DID,
signal: Vec<u8>,
topic: String,
rsp: oneshot::Sender<anyhow::Result<()>>,
},
DecodeEcdh {
src: DID,
Expand Down Expand Up @@ -70,35 +72,39 @@ impl GossipSubSender {
Ok(id)
}

pub fn send_signal_aes<T: Serialize + Display>(
pub async fn send_signal_aes<T: Serialize + Display>(
&self,
group_key: Vec<u8>,
signal: T,
topic: String,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let signal = serde_cbor::to_vec(&signal)?;
self.ch.send(GossipSubCmd::SendAes {
group_key,
signal,
topic,
rsp: tx,
})?;

rx.await??;
Ok(())
}

pub fn send_signal_ecdh<T: Serialize + Display>(
pub async fn send_signal_ecdh<T: Serialize + Display>(
&self,
dest: DID,
signal: T,
topic: String,
) -> anyhow::Result<()> {
let (tx, rx) = oneshot::channel();
let signal = serde_cbor::to_vec(&signal)?;
self.ch.send(GossipSubCmd::SendEcdh {
dest,
signal,
topic,
rsp: tx,
})?;

rx.await??;
Ok(())
}

Expand Down Expand Up @@ -176,28 +182,36 @@ async fn run(
GossipSubCmd::GetOwnId { rsp } => {
let _ = rsp.send(own_id.clone());
}
GossipSubCmd::SendAes { group_key, signal, topic } => {
GossipSubCmd::SendAes { group_key, signal, topic, rsp } => {
let encrypted = match Cipher::direct_encrypt(&signal, &group_key) {
Ok(r) => r,
Err(e) => {
log::error!("failed to encrypt aes message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
continue;
}
};
if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await {
log::error!("failed to publish message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
} else {
let _ = rsp.send(Ok(()));
}
},
GossipSubCmd::SendEcdh { dest, signal, topic } => {
GossipSubCmd::SendEcdh { dest, signal, topic, rsp } => {
let encrypted = match ecdh_encrypt(&own_id, &dest, signal) {
Ok(r) => r,
Err(e) => {
log::error!("failed to encrypt ecdh message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
continue;
}
};
if let Err(e) = ipfs.pubsub_publish(topic, encrypted).await {
log::error!("failed to publish message: {e}");
let _ = rsp.send(Err(anyhow::anyhow!(e)));
}else {
let _ = rsp.send(Ok(()));
}
}
GossipSubCmd::DecodeEcdh { src, data, rsp } => {
Expand Down
1 change: 1 addition & 0 deletions extensions/warp-blink-wrtc/src/blink_impl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod data;
mod blink_controller;
mod gossipsub_listener;
mod gossipsub_sender;
mod sender_queue;
mod signaling;
mod store;

Expand Down
Empty file.

0 comments on commit 2d2fb43

Please sign in to comment.