From 7153a38bc52a8cec877c8b874f37a37658b99370 Mon Sep 17 00:00:00 2001 From: Kasey Date: Wed, 5 Jun 2024 14:41:05 -0400 Subject: [PATCH 1/6] feat(iroh-gossip): configure the max message size (#2340) ## Description Add configuration option for `max_message_size` for `iroh-gossip::proto::Config`. This `Config` gets used in `iroh-gossip::Gossip::from_endpoint`. `iroh-docs` still uses the default 4096 bytes. The `max_message_size` configuration is useful for folks using `iroh-gossip::Gossip` as its own library. closes #2312 ## Breaking Changes Adds: `iroh-gossip::Gossip::max_message_size` - that reports the configured maximum message size for the gossip actor. Changes: `iroh_gossip::net::util::read_message` now takes a `max_message_size: usize` parameter `iroh_gossip::net::util::write_message` now takes a `max_message_size: usize` parameter `iroh_gossip::net::util::read_lp` now takes a `max_message_size: usize` parameter Removes: `iroh-gossip::proto:: MAX_MESSAGE_SIZE` const ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] All breaking changes documented. --- iroh-docs/src/engine/live.rs | 2 +- iroh-gossip/src/net.rs | 30 +++++++++++++++++++++++------- iroh-gossip/src/net/util.rs | 13 ++++++++----- iroh-gossip/src/proto/state.rs | 5 +++++ iroh-gossip/src/proto/topic.rs | 25 ++++++++++++++++++++++++- iroh-net/src/net/interfaces/bsd.rs | 2 +- 6 files changed, 62 insertions(+), 15 deletions(-) diff --git a/iroh-docs/src/engine/live.rs b/iroh-docs/src/engine/live.rs index 5c7608722b..88f4b39e22 100644 --- a/iroh-docs/src/engine/live.rs +++ b/iroh-docs/src/engine/live.rs @@ -543,7 +543,7 @@ impl LiveActor { match details .outcome .heads_received - .encode(Some(iroh_gossip::net::MAX_MESSAGE_SIZE)) + .encode(Some(self.gossip.max_message_size())) { Err(err) => warn!(?err, "Failed to encode author heads for sync report"), Ok(heads) => { diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 4083e3a113..756ccfee68 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -26,10 +26,6 @@ pub mod util; /// ALPN protocol name pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0"; -/// Maximum message size is limited currently. The limit is more-or-less arbitrary. -// TODO: Make the limit configurable. -pub const MAX_MESSAGE_SIZE: usize = 4096; - /// Channel capacity for all subscription broadcast channels (single) const SUBSCRIBE_ALL_CAP: usize = 2048; /// Channel capacity for topic subscription broadcast channels (one per topic) @@ -76,6 +72,7 @@ pub struct Gossip { to_actor_tx: mpsc::Sender, on_endpoints_tx: mpsc::Sender>, _actor_handle: Arc>>, + max_message_size: usize, } impl Gossip { @@ -94,6 +91,7 @@ impl Gossip { let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP); let me = endpoint.node_id().fmt_short(); + let max_message_size = state.max_message_size(); let actor = Actor { endpoint, state, @@ -125,9 +123,15 @@ impl Gossip { to_actor_tx, on_endpoints_tx, _actor_handle: Arc::new(actor_handle), + max_message_size, } } + /// Get the maximum message size configured for this gossip actor. + pub fn max_message_size(&self) -> usize { + self.max_message_size + } + /// Join a topic and connect to peers. /// /// @@ -427,12 +431,23 @@ impl Actor { let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP); self.conn_send_tx.insert(peer_id, send_tx.clone()); + let max_message_size = self.state.max_message_size(); + // Spawn a task for this connection let in_event_tx = self.in_event_tx.clone(); tokio::spawn( async move { debug!("connection established"); - match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await { + match connection_loop( + peer_id, + conn, + origin, + send_rx, + &in_event_tx, + max_message_size, + ) + .await + { Ok(()) => { debug!("connection closed without error") } @@ -605,6 +620,7 @@ async fn connection_loop( origin: ConnOrigin, mut send_rx: mpsc::Receiver, in_event_tx: &mpsc::Sender, + max_message_size: usize, ) -> anyhow::Result<()> { let (mut send, mut recv) = match origin { ConnOrigin::Accept => conn.accept_bi().await?, @@ -621,10 +637,10 @@ async fn connection_loop( // but the other side may still want to use it to // send data to us. Some(msg) = send_rx.recv(), if !send_rx.is_closed() => { - write_message(&mut send, &mut send_buf, &msg).await? + write_message(&mut send, &mut send_buf, &msg, max_message_size).await? } - msg = read_message(&mut recv, &mut recv_buf) => { + msg = read_message(&mut recv, &mut recv_buf, max_message_size) => { let msg = msg?; match msg { None => break, diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index 1101300292..2a45fa4961 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -11,16 +11,17 @@ use tokio::{ use crate::proto::util::TimerMap; -use super::{ProtoMessage, MAX_MESSAGE_SIZE}; +use super::ProtoMessage; /// Write a `ProtoMessage` as a length-prefixed, postcard-encoded message. pub async fn write_message( writer: &mut W, buffer: &mut BytesMut, frame: &ProtoMessage, + max_message_size: usize, ) -> Result<()> { let len = postcard::experimental::serialized_size(&frame)?; - ensure!(len < MAX_MESSAGE_SIZE); + ensure!(len < max_message_size); buffer.clear(); buffer.resize(len, 0u8); let slice = postcard::to_slice(&frame, buffer)?; @@ -33,8 +34,9 @@ pub async fn write_message( pub async fn read_message( reader: impl AsyncRead + Unpin, buffer: &mut BytesMut, + max_message_size: usize, ) -> Result> { - match read_lp(reader, buffer).await? { + match read_lp(reader, buffer, max_message_size).await? { None => Ok(None), Some(data) => { let message = postcard::from_bytes(&data)?; @@ -52,6 +54,7 @@ pub async fn read_message( pub async fn read_lp( mut reader: impl AsyncRead + Unpin, buffer: &mut BytesMut, + max_message_size: usize, ) -> Result> { let size = match reader.read_u32().await { Ok(size) => size, @@ -60,8 +63,8 @@ pub async fn read_lp( }; let mut reader = reader.take(size as u64); let size = usize::try_from(size).context("frame larger than usize")?; - if size > MAX_MESSAGE_SIZE { - bail!("Incoming message exceeds MAX_MESSAGE_SIZE"); + if size > max_message_size { + bail!("Incoming message exceeds the maximum message size of {max_message_size} bytes"); } buffer.reserve(size); loop { diff --git a/iroh-gossip/src/proto/state.rs b/iroh-gossip/src/proto/state.rs index f8b1ebd1e3..a841342014 100644 --- a/iroh-gossip/src/proto/state.rs +++ b/iroh-gossip/src/proto/state.rs @@ -196,6 +196,11 @@ impl State { .unwrap_or(false) } + /// Returns the maximum message size configured in the gossip protocol. + pub fn max_message_size(&self) -> usize { + self.config.max_message_size + } + /// Handle an [`InEvent`] /// /// This returns an iterator of [`OutEvent`]s that must be processed. diff --git a/iroh-gossip/src/proto/topic.rs b/iroh-gossip/src/proto/topic.rs index df36578dbb..0ac50d4f1f 100644 --- a/iroh-gossip/src/proto/topic.rs +++ b/iroh-gossip/src/proto/topic.rs @@ -18,6 +18,10 @@ use super::{ }; use super::{PeerData, PeerIdentity}; +/// The default maximum size in bytes for a gossip message. +/// This is a sane but arbitrary default and can be changed in the [`Config`]. +pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096; + /// Input event to the topic state handler. #[derive(Clone, Debug)] pub enum InEvent { @@ -170,13 +174,32 @@ impl IO for VecDeque> { self.push_back(event.into()) } } + /// Protocol configuration -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] pub struct Config { /// Configuration for the swarm membership layer pub membership: hyparview::Config, /// Configuration for the gossip broadcast layer pub broadcast: plumtree::Config, + /// Max message size in bytes. + /// + /// This size should be the same across a network to ensure all nodes can transmit and read large messages. + /// + /// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages. + /// + /// The default is [`DEFAULT_MAX_MESSAGE_SIZE`]. + pub max_message_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + membership: Default::default(), + broadcast: Default::default(), + max_message_size: DEFAULT_MAX_MESSAGE_SIZE, + } + } } /// The topic state maintains the swarm membership and broadcast tree for a particular topic. diff --git a/iroh-net/src/net/interfaces/bsd.rs b/iroh-net/src/net/interfaces/bsd.rs index dd6ca7e3ca..7ef0cd1eb0 100644 --- a/iroh-net/src/net/interfaces/bsd.rs +++ b/iroh-net/src/net/interfaces/bsd.rs @@ -300,7 +300,7 @@ impl WireFormat { Ok(Some(WireMessage::Route(m))) } - #[cfg(any(target_os = "openbsd",))] + #[cfg(target_os = "openbsd")] MessageType::Route => { if data.len() < self.body_off { return Err(RouteError::MessageTooShort); From 98914ee4dcdb78f7477311f933d84f4f2478e168 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 6 Jun 2024 09:02:00 +0200 Subject: [PATCH 2/6] fix(docs): prevent deadlocks with streams returned from docs actor (#2346) ## Description Fixes #2345 The iroh-docs actor loop can easily be deadlocked from the client side: If you call any RPC method that returns a stream, and the stream is longer than what the RPC layer buffers, and you call and await any other docs method *while consuming the stream*, the docs actor will deadlock. (It will only happen though if the stream is longer than the capacity of the intermediate channel that goes from the actor to the RPC layer, which is why this does not *always* happen) This is the case for all methods that return iterators. The solution is twofold: * Run single-threaded executor in iroh-docs actor loop * For actions returning iterators/streams, spawn a task on that executor to forward the store iterator into the stream, yielding when the receiver is not consuming fast enough To be able to spawn the iterators onto a task, they have to be `'static`. Which they can be - but only when operating on snapshots. So this PR fixes the potential for deadlock. It has the downside, however, that whenever calling a docs client function that returns an iterator, the current write transaction will be committed first, which has a perfomance penalty. However this is preferable to deadlocks, IMO. ## Breaking Changes ## Notes & open questions This will need tests and likely documentation of the perfomance implications. ## Change checklist - [ ] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --- iroh-docs/Cargo.toml | 6 +-- iroh-docs/src/actor.rs | 69 ++++++++++++++++++------- iroh-docs/src/store/fs.rs | 88 ++++++++++++++++---------------- iroh-docs/src/store/fs/query.rs | 27 +++++----- iroh-docs/src/store/fs/ranges.rs | 40 ++++++++++----- iroh/tests/sync.rs | 38 ++++++++++++++ 6 files changed, 174 insertions(+), 94 deletions(-) diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index 005d2e1ea3..f08c97fee0 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3.0" -futures-util = { version = "0.3.25", optional = true } +futures-util = { version = "0.3.25" } hex = "0.4" iroh-base = { version = "0.17.0", path = "../iroh-base" } iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] } @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] } strum = { version = "0.25", features = ["derive"] } tempfile = { version = "3.4" } thiserror = "1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "rt", "time", "macros"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tracing = "0.1" @@ -57,7 +57,7 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics", "engine"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"] metrics = ["dep:iroh-metrics"] engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"] diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..a48e8f55b3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -570,22 +572,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +624,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -636,13 +654,21 @@ impl Actor { } Ok(id) }), - Action::ListAuthors { reply } => iter_to_channel( - reply, - self.store + Action::ListAuthors { reply } => { + let iter = self + .store .list_authors() - .map(|a| a.map(|a| a.map(|a| a.id()))), - ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + .map(|a| a.map(|a| a.map(|a| a.id()))); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +683,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -759,7 +787,9 @@ impl Actor { .states .ensure_open(&namespace) .and_then(|_| self.store.get_many(namespace, query)); - iter_to_channel(reply, iter) + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { this.close(namespace); @@ -921,15 +951,18 @@ impl OpenReplicas { } } -fn iter_to_channel( +async fn iter_to_channel_async( channel: flume::Sender>, iter: Result>>, ) -> Result<(), SendReplyError> { match iter { - Err(err) => channel.send(Err(err)).map_err(send_reply_error)?, + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, Ok(iter) => { for item in iter { - channel.send(item).map_err(send_reply_error)?; + channel.send_async(item).await.map_err(send_reply_error)?; } } } diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..981143ca86 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -154,6 +154,22 @@ impl Store { } } + /// Get an owned read-only snapshot of the database. + /// + /// This will open a new read transaction. The read transaction won't be reused for other + /// reads. + /// + /// This has the side effect of committing any open write transaction, + /// so it can be used as a way to ensure that the data is persisted. + pub fn snapshot_owned(&mut self) -> Result { + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + Ok(tables) + } + /// Get access to the tables to read from them. /// /// The underlying transaction is a write transaction, but with a non-mut @@ -223,8 +239,6 @@ impl Store { } } -type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +311,16 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) } /// Get an author key from the store. @@ -340,19 +352,16 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let authors: Vec<_> = tables + pub fn list_authors(&mut self) -> Result>> { + let tables = self.snapshot()?; + let iter = tables .authors - .iter()? + .range::<&'static [u8; 32]>(..)? .map(|res| match res { Ok((_key, value)) => Ok(Author::from_bytes(value.value())), Err(err) => Err(err.into()), - }) - .collect(); - - Ok(authors.into_iter()) + }); + Ok(iter) } /// Import a new replica namespace. @@ -413,7 +422,8 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - QueryIterator::new(self.tables()?, namespace, query.into()) + let tables = self.snapshot_owned()?; + QueryIterator::new(tables, namespace, query.into()) } /// Get an entry by key and author. @@ -435,13 +445,8 @@ impl Store { /// Get all content hashes of all replicas in the store. pub fn content_hashes(&mut self) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; - let records = tables.records; - ContentHashesIterator::all(records) + let tables = self.snapshot_owned()?; + ContentHashesIterator::all(&tables.records) } /// Get the latest entry for each author in a namespace. @@ -870,14 +875,6 @@ impl Iterator for ParentIterator { } } -self_cell::self_cell!( - struct ContentHashesIteratorInner { - owner: RecordsTable, - #[covariant] - dependent: RecordsRange, - } -); - /// Iterator for all content hashes /// /// Note that you might get duplicate hashes. Also, the iterator will keep @@ -886,13 +883,16 @@ self_cell::self_cell!( /// Also, this represents a snapshot of the database at the time of creation. /// It nees a copy of a redb::ReadOnlyTable to be self-contained. #[derive(derive_more::Debug)] -pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner); +pub struct ContentHashesIterator { + #[debug(skip)] + range: RecordsRange<'static>, +} impl ContentHashesIterator { /// Create a new iterator over all content hashes. - pub fn all(owner: RecordsTable) -> anyhow::Result { - let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?; - Ok(Self(inner)) + pub fn all(table: &RecordsTable) -> anyhow::Result { + let range = RecordsRange::all_static(table)?; + Ok(Self { range }) } } @@ -900,7 +900,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - let v = self.0.with_dependent_mut(|_, d| d.next())?; + let v = self.range.next()?; Some(v.map(|e| e.content_hash())) } } diff --git a/iroh-docs/src/store/fs/query.rs b/iroh-docs/src/store/fs/query.rs index a73dbcd8e7..f05b4ecfb3 100644 --- a/iroh-docs/src/store/fs/query.rs +++ b/iroh-docs/src/store/fs/query.rs @@ -3,6 +3,7 @@ use iroh_base::hash::Hash; use crate::{ store::{ + fs::tables::ReadOnlyTables, util::{IndexKind, LatestPerKeySelector, SelectorRes}, AuthorFilter, KeyFilter, Query, }, @@ -12,34 +13,33 @@ use crate::{ use super::{ bounds::{ByKeyBounds, RecordsBounds}, ranges::{RecordsByKeyRange, RecordsRange}, - tables::Tables, RecordsValue, }; /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange<'static>, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(&tables.records, bounds)?; + let range = RecordsRange::with_bounds_static(&tables.records, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds( - &tables.records_by_key, - &tables.records, - bounds, - )?; + let range = + RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-docs/src/store/fs/ranges.rs b/iroh-docs/src/store/fs/ranges.rs index 9219c620ac..f28d95ae63 100644 --- a/iroh-docs/src/store/fs/ranges.rs +++ b/iroh-docs/src/store/fs/ranges.rs @@ -1,6 +1,6 @@ //! Ranges and helpers for working with [`redb`] tables -use redb::{Key, Range, ReadableTable, Table, Value}; +use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt for Range<'a, K, V #[debug("RecordsRange")] pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn all( - records: &'a impl ReadableTable, RecordsValue<'static>>, - ) -> anyhow::Result { - let range = records.range::>(..)?; - Ok(Self(range)) - } +// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>; +impl<'a> RecordsRange<'a> { pub(super) fn with_bounds( records: &'a impl ReadableTable, RecordsValue<'static>>, bounds: RecordsBounds, @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> { Ok(Self(range)) } + // /// Get the next item in the range. /// /// Omit items for which the `matcher` function returns false. @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> { } } +impl RecordsRange<'static> { + pub(super) fn all_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + ) -> anyhow::Result { + let range = records.range::>(..)?; + Ok(Self(range)) + } + pub(super) fn with_bounds_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + bounds: RecordsBounds, + ) -> anyhow::Result { + let range = records.range(bounds.as_ref())?; + Ok(Self(range)) + } +} + impl<'a> Iterator for RecordsRange<'a> { type Item = anyhow::Result; fn next(&mut self) -> Option { @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a> { - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, - by_key_range: Range<'a, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { +impl RecordsByKeyRange { pub fn with_bounds( - records_by_key_table: &'a impl ReadableTable, ()>, - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, + records_by_key_table: ReadOnlyTable, ()>, + records_table: ReadOnlyTable, RecordsValue<'static>>, bounds: ByKeyBounds, ) -> anyhow::Result { let by_key_range = records_by_key_table.range(bounds.as_ref())?; diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 556f5829a7..afa2591588 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -973,6 +973,44 @@ async fn sync_big() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg(feature = "test-utils")] +async fn test_list_docs_stream() -> Result<()> { + let node = Node::memory() + .node_discovery(iroh::node::DiscoveryConfig::None) + .relay_mode(iroh::net::relay::RelayMode::Disabled) + .spawn() + .await?; + let count = 200; + + // create docs + for _i in 0..count { + let doc = node.docs.create().await?; + doc.close().await?; + } + + // create doc stream + let mut stream = node.docs.list().await?; + + // process each doc and call into the docs actor. + // this makes sure that we don't deadlock the docs actor. + let mut i = 0; + let fut = async { + while let Some((id, _)) = stream.try_next().await.unwrap() { + let _doc = node.docs.open(id).await.unwrap().unwrap(); + i += 1; + } + }; + + tokio::time::timeout(Duration::from_secs(2), fut) + .await + .expect("not to timeout"); + + assert_eq!(i, count); + + Ok(()) +} + /// Get all entries of a document. async fn get_all(doc: &MemDoc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; From b047b28ddead8f357cb22c67c6e7ada23db5deb8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 6 Jun 2024 16:35:16 +0300 Subject: [PATCH 3/6] refactor(iroh-blobs)!: implement some collection related things on the client side (#2349) ## Description A collection is just one particular way to use a hashseq, so it feels a bit weird to have it baked in to the iroh node. With this we can move some of it into the client. This is a part of https://github.com/n0-computer/iroh/pull/2272 . We can make more similar changes once we have the batch API https://github.com/n0-computer/iroh/pull/2339 . ## Breaking Changes ## Notes & open questions Note: I closed #2272 because half of the changes in that PR are here, the other half will be part of the batch PR, and moving collections into iroh I am not convinced of yet... ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-blobs/src/export.rs | 2 +- iroh-blobs/src/format/collection.rs | 26 ++++++++-- iroh-cli/src/commands/blob.rs | 2 +- iroh/src/client/blobs.rs | 60 +++++++++++++++++----- iroh/src/client/tags.rs | 11 +++- iroh/src/node/rpc.rs | 79 +++-------------------------- iroh/src/rpc_protocol.rs | 70 ++++++++++++------------- 7 files changed, 119 insertions(+), 131 deletions(-) diff --git a/iroh-blobs/src/export.rs b/iroh-blobs/src/export.rs index 75b282fd6c..cdbda28881 100644 --- a/iroh-blobs/src/export.rs +++ b/iroh-blobs/src/export.rs @@ -46,7 +46,7 @@ pub async fn export_collection( progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { tokio::fs::create_dir_all(&outpath).await?; - let collection = Collection::load(db, &hash).await?; + let collection = Collection::load_db(db, &hash).await?; for (name, hash) in collection.into_iter() { #[allow(clippy::needless_borrow)] let path = outpath.join(pathbuf_from_name(&name)); diff --git a/iroh-blobs/src/format/collection.rs b/iroh-blobs/src/format/collection.rs index ab13572cc1..cdf4448e98 100644 --- a/iroh-blobs/src/format/collection.rs +++ b/iroh-blobs/src/format/collection.rs @@ -1,5 +1,5 @@ //! The collection type used by iroh -use std::collections::BTreeMap; +use std::{collections::BTreeMap, future::Future}; use anyhow::Context; use bao_tree::blake3; @@ -64,6 +64,12 @@ impl IntoIterator for Collection { } } +/// A simple store trait for loading blobs +pub trait SimpleStore { + /// Load a blob from the store + fn load(&self, hash: Hash) -> impl Future> + Send + '_; +} + /// Metadata for a collection /// /// This is the wire format for the metadata blob. @@ -84,7 +90,7 @@ impl Collection { /// /// To persist the collection, write all the blobs to storage, and use the /// hash of the last blob as the collection hash. - pub fn to_blobs(&self) -> impl Iterator { + pub fn to_blobs(&self) -> impl DoubleEndedIterator { let meta = CollectionMeta { header: *Self::HEADER, names: self.names(), @@ -160,11 +166,25 @@ impl Collection { Ok((collection, res, stats)) } + /// Create a new collection from a hash sequence and metadata. + pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result { + let hs = store.load(root).await?; + let hs = HashSeq::try_from(hs)?; + let meta_hash = hs.iter().next().context("empty hash seq")?; + let meta = store.load(meta_hash).await?; + let meta: CollectionMeta = postcard::from_bytes(&meta)?; + anyhow::ensure!( + meta.names.len() + 1 == hs.len(), + "names and links length mismatch" + ); + Ok(Self::from_parts(hs.into_iter(), meta)) + } + /// Load a collection from a store given a root hash /// /// This assumes that both the links and the metadata of the collection is stored in the store. /// It does not require that all child blobs are stored in the store. - pub async fn load(db: &D, root: &Hash) -> anyhow::Result + pub async fn load_db(db: &D, root: &Hash) -> anyhow::Result where D: crate::store::Map, { diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 82ea5bd4e9..cb1a9fb2e6 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -467,7 +467,7 @@ impl ListCommands { } } Self::Collections => { - let mut response = iroh.blobs.list_collections().await?; + let mut response = iroh.blobs.list_collections()?; while let Some(item) = response.next().await { let CollectionInfo { tag, diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 61d075e7fc..e1e98cae2e 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result}; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; +use genawaiter::sync::{Co, Gen}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_blobs::{ export::ExportProgress as BytesExportProgress, - format::collection::Collection, + format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, @@ -31,13 +32,12 @@ use tracing::warn; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, - BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, }; -use super::{flatten, Iroh}; +use super::{flatten, tags, Iroh}; /// Iroh blobs client. #[derive(Debug, Clone)] @@ -322,18 +322,35 @@ where /// Read the content of a collection. pub async fn get_collection(&self, hash: Hash) -> Result { - let BlobGetCollectionResponse { collection } = - self.rpc.rpc(BlobGetCollectionRequest { hash }).await??; - Ok(collection) + Collection::load(hash, self).await } /// List all collections. - pub async fn list_collections(&self) -> Result>> { - let stream = self - .rpc - .server_streaming(BlobListCollectionsRequest) - .await?; - Ok(flatten(stream)) + pub fn list_collections(&self) -> Result>> { + let this = self.clone(); + Ok(Gen::new(|co| async move { + if let Err(cause) = this.list_collections_impl(&co).await { + co.yield_(Err(cause)).await; + } + })) + } + + async fn list_collections_impl(&self, co: &Co>) -> Result<()> { + let tags = self.tags_client(); + let mut tags = tags.list_hash_seq().await?; + while let Some(tag) = tags.next().await { + let tag = tag?; + if let Ok(collection) = self.get_collection(tag.hash).await { + let info = CollectionInfo { + tag: tag.name, + hash: tag.hash, + total_blobs_count: Some(collection.len() as u64 + 1), + total_blobs_size: Some(0), + }; + co.yield_(Ok(info)).await; + } + } + Ok(()) } /// Delete a blob. @@ -366,6 +383,21 @@ where Ok(BlobStatus::Partial { size: reader.size }) } } + + fn tags_client(&self) -> tags::Client { + tags::Client { + rpc: self.rpc.clone(), + } + } +} + +impl SimpleStore for Client +where + C: ServiceConnection, +{ + async fn load(&self, hash: Hash) -> anyhow::Result { + self.read_to_bytes(hash).await + } } /// Whether to wrap the added data in a collection. @@ -929,7 +961,7 @@ mod tests { .create_collection(collection, SetTagOption::Auto, tags) .await?; - let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?; + let collections: Vec<_> = client.blobs.list_collections()?.try_collect().await?; assert_eq!(collections.len(), 1); { diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index c2d4309977..c25111e3e3 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -20,7 +20,16 @@ where { /// List all tags. pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListTagsRequest).await?; + let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// List all tags with a hash_seq format. + pub async fn list_hash_seq(&self) -> Result>> { + let stream = self + .rpc + .server_streaming(ListTagsRequest::hash_seq()) + .await?; Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index fc500b7566..56219110f1 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::BlobFormat; use iroh_blobs::{ - hashseq::parse_hash_seq, provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, @@ -33,16 +32,13 @@ use quic_rpc::{ use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; -use crate::client::blobs::{ - BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption, -}; +use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; use crate::client::tags::TagInfo; use crate::client::NodeStatus; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, @@ -95,12 +91,7 @@ impl Handler { chan.server_streaming(msg, handler, Self::blob_list_incomplete) .await } - BlobListCollections(msg) => { - chan.server_streaming(msg, handler, Self::blob_list_collections) - .await - } CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await, - BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await, ListTags(msg) => { chan.server_streaming(msg, handler, Self::blob_list_tags) .await @@ -348,39 +339,6 @@ impl Handler { Ok(()) } - async fn blob_list_collections_impl( - self, - co: &Co>, - ) -> anyhow::Result<()> { - let db = self.inner.db.clone(); - let local = self.inner.rt.clone(); - let tags = db.tags().await.unwrap(); - for item in tags { - let (name, HashAndFormat { hash, format }) = item?; - if !format.is_hash_seq() { - continue; - } - let Some(entry) = db.get(&hash).await? else { - continue; - }; - let count = local - .spawn_pinned(|| async move { - let reader = entry.data_reader().await?; - let (_collection, count) = parse_hash_seq(reader).await?; - anyhow::Ok(count) - }) - .await??; - co.yield_(Ok(CollectionInfo { - tag: name, - hash, - total_blobs_count: Some(count), - total_blobs_size: None, - })) - .await; - } - Ok(()) - } - fn blob_list( self, _msg: BlobListRequest, @@ -403,17 +361,6 @@ impl Handler { }) } - fn blob_list_collections( - self, - _msg: BlobListCollectionsRequest, - ) -> impl Stream> + Send + 'static { - Gen::new(move |co| async move { - if let Err(e) = self.blob_list_collections_impl(&co).await { - co.yield_(Err(e.into())).await; - } - }) - } - async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> { self.inner.db.set_tag(msg.name, None).await?; Ok(()) @@ -424,15 +371,16 @@ impl Handler { Ok(()) } - fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { + fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { tracing::info!("blob_list_tags"); Gen::new(|co| async move { let tags = self.inner.db.tags().await.unwrap(); #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { - tracing::info!("{:?} {} {:?}", name, hash, format); - co.yield_(TagInfo { name, hash, format }).await; + if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { + co.yield_(TagInfo { name, hash, format }).await; + } } } }) @@ -1044,21 +992,6 @@ impl Handler { Ok(CreateCollectionResponse { hash, tag }) } - - async fn blob_get_collection( - self, - req: BlobGetCollectionRequest, - ) -> RpcResult { - let hash = req.hash; - let db = self.inner.db.clone(); - let collection = self - .rt() - .spawn_pinned(move || async move { Collection::load(&db, &hash).await }) - .await - .map_err(|_| anyhow!("join failed"))??; - - Ok(BlobGetCollectionResponse { collection }) - } } async fn download( diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index ccfbc45671..8fe71e7d6a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -44,7 +44,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; use iroh_docs::engine::LiveEvent; use crate::client::{ - blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, docs::{ImportProgress, ShareMode}, tags::TagInfo, NodeStatus, @@ -205,22 +205,39 @@ impl ServerStreamingMsg for BlobListIncompleteRequest { /// /// Lists all collections that have been explicitly added to the database. #[derive(Debug, Serialize, Deserialize)] -pub struct BlobListCollectionsRequest; - -impl Msg for BlobListCollectionsRequest { - type Pattern = ServerStreaming; +pub struct ListTagsRequest { + /// List raw tags + pub raw: bool, + /// List hash seq tags + pub hash_seq: bool, +} + +impl ListTagsRequest { + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + } + } } -impl ServerStreamingMsg for BlobListCollectionsRequest { - type Response = RpcResult; -} - -/// List all collections -/// -/// Lists all collections that have been explicitly added to the database. -#[derive(Debug, Serialize, Deserialize)] -pub struct ListTagsRequest; - impl Msg for ListTagsRequest { type Pattern = ServerStreaming; } @@ -250,25 +267,6 @@ pub struct DeleteTagRequest { impl RpcMsg for DeleteTagRequest { type Response = RpcResult<()>; } - -/// Get a collection -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionRequest { - /// Hash of the collection - pub hash: Hash, -} - -impl RpcMsg for BlobGetCollectionRequest { - type Response = RpcResult; -} - -/// The response for a `BlobGetCollectionRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionResponse { - /// The collection. - pub collection: Collection, -} - /// Create a collection. #[derive(Debug, Serialize, Deserialize)] pub struct CreateCollectionRequest { @@ -1063,12 +1061,10 @@ pub enum Request { BlobExport(BlobExportRequest), BlobList(BlobListRequest), BlobListIncomplete(BlobListIncompleteRequest), - BlobListCollections(BlobListCollectionsRequest), BlobDeleteBlob(BlobDeleteBlobRequest), BlobValidate(BlobValidateRequest), BlobFsck(BlobConsistencyCheckRequest), CreateCollection(CreateCollectionRequest), - BlobGetCollection(BlobGetCollectionRequest), DeleteTag(DeleteTagRequest), ListTags(ListTagsRequest), @@ -1123,13 +1119,11 @@ pub enum Response { BlobAddPath(BlobAddPathResponse), BlobList(RpcResult), BlobListIncomplete(RpcResult), - BlobListCollections(RpcResult), BlobDownload(BlobDownloadResponse), BlobFsck(ConsistencyCheckProgress), BlobExport(BlobExportResponse), BlobValidate(ValidateProgress), CreateCollection(RpcResult), - BlobGetCollection(RpcResult), ListTags(TagInfo), DeleteTag(RpcResult<()>), From b2f0b0eb84ef8f4a9962d540805a148a103d1e2b Mon Sep 17 00:00:00 2001 From: Kasey Date: Thu, 6 Jun 2024 09:57:32 -0400 Subject: [PATCH 4/6] fix(iroh-net): return `Poll::Read(Ok(n))` when we have no relay URL or direct addresses in `poll_send` (#2322) ## Description If we have no relay URL or addresses to send transmits for a NodeID in `poll_send`, what do we do? Returning `Polling::Ready(Err(e))` causes the endpoint to error, which causes all connections to fail. If we return `Polling::Pending` (in this case), we have no mechanism for waking the waker once the `poll_send` is returned. Also, even if we wake up and continue to poll, we will attempt to send the same transmits that we know we cannot send. If we return `Polling::Ready(Ok(0))`, we will get into a loop in Quinn that attempts to keep re-sending the same transmits. However, if we report back to Quinn that we *have* sent those transmits (by returning `Polling::Ready(Ok(n))`), then Quinn will move on and attempt to send new transmits. QUIC mechanisms might cause those transmits to be re-sent when we get no ACKs for them, but eventually, the connection will time out. closes #2226 ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh-net/src/magicsock.rs | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 080e99b985..7437c87694 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -531,12 +531,20 @@ impl MagicSock { } if udp_addr.is_none() && relay_url.is_none() { - // Handle no addresses being available - warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "no UDP or relay address available for node", - ))); + // Returning an error here would lock up the entire `Endpoint`. + // + // If we returned `Poll::Pending`, the waker driving the `poll_send` will never get woken up. + // + // Our best bet here is to log an error and return `Poll::Ready(Ok(n))`. + // + // `n` is the number of consecutive transmits in this batch that are meant for the same destination (a destination that we have no addresses for, and so we can never actually send). + // + // When we return `Poll::Ready(Ok(n))`, we are effectively dropping those n messages, by lying to QUIC and saying they were sent. + // (If we returned `Poll::Ready(Ok(0))` instead, QUIC would loop to attempt to re-send those messages, blocking other traffic.) + // + // When `QUIC` gets no `ACK`s for those messages, the connection will eventually timeout. + error!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); + return Poll::Ready(Ok(n)); } if (udp_addr.is_none() || udp_pending) && (relay_url.is_none() || relay_pending) { @@ -549,14 +557,16 @@ impl MagicSock { } if !relay_sent && !udp_sent && !pings_sent { - warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); + // Returning an error here would lock up the entire `Endpoint`. + // Instead, log an error and return `Poll::Pending`, the connection will timeout. let err = udp_error.unwrap_or_else(|| { io::Error::new( io::ErrorKind::NotConnected, "no UDP or relay address available for node", ) }); - return Poll::Ready(Err(err)); + error!(node = %public_key.fmt_short(), "{err:?}"); + return Poll::Pending; } trace!( From 7198cd0f69cd0a178db3b71b7ee58ea5f285b95e Mon Sep 17 00:00:00 2001 From: Kasey Date: Thu, 6 Jun 2024 17:11:57 -0400 Subject: [PATCH 5/6] chore(ci): update clippy (#2351) ## Description ensure we use the latest version of clippy in CI ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- .github/workflows/ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 074a3bea4a..d027f93ccf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -197,6 +197,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + components: clippy - name: Install sccache uses: mozilla-actions/sccache-action@v0.0.4 From 8d91b10e25e5a8363edde3c41a1bce4f9dc7455a Mon Sep 17 00:00:00 2001 From: Floris Bruynooghe Date: Fri, 7 Jun 2024 11:31:35 +0200 Subject: [PATCH 6/6] docs(iroh-net): Update endpoint docs (#2334) ## Description This rewrites quite a bit of the docs aiming to be more consistent and clearly describe the various parts of the API and how they interact. It also tries to get the style standardised following https://github.com/rust-lang/rfcs/blob/master/text/1574-more-api-documentation-conventions.md#appendix-a-full-conventions-text The order of the functions has been deliberately changed, the order is used by rust doc as well so directly affects how users see the documentation. Some `PublicKey` types have been changed into `NodeId`. They mostly already had `node_id` as parameter names and were described as such. But no other code changes have been made. ## Breaking Changes ## Notes & open questions The sorting makes the diff rather difficult to read, sorry about that. But maybe that's not so bad for the doc comments as the result is more important than the diff. I've taken to calling the logical thing the `Endpoint` controls an "iroh-net node". This is the thing that goes into the `NodeMap` etc, so is consistent with naming. But I'm usually using "iroh-net node" in prose to distinguish it from the `iroh::node::Node`. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - ~~[ ] Tests if relevant.~~ - ~~[ ] All breaking changes documented.~~ --------- Co-authored-by: Divma <26765164+divagant-martian@users.noreply.github.com> --- iroh-net/src/endpoint.rs | 872 +++++++++++++++++++++++---------------- 1 file changed, 506 insertions(+), 366 deletions(-) diff --git a/iroh-net/src/endpoint.rs b/iroh-net/src/endpoint.rs index dd1a56d274..dd1d219569 100644 --- a/iroh-net/src/endpoint.rs +++ b/iroh-net/src/endpoint.rs @@ -1,6 +1,15 @@ -//! An endpoint that leverages a [`quinn::Endpoint`] and transparently routes packages via direct -//! conenctions or a relay when necessary, optimizing the path to target nodes to ensure maximum -//! connectivity. +//! The [`Endpoint`] allows establishing connections to other iroh-net nodes. +//! +//! The [`Endpoint`] is the main API interface to manage a local iroh-net node. It allows +//! connecting to and accepting connections from other nodes. See the [module docs] for +//! more details on how iroh-net connections work. +//! +//! The main items in this module are: +//! +//! - [`Endpoint`] to establish iroh-net connections with other nodes. +//! - [`Builder`] to create an [`Endpoint`]. +//! +//! [module docs]: crate use std::any::Any; use std::future::Future; @@ -45,11 +54,19 @@ pub use super::magicsock::{ pub use iroh_base::node_addr::{AddrInfo, NodeAddr}; -/// The delay we add before starting a discovery in [`Endpoint::connect`] if the user provided -/// new direct addresses (to try these addresses before starting the discovery). +/// The delay to fall back to discovery when direct addresses fail. +/// +/// When a connection is attempted with a [`NodeAddr`] containing direct addresses the +/// [`Endpoint`] assumes one of those addresses probably works. If after this delay there +/// is still no connection the configured [`Discovery`] will be used however. const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); -/// Builder for [Endpoint] +/// Builder for [`Endpoint`]. +/// +/// By default the endpoint will generate a new random [`SecretKey`], which will result in a +/// new [`NodeId`]. +/// +/// To create the [`Endpoint`] call [`Builder::bind`]. #[derive(Debug)] pub struct Builder { secret_key: Option, @@ -87,117 +104,139 @@ impl Default for Builder { } impl Builder { - /// Set a secret key to authenticate with other peers. + // The ordering of public methods is reflected directly in the documentation. This is + // roughly ordered by what is most commonly needed by users. + + // # The final constructor that everyone needs. + + /// Binds the magic endpoint on the specified socket address. /// - /// This secret key's public key will be the [PublicKey] of this endpoint. + /// The *bind_port* is the port that should be bound locally. + /// The port will be used to bind an IPv4 and, if supported, and IPv6 socket. + /// You can pass `0` to let the operating system choose a free port for you. /// - /// If not set, a new secret key will be generated. - pub fn secret_key(mut self, secret_key: SecretKey) -> Self { - self.secret_key = Some(secret_key); - self - } + /// NOTE: This will be improved soon to add support for binding on specific addresses. + pub async fn bind(self, bind_port: u16) -> Result { + let relay_map = match self.relay_mode { + RelayMode::Disabled => RelayMap::empty(), + RelayMode::Default => default_relay_map(), + RelayMode::Custom(relay_map) => { + ensure!(!relay_map.is_empty(), "Empty custom relay server map",); + relay_map + } + }; + let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate); + let mut server_config = make_server_config( + &secret_key, + self.alpn_protocols, + self.transport_config, + self.keylog, + )?; + if let Some(c) = self.concurrent_connections { + server_config.concurrent_connections(c); + } + let dns_resolver = self + .dns_resolver + .unwrap_or_else(|| default_resolver().clone()); - /// Set the ALPN protocols that this endpoint will accept on incoming connections. - pub fn alpns(mut self, alpn_protocols: Vec>) -> Self { - self.alpn_protocols = alpn_protocols; - self + let msock_opts = magicsock::Options { + port: bind_port, + secret_key, + relay_map, + nodes_path: self.peers_path, + discovery: self.discovery, + proxy_url: self.proxy_url, + dns_resolver, + #[cfg(any(test, feature = "test-utils"))] + insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, + }; + Endpoint::bind(Some(server_config), msock_opts, self.keylog).await } - /// Set an explicit proxy url to proxy all HTTP(S) traffic through. - pub fn proxy_url(mut self, url: Url) -> Self { - self.proxy_url.replace(url); - self - } + // # The very common methods everyone basically needs. - /// Set the proxy url from the environment, in this order: + /// Sets a secret key to authenticate with other peers. /// - /// - `HTTP_PROXY` - /// - `http_proxy` - /// - `HTTPS_PROXY` - /// - `https_proxy` - pub fn proxy_from_env(mut self) -> Self { - self.proxy_url = proxy_url_from_env(); - self - } - - /// If *keylog* is `true` and the KEYLOGFILE environment variable is present it will be - /// considered a filename to which the TLS pre-master keys are logged. This can be useful - /// to be able to decrypt captured traffic for debugging purposes. - pub fn keylog(mut self, keylog: bool) -> Self { - self.keylog = keylog; + /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus + /// also its [`NodeId`] + /// + /// If not set, a new secret key will be generated. + pub fn secret_key(mut self, secret_key: SecretKey) -> Self { + self.secret_key = Some(secret_key); self } - /// Skip verification of SSL certificates from relay servers + /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections. /// - /// May only be used in tests. - #[cfg(any(test, feature = "test-utils"))] - pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self { - self.insecure_skip_relay_cert_verify = skip_verify; + /// Not setting this will still allow creating connections, but to accept incoming + /// connections the [ALPN] must be set. + /// + /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation + pub fn alpns(mut self, alpn_protocols: Vec>) -> Self { + self.alpn_protocols = alpn_protocols; self } + // # Methods for common customisation items. + /// Sets the relay servers to assist in establishing connectivity. /// - /// relay servers are used to discover other peers by [`PublicKey`] and also help - /// establish connections between peers by being an initial relay for traffic while - /// assisting in holepunching to establish a direct connection between peers. + /// Relay servers are used to establish initial connection with another iroh-net node. + /// They also perform various functions related to hole punching, see the [crate docs] + /// for more details. + /// + /// By default the Number0 relay servers are used. /// /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one /// configured relay node. If an invalid [`RelayMap`] is provided [`bind`] /// will result in an error. /// /// [`bind`]: Builder::bind + /// [crate docs]: crate pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self { self.relay_mode = relay_mode; self } - /// Set a custom [quinn::TransportConfig] for this endpoint. + /// Optionally sets a discovery mechanism for this endpoint. /// - /// The transport config contains parameters governing the QUIC state machine. + /// If you want to combine multiple discovery services, you can pass a + /// [`crate::discovery::ConcurrentDiscovery`]. /// - /// If unset, the default config is used. Default values should be suitable for most internet - /// applications. Applications protocols which forbid remotely-initiated streams should set - /// `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to zero. - pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self { - self.transport_config = Some(transport_config); - self - } - - /// Maximum number of simultaneous connections to accept. + /// If no discovery service is set, connecting to a node without providing its + /// direct addresses or relay URLs will fail. /// - /// New incoming connections are only accepted if the total number of incoming or outgoing - /// connections is less than this. Outgoing connections are unaffected. - pub fn concurrent_connections(mut self, concurrent_connections: u32) -> Self { - self.concurrent_connections = Some(concurrent_connections); + /// See the documentation of the [`Discovery`] trait for details. + pub fn discovery(mut self, discovery: Box) -> Self { + self.discovery = Some(discovery); self } - /// Optionally set the path where peer info should be stored. + /// Optionally sets the path where peer info should be stored. /// - /// If the file exists, it will be used to populate an initial set of peers. Peers will be - /// saved periodically and on shutdown to this path. + /// If the file exists, it will be used to populate an initial set of peers. Peers will + /// be saved periodically and on shutdown to this path. pub fn peers_data_path(mut self, path: PathBuf) -> Self { self.peers_path = Some(path); self } - /// Optionally set a discovery mechanism for this endpoint. - /// - /// If you want to combine multiple discovery services, you can pass a - /// [`crate::discovery::ConcurrentDiscovery`]. + // # Methods for more specialist customisation. + + /// Sets a custom [`quinn::TransportConfig`] for this endpoint. /// - /// If no discovery service is set, connecting to a node without providing its - /// direct addresses or relay URLs will fail. + /// The transport config contains parameters governing the QUIC state machine. /// - /// See the documentation of the [`Discovery`] trait for details. - pub fn discovery(mut self, discovery: Box) -> Self { - self.discovery = Some(discovery); + /// If unset, the default config is used. Default values should be suitable for most + /// internet applications. Applications protocols which forbid remotely-initiated + /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to + /// zero. + pub fn transport_config(mut self, transport_config: quinn::TransportConfig) -> Self { + self.transport_config = Some(transport_config); self } - /// Optionally set a custom DNS resolver to use for this endpoint. + /// Optionally sets a custom DNS resolver to use for this endpoint. /// /// The DNS resolver is used to resolve relay hostnames, and node addresses if /// [`crate::discovery::dns::DnsDiscovery`] is configured. @@ -210,51 +249,55 @@ impl Builder { self } - /// Bind the magic endpoint on the specified socket address. + /// Sets an explicit proxy url to proxy all HTTP(S) traffic through. + pub fn proxy_url(mut self, url: Url) -> Self { + self.proxy_url.replace(url); + self + } + + /// Sets the proxy url from the environment, in this order: /// - /// The *bind_port* is the port that should be bound locally. - /// The port will be used to bind an IPv4 and, if supported, and IPv6 socket. - /// You can pass `0` to let the operating system choose a free port for you. - /// NOTE: This will be improved soon to add support for binding on specific addresses. - pub async fn bind(self, bind_port: u16) -> Result { - let relay_map = match self.relay_mode { - RelayMode::Disabled => RelayMap::empty(), - RelayMode::Default => default_relay_map(), - RelayMode::Custom(relay_map) => { - ensure!(!relay_map.is_empty(), "Empty custom relay server map",); - relay_map - } - }; - let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate); - let mut server_config = make_server_config( - &secret_key, - self.alpn_protocols, - self.transport_config, - self.keylog, - )?; - if let Some(c) = self.concurrent_connections { - server_config.concurrent_connections(c); - } - let dns_resolver = self - .dns_resolver - .unwrap_or_else(|| default_resolver().clone()); + /// - `HTTP_PROXY` + /// - `http_proxy` + /// - `HTTPS_PROXY` + /// - `https_proxy` + pub fn proxy_from_env(mut self) -> Self { + self.proxy_url = proxy_url_from_env(); + self + } - let msock_opts = magicsock::Options { - port: bind_port, - secret_key, - relay_map, - nodes_path: self.peers_path, - discovery: self.discovery, - proxy_url: self.proxy_url, - dns_resolver, - #[cfg(any(test, feature = "test-utils"))] - insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, - }; - Endpoint::bind(Some(server_config), msock_opts, self.keylog).await + /// Enables saving the TLS pre-master key for connections. + /// + /// This key should normally remain secret but can be useful to debug networking issues + /// by decrypting captured traffic. + /// + /// If *keylog* is `true` then setting the `KEYLOGFILE` environment variable to a + /// filename will result in this file being used to log the TLS pre-master keys. + pub fn keylog(mut self, keylog: bool) -> Self { + self.keylog = keylog; + self + } + + /// Skip verification of SSL certificates from relay servers + /// + /// May only be used in tests. + #[cfg(any(test, feature = "test-utils"))] + pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self { + self.insecure_skip_relay_cert_verify = skip_verify; + self + } + + /// Maximum number of simultaneous connections to accept. + /// + /// New incoming connections are only accepted if the total number of incoming or + /// outgoing connections is less than this. Outgoing connections are unaffected. + pub fn concurrent_connections(mut self, concurrent_connections: u32) -> Self { + self.concurrent_connections = Some(concurrent_connections); + self } } -/// Create a [`quinn::ServerConfig`] with the given secret key and limits. +/// Creates a [`quinn::ServerConfig`] with the given secret key and limits. pub fn make_server_config( secret_key: &SecretKey, alpn_protocols: Vec>, @@ -268,15 +311,28 @@ pub fn make_server_config( Ok(server_config) } -/// Iroh connectivity layer. +/// Controls an iroh-net node, establishing connections with other nodes. /// -/// This is responsible for routing packets to nodes based on node IDs, it will initially route -/// packets via a relay and transparently try and establish a node-to-node connection and upgrade -/// to it. It will also keep looking for better connections as the network details of both nodes -/// change. +/// This is the main API interface to create connections to, and accept connections from +/// other iroh-net nodes. The connections are peer-to-peer and encrypted, a Relay server is +/// used to make the connections reliable. See the [crate docs] for a more detailed +/// overview of iroh-net. /// -/// It is usually only necessary to use a single [`Endpoint`] instance in an application, it -/// means any QUIC endpoints on top will be sharing as much information about nodes as possible. +/// It is recommended to only create a single instance per application. This ensures all +/// the connections made share the same peer-to-peer connections to other iroh-net nodes, +/// while still remaining independent connections. This will result in more optimal network +/// behaviour. +/// +/// New connections are typically created using the [`Endpoint::connect`] and +/// [`Endpoint::accept`] methods. Once established, the [`Connection`] gives access to most +/// [QUIC] features. Individual streams to send data to the peer are created using the +/// [`Connection::open_bi`], [`Connection::accept_bi`], [`Connection::open_uni`] and +/// [`Connection::open_bi`] functions. +/// +/// Note that due to the light-weight properties of streams a stream will only be accepted +/// once the initiating peer has sent some data on it. +/// +/// [QUIC]: https://quicwg.org #[derive(Clone, Debug)] pub struct Endpoint { secret_key: Arc, @@ -288,12 +344,18 @@ pub struct Endpoint { } impl Endpoint { - /// Build an [`Endpoint`] + // The ordering of public methods is reflected directly in the documentation. This is + // roughly ordered by what is most commonly needed by users, but grouped in similar + // items. + + // # Methods relating to construction. + + /// Returns the builder for an [`Endpoint`]. pub fn builder() -> Builder { Builder::default() } - /// Create a quinn endpoint backed by a magicsock. + /// Creates a quinn endpoint backed by a magicsock. /// /// This is for internal use, the public interface is the [`Builder`] obtained from /// [Self::builder]. See the methods on the builder for documentation of the parameters. @@ -334,242 +396,391 @@ impl Endpoint { }) } - /// Accept an incoming connection on the socket. - pub fn accept(&self) -> Accept<'_> { - Accept { - inner: self.endpoint.accept(), - magic_ep: self.clone(), + // # Methods for establishing connectivity. + + /// Connects to a remote [`Endpoint`]. + /// + /// A [`NodeAddr`] is required. It must contain the [`NodeId`] to dial and may also + /// contain a [`RelayUrl`] and direct addresses. If direct addresses are provided, they + /// will be used to try and establish a direct connection without involving a relay + /// server. + /// + /// If neither a [`RelayUrl`] or direct addresses are configured in the [`NodeAddr`] it + /// may still be possible a connection can be established. This depends on other calls + /// to [`Endpoint::add_node_addr`] which may provide contact information, or via the + /// [`Discovery`] service configured using [`Builder::discovery`]. The discovery + /// service will also be used if the remote node is not reachable on the provided direct + /// addresses and there is no [`RelayUrl`]. + /// + /// If addresses or relay servers are neither provided nor can be discovered, the + /// connection attempt will fail with an error. + /// + /// The `alpn`, or application-level protocol identifier, is also required. The remote + /// endpoint must support this `alpn`, otherwise the connection attempt will fail with + /// an error. + pub async fn connect(&self, node_addr: NodeAddr, alpn: &[u8]) -> Result { + // Connecting to ourselves is not supported. + if node_addr.node_id == self.node_id() { + bail!( + "Connecting to ourself is not supported ({} is the node id of this node)", + node_addr.node_id.fmt_short() + ); } - } - /// Get the node id of this endpoint. - pub fn node_id(&self) -> NodeId { - self.secret_key.public() - } + if !node_addr.info.is_empty() { + self.add_node_addr(node_addr.clone())?; + } - /// Get the secret_key of this endpoint. - pub fn secret_key(&self) -> &SecretKey { - &self.secret_key - } + let NodeAddr { node_id, info } = node_addr.clone(); - /// Optional reference to the discovery mechanism. - pub fn discovery(&self) -> Option<&dyn Discovery> { - self.msock.discovery() + // Get the mapped IPv6 address from the magic socket. Quinn will connect to this address. + // Start discovery for this node if it's enabled and we have no valid or verified + // address information for this node. + let (addr, discovery) = self + .get_mapping_addr_and_maybe_start_discovery(node_addr) + .await?; + + debug!( + "connecting to {}: (via {} - {:?})", + node_id, addr, info.direct_addresses + ); + + // Start connecting via quinn. This will time out after 10 seconds if no reachable address + // is available. + let conn = self.connect_quinn(&node_id, alpn, addr).await; + + // Cancel the node discovery task (if still running). + if let Some(discovery) = discovery { + discovery.cancel(); + } + + conn } - /// Get the local endpoint addresses on which the underlying magic socket is bound. + /// Connects to a remote endpoint, using just the nodes's [`NodeId`]. /// - /// Returns a tuple of the IPv4 and the optional IPv6 address. - pub fn local_addr(&self) -> (SocketAddr, Option) { - self.msock.local_addr() + /// This is a convenience function for [`Endpoint::connect`]. It relies on addressing + /// information being provided by either the discovery service or using + /// [`Endpoint::add_node_addr`]. See [`Endpoint::connect`] for the details of how it + /// uses the discovery service to establish a connection to a remote node. + pub async fn connect_by_node_id( + &self, + node_id: &NodeId, + alpn: &[u8], + ) -> Result { + let addr = NodeAddr::new(*node_id); + self.connect(addr, alpn).await } - /// Returns the local endpoints as a stream. - /// - /// The [`Endpoint`] continuously monitors the local endpoints, the network - /// addresses it can listen on, for changes. Whenever changes are detected this stream - /// will yield a new list of endpoints. - /// - /// Upon the first creation, the first local endpoint discovery might still be underway, in - /// this case the first item of the stream will not be immediately available. Once this first - /// set of local endpoints are discovered the stream will always return the first set of - /// endpoints immediately, which are the most recently discovered endpoints. - /// - /// The list of endpoints yielded contains both the locally-bound addresses and the - /// endpoint's publicly-reachable addresses, if they could be discovered through STUN or - /// port mapping. - /// - /// # Examples - /// - /// To get the current endpoints, drop the stream after the first item was received: - /// ``` - /// use futures_lite::StreamExt; - /// use iroh_net::Endpoint; - /// - /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); - /// # rt.block_on(async move { - /// let mep = Endpoint::builder().bind(0).await.unwrap(); - /// let _endpoints = mep.local_endpoints().next().await; - /// # }); - /// ``` - pub fn local_endpoints(&self) -> LocalEndpointsStream { - self.msock.local_endpoints() + async fn connect_quinn( + &self, + node_id: &PublicKey, + alpn: &[u8], + addr: SocketAddr, + ) -> Result { + let client_config = { + let alpn_protocols = vec![alpn.to_vec()]; + let tls_client_config = tls::make_client_config( + &self.secret_key, + Some(*node_id), + alpn_protocols, + self.keylog, + )?; + let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); + let mut transport_config = quinn::TransportConfig::default(); + transport_config.keep_alive_interval(Some(Duration::from_secs(1))); + client_config.transport_config(Arc::new(transport_config)); + client_config + }; + + // TODO: We'd eventually want to replace "localhost" with something that makes more sense. + let connect = self + .endpoint + .connect_with(client_config, addr, "localhost")?; + + let connection = connect.await.context("failed connecting to provider")?; + + let rtt_msg = RttMessage::NewConnection { + connection: connection.weak_handle(), + conn_type_changes: self.conn_type_stream(node_id)?, + node_id: *node_id, + }; + if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await { + // If this actor is dead, that's not great but we can still function. + warn!("rtt-actor not reachable: {err:#}"); + } + + Ok(connection) } - /// Get the relay url we are connected to with the lowest latency. + /// Accepts an incoming connection on the endpoint. /// - /// Returns `None` if we are not connected to any relayer. - pub fn my_relay(&self) -> Option { - self.msock.my_relay() + /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted. + /// If multiple ALPNs have been configured the ALPN can be inspected before accepting + /// the connection using [`Connecting::alpn`]. + pub fn accept(&self) -> Accept<'_> { + Accept { + inner: self.endpoint.accept(), + magic_ep: self.clone(), + } } - /// Get the [`NodeAddr`] for this endpoint. + // # Methods for manipulating the internal state about other nodes. + + /// Informs this [`Endpoint`] about addresses of the iroh-net node. + /// + /// This updates the local state for the remote node. If the provided [`NodeAddr`] + /// contains a [`RelayUrl`] this will be used as the new relay server for this node. If + /// it contains any new IP endpoints they will also be stored and tried when next + /// connecting to this node. + /// + /// # Errors + /// + /// Will return an error if we attempt to add our own [`PublicKey`] to the node map. + pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> { + // Connecting to ourselves is not supported. + if node_addr.node_id == self.node_id() { + bail!( + "Adding our own address is not supported ({} is the node id of this node)", + node_addr.node_id.fmt_short() + ); + } + self.msock.add_node_addr(node_addr); + Ok(()) + } + + // # Getter methods for properties of this Endpoint itself. + + /// Returns the secret_key of this endpoint. + pub fn secret_key(&self) -> &SecretKey { + &self.secret_key + } + + /// Returns the node id of this endpoint. + /// + /// This ID is the unique addressing information of this node and other peers must know + /// it to be able to connect to this node. + pub fn node_id(&self) -> NodeId { + self.secret_key.public() + } + + /// Returns the current [`NodeAddr`] for this endpoint. + /// + /// The returned [`NodeAddr`] will have the current [`RelayUrl`] and local IP endpoints + /// as they would be returned by [`Endpoint::my_relay`] and + /// [`Endpoint::local_endpoints`]. pub async fn my_addr(&self) -> Result { let addrs = self .local_endpoints() .next() .await - .ok_or(anyhow!("No endpoints found"))?; + .ok_or(anyhow!("No IP endpoints found"))?; let relay = self.my_relay(); let addrs = addrs.into_iter().map(|x| x.addr).collect(); Ok(NodeAddr::from_parts(self.node_id(), relay, addrs)) } - /// Get the [`NodeAddr`] for this endpoint, while providing the endpoints. + /// Returns the [`NodeAddr`] for this endpoint with the provided endpoints. + /// + /// Like [`Endpoint::my_addr`] but uses the provided IP endpoints rather than those from + /// [`Endpoint::local_endpoints`]. pub fn my_addr_with_endpoints(&self, eps: Vec) -> Result { let relay = self.my_relay(); let addrs = eps.into_iter().map(|x| x.addr).collect(); Ok(NodeAddr::from_parts(self.node_id(), relay, addrs)) } - /// Watch for changes to the home relay. + /// Returns the [`RelayUrl`] of the Relay server used as home relay. + /// + /// Every endpoint has a home Relay server which it chooses as the server with the + /// lowest latency out of the configured servers provided by [`Builder::relay_mode`]. + /// This is the server other iroh-net nodes can use to reliably establish a connection + /// to this node. + /// + /// Returns `None` if we are not connected to any Relay server. + /// + /// Note that this will be `None` right after the [`Endpoint`] is created since it takes + /// some time to connect to find and connect to the home relay server. Use + /// [`Endpoint::watch_home_relay`] to wait until the home relay server is available. + pub fn my_relay(&self) -> Option { + self.msock.my_relay() + } + + /// Watches for changes to the home relay. + /// + /// If there is currently a home relay it will be yielded immediately as the first item + /// in the stream. This makes it possible to use this function to wait for the initial + /// home relay to be known. /// - /// Note that this can be used to wait for the initial home relay to be known. If the home - /// relay is known at this point, it will be the first item in the stream. + /// Note that it is not guaranteed that a home relay will ever become available. If no + /// servers are configured with [`Builder::relay_mode`] this stream will never yield an + /// item. pub fn watch_home_relay(&self) -> impl Stream { self.msock.watch_home_relay() } - /// Get information on all the nodes we have connection information about. + /// Returns the direct addresses of this [`Endpoint`]. /// - /// Includes the node's [`PublicKey`], potential relay Url, its addresses with any known - /// latency, and its [`ConnectionType`], which let's us know if we are currently communicating - /// with that node over a `Direct` (UDP) or `Relay` (relay) connection. + /// The direct addresses of the [`Endpoint`] are those that could be used by other + /// iroh-net nodes to establish direct connectivity, depending on the network + /// situation. The yielded lists of direct addresses contain both the locally-bound + /// addresses and the [`Endpoint`]'s publicly reachable addresses discovered through + /// mechanisms such as [STUN] and port mapping. Hence usually only a subset of these + /// will be applicable to a certain remote iroh-net node. /// - /// Connections are currently only pruned on user action (when we explicitly add a new address - /// to the internal addressbook through [`Endpoint::add_node_addr`]), so these connections - /// are not necessarily active connections. - pub fn connection_infos(&self) -> Vec { - self.msock.connection_infos() + /// The [`Endpoint`] continuously monitors the direct addresses for changes as its own + /// location in the network might change. Whenever changes are detected this stream + /// will yield a new list of direct addresses. + /// + /// When issuing the first call to this method the first direct address discovery might + /// still be underway, in this case the first item of the returned stream will not be + /// immediately available. Once this first set of local IP endpoints are discovered the + /// stream will always return the first set of IP endpoints immediately, which are the + /// most recently discovered IP endpoints. + /// + /// # Examples + /// + /// To get the current endpoints, drop the stream after the first item was received: + /// ``` + /// use futures_lite::StreamExt; + /// use iroh_net::Endpoint; + /// + /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); + /// # rt.block_on(async move { + /// let mep = Endpoint::builder().bind(0).await.unwrap(); + /// let _endpoints = mep.local_endpoints().next().await; + /// # }); + /// ``` + /// + /// [STUN]: https://en.wikipedia.org/wiki/STUN + pub fn local_endpoints(&self) -> LocalEndpointsStream { + self.msock.local_endpoints() } - /// Get connection information about a specific node. + /// Returns the local socket addresses on which the underlying sockets are bound. /// - /// Includes the node's [`PublicKey`], potential relay Url, its addresses with any known - /// latency, and its [`ConnectionType`], which let's us know if we are currently communicating - /// with that node over a `Direct` (UDP) or `Relay` (relay) connection. - pub fn connection_info(&self, node_id: PublicKey) -> Option { - self.msock.connection_info(node_id) + /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6 + /// address if available. + pub fn local_addr(&self) -> (SocketAddr, Option) { + self.msock.local_addr() } - pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { - self.cancel_token.cancelled() - } + // # Getter methods for information about other nodes. - /// Connect to a remote endpoint, using just the nodes's [`PublicKey`]. - pub async fn connect_by_node_id( - &self, - node_id: &PublicKey, - alpn: &[u8], - ) -> Result { - let addr = NodeAddr::new(*node_id); - self.connect(addr, alpn).await + /// Returns connection information about a specific node. + /// + /// Then [`Endpoint`] stores some information about all the other iroh-net nodes it has + /// information about. This includes information about the relay server in use, any + /// known direct addresses, when there was last any conact with this node and what kind + /// of connection this was. + pub fn connection_info(&self, node_id: NodeId) -> Option { + self.msock.connection_info(node_id) } - /// Returns a stream that reports changes in the [`ConnectionType`] for the given `node_id`. + /// Returns information on all the nodes we have connection information about. /// - /// # Errors + /// This returns the same information as [`Endpoint::connection_info`] for each node + /// known to this [`Endpoint`]. /// - /// Will error if we do not have any address information for the given `node_id` - pub fn conn_type_stream(&self, node_id: &PublicKey) -> Result { - self.msock.conn_type_stream(node_id) + /// Connections are currently only pruned on user action when using + /// [`Endpoint::add_node_addr`] so these connections are not necessarily active + /// connections. + pub fn connection_infos(&self) -> Vec { + self.msock.connection_infos() } - /// Connect to a remote endpoint. + // # Methods for less common getters. + // + // Partially they return things passed into the builder. + + /// Returns a stream that reports connection type changes for the remote node. /// - /// A [`NodeAddr`] is required. It must contain the [`NodeId`] to dial and may also contain a - /// relay URL and direct addresses. If direct addresses are provided, they will be used to - /// try and establish a direct connection without involving a relay server. + /// This returns a stream of [`ConnectionType`] items, each time the underlying + /// connection to a remote node changes it yields an item. These connection changes are + /// when the connection switches between using the Relay server and a direct connection. /// - /// The `alpn`, or application-level protocol identifier, is also required. The remote endpoint - /// must support this `alpn`, otherwise the connection attempt will fail with an error. + /// If there is currently a connection with the remote node the first item in the stream + /// will yield immediately returning the current connection type. /// - /// If the [`NodeAddr`] contains only [`NodeId`] and no direct addresses and no relay servers, - /// a discovery service will be invoked, if configured, to try and discover the node's - /// addressing information. The discovery services must be configured globally per [`Endpoint`] - /// with [`Builder::discovery`]. The discovery service will also be invoked if - /// none of the existing or provided direct addresses are reachable. + /// Note that this does not guarantee each connection change is yielded in the stream. + /// If the connection type changes several times before this stream is polled only the + /// last recorded state is returned. This can be observed e.g. right at the start of a + /// connection when the switch from a relayed to a direct connection can be so fast that + /// the relayed state is never exposed. /// - /// If addresses or relay servers are neither provided nor can be discovered, the connection - /// attempt will fail with an error. - pub async fn connect(&self, node_addr: NodeAddr, alpn: &[u8]) -> Result { - // Connecting to ourselves is not supported. - if node_addr.node_id == self.node_id() { - bail!( - "Connecting to ourself is not supported ({} is the node id of this node)", - node_addr.node_id.fmt_short() - ); - } - - if !node_addr.info.is_empty() { - self.add_node_addr(node_addr.clone())?; - } - - let NodeAddr { node_id, info } = node_addr.clone(); - - // Get the mapped IPv6 address from the magic socket. Quinn will connect to this address. - // Start discovery for this node if it's enabled and we have no valid or verified - // address information for this node. - let (addr, discovery) = self - .get_mapping_addr_and_maybe_start_discovery(node_addr) - .await?; + /// # Errors + /// + /// Will error if we do not have any address information for the given `node_id`. + pub fn conn_type_stream(&self, node_id: &NodeId) -> Result { + self.msock.conn_type_stream(node_id) + } - debug!( - "connecting to {}: (via {} - {:?})", - node_id, addr, info.direct_addresses - ); + /// Returns the DNS resolver used in this [`Endpoint`]. + /// + /// See [`Builder::discovery`]. + pub fn dns_resolver(&self) -> &DnsResolver { + self.msock.dns_resolver() + } - // Start connecting via quinn. This will time out after 10 seconds if no reachable address - // is available. - let conn = self.connect_quinn(&node_id, alpn, addr).await; + /// Returns the discovery mechanism, if configured. + /// + /// See [`Builder::dns_resolver`]. + pub fn discovery(&self) -> Option<&dyn Discovery> { + self.msock.discovery() + } - // Cancel the node discovery task (if still running). - if let Some(discovery) = discovery { - discovery.cancel(); - } + // # Methods for less common state updates. - conn + /// Notifies the system of potential network changes. + /// + /// On many systems iroh is able to detect network changes by itself, however + /// some systems like android do not expose this functionality to native code. + /// Android does however provide this functionality to Java code. This + /// function allows for notifying iroh of any potential network changes like + /// this. + /// + /// Even when the network did not change, or iroh was already able to detect + /// the network change itself, there is no harm in calling this function. + pub async fn network_change(&self) { + self.msock.network_change().await; } - async fn connect_quinn( - &self, - node_id: &PublicKey, - alpn: &[u8], - addr: SocketAddr, - ) -> Result { - let client_config = { - let alpn_protocols = vec![alpn.to_vec()]; - let tls_client_config = tls::make_client_config( - &self.secret_key, - Some(*node_id), - alpn_protocols, - self.keylog, - )?; - let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config)); - let mut transport_config = quinn::TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_secs(1))); - client_config.transport_config(Arc::new(transport_config)); - client_config - }; + // # Methods for terminating the endpoint. - // TODO: We'd eventually want to replace "localhost" with something that makes more sense. - let connect = self - .endpoint - .connect_with(client_config, addr, "localhost")?; + /// Closes the QUIC endpoint and the magic socket. + /// + /// This will close all open QUIC connections with the provided error_code and + /// reason. See [`quinn::Connection`] for details on how these are interpreted. + /// + /// It will then wait for all connections to actually be shutdown, and afterwards + /// close the magic socket. + /// + /// Returns an error if closing the magic socket failed. + /// TODO: Document error cases. + pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> { + let Endpoint { + msock, + endpoint, + cancel_token, + .. + } = self; + cancel_token.cancel(); + tracing::debug!("Closing connections"); + endpoint.close(error_code, reason); + endpoint.wait_idle().await; + // In case this is the last clone of `Endpoint`, dropping the `quinn::Endpoint` will + // make it more likely that the underlying socket is not polled by quinn anymore after this + drop(endpoint); + tracing::debug!("Connections closed"); - let connection = connect.await.context("failed connecting to provider")?; + msock.close().await?; + Ok(()) + } - let rtt_msg = RttMessage::NewConnection { - connection: connection.weak_handle(), - conn_type_changes: self.conn_type_stream(node_id)?, - node_id: *node_id, - }; - if let Err(err) = self.rtt_actor.msg_tx.send(rtt_msg).await { - // If this actor is dead, that's not great but we can still function. - warn!("rtt-actor not reachable: {err:#}"); - } + // # Remaining private methods - Ok(connection) + pub(crate) fn cancelled(&self) -> WaitForCancellationFuture<'_> { + self.cancel_token.cancelled() } /// Return the quic mapped address for this `node_id` and possibly start discovery @@ -631,77 +842,6 @@ impl Endpoint { } } - /// Inform the magic socket about addresses of the peer. - /// - /// This updates the magic socket's *netmap* with these addresses, which are used as candidates - /// when connecting to this peer (in addition to addresses obtained from a relay server). - /// - /// Note: updating the magic socket's *netmap* will also prune any connections that are *not* - /// present in the netmap. - /// - /// # Errors - /// Will return an error if we attempt to add our own [`PublicKey`] to the node map. - pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<()> { - // Connecting to ourselves is not supported. - if node_addr.node_id == self.node_id() { - bail!( - "Adding our own address is not supported ({} is the node id of this node)", - node_addr.node_id.fmt_short() - ); - } - self.msock.add_node_addr(node_addr); - Ok(()) - } - - /// Get a reference to the DNS resolver used in this [`Endpoint`]. - pub fn dns_resolver(&self) -> &DnsResolver { - self.msock.dns_resolver() - } - - /// Close the QUIC endpoint and the magic socket. - /// - /// This will close all open QUIC connections with the provided error_code and reason. See - /// [quinn::Connection] for details on how these are interpreted. - /// - /// It will then wait for all connections to actually be shutdown, and afterwards - /// close the magic socket. - /// - /// Returns an error if closing the magic socket failed. - /// TODO: Document error cases. - pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> { - let Endpoint { - msock, - endpoint, - cancel_token, - .. - } = self; - cancel_token.cancel(); - tracing::debug!("Closing connections"); - endpoint.close(error_code, reason); - endpoint.wait_idle().await; - // In case this is the last clone of `Endpoint`, dropping the `quinn::Endpoint` will - // make it more likely that the underlying socket is not polled by quinn anymore after this - drop(endpoint); - tracing::debug!("Connections closed"); - - msock.close().await?; - Ok(()) - } - - /// Call to notify the system of potential network changes. - /// - /// On many systems iroh is able to detect network changes by itself, however - /// some systems like android do not expose this functionality to native code. - /// Android does however provide this functionality to Java code. This - /// function allows for notifying iroh of any potential network changes like - /// this. - /// - /// Even when the network did not change, or iroh was already able to detect - /// the network change itself, there is no harm in calling this function. - pub async fn network_change(&self) { - self.msock.network_change().await; - } - #[cfg(test)] pub(crate) fn magic_sock(&self) -> Handle { self.msock.clone()