diff --git a/Cargo.lock b/Cargo.lock index aba23419..ff1081af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3047,6 +3047,7 @@ dependencies = [ "async-tungstenite", "bincode", "bytes", + "cfg-if", "futures", "futures-channel", "futures-test", diff --git a/matchbox_demo/src/main.rs b/matchbox_demo/src/main.rs index 907b2390..a5c01b42 100644 --- a/matchbox_demo/src/main.rs +++ b/matchbox_demo/src/main.rs @@ -91,7 +91,7 @@ fn start_matchbox_socket(mut commands: Commands, args: Res) { let room_url = format!("{}/{}", &args.matchbox, room_id); info!("connecting to matchbox server: {:?}", room_url); - let (socket, message_loop) = WebRtcSocket::new(room_url); + let (socket, message_loop) = WebRtcSocket::new_unreliable(room_url); // The message loop needs to be awaited, or nothing will happen. // We do this here using bevy's task system. diff --git a/matchbox_server/src/signaling.rs b/matchbox_server/src/signaling.rs index 00d7edfc..0dbf6d35 100644 --- a/matchbox_server/src/signaling.rs +++ b/matchbox_server/src/signaling.rs @@ -286,7 +286,7 @@ async fn handle_ws( #[cfg(test)] mod tests { - use crate::signaling::PeerEvent; + use crate::{signaling::PeerEvent, ws_handler, ServerState}; use axum::{routing::get, Router}; use futures::{lock::Mutex, pin_mut, SinkExt, StreamExt}; use std::{ @@ -297,8 +297,6 @@ mod tests { use tokio::{net::TcpStream, select, time}; use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; - use super::{ws_handler, ServerState}; - fn app() -> Router { Router::new() .route("/:room_id", get(ws_handler)) diff --git a/matchbox_simple_demo/src/main.rs b/matchbox_simple_demo/src/main.rs index e527fa20..7e0f7e00 100644 --- a/matchbox_simple_demo/src/main.rs +++ b/matchbox_simple_demo/src/main.rs @@ -25,7 +25,7 @@ async fn main() { async fn async_main() { info!("Connecting to matchbox"); - let (mut socket, loop_fut) = WebRtcSocket::new("ws://localhost:3536/example_room"); + let (mut socket, loop_fut) = WebRtcSocket::new_unreliable("ws://localhost:3536/example_room"); info!("my id is {:?}", socket.id()); diff --git a/matchbox_socket/Cargo.toml b/matchbox_socket/Cargo.toml index 19e25077..339a4354 100644 --- a/matchbox_socket/Cargo.toml +++ b/matchbox_socket/Cargo.toml @@ -24,6 +24,7 @@ serde_json = { version = "1.0", default-features = false, features = ["alloc"] } uuid = { version = "1.0", default-features = false, features = ["v4"] } log = { version = "0.4", default-features = false } thiserror = "1.0" +cfg-if = "1.0" # ggrs-socket ggrs = { version = "0.9.3", default-features = false, optional = true } diff --git a/matchbox_socket/src/webrtc_socket/error.rs b/matchbox_socket/src/webrtc_socket/error.rs index 19313628..9e098464 100644 --- a/matchbox_socket/src/webrtc_socket/error.rs +++ b/matchbox_socket/src/webrtc_socket/error.rs @@ -1,9 +1,12 @@ +use crate::webrtc_socket::messages::PeerEvent; +use futures_channel::mpsc::TrySendError; + /// An error that can occur with WebRTC signalling. #[derive(Debug, thiserror::Error)] pub enum SignallingError { // Common #[error("failed to send event to signalling server")] - Undeliverable(#[from] futures_channel::mpsc::TrySendError), + Undeliverable(#[from] TrySendError), #[error("The stream is exhausted")] StreamExhausted, diff --git a/matchbox_socket/src/webrtc_socket/messages.rs b/matchbox_socket/src/webrtc_socket/messages.rs index 33c3d249..c7e6a8cf 100644 --- a/matchbox_socket/src/webrtc_socket/messages.rs +++ b/matchbox_socket/src/webrtc_socket/messages.rs @@ -19,6 +19,7 @@ pub enum PeerRequest { KeepAlive, } +/// Signals go from peer to peer #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum PeerSignal { IceCandidate(String), diff --git a/matchbox_socket/src/webrtc_socket/mod.rs b/matchbox_socket/src/webrtc_socket/mod.rs index 966b0692..b82b7e98 100644 --- a/matchbox_socket/src/webrtc_socket/mod.rs +++ b/matchbox_socket/src/webrtc_socket/mod.rs @@ -1,440 +1,41 @@ use crate::Error; -use futures::{future::Fuse, stream::FusedStream, Future, FutureExt, StreamExt}; -use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; -use futures_util::select; -use log::{debug, error}; +use cfg_if::cfg_if; +use futures::Future; use messages::*; use std::pin::Pin; -use uuid::Uuid; -pub(crate) mod error; mod messages; mod signal_peer; +mod socket; +// TODO: Should be a WebRtcConfig field /// The duration, in milliseconds, to send "Keep Alive" requests const KEEP_ALIVE_INTERVAL: u64 = 10_000; /// The raw format of data being sent and received. type Packet = Box<[u8]>; -// TODO: maybe use cfg-if to make this slightly tidier -#[cfg(not(target_arch = "wasm32"))] -mod native { - mod message_loop; - mod signalling_loop; - pub use message_loop::*; - pub use signalling_loop::*; -} -#[cfg(not(target_arch = "wasm32"))] -use native::*; - -#[cfg(target_arch = "wasm32")] -mod wasm { - mod message_loop; - mod signalling_loop; - pub use message_loop::*; - pub use signalling_loop::*; -} -#[cfg(target_arch = "wasm32")] -use wasm::*; - -/// General configuration options for a WebRtc connection. -/// -/// See [`WebRtcSocket::new_with_config`] -#[derive(Debug)] -pub struct WebRtcSocketConfig { - /// The url for the room to connect to - /// - /// This is a websocket url, starting with `ws://` or `wss://` followed by - /// the hostname and path to a matchbox server, followed by a room id and - /// optional query parameters. - /// - /// e.g.: `wss://matchbox.example.com/your_game` - /// - /// or: `wss://matchbox.example.com/your_game?next=2` - /// - /// The last form will pair player in the order they connect. - pub room_url: String, - /// Configuration for the (single) ICE server - pub ice_server: RtcIceServerConfig, - /// Configuration for one or multiple reliable or unreliable data channels - pub channels: Vec, -} - -/// Configuration options for an ICE server connection. -/// See also: -#[derive(Debug)] -pub struct RtcIceServerConfig { - /// An ICE server instance can have several URLs - pub urls: Vec, - /// A username for authentication with the ICE server - /// - /// See: - pub username: Option, - /// A password or token when authenticating with a turn server - /// - /// See: - pub credential: Option, -} - -/// Configuration options for a data channel -/// See also: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel -#[derive(Debug)] -pub struct ChannelConfig { - /// Whether messages sent on the channel are guaranteed to arrive in order - /// See also: - pub ordered: bool, - /// Maximum number of retransmit attempts of a message before giving up - /// See also: - pub max_retransmits: Option, -} - -impl ChannelConfig { - /// Messages sent via an unreliable channel may arrive in any order or not at all, but arrive as - /// quickly as possible - pub fn unreliable() -> Self { - ChannelConfig { - ordered: false, - max_retransmits: Some(0), - } - } - - /// Messages sent via a reliable channel are guaranteed to arrive in order and will be resent - /// until they arrive - pub fn reliable() -> Self { - ChannelConfig { - ordered: true, - max_retransmits: None, - } - } -} - -impl Default for WebRtcSocketConfig { - fn default() -> Self { - WebRtcSocketConfig { - room_url: "ws://localhost:3536/example_room".to_string(), - ice_server: RtcIceServerConfig::default(), - channels: vec![ChannelConfig::unreliable()], - } - } -} - -impl Default for RtcIceServerConfig { - fn default() -> Self { - Self { - urls: vec![ - "stun:stun.l.google.com:19302".to_string(), - "stun:stun1.l.google.com:19302".to_string(), - "stun:stun2.l.google.com:19302".to_string(), - "stun:stun3.l.google.com:19302".to_string(), - "stun:stun4.l.google.com:19302".to_string(), - ], - username: Default::default(), - credential: Default::default(), - } - } -} - -/// Contains the interface end of a full-mesh web rtc connection -/// -/// Used to send and receive messages from other peers -#[derive(Debug)] -pub struct WebRtcSocket { - messages_from_peers: Vec>, - new_connected_peers: futures_channel::mpsc::UnboundedReceiver, - disconnected_peers: futures_channel::mpsc::UnboundedReceiver, - peer_messages_out: Vec>, - peers: Vec, - id: PeerId, -} - -#[cfg(not(target_arch = "wasm32"))] -pub(crate) type MessageLoopFuture = Pin> + Send>>; -// TODO: figure out if it's possible to implement Send in wasm as well -#[cfg(target_arch = "wasm32")] -pub(crate) type MessageLoopFuture = Pin>>>; - -impl WebRtcSocket { - /// Create a new connection to the given room with a single unreliable data channel - /// - /// See [`WebRtcSocketConfig::room_url`] for details on the room url. - /// - /// The returned future should be awaited in order for messages to be sent and received. - #[must_use] - pub fn new>(room_url: T) -> (Self, MessageLoopFuture) { - WebRtcSocket::new_with_config(WebRtcSocketConfig { - room_url: room_url.into(), - ..Default::default() - }) - } - - /// Create a new connection with the given [`WebRtcSocketConfig`] - /// - /// The returned future should be awaited in order for messages to be sent and received. - #[must_use] - pub fn new_with_config(config: WebRtcSocketConfig) -> (Self, MessageLoopFuture) { - if config.channels.is_empty() { - panic!("You need to configure at least one channel in WebRtcSocketConfig"); - } - - let (messages_from_peers_tx, messages_from_peers) = new_senders_and_receivers(&config); - let (new_connected_peers_tx, new_connected_peers) = futures_channel::mpsc::unbounded(); - let (disconnected_peers_tx, disconnected_peers) = futures_channel::mpsc::unbounded(); - let (peer_messages_out_tx, peer_messages_out_rx) = new_senders_and_receivers(&config); - - // Would perhaps be smarter to let signalling server decide this... - let id = Uuid::new_v4().to_string(); - - ( - Self { - id: id.clone(), - messages_from_peers, - peer_messages_out: peer_messages_out_tx, - new_connected_peers, - disconnected_peers, - peers: vec![], - }, - Box::pin(run_socket( - config, - id, - peer_messages_out_rx, - new_connected_peers_tx, - disconnected_peers_tx, - messages_from_peers_tx, - )), - ) - } - - /// Returns a future that resolves when the given number of peers have connected - pub async fn wait_for_peers(&mut self, peers: usize) -> Vec { - debug!("waiting for peers to join"); - let mut addrs = vec![]; - while let Some(id) = self.new_connected_peers.next().await { - addrs.push(id.clone()); - if addrs.len() == peers { - debug!("all peers joined"); - self.peers.extend(addrs.clone()); - return addrs; - } - } - panic!("Signal server died") - } - - /// Check if new peers have connected and if so add them as peers - pub fn accept_new_connections(&mut self) -> Vec { - let mut ids = Vec::new(); - while let Ok(Some(id)) = self.new_connected_peers.try_next() { - self.peers.push(id.clone()); - ids.push(id); - } - ids - } - - /// Check for peer disconnections and return a Vec of ids of disconnected peers. - /// - /// See also: [`WebRtcSocket::connected_peers`] - pub fn disconnected_peers(&mut self) -> Vec { - let mut ids = vec![]; - // Collect all disconnected peers - while let Ok(Some(id)) = self.disconnected_peers.try_next() { - if let Some(index) = self.peers.iter().position(|x| x == &id) { - self.peers.remove(index); - } - ids.push(id); - } - // If the channel dropped or becomes terminated, flush all peers - if self.disconnected_peers.is_terminated() { - ids.append(&mut self.peers); - } - ids - } - - /// Returns a Vec of the ids of the connected peers. - /// - /// See also: [`WebRtcSocket::disconnected_peers`] - pub fn connected_peers(&self) -> Vec { - self.peers.clone() // TODO: could probably be an iterator or reference instead? - } - - /// Call this where you want to handle new received messages from the default channel (with - /// index 0) which will be the only channel if you didn't configure any explicitly - /// - /// messages are removed from the socket when called - /// - /// See also: [`WebRtcSocket::receive_on_channel`] - pub fn receive(&mut self) -> Vec<(PeerId, Packet)> { - self.receive_on_channel(0) - } - - /// Call this where you want to handle new received messages from a specific channel as - /// configured in [`WebRtcSocketConfig::channels`]. The index of a channel is its index in - /// the vec [`WebRtcSocketConfig::channels`] as you configured it before (or 0 for the - /// default channel if you use the default configuration). - /// - /// messages are removed from the socket when called - pub fn receive_on_channel(&mut self, index: usize) -> Vec<(PeerId, Packet)> { - std::iter::repeat_with(|| { - self.messages_from_peers - .get_mut(index) - .unwrap_or_else(|| panic!("No data channel with index {}", index)) - .try_next() - }) - // .map_while(|poll| match p { // map_while is nightly-only :( - .take_while(|p| !p.is_err()) - .map(|p| match p.unwrap() { - Some((peer_id, packet)) => (peer_id, packet), - None => todo!("Handle connection closed??"), - }) - .collect() - } - - /// Send a packet to the given peer on the default channel (with index 0) which will be the only - /// channel if you didn't configure any explicitly - /// - /// See also [`WebRtcSocket::send_on_channel`] - pub fn send>(&mut self, packet: Packet, id: T) { - self.send_on_channel(packet, id, 0); - } - - /// Send a packet to the given peer on a specific channel as configured in - /// [`WebRtcSocketConfig::channels`]. - /// - /// The index of a channel is its index in the vec [`WebRtcSocketConfig::channels`] as you - /// configured it before (or 0 for the default channel if you use the default - /// configuration). - pub fn send_on_channel>(&mut self, packet: Packet, id: T, index: usize) { - self.peer_messages_out - .get(index) - .unwrap_or_else(|| panic!("No data channel with index {}", index)) - .unbounded_send((id.into(), packet)) - .expect("send_to failed"); - } - - /// Send a packet to all connected peers on a specific channel as configured in - /// [`WebRtcSocketConfig::channels`]. - /// - /// The index of a channel is its index in the vec [`WebRtcSocketConfig::channels`] on socket - /// creation. - pub fn broadcast_on_channel(&mut self, packet: Packet, index: usize) { - let sender = self - .peer_messages_out - .get(index) - .unwrap_or_else(|| panic!("No data channel with index {}", index)); - - for peer_id in self.connected_peers() { - sender - .unbounded_send((peer_id, packet.clone())) - .expect("send_to failed"); - } - } - - /// Returns the id of this peer - pub fn id(&self) -> &PeerId { - &self.id - } -} - -/// All the channels needed for the messaging loop. -pub struct MessageLoopChannels { - requests_sender: futures_channel::mpsc::UnboundedSender, - events_receiver: futures_channel::mpsc::UnboundedReceiver, - peer_messages_out_rx: Vec>, - new_connected_peers_tx: futures_channel::mpsc::UnboundedSender, - disconnected_peers_tx: futures_channel::mpsc::UnboundedSender, - messages_from_peers_tx: Vec>, -} - -async fn run_socket( - config: WebRtcSocketConfig, - id: PeerId, - peer_messages_out_rx: Vec>, - new_connected_peers_tx: futures_channel::mpsc::UnboundedSender, - disconnected_peers_tx: futures_channel::mpsc::UnboundedSender, - messages_from_peers_tx: Vec>, -) -> Result<(), Error> { - debug!("Starting WebRtcSocket message loop"); - - let (requests_sender, requests_receiver) = futures_channel::mpsc::unbounded::(); - let (events_sender, events_receiver) = futures_channel::mpsc::unbounded::(); - - let signalling_loop_fut = - signalling_loop(config.room_url.clone(), requests_receiver, events_sender); - - let channels = MessageLoopChannels { - requests_sender, - events_receiver, - peer_messages_out_rx, - new_connected_peers_tx, - disconnected_peers_tx, - messages_from_peers_tx, - }; - let message_loop_fut = message_loop(id, config, channels); - - let mut message_loop_done = Box::pin(message_loop_fut.fuse()); - let mut signalling_loop_done = Box::pin(signalling_loop_fut.fuse()); - loop { - select! { - _ = message_loop_done => { - debug!("Message loop completed"); - break; - } - - sigloop = signalling_loop_done => { - match sigloop { - Ok(()) => debug!("Signalling loop completed"), - Err(e) => { - // TODO: Reconnect X attempts if configured to reconnect. - error!("{e:?}"); - return Err(Error::from(e)); - }, - } - } - - complete => break - } - } - Ok(()) -} - -pub(crate) fn new_senders_and_receivers( - config: &WebRtcSocketConfig, -) -> (Vec>, Vec>) { - (0..config.channels.len()) - .map(|_| futures_channel::mpsc::unbounded()) - .unzip() -} - -fn create_data_channels_ready_fut( - config: &WebRtcSocketConfig, -) -> ( - Vec>, - Pin>>>, -) { - let (senders, receivers) = (0..config.channels.len()) - .map(|_| futures_channel::mpsc::channel(1)) - .unzip(); - - (senders, Box::pin(wait_for_ready(receivers).fuse())) -} - -async fn wait_for_ready(channel_ready_rx: Vec>) { - for mut receiver in channel_ready_rx { - if receiver.next().await.is_none() { - panic!("Sender closed before channel was ready"); - } - } -} - -#[cfg(test)] -mod test { - use crate::{Error, WebRtcSocket}; - - #[futures_test::test] - async fn unreachable_server() { - // .invalid is a reserved tld for testing and documentation - let (_socket, fut) = WebRtcSocket::new("wss://example.invalid"); - - let result = fut.await; - assert!(result.is_err()); - assert!(matches!(result.unwrap_err(), Error::Signalling(_))); +pub(crate) mod error; +pub(crate) use socket::MessageLoopChannels; +pub use socket::{ChannelConfig, RtcIceServerConfig, WebRtcSocket, WebRtcSocketConfig}; +cfg_if! { + if #[cfg(target_arch = "wasm32")] { + // TODO: figure out if it's possible to implement Send in wasm as well + type MessageLoopFuture = Pin>>>; + mod wasm { + mod message_loop; + mod signalling_loop; + pub use message_loop::*; + pub use signalling_loop::*; + } + use wasm::*; + } else { + type MessageLoopFuture = Pin> + Send>>; + mod native { + mod message_loop; + mod signalling_loop; + pub use message_loop::*; + pub use signalling_loop::*; + } + use native::*; } } diff --git a/matchbox_socket/src/webrtc_socket/native/message_loop.rs b/matchbox_socket/src/webrtc_socket/native/message_loop.rs index 28a69543..6a232a91 100644 --- a/matchbox_socket/src/webrtc_socket/native/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/message_loop.rs @@ -1,8 +1,7 @@ use crate::webrtc_socket::{ - create_data_channels_ready_fut, messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, - new_senders_and_receivers, signal_peer::SignalPeer, + socket::{create_data_channels_ready_fut, new_senders_and_receivers}, ChannelConfig, MessageLoopChannels, Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL, }; use async_compat::CompatExt; @@ -123,9 +122,9 @@ async fn message_loop_impl(id: PeerId, config: &WebRtcSocketConfig, channels: Me match message { Some((channel_index, Some((peer, packet)))) => { let senders = connected_peers.get_mut(&peer) - .unwrap_or_else(|| panic!("couldn't find data channel for peer {}", peer)); + .unwrap_or_else(|| panic!("couldn't find data channel for peer {peer}")); let sender = senders.get_mut(channel_index) - .unwrap_or_else(|| panic!("Unexpected data channel index during send: {}", channel_index)); + .unwrap_or_else(|| panic!("Unexpected data channel index during send: {channel_index}")); sender.unbounded_send(packet).unwrap(); }, Some((_, None)) | None => { diff --git a/matchbox_socket/src/webrtc_socket/native/signalling_loop.rs b/matchbox_socket/src/webrtc_socket/native/signalling_loop.rs index 74a50cf3..c2be7935 100644 --- a/matchbox_socket/src/webrtc_socket/native/signalling_loop.rs +++ b/matchbox_socket/src/webrtc_socket/native/signalling_loop.rs @@ -35,7 +35,7 @@ pub async fn signalling_loop( Some(Ok(Message::Text(message))) => { debug!("{}", message); let event: PeerEvent = serde_json::from_str(&message) - .unwrap_or_else(|err| panic!("couldn't parse peer event: {}.\nEvent: {}", err, message)); + .unwrap_or_else(|err| panic!("couldn't parse peer event: {err}.\nEvent: {message}")); events_sender.unbounded_send(event).map_err(SignallingError::from)?; }, Some(Ok(message)) => { diff --git a/matchbox_socket/src/webrtc_socket/signal_peer.rs b/matchbox_socket/src/webrtc_socket/signal_peer.rs index 9089790a..b3e068ef 100644 --- a/matchbox_socket/src/webrtc_socket/signal_peer.rs +++ b/matchbox_socket/src/webrtc_socket/signal_peer.rs @@ -1,7 +1,6 @@ +use crate::webrtc_socket::{PeerId, PeerRequest, PeerSignal}; use futures_channel::mpsc::UnboundedSender; -use super::{PeerId, PeerRequest, PeerSignal}; - #[derive(Debug, Clone)] pub struct SignalPeer { pub id: PeerId, diff --git a/matchbox_socket/src/webrtc_socket/socket.rs b/matchbox_socket/src/webrtc_socket/socket.rs new file mode 100644 index 00000000..59add921 --- /dev/null +++ b/matchbox_socket/src/webrtc_socket/socket.rs @@ -0,0 +1,413 @@ +use crate::{ + webrtc_socket::{ + message_loop, messages::PeerId, signalling_loop, MessageLoopFuture, Packet, PeerEvent, + PeerRequest, + }, + Error, +}; +use futures::{future::Fuse, select, stream::FusedStream, Future, FutureExt, StreamExt}; +use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender}; +use log::{debug, error}; +use std::pin::Pin; +use uuid::Uuid; + +/// Configuration options for an ICE server connection. +/// See also: +#[derive(Debug)] +pub struct RtcIceServerConfig { + /// An ICE server instance can have several URLs + pub urls: Vec, + /// A username for authentication with the ICE server + /// + /// See: + pub username: Option, + /// A password or token when authenticating with a turn server + /// + /// See: + pub credential: Option, +} + +/// Configuration options for a data channel +/// See also: https://developer.mozilla.org/en-US/docs/Web/API/RTCDataChannel +#[derive(Debug)] +pub struct ChannelConfig { + /// Whether messages sent on the channel are guaranteed to arrive in order + /// See also: + pub ordered: bool, + /// Maximum number of retransmit attempts of a message before giving up + /// See also: + pub max_retransmits: Option, +} + +impl ChannelConfig { + /// Messages sent via an unreliable channel may arrive in any order or not at all, but arrive as + /// quickly as possible + pub fn unreliable() -> Self { + ChannelConfig { + ordered: false, + max_retransmits: Some(0), + } + } + + /// Messages sent via a reliable channel are guaranteed to arrive in order and will be resent + /// until they arrive + pub fn reliable() -> Self { + ChannelConfig { + ordered: true, + max_retransmits: None, + } + } +} + +impl Default for RtcIceServerConfig { + fn default() -> Self { + Self { + urls: vec![ + "stun:stun.l.google.com:19302".to_string(), + "stun:stun1.l.google.com:19302".to_string(), + "stun:stun2.l.google.com:19302".to_string(), + "stun:stun3.l.google.com:19302".to_string(), + "stun:stun4.l.google.com:19302".to_string(), + ], + username: Default::default(), + credential: Default::default(), + } + } +} + +/// General configuration options for a WebRtc connection. +/// +/// See [`WebRtcSocket::new_with_config`] +#[derive(Debug)] +pub struct WebRtcSocketConfig { + /// The url for the room to connect to + /// + /// This is a websocket url, starting with `ws://` or `wss://` followed by + /// the hostname and path to a matchbox server, followed by a room id and + /// optional query parameters. + /// + /// e.g.: `wss://matchbox.example.com/your_game` + /// + /// or: `wss://matchbox.example.com/your_game?next=2` + /// + /// The last form will pair player in the order they connect. + pub room_url: String, + /// Configuration for the (single) ICE server + pub ice_server: RtcIceServerConfig, + /// Configuration for one or multiple reliable or unreliable data channels + pub channels: Vec, +} + +/// Contains the interface end of a full-mesh web rtc connection +/// +/// Used to send and receive messages from other peers +#[derive(Debug)] +pub struct WebRtcSocket { + messages_from_peers: Vec>, + new_connected_peers: futures_channel::mpsc::UnboundedReceiver, + disconnected_peers: futures_channel::mpsc::UnboundedReceiver, + peer_messages_out: Vec>, + peers: Vec, + id: PeerId, +} + +impl WebRtcSocket { + /// Create a new connection to the given room with a single unreliable data channel + /// + /// See [`WebRtcSocketConfig::room_url`] for details on the room url. + /// + /// The returned future should be awaited in order for messages to be sent and received. + #[must_use] + pub fn new_unreliable(room_url: impl Into) -> (Self, MessageLoopFuture) { + WebRtcSocket::new_with_config(WebRtcSocketConfig { + room_url: room_url.into(), + ice_server: RtcIceServerConfig::default(), + channels: vec![ChannelConfig::unreliable()], + }) + } + + /// Create a new connection to the given room with a single reliable data channel + /// + /// See [`WebRtcSocketConfig::room_url`] for details on the room url. + /// + /// The returned future should be awaited in order for messages to be sent and received. + #[must_use] + pub fn new_reliable(room_url: impl Into) -> (Self, MessageLoopFuture) { + WebRtcSocket::new_with_config(WebRtcSocketConfig { + room_url: room_url.into(), + ice_server: RtcIceServerConfig::default(), + channels: vec![ChannelConfig::reliable()], + }) + } + + /// Create a new connection with the given [`WebRtcSocketConfig`] + /// + /// The returned future should be awaited in order for messages to be sent and received. + #[must_use] + pub fn new_with_config(config: WebRtcSocketConfig) -> (Self, MessageLoopFuture) { + if config.channels.is_empty() { + panic!("You need to configure at least one channel in WebRtcSocketConfig"); + } + + let (messages_from_peers_tx, messages_from_peers) = new_senders_and_receivers(&config); + let (new_connected_peers_tx, new_connected_peers) = futures_channel::mpsc::unbounded(); + let (disconnected_peers_tx, disconnected_peers) = futures_channel::mpsc::unbounded(); + let (peer_messages_out_tx, peer_messages_out_rx) = new_senders_and_receivers(&config); + + // Would perhaps be smarter to let signalling server decide this... + let id = Uuid::new_v4().to_string(); + + ( + Self { + id: id.clone(), + messages_from_peers, + peer_messages_out: peer_messages_out_tx, + new_connected_peers, + disconnected_peers, + peers: vec![], + }, + Box::pin(run_socket( + config, + id, + peer_messages_out_rx, + new_connected_peers_tx, + disconnected_peers_tx, + messages_from_peers_tx, + )), + ) + } + + /// Returns a future that resolves when the given number of peers have connected + pub async fn wait_for_peers(&mut self, peers: usize) -> Vec { + debug!("waiting for peers to join"); + let mut addrs = vec![]; + while let Some(id) = self.new_connected_peers.next().await { + addrs.push(id.clone()); + if addrs.len() == peers { + debug!("all peers joined"); + self.peers.extend(addrs.clone()); + return addrs; + } + } + panic!("Signal server died") + } + + /// Check if new peers have connected and if so add them as peers + pub fn accept_new_connections(&mut self) -> Vec { + let mut ids = Vec::new(); + while let Ok(Some(id)) = self.new_connected_peers.try_next() { + self.peers.push(id.clone()); + ids.push(id); + } + ids + } + + /// Check for peer disconnections and return a Vec of ids of disconnected peers. + /// + /// See also: [`WebRtcSocket::connected_peers`] + pub fn disconnected_peers(&mut self) -> Vec { + let mut ids = vec![]; + // Collect all disconnected peers + while let Ok(Some(id)) = self.disconnected_peers.try_next() { + if let Some(index) = self.peers.iter().position(|x| x == &id) { + self.peers.remove(index); + } + ids.push(id); + } + // If the channel dropped or becomes terminated, flush all peers + if self.disconnected_peers.is_terminated() { + ids.append(&mut self.peers); + } + ids + } + + /// Returns a Vec of the ids of the connected peers. + /// + /// See also: [`WebRtcSocket::disconnected_peers`] + pub fn connected_peers(&self) -> Vec { + self.peers.clone() // TODO: could probably be an iterator or reference instead? + } + + /// Call this where you want to handle new received messages from the default channel (with + /// index 0) which will be the only channel if you didn't configure any explicitly + /// + /// messages are removed from the socket when called + /// + /// See also: [`WebRtcSocket::receive_on_channel`] + pub fn receive(&mut self) -> Vec<(PeerId, Packet)> { + self.receive_on_channel(0) + } + + /// Call this where you want to handle new received messages from a specific channel as + /// configured in [`WebRtcSocketConfig::channels`]. The index of a channel is its index in + /// the vec [`WebRtcSocketConfig::channels`] as you configured it before (or 0 for the + /// default channel if you use the default configuration). + /// + /// messages are removed from the socket when called + pub fn receive_on_channel(&mut self, index: usize) -> Vec<(PeerId, Packet)> { + std::iter::repeat_with(|| { + self.messages_from_peers + .get_mut(index) + .unwrap_or_else(|| panic!("No data channel with index {index}")) + .try_next() + }) + // .map_while(|poll| match p { // map_while is nightly-only :( + .take_while(|p| !p.is_err()) + .map(|p| match p.unwrap() { + Some((peer_id, packet)) => (peer_id, packet), + None => todo!("Handle connection closed??"), + }) + .collect() + } + + /// Send a packet to the given peer on the default channel (with index 0) which will be the only + /// channel if you didn't configure any explicitly + /// + /// See also [`WebRtcSocket::send_on_channel`] + pub fn send>(&mut self, packet: Packet, id: T) { + self.send_on_channel(packet, id, 0); + } + + /// Send a packet to the given peer on a specific channel as configured in + /// [`WebRtcSocketConfig::channels`]. + /// + /// The index of a channel is its index in the vec [`WebRtcSocketConfig::channels`] as you + /// configured it before (or 0 for the default channel if you use the default + /// configuration). + pub fn send_on_channel>(&mut self, packet: Packet, id: T, index: usize) { + self.peer_messages_out + .get(index) + .unwrap_or_else(|| panic!("No data channel with index {index}")) + .unbounded_send((id.into(), packet)) + .expect("send_to failed"); + } + + /// Send a packet to all connected peers on a specific channel as configured in + /// [`WebRtcSocketConfig::channels`]. + /// + /// The index of a channel is its index in the vec [`WebRtcSocketConfig::channels`] on socket + /// creation. + pub fn broadcast_on_channel(&mut self, packet: Packet, index: usize) { + let sender = self + .peer_messages_out + .get(index) + .unwrap_or_else(|| panic!("No data channel with index {index}")); + + for peer_id in self.connected_peers() { + sender + .unbounded_send((peer_id, packet.clone())) + .expect("send_to failed"); + } + } + + /// Returns the id of this peer + pub fn id(&self) -> &PeerId { + &self.id + } +} + +pub(crate) fn new_senders_and_receivers( + config: &WebRtcSocketConfig, +) -> (Vec>, Vec>) { + (0..config.channels.len()) + .map(|_| futures_channel::mpsc::unbounded()) + .unzip() +} + +pub(crate) fn create_data_channels_ready_fut( + config: &WebRtcSocketConfig, +) -> ( + Vec>, + Pin>>>, +) { + let (senders, receivers) = (0..config.channels.len()) + .map(|_| futures_channel::mpsc::channel(1)) + .unzip(); + + (senders, Box::pin(wait_for_ready(receivers).fuse())) +} + +async fn wait_for_ready(channel_ready_rx: Vec>) { + for mut receiver in channel_ready_rx { + if receiver.next().await.is_none() { + panic!("Sender closed before channel was ready"); + } + } +} + +/// All the channels needed for the messaging loop. +pub struct MessageLoopChannels { + pub requests_sender: futures_channel::mpsc::UnboundedSender, + pub events_receiver: futures_channel::mpsc::UnboundedReceiver, + pub peer_messages_out_rx: Vec>, + pub new_connected_peers_tx: futures_channel::mpsc::UnboundedSender, + pub disconnected_peers_tx: futures_channel::mpsc::UnboundedSender, + pub messages_from_peers_tx: Vec>, +} + +async fn run_socket( + config: WebRtcSocketConfig, + id: PeerId, + peer_messages_out_rx: Vec>, + new_connected_peers_tx: futures_channel::mpsc::UnboundedSender, + disconnected_peers_tx: futures_channel::mpsc::UnboundedSender, + messages_from_peers_tx: Vec>, +) -> Result<(), Error> { + debug!("Starting WebRtcSocket"); + + let (requests_sender, requests_receiver) = futures_channel::mpsc::unbounded::(); + let (events_sender, events_receiver) = futures_channel::mpsc::unbounded::(); + + let signalling_loop_fut = + signalling_loop(config.room_url.clone(), requests_receiver, events_sender); + + let channels = MessageLoopChannels { + requests_sender, + events_receiver, + peer_messages_out_rx, + new_connected_peers_tx, + disconnected_peers_tx, + messages_from_peers_tx, + }; + let message_loop_fut = message_loop(id, config, channels); + + let mut message_loop_done = Box::pin(message_loop_fut.fuse()); + let mut signalling_loop_done = Box::pin(signalling_loop_fut.fuse()); + loop { + select! { + _ = message_loop_done => { + debug!("Message loop completed"); + break; + } + + sigloop = signalling_loop_done => { + match sigloop { + Ok(()) => debug!("Signalling loop completed"), + Err(e) => { + // TODO: Reconnect X attempts if configured to reconnect. + error!("{e:?}"); + return Err(Error::from(e)); + }, + } + } + + complete => break + } + } + Ok(()) +} + +#[cfg(test)] +mod test { + use crate::{Error, WebRtcSocket}; + + #[futures_test::test] + async fn unreachable_server() { + // .invalid is a reserved tld for testing and documentation + let (_socket, fut) = WebRtcSocket::new_reliable("wss://example.invalid"); + + let result = fut.await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), Error::Signalling(_))); + } +} diff --git a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs index 7d894482..6a0e6b9b 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/message_loop.rs @@ -1,7 +1,7 @@ use crate::webrtc_socket::{ - create_data_channels_ready_fut, messages::{PeerEvent, PeerId, PeerRequest, PeerSignal}, signal_peer::SignalPeer, + socket::create_data_channels_ready_fut, ChannelConfig, MessageLoopChannels, Packet, WebRtcSocketConfig, KEEP_ALIVE_INTERVAL, }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; @@ -122,7 +122,7 @@ pub async fn message_loop(id: PeerId, config: WebRtcSocketConfig, channels: Mess let data_channel = data_channels.get(&peer) .expect("couldn't find data channel for peer") .get(channel_index) - .unwrap_or_else(|| panic!("couldn't find data channel with index {}", channel_index)); + .unwrap_or_else(|| panic!("couldn't find data channel with index {channel_index}")); if let Err(err) = data_channel.send_with_u8_array(&packet) { // This likely means the other peer disconnected diff --git a/matchbox_socket/src/webrtc_socket/wasm/signalling_loop.rs b/matchbox_socket/src/webrtc_socket/wasm/signalling_loop.rs index bb2af870..6c764a1c 100644 --- a/matchbox_socket/src/webrtc_socket/wasm/signalling_loop.rs +++ b/matchbox_socket/src/webrtc_socket/wasm/signalling_loop.rs @@ -28,7 +28,7 @@ pub async fn signalling_loop( Some(WsMessage::Text(message)) => { debug!("{}", message); let event: PeerEvent = serde_json::from_str(&message) - .unwrap_or_else(|_| panic!("couldn't parse peer event {}", message)); + .unwrap_or_else(|_| panic!("couldn't parse peer event {message}")); events_sender.unbounded_send(event).map_err(SignallingError::from)?; }, Some(WsMessage::Binary(_)) => {