Skip to content

Commit

Permalink
implement resource control and refactor channel opening
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 13, 2024
1 parent d724430 commit 7e6d839
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 64 deletions.
56 changes: 53 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions iroh-willow/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iroh-willow"
version = "0.14.0"
version = "0.15.0"
edition = "2021"
readme = "README.md"
description = "willow protocol implementation for iroh"
Expand All @@ -16,11 +16,11 @@ workspace = true

[dependencies]
anyhow = "1"
derive_more = { version = "1.0.0-beta.1", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref", "try_from"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
iroh-base = { version = "0.14.0", path = "../iroh-base" }
iroh-metrics = { version = "0.14.0", path = "../iroh-metrics", optional = true }
iroh-base = { version = "0.15.0", path = "../iroh-base" }
iroh-metrics = { version = "0.15.0", path = "../iroh-metrics", optional = true }
num_enum = "0.7"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
Expand All @@ -38,7 +38,7 @@ redb = { version = "2.0.0" }
tempfile = { version = "3.4" }

# net
iroh-net = { version = "0.14.0", optional = true, path = "../iroh-net" }
iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" }
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
quinn = { version = "0.10", optional = true }
Expand All @@ -52,6 +52,8 @@ once_cell = "1.19.0"
rayon = "1.10.0"
smallvec = "1.13.2"
itertools = "0.12.1"
futures-lite = "2.3.0"
futures-concurrency = "7.6.0"

[dev-dependencies]
iroh-test = { path = "../iroh-test" }
Expand Down
4 changes: 2 additions & 2 deletions iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
store::Store,
};

pub const CHANNEL_CAP: usize = 1024;
pub const INBOX_CAP: usize = 1024;

pub type SessionId = NodeId;

