Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh-net): Implement websocket protocol upgrade in iroh-relay #2387

Merged
merged 28 commits into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
96c1e03
feat: Implement `websocket` protocol upgrade in iroh-relay
matheus23 Jun 19, 2024
06c77b3
chore: Replace path dep with git dep for `fastwebsockets`
matheus23 Jun 19, 2024
72ef2e3
feat: Use `tokio-tungstenite-wasm` for client ws connecting
matheus23 Jun 20, 2024
805136f
feat: Add back codepath for derp protocol on the client
matheus23 Jun 20, 2024
88af1db
chore: Remove unused imports & fastwebsockets lib
matheus23 Jun 20, 2024
ddc932c
chore: Prefer `Pin::new` over `_unpin()`
matheus23 Jun 20, 2024
3feb916
feat: Determine ws/relay protocl based on relay url scheme
matheus23 Jun 20, 2024
b6af9b2
fix: Send `Connection: upgrade` header in ws response, reintroduce tests
matheus23 Jun 20, 2024
5a7f33f
feat: Break the websocket wire protocol to avoid copies
matheus23 Jun 20, 2024
8d02bfa
refactor: Move the `RelayIo` definition
matheus23 Jun 20, 2024
97f2be2
chore: Make clippy happy
matheus23 Jun 21, 2024
6314213
feat: Add `websocket_accepts` and similar rough metrics
matheus23 Jun 21, 2024
8803fdd
fix: Don't stop at websocket control messages
matheus23 Jun 21, 2024
fdd1214
refactor: Adjust mod exposure, rename & cleanup
matheus23 Jun 21, 2024
7def324
docs: Document `Protocol` struct
matheus23 Jun 21, 2024
861fc74
chore: Write (prop)tests for new `codec.rs` code
matheus23 Jun 21, 2024
0146574
chore: Only depend on a single `tungstenite` dep
matheus23 Jun 24, 2024
da4ca5e
refactor: Make `write_to` take an `impl BufMut`
matheus23 Jun 24, 2024
43cd801
chore: Write some docs for new fns in `codec::Frame`
matheus23 Jun 24, 2024
160a2a0
feat: Clarify tracing message
matheus23 Jun 24, 2024
e99e7af
refactor: Introduce `const SUPPORTED_WEBSOCKET_VERSION`
matheus23 Jun 24, 2024
22dfe72
refactor: Better unwrap tungstenite errors
matheus23 Jun 27, 2024
b600806
refactor: Use `&mut impl BufMut` instead of `mut dst: impl BufMut`
matheus23 Jun 27, 2024
c1e4286
chore: Better docs
matheus23 Jun 27, 2024
7fa9a3a
chore: Drop `Relay*` prefix for `RelayConnReader` & writer
matheus23 Jun 27, 2024
2449d40
Merge branch 'main' into matheus23/relay-websockets
matheus23 Jun 27, 2024
d956826
refactor: Rename `use_https` to `use_tls`
matheus23 Jun 27, 2024
2f27d26
refactor: Remove `into_ws_message`, etc. and some renames
matheus23 Jun 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-blobs/src/downloader/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use iroh_net::key::SecretKey;

use crate::{
get::{db::BlobId, progress::TransferState},
util::progress::{FlumeProgressSender, IdGenerator, ProgressSender},
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
util::progress::{FlumeProgressSender, IdGenerator},
};

use super::*;
Expand Down
5 changes: 1 addition & 4 deletions iroh-blobs/src/downloader/test/dialer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
//! Implementation of [`super::Dialer`] used for testing.

use std::{
collections::HashSet,
task::{Context, Poll},
};
use std::task::{Context, Poll};

use parking_lot::RwLock;

Expand Down
1 change: 0 additions & 1 deletion iroh-blobs/src/downloader/test/getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use futures_lite::{future::Boxed as BoxFuture, FutureExt};
use parking_lot::RwLock;
use std::{sync::Arc, time::Duration};

use super::*;

Expand Down
1 change: 0 additions & 1 deletion iroh-blobs/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,6 @@ pub mod test_support {
BlockSize, ChunkRanges,
};
use futures_lite::{Stream, StreamExt};
use iroh_base::hash::Hash;
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
Expand Down
1 change: 0 additions & 1 deletion iroh-cli/src/commands/doctor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use iroh::{
};
use portable_atomic::AtomicU64;
use postcard::experimental::max_size::MaxSize;
use ratatui::backend::Backend;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, sync};

