From 79071f4d871598fb2390cc9d07fe2b48da85c06b Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 18:37:05 +0200 Subject: [PATCH 01/10] feat: custom protocols --- iroh/src/node.rs | 16 +++++++++-- iroh/src/node/builder.rs | 60 ++++++++++++++++++++++++++++++++++++--- iroh/src/node/protocol.rs | 19 +++++++++++++ 3 files changed, 89 insertions(+), 6 deletions(-) create mode 100644 iroh/src/node/protocol.rs diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 3b9173c706..0b4f3d9a0c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -5,8 +5,8 @@ //! To shut down the node, call [`Node::shutdown`]. use std::fmt::Debug; use std::net::SocketAddr; -use std::path::Path; use std::sync::Arc; +use std::{any::Any, path::Path}; use anyhow::{anyhow, Result}; use futures_lite::StreamExt; @@ -23,14 +23,16 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::debug; -use crate::client::RpcService; +use crate::{client::RpcService, node::builder::ProtocolMap}; mod builder; +mod protocol; mod rpc; mod rpc_status; pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; +pub use protocol::Protocol; /// A server which implements the iroh node. /// @@ -47,6 +49,7 @@ pub struct Node { inner: Arc>, task: Arc>, client: crate::client::MemIroh, + protocols: ProtocolMap, } #[derive(derive_more::Debug)] @@ -150,6 +153,15 @@ impl Node { self.inner.endpoint.my_relay() } + /// Returns the protocol handler for a alpn. + pub fn get_protocol(&self, alpn: &[u8]) -> Option> { + let protocols = self.protocols.read().unwrap(); + let protocol: Arc = protocols.get(alpn)?.clone(); + let protocol_any: Arc = protocol.as_arc_any(); + let protocol_ref = Arc::downcast(protocol_any).ok()?; + Some(protocol_ref) + } + /// Aborts the node. /// /// This does not gracefully terminate currently: all connections are closed and diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index db935479f2..2e1f38ed25 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -1,8 +1,8 @@ use std::{ - collections::BTreeSet, + collections::{BTreeSet, HashMap}, net::{Ipv4Addr, SocketAddrV4}, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, RwLock}, time::Duration, }; @@ -28,11 +28,13 @@ use quic_rpc::{ RpcServer, ServiceEndpoint, }; use serde::{Deserialize, Serialize}; +use tokio::sync::oneshot; use tokio_util::{sync::CancellationToken, task::LocalPoolHandle}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::RPC_ALPN, + node::Protocol, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; @@ -54,6 +56,9 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; +pub(super) type ProtocolMap = Arc>>>; +type ProtocolBuilders = Vec<(&'static [u8], Box) -> Arc>)>; + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -84,6 +89,7 @@ where dns_resolver: Option, node_discovery: DiscoveryConfig, docs_store: iroh_docs::store::fs::Store, + protocols: ProtocolBuilders, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -133,6 +139,7 @@ impl Default for Builder { rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, docs_store: iroh_docs::store::Store::memory(), + protocols: Default::default(), node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -160,6 +167,7 @@ impl Builder { gc_policy: GcPolicy::Disabled, docs_store, node_discovery: Default::default(), + protocols: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, gc_done_callback: None, @@ -223,6 +231,7 @@ where gc_policy: self.gc_policy, docs_store, node_discovery: self.node_discovery, + protocols: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, gc_done_callback: self.gc_done_callback, @@ -244,6 +253,7 @@ where gc_policy: self.gc_policy, docs_store: self.docs_store, node_discovery: self.node_discovery, + protocols: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, gc_done_callback: self.gc_done_callback, @@ -270,6 +280,7 @@ where gc_policy: self.gc_policy, docs_store: self.docs_store, node_discovery: self.node_discovery, + protocols: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, gc_done_callback: self.gc_done_callback, @@ -343,6 +354,16 @@ where self } + /// Accept a custom protocol. + pub fn accept( + mut self, + alpn: &'static [u8], + protocol: impl FnOnce(Node) -> Arc + 'static, + ) -> Self { + self.protocols.push((alpn, Box::new(protocol))); + self + } + /// Register a callback for when GC is done. #[cfg(any(test, feature = "test-utils"))] pub fn register_gc_done_cb(mut self, cb: Box) -> Self { @@ -481,6 +502,8 @@ where let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); + let protocols = Arc::new(RwLock::new(HashMap::new())); + let inner = Arc::new(NodeInner { db: self.blobs_store, endpoint: endpoint.clone(), @@ -492,7 +515,9 @@ where sync, downloader, }); + let (ready_tx, ready_rx) = oneshot::channel(); let task = { + let protocols = Arc::clone(&protocols); let gossip = gossip.clone(); let handler = rpc::Handler { inner: inner.clone(), @@ -501,8 +526,11 @@ where let ep = endpoint.clone(); tokio::task::spawn( async move { + // Wait until the protocol builders have run. + ready_rx.await.expect("cannot fail"); Self::run( ep, + protocols, handler, self.rpc_endpoint, internal_rpc, @@ -518,8 +546,17 @@ where inner, task: Arc::new(task), client, + protocols, }; + for (alpn, p) in self.protocols { + let protocol = p(node.clone()); + node.protocols.write().unwrap().insert(alpn, protocol); + } + + // Notify the run task that the protocols are now built. + ready_tx.send(()).expect("cannot fail"); + // spawn a task that updates the gossip endpoints. // TODO: track task let mut stream = endpoint.local_endpoints(); @@ -545,6 +582,7 @@ where #[allow(clippy::too_many_arguments)] async fn run( server: Endpoint, + protocols: ProtocolMap, handler: rpc::Handler, rpc: E, internal_rpc: impl ServiceEndpoint, @@ -615,8 +653,9 @@ where let gossip = gossip.clone(); let inner = handler.inner.clone(); let sync = handler.inner.sync.clone(); + let protocols = protocols.clone(); tokio::task::spawn(async move { - if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await { + if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync, protocols).await { warn!("Handling incoming connection ended with error: {err}"); } }); @@ -738,6 +777,7 @@ async fn handle_connection( node: Arc>, gossip: Gossip, sync: DocsEngine, + protocols: ProtocolMap, ) -> Result<()> { match alpn.as_bytes() { GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, @@ -752,7 +792,19 @@ async fn handle_connection( ) .await } - _ => bail!("ignoring connection: unsupported ALPN protocol"), + alpn => { + let protocol = { + let protocols = protocols.read().unwrap(); + protocols.get(alpn).cloned() + }; + if let Some(protocol) = protocol { + drop(protocols); + let connection = connecting.await?; + protocol.accept(connection).await?; + } else { + bail!("ignoring connection: unsupported ALPN protocol"); + } + } } Ok(()) } diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs new file mode 100644 index 0000000000..4dc7dbb29d --- /dev/null +++ b/iroh/src/node/protocol.rs @@ -0,0 +1,19 @@ +use std::{any::Any, fmt, future::Future, pin::Pin, sync::Arc}; + +use iroh_net::endpoint::Connection; + +/// Trait for iroh protocol handlers. +pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static { + /// Return `self` as `dyn Any`. + /// + /// Implementations can simply return `self` here. + fn as_arc_any(self: Arc) -> Arc; + + /// Accept an incoming connection. + /// + /// This runs on a freshly spawned tokio task so this can be long-running. + fn accept( + &self, + conn: Connection, + ) -> Pin> + 'static + Send + Sync>>; +} From aa1d78a861cbda50d217fba0e7e57e4ca8c87b57 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 19:02:02 +0200 Subject: [PATCH 02/10] fixes and add example --- iroh/Cargo.toml | 4 + iroh/examples/custom-protocol.rs | 134 +++++++++++++++++++++++++++++++ iroh/src/node/builder.rs | 8 +- iroh/src/node/protocol.rs | 11 ++- 4 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 iroh/examples/custom-protocol.rs diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index aac6f9a645..5130f336c2 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -101,3 +101,7 @@ required-features = ["examples"] [[example]] name = "client" required-features = ["examples"] + +[[example]] +name = "custom-protocol" +required-features = ["examples"] diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs new file mode 100644 index 0000000000..e67896f76b --- /dev/null +++ b/iroh/examples/custom-protocol.rs @@ -0,0 +1,134 @@ +use std::{any::Any, fmt, sync::Arc}; + +use anyhow::Result; +use clap::Parser; +use futures_lite::future::Boxed; +use iroh::{ + blobs::store::Store, + net::{ + endpoint::{get_remote_node_id, Connection}, + NodeId, + }, + node::{Node, Protocol}, +}; +use tracing_subscriber::{prelude::*, EnvFilter}; + +const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; + +#[derive(Debug, Parser)] +pub struct Cli { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +pub enum Command { + Accept, + Connect { node: NodeId }, +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + let args = Cli::parse(); + // create a new node + let node = iroh::node::Node::memory() + .accept(EXAMPLE_ALPN, |node| ExampleProtocol::build(node)) + .spawn() + .await?; + + // print the ticket if this is the accepting side + match args.command { + Command::Accept => { + let node_id = node.node_id(); + println!("node id: {node_id}"); + // wait until ctrl-c + tokio::signal::ctrl_c().await?; + } + Command::Connect { node: node_id } => { + let proto = ExampleProtocol::from_node(&node, EXAMPLE_ALPN).expect("it is registered"); + proto.connect(node_id).await?; + } + } + + node.shutdown().await?; + + Ok(()) +} + +#[derive(Debug)] +struct ExampleProtocol { + node: Node, +} + +impl Protocol for ExampleProtocol { + fn as_arc_any(self: Arc) -> Arc { + self + } + + fn accept(self: Arc, conn: quinn::Connection) -> Boxed> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} + +impl ExampleProtocol { + fn build(node: Node) -> Arc { + Arc::new(Self { node }) + } + + fn from_node(node: &Node, alpn: &'static [u8]) -> Option> { + node.get_protocol::>(alpn) + } + + async fn handle_connection(&self, conn: Connection) -> Result<()> { + let remote_node_id = get_remote_node_id(&conn)?; + println!("accepting new connection from {remote_node_id}"); + let mut send_stream = conn.open_uni().await?; + println!("stream open!"); + // not that this is something that you wanted to do, but let's create a new blob for each + // incoming connection. this could be any mechanism, but we want to demonstrate how to use a + // custom protocol together with built-in iroh functionality + let content = format!("this blob is created for my beloved peer {remote_node_id} ♥"); + let hash = self + .node + .blobs() + .add_bytes(content.as_bytes().to_vec()) + .await?; + // send the hash over our custom proto + send_stream.write_all(hash.hash.as_bytes()).await?; + send_stream.finish().await?; + Ok(()) + } + + pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> { + println!("connecting to {remote_node_id}"); + let conn = self + .node + .endpoint() + .connect_by_node_id(&remote_node_id, EXAMPLE_ALPN) + .await?; + let mut recv_stream = conn.accept_uni().await?; + let hash_bytes = recv_stream.read_to_end(32).await?; + let hash = iroh::blobs::Hash::from_bytes(*(&hash_bytes.try_into().unwrap())); + println!("received hash: {hash}"); + self.node + .blobs() + .download(hash, remote_node_id.into()) + .await? + .await?; + println!("blob downloaded"); + let content = self.node.blobs().read_to_bytes(hash).await?; + let message = String::from_utf8(content.to_vec())?; + println!("blob content: {message}"); + Ok(()) + } +} + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 2e1f38ed25..c0668c7843 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -422,10 +422,16 @@ where } }; + let alpns = PROTOCOLS + .iter() + .chain(self.protocols.iter().map(|(alpn, _)| alpn)) + .map(|p| p.to_vec()) + .collect(); + let endpoint = Endpoint::builder() .secret_key(self.secret_key.clone()) .proxy_from_env() - .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) + .alpns(alpns) .keylog(self.keylog) .transport_config(transport_config) .concurrent_connections(MAX_CONNECTIONS) diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 4dc7dbb29d..55046952e6 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,9 +1,11 @@ -use std::{any::Any, fmt, future::Future, pin::Pin, sync::Arc}; +use std::{any::Any, fmt, sync::Arc}; +use anyhow::Result; +use futures_lite::future::Boxed; use iroh_net::endpoint::Connection; /// Trait for iroh protocol handlers. -pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static { +pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static { /// Return `self` as `dyn Any`. /// /// Implementations can simply return `self` here. @@ -12,8 +14,5 @@ pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static { /// Accept an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept( - &self, - conn: Connection, - ) -> Pin> + 'static + Send + Sync>>; + fn accept(self: Arc, conn: Connection) -> Boxed>; } From 62dda4812798ca14b2daf976de7e94c4a4c088a7 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 19:08:35 +0200 Subject: [PATCH 03/10] improve example --- iroh/examples/custom-protocol.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index e67896f76b..ded75add1a 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -82,9 +82,8 @@ impl ExampleProtocol { async fn handle_connection(&self, conn: Connection) -> Result<()> { let remote_node_id = get_remote_node_id(&conn)?; - println!("accepting new connection from {remote_node_id}"); + println!("accepted connection from {remote_node_id}"); let mut send_stream = conn.open_uni().await?; - println!("stream open!"); // not that this is something that you wanted to do, but let's create a new blob for each // incoming connection. this could be any mechanism, but we want to demonstrate how to use a // custom protocol together with built-in iroh functionality @@ -97,10 +96,12 @@ impl ExampleProtocol { // send the hash over our custom proto send_stream.write_all(hash.hash.as_bytes()).await?; send_stream.finish().await?; + println!("closing connection from {remote_node_id}"); Ok(()) } pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> { + println!("our node id: {}", self.node.node_id()); println!("connecting to {remote_node_id}"); let conn = self .node From 2b149866018b326885edde4d57a0fd6daf5774d8 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 19:12:43 +0200 Subject: [PATCH 04/10] improve example --- iroh/examples/custom-protocol.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index ded75add1a..2516eef8d7 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -13,8 +13,6 @@ use iroh::{ }; use tracing_subscriber::{prelude::*, EnvFilter}; -const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; - #[derive(Debug, Parser)] pub struct Cli { #[clap(subcommand)] @@ -33,7 +31,7 @@ async fn main() -> Result<()> { let args = Cli::parse(); // create a new node let node = iroh::node::Node::memory() - .accept(EXAMPLE_ALPN, |node| ExampleProtocol::build(node)) + .accept(ExampleProto::ALPN, |node| ExampleProto::build(node)) .spawn() .await?; @@ -46,7 +44,7 @@ async fn main() -> Result<()> { tokio::signal::ctrl_c().await?; } Command::Connect { node: node_id } => { - let proto = ExampleProtocol::from_node(&node, EXAMPLE_ALPN).expect("it is registered"); + let proto = ExampleProto::get_from_node(&node, EXAMPLE_ALPN).expect("it is registered"); proto.connect(node_id).await?; } } @@ -57,11 +55,11 @@ async fn main() -> Result<()> { } #[derive(Debug)] -struct ExampleProtocol { +struct ExampleProto { node: Node, } -impl Protocol for ExampleProtocol { +impl Protocol for ExampleProto { fn as_arc_any(self: Arc) -> Arc { self } @@ -71,13 +69,15 @@ impl Protocol for ExampleProtocol { } } -impl ExampleProtocol { +impl ExampleProto { + const ALPN: &'static [u8] = b"example-proto/0"; + fn build(node: Node) -> Arc { Arc::new(Self { node }) } - fn from_node(node: &Node, alpn: &'static [u8]) -> Option> { - node.get_protocol::>(alpn) + fn get_from_node(node: &Node, alpn: &'static [u8]) -> Option> { + node.get_protocol::>(alpn) } async fn handle_connection(&self, conn: Connection) -> Result<()> { @@ -100,7 +100,7 @@ impl ExampleProtocol { Ok(()) } - pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> { + async fn connect(&self, remote_node_id: NodeId) -> Result<()> { println!("our node id: {}", self.node.node_id()); println!("connecting to {remote_node_id}"); let conn = self From 9261259f0bfb50a02187c0c29b553b61c2b04491 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 19:17:12 +0200 Subject: [PATCH 05/10] make builder send again --- iroh/src/node/builder.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index c0668c7843..b7f5a9aded 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -57,7 +57,10 @@ const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; pub(super) type ProtocolMap = Arc>>>; -type ProtocolBuilders = Vec<(&'static [u8], Box) -> Arc>)>; +type ProtocolBuilders = Vec<( + &'static [u8], + Box) -> Arc + Send + 'static>, +)>; /// Builder for the [`Node`]. /// @@ -358,7 +361,7 @@ where pub fn accept( mut self, alpn: &'static [u8], - protocol: impl FnOnce(Node) -> Arc + 'static, + protocol: impl FnOnce(Node) -> Arc + Send + 'static, ) -> Self { self.protocols.push((alpn, Box::new(protocol))); self From 85e40de75b113bad4db4d4780ed1add6e51e735f Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Tue, 11 Jun 2024 19:22:17 +0200 Subject: [PATCH 06/10] fix & clippy --- iroh/examples/custom-protocol.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 2516eef8d7..f898b75ed3 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -31,7 +31,7 @@ async fn main() -> Result<()> { let args = Cli::parse(); // create a new node let node = iroh::node::Node::memory() - .accept(ExampleProto::ALPN, |node| ExampleProto::build(node)) + .accept(EXAMPLE_ALPN, |node| ExampleProto::build(node)) .spawn() .await?; @@ -54,6 +54,8 @@ async fn main() -> Result<()> { Ok(()) } +const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; + #[derive(Debug)] struct ExampleProto { node: Node, @@ -70,8 +72,6 @@ impl Protocol for ExampleProto { } impl ExampleProto { - const ALPN: &'static [u8] = b"example-proto/0"; - fn build(node: Node) -> Arc { Arc::new(Self { node }) } @@ -110,7 +110,7 @@ impl ExampleProto { .await?; let mut recv_stream = conn.accept_uni().await?; let hash_bytes = recv_stream.read_to_end(32).await?; - let hash = iroh::blobs::Hash::from_bytes(*(&hash_bytes.try_into().unwrap())); + let hash = iroh::blobs::Hash::from_bytes(hash_bytes.try_into().unwrap()); println!("received hash: {hash}"); self.node .blobs() From ee043e5df29f91e767e2ae9c4c8b4b5d80a04583 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 12 Jun 2024 17:49:34 +0200 Subject: [PATCH 07/10] cleanups and PR review --- iroh/examples/custom-protocol.rs | 10 +++++----- iroh/src/node/builder.rs | 4 +--- iroh/src/node/protocol.rs | 8 ++++---- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index f898b75ed3..5507e3b098 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -2,11 +2,11 @@ use std::{any::Any, fmt, sync::Arc}; use anyhow::Result; use clap::Parser; -use futures_lite::future::Boxed; +use futures_lite::future::Boxed as BoxedFuture; use iroh::{ blobs::store::Store, net::{ - endpoint::{get_remote_node_id, Connection}, + endpoint::{get_remote_node_id, Connecting, Connection}, NodeId, }, node::{Node, Protocol}, @@ -54,7 +54,7 @@ async fn main() -> Result<()> { Ok(()) } -const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; +const EXAMPLE_ALPN: &[u8] = b"example-proto/0"; #[derive(Debug)] struct ExampleProto { @@ -66,8 +66,8 @@ impl Protocol for ExampleProto { self } - fn accept(self: Arc, conn: quinn::Connection) -> Boxed> { - Box::pin(async move { self.handle_connection(conn).await }) + fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { self.handle_connection(conn.await?).await }) } } diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index b7f5a9aded..941ff8915f 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -807,9 +807,7 @@ async fn handle_connection( protocols.get(alpn).cloned() }; if let Some(protocol) = protocol { - drop(protocols); - let connection = connecting.await?; - protocol.accept(connection).await?; + protocol.handle_connection(connecting).await?; } else { bail!("ignoring connection: unsupported ALPN protocol"); } diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 55046952e6..dd9db9d84c 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,8 +1,8 @@ use std::{any::Any, fmt, sync::Arc}; use anyhow::Result; -use futures_lite::future::Boxed; -use iroh_net::endpoint::Connection; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_net::endpoint::Connecting; /// Trait for iroh protocol handlers. pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static { @@ -11,8 +11,8 @@ pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static { /// Implementations can simply return `self` here. fn as_arc_any(self: Arc) -> Arc; - /// Accept an incoming connection. + /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept(self: Arc, conn: Connection) -> Boxed>; + fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture>; } From db3513638382c0fab7961406516032ab59138f28 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 12 Jun 2024 18:07:01 +0200 Subject: [PATCH 08/10] improve code structure --- Cargo.lock | 1 + iroh/Cargo.toml | 1 + iroh/examples/custom-protocol.rs | 4 --- iroh/src/node.rs | 16 +++++----- iroh/src/node/builder.rs | 48 ++++++++++++---------------- iroh/src/node/protocol.rs | 54 +++++++++++++++++++++++++++----- 6 files changed, 76 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a63e49d931..ef3fa7ca76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2437,6 +2437,7 @@ dependencies = [ "iroh-quinn", "iroh-test", "num_cpus", + "once_cell", "parking_lot", "portable-atomic", "postcard", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 5130f336c2..5462a5f2ff 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -35,6 +35,7 @@ num_cpus = { version = "1.15.0" } portable-atomic = "1" iroh-docs = { version = "0.18.0", path = "../iroh-docs" } iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" } +once_cell = "1.18.0" parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 5507e3b098..be68c7e44a 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -62,10 +62,6 @@ struct ExampleProto { } impl Protocol for ExampleProto { - fn as_arc_any(self: Arc) -> Arc { - self - } - fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture> { Box::pin(async move { self.handle_connection(conn.await?).await }) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 0b4f3d9a0c..36cf4705a9 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -5,8 +5,8 @@ //! To shut down the node, call [`Node::shutdown`]. use std::fmt::Debug; use std::net::SocketAddr; +use std::path::Path; use std::sync::Arc; -use std::{any::Any, path::Path}; use anyhow::{anyhow, Result}; use futures_lite::StreamExt; @@ -16,6 +16,7 @@ use iroh_blobs::store::Store as BaoStore; use iroh_docs::engine::Engine; use iroh_net::util::AbortingJoinHandle; use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint}; +use once_cell::sync::OnceCell; use quic_rpc::transport::flume::FlumeConnection; use quic_rpc::RpcClient; use tokio::task::JoinHandle; @@ -23,7 +24,7 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::debug; -use crate::{client::RpcService, node::builder::ProtocolMap}; +use crate::{client::RpcService, node::protocol::ProtocolMap}; mod builder; mod protocol; @@ -47,7 +48,7 @@ pub use protocol::Protocol; #[derive(Debug, Clone)] pub struct Node { inner: Arc>, - task: Arc>, + task: Arc>>, client: crate::client::MemIroh, protocols: ProtocolMap, } @@ -155,11 +156,7 @@ impl Node { /// Returns the protocol handler for a alpn. pub fn get_protocol(&self, alpn: &[u8]) -> Option> { - let protocols = self.protocols.read().unwrap(); - let protocol: Arc = protocols.get(alpn)?.clone(); - let protocol_any: Arc = protocol.as_arc_any(); - let protocol_ref = Arc::downcast(protocol_any).ok()?; - Some(protocol_ref) + self.protocols.get(alpn) } /// Aborts the node. @@ -173,7 +170,8 @@ impl Node { pub async fn shutdown(self) -> Result<()> { self.inner.cancel_token.cancel(); - if let Ok(task) = Arc::try_unwrap(self.task) { + if let Ok(mut task) = Arc::try_unwrap(self.task) { + let task = task.take().expect("cannot be empty"); task.await?; } Ok(()) diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 941ff8915f..34724b64e1 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -1,8 +1,8 @@ use std::{ - collections::{BTreeSet, HashMap}, + collections::BTreeSet, net::{Ipv4Addr, SocketAddrV4}, path::{Path, PathBuf}, - sync::{Arc, RwLock}, + sync::Arc, time::Duration, }; @@ -28,13 +28,12 @@ use quic_rpc::{ RpcServer, ServiceEndpoint, }; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; use tokio_util::{sync::CancellationToken, task::LocalPoolHandle}; use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::RPC_ALPN, - node::Protocol, + node::{protocol::ProtocolMap, Protocol}, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; @@ -56,7 +55,6 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; -pub(super) type ProtocolMap = Arc>>>; type ProtocolBuilders = Vec<( &'static [u8], Box) -> Arc + Send + 'static>, @@ -511,7 +509,7 @@ where let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); - let protocols = Arc::new(RwLock::new(HashMap::new())); + let protocols = ProtocolMap::default(); let inner = Arc::new(NodeInner { db: self.blobs_store, @@ -524,9 +522,21 @@ where sync, downloader, }); - let (ready_tx, ready_rx) = oneshot::channel(); + + let node = Node { + inner: inner.clone(), + task: Default::default(), + client, + protocols: protocols.clone(), + }; + + for (alpn, p) in self.protocols { + let protocol = p(node.clone()); + protocols.insert(alpn, protocol); + } + let task = { - let protocols = Arc::clone(&protocols); + let protocols = protocols.clone(); let gossip = gossip.clone(); let handler = rpc::Handler { inner: inner.clone(), @@ -535,8 +545,6 @@ where let ep = endpoint.clone(); tokio::task::spawn( async move { - // Wait until the protocol builders have run. - ready_rx.await.expect("cannot fail"); Self::run( ep, protocols, @@ -551,20 +559,7 @@ where ) }; - let node = Node { - inner, - task: Arc::new(task), - client, - protocols, - }; - - for (alpn, p) in self.protocols { - let protocol = p(node.clone()); - node.protocols.write().unwrap().insert(alpn, protocol); - } - - // Notify the run task that the protocols are now built. - ready_tx.send(()).expect("cannot fail"); + node.task.set(task).expect("was empty"); // spawn a task that updates the gossip endpoints. // TODO: track task @@ -802,10 +797,7 @@ async fn handle_connection( .await } alpn => { - let protocol = { - let protocols = protocols.read().unwrap(); - protocols.get(alpn).cloned() - }; + let protocol = protocols.get_any(alpn); if let Some(protocol) = protocol { protocol.handle_connection(connecting).await?; } else { diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index dd9db9d84c..0bba70cc73 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,18 +1,58 @@ -use std::{any::Any, fmt, sync::Arc}; +use std::{ + any::Any, + collections::HashMap, + fmt, + sync::{Arc, RwLock}, +}; use anyhow::Result; use futures_lite::future::Boxed as BoxedFuture; use iroh_net::endpoint::Connecting; /// Trait for iroh protocol handlers. -pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static { - /// Return `self` as `dyn Any`. - /// - /// Implementations can simply return `self` here. - fn as_arc_any(self: Arc) -> Arc; - +pub trait Protocol: Send + Sync + IntoArcAny + fmt::Debug + 'static { /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture>; } + +/// Helper trait to facilite casting from `Arc` to `Arc`. +/// +/// This trait has a blanket implementation so there is no need to implement this yourself. +pub trait IntoArcAny { + fn into_arc_any(self: Arc) -> Arc; +} + +impl IntoArcAny for T { + fn into_arc_any(self: Arc) -> Arc { + self + } +} + +/// Map of registered protocol handlers. +#[allow(clippy::type_complexity)] +#[derive(Debug, Clone, Default)] +pub struct ProtocolMap(Arc>>>); + +impl ProtocolMap { + /// Returns the registered protocol handler for an ALPN as a concrete type. + pub fn get(&self, alpn: &[u8]) -> Option> { + let protocols = self.0.read().unwrap(); + let protocol: Arc = protocols.get(alpn)?.clone(); + let protocol_any: Arc = protocol.into_arc_any(); + let protocol_ref = Arc::downcast(protocol_any).ok()?; + Some(protocol_ref) + } + + /// Returns the registered protocol handler for an ALPN as a `dyn Protocol`. + pub fn get_any(&self, alpn: &[u8]) -> Option> { + let protocols = self.0.read().unwrap(); + let protocol: Arc = protocols.get(alpn)?.clone(); + Some(protocol) + } + + pub(super) fn insert(&self, alpn: &'static [u8], protocol: Arc) { + self.0.write().unwrap().insert(alpn, protocol); + } +} From c7517ce8f99edae25d77b40eab1e6808dd7ec992 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 12 Jun 2024 18:13:29 +0200 Subject: [PATCH 09/10] fixup --- iroh/examples/custom-protocol.rs | 45 ++++++++++++++++---------------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index be68c7e44a..4553718142 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -1,4 +1,4 @@ -use std::{any::Any, fmt, sync::Arc}; +use std::{fmt, sync::Arc}; use anyhow::Result; use clap::Parser; @@ -6,7 +6,7 @@ use futures_lite::future::Boxed as BoxedFuture; use iroh::{ blobs::store::Store, net::{ - endpoint::{get_remote_node_id, Connecting, Connection}, + endpoint::{get_remote_node_id, Connecting}, NodeId, }, node::{Node, Protocol}, @@ -63,7 +63,26 @@ struct ExampleProto { impl Protocol for ExampleProto { fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn.await?).await }) + Box::pin(async move { + let conn = conn.await?; + let remote_node_id = get_remote_node_id(&conn)?; + println!("accepted connection from {remote_node_id}"); + let mut send_stream = conn.open_uni().await?; + // not that this is something that you wanted to do, but let's create a new blob for each + // incoming connection. this could be any mechanism, but we want to demonstrate how to use a + // custom protocol together with built-in iroh functionality + let content = format!("this blob is created for my beloved peer {remote_node_id} ♥"); + let hash = self + .node + .blobs() + .add_bytes(content.as_bytes().to_vec()) + .await?; + // send the hash over our custom proto + send_stream.write_all(hash.hash.as_bytes()).await?; + send_stream.finish().await?; + println!("closing connection from {remote_node_id}"); + Ok(()) + }) } } @@ -76,26 +95,6 @@ impl ExampleProto { node.get_protocol::>(alpn) } - async fn handle_connection(&self, conn: Connection) -> Result<()> { - let remote_node_id = get_remote_node_id(&conn)?; - println!("accepted connection from {remote_node_id}"); - let mut send_stream = conn.open_uni().await?; - // not that this is something that you wanted to do, but let's create a new blob for each - // incoming connection. this could be any mechanism, but we want to demonstrate how to use a - // custom protocol together with built-in iroh functionality - let content = format!("this blob is created for my beloved peer {remote_node_id} ♥"); - let hash = self - .node - .blobs() - .add_bytes(content.as_bytes().to_vec()) - .await?; - // send the hash over our custom proto - send_stream.write_all(hash.hash.as_bytes()).await?; - send_stream.finish().await?; - println!("closing connection from {remote_node_id}"); - Ok(()) - } - async fn connect(&self, remote_node_id: NodeId) -> Result<()> { println!("our node id: {}", self.node.node_id()); println!("connecting to {remote_node_id}"); From 11a609f51ee2a4115f6f3286ca685c14365924ce Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Wed, 12 Jun 2024 18:32:16 +0200 Subject: [PATCH 10/10] rename back to accept --- iroh/examples/custom-protocol.rs | 2 +- iroh/src/node/builder.rs | 2 +- iroh/src/node/protocol.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 4553718142..c973b22063 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -62,7 +62,7 @@ struct ExampleProto { } impl Protocol for ExampleProto { - fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture> { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { Box::pin(async move { let conn = conn.await?; let remote_node_id = get_remote_node_id(&conn)?; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 34724b64e1..d23732a08c 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -799,7 +799,7 @@ async fn handle_connection( alpn => { let protocol = protocols.get_any(alpn); if let Some(protocol) = protocol { - protocol.handle_connection(connecting).await?; + protocol.accept(connecting).await?; } else { bail!("ignoring connection: unsupported ALPN protocol"); } diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 0bba70cc73..139ebbda8a 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -14,7 +14,7 @@ pub trait Protocol: Send + Sync + IntoArcAny + fmt::Debug + 'static { /// Handle an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn handle_connection(self: Arc, conn: Connecting) -> BoxedFuture>; + fn accept(self: Arc, conn: Connecting) -> BoxedFuture>; } /// Helper trait to facilite casting from `Arc` to `Arc`.