Expand Down Expand Up @@ -87,7 +87,7 @@ impl Notifier {

impl StoreHandle {
pub fn spawn<S: Store>(store: S, me: NodeId) -> StoreHandle {
let (tx, rx) = flume::bounded(CHANNEL_CAP);
let (tx, rx) = flume::bounded(INBOX_CAP);
// This channel only tracks wake to resume messages to coroutines, which are a sinlge u64
// per wakeup. We want to issue wake calls synchronosuly without blocking, so we use an
// unbounded channel here. The actual capacity is bounded by the number of sessions times
Expand Down
133 changes: 111 additions & 22 deletions iroh-willow/src/net.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::ensure;
use futures::TryFutureExt;
use futures_concurrency::future::TryJoin;
use iroh_base::{hash::Hash, key::NodeId};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
Expand All @@ -14,11 +15,16 @@ use crate::{
AccessChallenge, ChallengeHash, LogicalChannel, Message, CHALLENGE_HASH_LENGTH,
MAX_PAYLOAD_SIZE_POWER,
},
session::{channels::Channels, Role, SessionInit, SessionState},
util::channel::{inbound_channel, outbound_channel, Reader, Receiver, Sender, Writer},
session::{
channels::{Channels, LogicalChannelReceivers, LogicalChannelSenders},
Role, SessionInit, SessionState,
},
util::channel::{
inbound_channel, outbound_channel, Guarantees, Reader, Receiver, Sender, Writer,
},
};

const CHANNEL_CAP: usize = 1024 * 64;
pub const CHANNEL_CAP: usize = 1024 * 64;

#[instrument(skip_all, name = "willow_net", fields(me=%me.fmt_short(), peer=%peer.fmt_short()))]
pub async fn run(
Expand Down Expand Up @@ -50,30 +56,19 @@ pub async fn run(
&mut join_set,
LogicalChannel::Control,
CHANNEL_CAP,
CHANNEL_CAP,
Guarantees::Unlimited,
control_send_stream,
control_recv_stream,
);

let (mut reconciliation_send_stream, mut reconciliation_recv_stream) = match our_role {
Role::Alfie => conn.open_bi().await?,
Role::Betty => conn.accept_bi().await?,
};
reconciliation_send_stream.write_u8(0u8).await?;
reconciliation_recv_stream.read_u8().await?;
let (reconciliation_send, reconciliation_recv) = spawn_channel(
&mut join_set,
LogicalChannel::Reconciliation,
CHANNEL_CAP,
reconciliation_send_stream,
reconciliation_recv_stream,
);
let (logical_send, logical_recv) = open_logical_channels(&mut join_set, conn, our_role).await?;
debug!("channels opened");

let channels = Channels {
control_send,
control_recv,
reconciliation_send,
reconciliation_recv,
logical_send,
logical_recv,
};
let state = SessionState::new(our_role, our_nonce, received_commitment, max_payload_size);

Expand Down Expand Up @@ -115,15 +110,109 @@ async fn join_all(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<(
final_result
}

#[derive(Debug, thiserror::Error)]
#[error("missing channel: {0:?}")]
struct MissingChannel(LogicalChannel);

async fn open_logical_channels(
join_set: &mut JoinSet<anyhow::Result<()>>,
conn: quinn::Connection,
our_role: Role,
) -> anyhow::Result<(LogicalChannelSenders, LogicalChannelReceivers)> {
let cap = CHANNEL_CAP;
let channels = [LogicalChannel::Reconciliation, LogicalChannel::StaticToken];
let mut channels = match our_role {
Role::Alfie => {
channels
.map(|ch| {
let conn = conn.clone();
async move {
let ch_id = ch as u8;
let (mut send, recv) = conn.open_bi().await?;
send.write_u8(ch_id).await?;
Result::<_, anyhow::Error>::Ok((ch, Some((send, recv))))
}
})
.try_join()
.await
}
Role::Betty => {
channels
.map(|_| async {
let (send, mut recv) = conn.accept_bi().await?;
let channel_id = recv.read_u8().await?;
let channel = LogicalChannel::try_from(channel_id)?;
Result::<_, anyhow::Error>::Ok((channel, Some((send, recv))))
})
.try_join()
.await
}
}?;

let mut take_channel = |ch| {
channels
.iter_mut()
.find(|(c, _)| *c == ch)
.map(|(_, streams)| streams.take())
.flatten()
.ok_or(MissingChannel(ch))
.map(|(send_stream, recv_stream)| {
spawn_channel(
join_set,
ch,
cap,
cap,
Guarantees::Limited(0),
send_stream,
recv_stream,
)
})
};

let rec = take_channel(LogicalChannel::Reconciliation)?;
let stt = take_channel(LogicalChannel::StaticToken)?;
Ok((
LogicalChannelSenders {
reconciliation: rec.0,
static_tokens: stt.0,
},
LogicalChannelReceivers {
reconciliation: rec.1,
static_tokens: stt.1,
},
))
}

// async fn open_logical_channel(
// join_set: &mut JoinSet<anyhow::Result<()>>,
// conn: &quinn::Connection,
// ch: LogicalChannel,
// ) -> anyhow::Result<(Sender<Message>, Receiver<Message>)> {
// let (mut send_stream, recv_stream) = conn.open_bi().await?;
// send_stream.write_u8(ch as u8).await?;
// let cap = CHANNEL_CAP;
// Ok(spawn_channel(
// join_set,
// ch,
// cap,
// cap,
// Guarantees::Limited(0),
// send_stream,
// recv_stream,
// ))
// }

fn spawn_channel(
join_set: &mut JoinSet<anyhow::Result<()>>,
ch: LogicalChannel,
cap: usize,
send_cap: usize,
recv_cap: usize,
guarantees: Guarantees,
send_stream: quinn::SendStream,
recv_stream: quinn::RecvStream,
) -> (Sender<Message>, Receiver<Message>) {
let (sender, outbound_reader) = outbound_channel(cap);
let (inbound_writer, recveiver) = inbound_channel(cap);
let (sender, outbound_reader) = outbound_channel(send_cap, guarantees);
let (inbound_writer, recveiver) = inbound_channel(recv_cap);

let recv_fut = recv_loop(recv_stream, inbound_writer)
.map_err(move |e| e.context(format!("receive loop for {ch:?} failed")))
Expand Down
Loading

0 comments on commit 7e6d839

Please sign in to comment.