diff --git a/iroh-net/src/relay/server.rs b/iroh-net/src/relay/server.rs index acbbe3de735..8768203cda8 100644 --- a/iroh-net/src/relay/server.rs +++ b/iroh-net/src/relay/server.rs @@ -173,62 +173,6 @@ impl Clone for ClientConnHandler { } } -// TODO(matheus23): Move to streams.rs, next to `MaybeTlsStream`, or perhaps to codec.rs next to `DerpCodec` -#[derive(Debug)] -pub(crate) enum RelayIo { - Derp(Framed), - Ws(WebSocketStream), -} - -fn tung_to_io_err(e: tungstenite::Error) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) -} - -impl Sink for RelayIo { - type Error = std::io::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match *self { - Self::Derp(ref mut framed) => Pin::new(framed).poll_ready(cx), - Self::Ws(ref mut ws) => Pin::new(ws).poll_ready(cx).map_err(tung_to_io_err), - } - } - - fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> { - match *self { - Self::Derp(ref mut framed) => Pin::new(framed).start_send(item), - Self::Ws(ref mut ws) => Pin::new(ws) - .start_send(item.into_ws_message()?) - .map_err(tung_to_io_err), - } - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match *self { - Self::Derp(ref mut framed) => Pin::new(framed).poll_flush(cx), - Self::Ws(ref mut ws) => Pin::new(ws).poll_flush(cx).map_err(tung_to_io_err), - } - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match *self { - Self::Derp(ref mut framed) => Pin::new(framed).poll_close(cx), - Self::Ws(ref mut ws) => Pin::new(ws).poll_close(cx).map_err(tung_to_io_err), - } - } -} - -impl Stream for RelayIo { - type Item = anyhow::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match *self { - Self::Derp(ref mut framed) => Pin::new(framed).poll_next(cx), - Self::Ws(ref mut ws) => Pin::new(ws).poll_next(cx).map(Frame::from_ws_message), - } - } -} - impl ClientConnHandler { /// Adds a new connection to the server and serves it. /// @@ -415,6 +359,61 @@ fn init_meta_cert(server_key: &PublicKey) -> Vec { .expect("fixed allocations") } +#[derive(Debug)] +pub(crate) enum RelayIo { + Derp(Framed), + Ws(WebSocketStream), +} + +fn tung_to_io_err(e: tungstenite::Error) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) +} + +impl Sink for RelayIo { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match *self { + Self::Derp(ref mut framed) => Pin::new(framed).poll_ready(cx), + Self::Ws(ref mut ws) => Pin::new(ws).poll_ready(cx).map_err(tung_to_io_err), + } + } + + fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> { + match *self { + Self::Derp(ref mut framed) => Pin::new(framed).start_send(item), + Self::Ws(ref mut ws) => Pin::new(ws) + .start_send(item.into_ws_message()?) + .map_err(tung_to_io_err), + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match *self { + Self::Derp(ref mut framed) => Pin::new(framed).poll_flush(cx), + Self::Ws(ref mut ws) => Pin::new(ws).poll_flush(cx).map_err(tung_to_io_err), + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match *self { + Self::Derp(ref mut framed) => Pin::new(framed).poll_close(cx), + Self::Ws(ref mut ws) => Pin::new(ws).poll_close(cx).map_err(tung_to_io_err), + } + } +} + +impl Stream for RelayIo { + type Item = anyhow::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match *self { + Self::Derp(ref mut framed) => Pin::new(framed).poll_next(cx), + Self::Ws(ref mut ws) => Pin::new(ws).poll_next(cx).map(Frame::from_ws_message), + } + } +} + /// Whether or not the underlying [`tokio::net::TcpStream`] is served over Tls #[derive(Debug)] pub enum MaybeTlsStream {