Skip to content

Commit

Permalink
refactor: Move the RelayIo definition
Browse files Browse the repository at this point in the history
  • Loading branch information
matheus23 committed Jun 20, 2024
1 parent 4c80a04 commit 500c90c
Showing 1 changed file with 55 additions and 56 deletions.
111 changes: 55 additions & 56 deletions iroh-net/src/relay/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,62 +173,6 @@ impl Clone for ClientConnHandler {
}
}

// TODO(matheus23): Move to streams.rs, next to `MaybeTlsStream`, or perhaps to codec.rs next to `DerpCodec`
#[derive(Debug)]
pub(crate) enum RelayIo {
Derp(Framed<MaybeTlsStream, DerpCodec>),
Ws(WebSocketStream<MaybeTlsStream>),
}

fn tung_to_io_err(e: tungstenite::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}

impl Sink<Frame> for RelayIo {
type Error = std::io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_ready(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_ready(cx).map_err(tung_to_io_err),
}
}

fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).start_send(item),
Self::Ws(ref mut ws) => Pin::new(ws)
.start_send(item.into_ws_message()?)
.map_err(tung_to_io_err),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_flush(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_flush(cx).map_err(tung_to_io_err),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_close(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_close(cx).map_err(tung_to_io_err),
}
}
}

impl Stream for RelayIo {
type Item = anyhow::Result<Frame>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_next(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_next(cx).map(Frame::from_ws_message),
}
}
}

impl ClientConnHandler {
/// Adds a new connection to the server and serves it.
///
Expand Down Expand Up @@ -415,6 +359,61 @@ fn init_meta_cert(server_key: &PublicKey) -> Vec<u8> {
.expect("fixed allocations")
}

#[derive(Debug)]
pub(crate) enum RelayIo {
Derp(Framed<MaybeTlsStream, DerpCodec>),
Ws(WebSocketStream<MaybeTlsStream>),
}

fn tung_to_io_err(e: tungstenite::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
}

impl Sink<Frame> for RelayIo {
type Error = std::io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_ready(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_ready(cx).map_err(tung_to_io_err),
}
}

fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).start_send(item),
Self::Ws(ref mut ws) => Pin::new(ws)
.start_send(item.into_ws_message()?)
.map_err(tung_to_io_err),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_flush(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_flush(cx).map_err(tung_to_io_err),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_close(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_close(cx).map_err(tung_to_io_err),
}
}
}

impl Stream for RelayIo {
type Item = anyhow::Result<Frame>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
Self::Derp(ref mut framed) => Pin::new(framed).poll_next(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_next(cx).map(Frame::from_ws_message),
}
}
}

/// Whether or not the underlying [`tokio::net::TcpStream`] is served over Tls
#[derive(Debug)]
pub enum MaybeTlsStream {
Expand Down

0 comments on commit 500c90c

Please sign in to comment.