Skip to content

Commit

Permalink
add handling for data channels
Browse files Browse the repository at this point in the history
  • Loading branch information
sdwoodbury committed Nov 2, 2023
1 parent ece6cfe commit b4c664e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
3 changes: 3 additions & 0 deletions extensions/warp-blink-wrtc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,9 @@ async fn handle_webrtc(params: WebRtcHandlerParams, mut webrtc_event_stream: Web
log::error!("failed to send signal: {e}");
}
}
EmittedEvents::DataChannelCreated { peer, data_channel } => {},
EmittedEvents::DataChannelOpened { peer } => {}
EmittedEvents::DataChannelClosed { peer } => {}
}
}
None => todo!()
Expand Down
21 changes: 19 additions & 2 deletions extensions/warp-blink-wrtc/src/simple_webrtc/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use futures::stream::BoxStream;
use warp::crypto::DID;
use webrtc::{
ice_transport::ice_candidate::RTCIceCandidate,
data_channel::RTCDataChannel, ice_transport::ice_candidate::RTCIceCandidate,
peer_connection::sdp::session_description::RTCSessionDescription,
track::track_remote::TrackRemote,
};
Expand All @@ -23,7 +23,7 @@ impl core::ops::DerefMut for WebRtcEventStream {
}
}

#[derive(Debug, Clone, derive_more::Display)]
#[derive(Clone, derive_more::Display)]
pub enum EmittedEvents {
#[display(fmt = "Ice")]
Ice {
Expand Down Expand Up @@ -55,8 +55,25 @@ pub enum EmittedEvents {
/// and processing the output
#[display(fmt = "TrackAdded")]
TrackAdded { peer: DID, track: Arc<TrackRemote> },

#[display(fmt = "DataChannelOpened")]
DataChannelCreated {
peer: DID,
data_channel: Arc<RTCDataChannel>,
},
#[display(fmt = "DataChannelClosed")]
DataChannelClosed { peer: DID },
#[display(fmt = "DataChannelOpened")]
DataChannelOpened { peer: DID },
// todo: when a track is removed, does the RTCPeerConnectionState::Disconnected event fire?
// it appears that WebRTC doesn't emit an event for this. perhaps the track is automatically
// closed on the remote side when the local side calls `remove_track`
// TrackRemoved,
}

// needed because RTcDAtaChannel doesn't implement Debug
impl std::fmt::Debug for EmittedEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
48 changes: 48 additions & 0 deletions extensions/warp-blink-wrtc/src/simple_webrtc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use warp::crypto::DID;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::api::APIBuilder;
use webrtc::data_channel::RTCDataChannel;
use webrtc::ice_transport::ice_candidate::{RTCIceCandidate, RTCIceCandidateInit};
use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
use webrtc::ice_transport::ice_server::RTCIceServer;
Expand Down Expand Up @@ -475,6 +476,53 @@ impl Controller {
},
));

let tx = self.event_ch.clone();
let dest = peer_id.clone();
peer.connection
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
let tx2 = tx.clone();
let dest2 = dest.clone();
d.on_close(Box::new(move || {
if let Err(e) = tx2.send(EmittedEvents::DataChannelClosed {
peer: dest2.clone(),
}) {
log::error!(
"failed to send data channel closed event for peer {}: {}",
&dest2,
e
);
}
Box::pin(async {})
}));

let tx2 = tx.clone();
let dest2 = dest.clone();
d.on_open(Box::new(move || {
if let Err(e) = tx2.send(EmittedEvents::DataChannelOpened {
peer: dest2.clone(),
}) {
log::error!(
"failed to send data channel opened event for peer {}: {}",
&dest2,
e
);
}
Box::pin(async {})
}));

if let Err(e) = tx.send(EmittedEvents::DataChannelCreated {
peer: dest.clone(),
data_channel: d,
}) {
log::error!(
"failed to send data channel opened event for peer {}: {}",
&dest,
e
);
}
Box::pin(async {})
}));

// attach all media sources to the peer
for (source_id, track) in &self.media_sources {
match peer.connection.add_track(track.clone()).await {
Expand Down

0 comments on commit b4c664e

Please sign in to comment.