Expand Down
3 changes: 3 additions & 0 deletions iroh-net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"], optional = tr
# metrics
iroh-metrics = { version = "0.18.0", path = "../iroh-metrics", default-features = false }
strum = { version = "0.26.2", features = ["derive"] }
tungstenite = "0.23.0"
tokio-tungstenite = "0.23.1"
tokio-tungstenite-wasm = "0.3.1"

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
netlink-packet-core = "0.7.0"
Expand Down
1 change: 0 additions & 1 deletion iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,6 @@ impl NetInfo {
#[cfg(test)]
mod tests {
use anyhow::Context;
use futures_lite::StreamExt;
use iroh_test::CallOnDrop;
use rand::RngCore;

Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
pub(crate) mod client;
pub(crate) mod client_conn;
pub(crate) mod clients;
mod codec;
pub(crate) mod codec;
pub mod http;
pub mod iroh_relay;
mod map;
Expand Down
114 changes: 93 additions & 21 deletions iroh-net/src/relay/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
//! based on tailscale/derp/derp_client.go
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes;
use futures_lite::StreamExt;
use futures_lite::Stream;
use futures_sink::Sink;
use futures_util::sink::SinkExt;
use tokio::io::AsyncWrite;
use futures_util::stream::{SplitSink, SplitStream, StreamExt};
use futures_util::SinkExt;
use tokio::sync::mpsc;
use tokio_tungstenite_wasm::WebSocketStream;
use tokio_util::codec::{FramedRead, FramedWrite};
use tracing::{debug, info_span, trace, Instrument};

Expand Down Expand Up @@ -64,12 +67,12 @@ impl ClientReceiver {
}
}

type RelayReader = FramedRead<MaybeTlsStreamReader, DerpCodec>;

#[derive(derive_more::Debug)]
pub struct InnerClient {
// our local addrs
local_addr: SocketAddr,
/// Our local address, if known.
///
/// Is `None` in tests or when using websockets (because we don't control connection establishment in browsers).
local_addr: Option<SocketAddr>,
/// Channel on which to communicate to the server. The associated [`mpsc::Receiver`] will close
/// if there is ever an error writing to the server.
writer_channel: mpsc::Sender<ClientWriterMessage>,
Expand Down Expand Up @@ -123,8 +126,10 @@ impl Client {
}

/// The local address that the [`Client`] is listening on.
pub fn local_addr(&self) -> Result<SocketAddr> {
Ok(self.inner.local_addr)
///
/// `None`, when run in a testing environment or when using websockets.
pub fn local_addr(&self) -> Option<SocketAddr> {
self.inner.local_addr
}

/// Whether or not this [`Client`] is closed.
Expand Down Expand Up @@ -209,13 +214,13 @@ enum ClientWriterMessage {
///
/// Shutsdown when you send a [`ClientWriterMessage::Shutdown`], or if there is an error writing to
/// the server.
struct ClientWriter<W: AsyncWrite + Unpin + Send + 'static> {
struct ClientWriter {
recv_msgs: mpsc::Receiver<ClientWriterMessage>,
writer: FramedWrite<W, DerpCodec>,
writer: RelayConnWriter,
rate_limiter: Option<RateLimiter>,
}

impl<W: AsyncWrite + Unpin + Send + 'static> ClientWriter<W> {
impl ClientWriter {
async fn run(mut self) -> Result<()> {
while let Some(msg) = self.recv_msgs.recv().await {
match msg {
Expand Down Expand Up @@ -244,25 +249,92 @@ impl<W: AsyncWrite + Unpin + Send + 'static> ClientWriter<W> {
}
}

/// The Builder returns a [`Client`] starts a [`ClientWriter`] run task.
/// The Builder returns a [`Client`] and a started [`ClientWriter`] run task.
pub struct ClientBuilder {
secret_key: SecretKey,
reader: RelayReader,
writer: FramedWrite<MaybeTlsStreamWriter, DerpCodec>,
local_addr: SocketAddr,
reader: RelayConnReader,
writer: RelayConnWriter,
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
local_addr: Option<SocketAddr>,
}

pub(crate) enum RelayConnReader {
Derp(FramedRead<MaybeTlsStreamReader, DerpCodec>),
Ws(SplitStream<WebSocketStream>),
}

pub(crate) enum RelayConnWriter {
Derp(FramedWrite<MaybeTlsStreamWriter, DerpCodec>),
Ws(SplitSink<WebSocketStream, tokio_tungstenite_wasm::Message>),
}

fn tung_wasm_to_io_err(e: tokio_tungstenite_wasm::Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, e.to_string())
matheus23 marked this conversation as resolved.
Show resolved Hide resolved
}

impl Stream for RelayConnReader {
type Item = Result<Frame>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match *self {
Self::Derp(ref mut ws) => Pin::new(ws).poll_next(cx),
Self::Ws(ref mut ws) => match Pin::new(ws).poll_next(cx) {
Poll::Ready(Some(item)) => match Frame::from_wasm_ws_message(item) {
Some(frame) => Poll::Ready(Some(frame)),
// We filter websocket messages that don't carry `Frame`s.
None => Poll::Pending,
},
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
},
}
}
}

impl Sink<Frame> for RelayConnWriter {
type Error = std::io::Error;

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut ws) => Pin::new(ws).poll_ready(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_ready(cx).map_err(tung_wasm_to_io_err),
}
}

fn start_send(mut self: Pin<&mut Self>, item: Frame) -> Result<(), Self::Error> {
match *self {
Self::Derp(ref mut ws) => Pin::new(ws).start_send(item),
Self::Ws(ref mut ws) => Pin::new(ws)
.start_send(item.into_wasm_ws_message()?)
.map_err(tung_wasm_to_io_err),
}
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut ws) => Pin::new(ws).poll_flush(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_flush(cx).map_err(tung_wasm_to_io_err),
}
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match *self {
Self::Derp(ref mut ws) => Pin::new(ws).poll_close(cx),
Self::Ws(ref mut ws) => Pin::new(ws).poll_close(cx).map_err(tung_wasm_to_io_err),
}
}
}

impl ClientBuilder {
pub fn new(
secret_key: SecretKey,
local_addr: SocketAddr,
reader: MaybeTlsStreamReader,
writer: MaybeTlsStreamWriter,
local_addr: Option<SocketAddr>,
reader: RelayConnReader,
writer: RelayConnWriter,
) -> Self {
Self {
secret_key,
reader: FramedRead::new(reader, DerpCodec),
writer: FramedWrite::new(writer, DerpCodec),
reader,
writer,
local_addr,
}
}
Expand Down
Loading
Loading