Skip to content

Commit

Permalink
Merge branch 'main' into eviction
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Dec 4, 2024
2 parents 691f758 + e575af2 commit ef0f0da
Show file tree
Hide file tree
Showing 12 changed files with 504 additions and 423 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ jobs:
branch: ${{ github.ref }}
max_workers: 4
netsim_branch: "main"
sim_paths: "sims/iroh_v2/iroh.json,sims/integration_v2"
sim_paths: "sims/iroh/iroh.json,sims/integration"
pr_number: ${{ github.event.pull_request.number || '' }}

codespell:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/netsim.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
branch: "main"
max_workers: 1
netsim_branch: "main"
sim_paths: "sims/iroh_v2,sims/integration_v2"
sim_paths: "sims/iroh,sims/integration"
pr_number: ""
publish_metrics: true
build_profile: "optimized-release"
Expand All @@ -53,7 +53,7 @@ jobs:
branch: ${{inputs.branch}}
max_workers: 1
netsim_branch: ${{inputs.netsim_branch}}
sim_paths: "sims/iroh_v2"
sim_paths: "sims/iroh"
pr_number: ${{inputs.pr_number}}
publish_metrics: false
build_profile: "optimized-release"
Expand Down
38 changes: 4 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ tokio-rustls = { version = "0.26", default-features = false, features = [
"ring",
] }
tokio-rustls-acme = { version = "0.6", optional = true }
tokio-tungstenite = "0.24"
tokio-tungstenite = "0.21" # avoid duplicating this dependency as long as tokio-tungstenite-wasm isn't updated
tokio-tungstenite-wasm = "0.3"
tokio-util = { version = "0.7", features = ["io-util", "io", "codec", "rt"] }
toml = { version = "0.8", optional = true }
Expand Down
56 changes: 7 additions & 49 deletions iroh-relay/src/client/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
use std::{
net::SocketAddr,
num::NonZeroU32,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use anyhow::{anyhow, bail, ensure, Context as _, Result};
use anyhow::{anyhow, bail, ensure, Result};
use bytes::Bytes;
use futures_lite::Stream;
use futures_sink::Sink;
Expand Down Expand Up @@ -179,7 +178,7 @@ fn process_incoming_frame(frame: Frame) -> Result<ReceivedMessage> {
Frame::NodeGone { node_id } => Ok(ReceivedMessage::NodeGone(node_id)),
Frame::RecvPacket { src_key, content } => {
let packet = ReceivedMessage::ReceivedPacket {
source: src_key,
remote_node_id: src_key,
data: content,
};
Ok(packet)
Expand Down Expand Up @@ -229,15 +228,14 @@ enum ConnWriterMessage {
struct ConnWriterTasks {
recv_msgs: mpsc::Receiver<ConnWriterMessage>,
writer: ConnWriter,
rate_limiter: Option<RateLimiter>,
}

impl ConnWriterTasks {
async fn run(mut self) -> Result<()> {
while let Some(msg) = self.recv_msgs.recv().await {
match msg {
ConnWriterMessage::Packet((key, bytes)) => {
send_packet(&mut self.writer, &self.rate_limiter, key, bytes).await?;
send_packet(&mut self.writer, key, bytes).await?;
}
ConnWriterMessage::Pong(data) => {
write_frame(&mut self.writer, Frame::Pong { data }, None).await?;
Expand Down Expand Up @@ -360,7 +358,7 @@ impl ConnBuilder {
}
}

async fn server_handshake(&mut self) -> Result<Option<RateLimiter>> {
async fn server_handshake(&mut self) -> Result<()> {
debug!("server_handshake: started");
let client_info = ClientInfo {
version: PROTOCOL_VERSION,
Expand All @@ -369,22 +367,18 @@ impl ConnBuilder {
crate::protos::relay::send_client_key(&mut self.writer, &self.secret_key, &client_info)
.await?;

// TODO: add some actual configuration
let rate_limiter = RateLimiter::new(0, 0)?;

debug!("server_handshake: done");
Ok(rate_limiter)
Ok(())
}

pub async fn build(mut self) -> Result<(Conn, ConnReceiver)> {
// exchange information with the server
let rate_limiter = self.server_handshake().await?;
self.server_handshake().await?;

// create task to handle writing to the server
let (writer_sender, writer_recv) = mpsc::channel(PER_CLIENT_SEND_QUEUE_DEPTH);
let writer_task = tokio::task::spawn(
ConnWriterTasks {
rate_limiter,
writer: self.writer,
recv_msgs: writer_recv,
}
Expand Down Expand Up @@ -451,7 +445,7 @@ pub enum ReceivedMessage {
/// Represents an incoming packet.
ReceivedPacket {
/// The [`NodeId`] of the packet sender.
source: NodeId,
remote_node_id: NodeId,
/// The received packet bytes.
#[debug(skip)]
data: Bytes, // TODO: ref
Expand Down Expand Up @@ -494,7 +488,6 @@ pub enum ReceivedMessage {

pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
mut writer: S,
rate_limiter: &Option<RateLimiter>,
dst: NodeId,
packet: Bytes,
) -> Result<()> {
Expand All @@ -508,43 +501,8 @@ pub(crate) async fn send_packet<S: Sink<Frame, Error = std::io::Error> + Unpin>(
dst_key: dst,
packet,
};
if let Some(rate_limiter) = rate_limiter {
if rate_limiter.check_n(frame.len()).is_err() {
tracing::debug!("dropping send: rate limit reached");
return Ok(());
}
}
writer.send(frame).await?;
writer.flush().await?;

Ok(())
}

pub(crate) struct RateLimiter {
inner: governor::DefaultDirectRateLimiter,
}

impl RateLimiter {
pub(crate) fn new(bytes_per_second: usize, bytes_burst: usize) -> Result<Option<Self>> {
if bytes_per_second == 0 || bytes_burst == 0 {
return Ok(None);
}
let bytes_per_second = NonZeroU32::new(u32::try_from(bytes_per_second)?)
.context("bytes_per_second not non-zero")?;
let bytes_burst =
NonZeroU32::new(u32::try_from(bytes_burst)?).context("bytes_burst not non-zero")?;
Ok(Some(Self {
inner: governor::RateLimiter::direct(
governor::Quota::per_second(bytes_per_second).allow_burst(bytes_burst),
),
}))
}

pub(crate) fn check_n(&self, n: usize) -> Result<()> {
let n = NonZeroU32::new(u32::try_from(n)?).context("n not non-zero")?;
match self.inner.check_n(n) {
Ok(_) => Ok(()),
Err(_) => bail!("batch cannot go through"),
}
}
}
48 changes: 36 additions & 12 deletions iroh-relay/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,12 @@ mod tests {
client_a.send(b_key, msg.clone()).await.unwrap();

let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(a_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_b received unexpected message {res:?}");
Expand All @@ -926,8 +930,12 @@ mod tests {
client_b.send(a_key, msg.clone()).await.unwrap();

let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(b_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_a received unexpected message {res:?}");
Expand Down Expand Up @@ -982,8 +990,12 @@ mod tests {
client_a.send(b_key, msg.clone()).await.unwrap();

let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(a_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_b received unexpected message {res:?}");
Expand All @@ -994,8 +1006,12 @@ mod tests {
client_b.send(a_key, msg.clone()).await.unwrap();

let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(b_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_a received unexpected message {res:?}");
Expand Down Expand Up @@ -1049,8 +1065,12 @@ mod tests {
client_a.send(b_key, msg.clone()).await.unwrap();

let res = client_b_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(a_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(a_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_b received unexpected message {res:?}");
Expand All @@ -1061,8 +1081,12 @@ mod tests {
client_b.send(a_key, msg.clone()).await.unwrap();

let res = client_a_receiver.recv().await.unwrap().unwrap();
if let ReceivedMessage::ReceivedPacket { source, data } = res {
assert_eq!(b_key, source);
if let ReceivedMessage::ReceivedPacket {
remote_node_id,
data,
} = res
{
assert_eq!(b_key, remote_node_id);
assert_eq!(msg, data);
} else {
panic!("client_a received unexpected message {res:?}");
Expand Down
3 changes: 1 addition & 2 deletions iroh-relay/src/server/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,7 @@ mod tests {

// write message from b to a
let msg = b"hello world!";
crate::client::conn::send_packet(&mut b_io, &None, node_id_a, Bytes::from_static(msg))
.await?;
crate::client::conn::send_packet(&mut b_io, node_id_a, Bytes::from_static(msg)).await?;

// get message on a's reader
let frame = recv_frame(FrameType::RecvPacket, &mut a_io).await?;
Expand Down
6 changes: 3 additions & 3 deletions iroh-relay/src/server/client_conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ mod tests {
// send packet
println!(" send packet");
let data = b"hello world!";
conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?;
conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?;
let msg = server_channel_r.recv().await.unwrap();
match msg {
actor::Message::SendPacket {
Expand All @@ -640,7 +640,7 @@ mod tests {
let mut disco_data = disco::MAGIC.as_bytes().to_vec();
disco_data.extend_from_slice(target.as_bytes());
disco_data.extend_from_slice(data);
conn::send_packet(&mut io_rw, &None, target, disco_data.clone().into()).await?;
conn::send_packet(&mut io_rw, target, disco_data.clone().into()).await?;
let msg = server_channel_r.recv().await.unwrap();
match msg {
actor::Message::SendDiscoPacket {
Expand Down Expand Up @@ -698,7 +698,7 @@ mod tests {
let data = b"hello world!";
let target = SecretKey::generate().public();

conn::send_packet(&mut io_rw, &None, target, Bytes::from_static(data)).await?;
conn::send_packet(&mut io_rw, target, Bytes::from_static(data)).await?;
let msg = server_channel_r.recv().await.unwrap();
match msg {
actor::Message::SendPacket {
Expand Down
Loading

0 comments on commit ef0f0da

Please sign in to comment.