diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 4290943494..5e3bf025f1 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -9,11 +9,13 @@ use std::path::Path; use std::sync::Arc; use anyhow::{anyhow, Result}; +use futures_lite::future::Boxed; use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::downloader::Downloader; use iroh_blobs::store::Store as BaoStore; use iroh_docs::engine::Engine; +use iroh_net::endpoint::Connecting; use iroh_net::util::AbortingJoinHandle; use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint}; use quic_rpc::transport::flume::FlumeConnection; @@ -49,6 +51,17 @@ pub struct Node { client: crate::client::MemIroh, } +/// +pub trait Protocol: Send + Sync + Debug + 'static { + /// Accept a connection + fn accept(&self, conn: Connecting) -> Boxed>; + + /// Shutdown + fn shutdown(&self) -> Boxed<()> { + Box::pin(async move {}) + } +} + #[derive(derive_more::Debug)] struct NodeInner { db: D, diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 59782b2010..ce48966871 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -1,5 +1,5 @@ use std::{ - collections::BTreeSet, + collections::{BTreeMap, BTreeSet}, net::{Ipv4Addr, SocketAddrV4}, path::{Path, PathBuf}, sync::Arc, @@ -24,7 +24,9 @@ use iroh_net::{ Endpoint, }; use quic_rpc::{ - transport::{misc::DummyServerEndpoint, quinn::QuinnServerEndpoint}, + transport::{ + flume::FlumeServerEndpoint, misc::DummyServerEndpoint, quinn::QuinnServerEndpoint, + }, RpcServer, ServiceEndpoint, }; use serde::{Deserialize, Serialize}; @@ -37,7 +39,7 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; +use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol}; pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN]; @@ -366,9 +368,14 @@ where /// connections. The returned [`Node`] can be used to control the task as well as /// get information about it. pub async fn spawn(self) -> Result> { + self.build().await?.spawn().await + } + + /// Build a node without spawning it. + pub async fn build(self) -> Result> { // We clone the blob store to shut it down in case the node fails to spawn. let blobs_store = self.blobs_store.clone(); - match self.spawn_inner().await { + match self.build_inner().await { Ok(node) => Ok(node), Err(err) => { debug!("failed to spawn node, shutting down"); @@ -378,8 +385,8 @@ where } } - async fn spawn_inner(mut self) -> Result> { - trace!("spawning node"); + async fn build_inner(mut self) -> Result> { + trace!("building endpoint"); let lp = LocalPoolHandle::new(num_cpus::get()); let mut transport_config = quinn::TransportConfig::default(); @@ -481,7 +488,7 @@ where let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); - let inner = Arc::new(NodeInner { + let inner = NodeInner { db: self.blobs_store, endpoint: endpoint.clone(), secret_key: self.secret_key, @@ -491,54 +498,16 @@ where rt: lp.clone(), sync, downloader, - }); - let task = { - let gossip = gossip.clone(); - let handler = rpc::Handler { - inner: inner.clone(), - }; - let me = endpoint.node_id().fmt_short(); - let ep = endpoint.clone(); - tokio::task::spawn( - async move { - Self::run( - ep, - handler, - self.rpc_endpoint, - internal_rpc, - gossip, - ) - .await - } - .instrument(error_span!("node", %me)), - ) }; - let node = Node { + let node = UnspawnedNode { + gossip, + rpc_endpoint: self.rpc_endpoint, inner, - task: Arc::new(task), client, + internal_rpc, + handlers: Default::default(), }; - - // spawn a task that updates the gossip endpoints. - // TODO: track task - let mut stream = endpoint.local_endpoints(); - tokio::task::spawn(async move { - while let Some(eps) = stream.next().await { - if let Err(err) = gossip.update_endpoints(&eps) { - warn!("Failed to update gossip endpoints: {err:?}"); - } - } - warn!("failed to retrieve local endpoints"); - }); - - // Wait for a single endpoint update, to make sure - // we found some endpoints - tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next()) - .await - .context("waiting for endpoint")? - .context("no endpoints")?; - Ok(node) } @@ -549,6 +518,7 @@ where rpc: E, internal_rpc: impl ServiceEndpoint, gossip: Gossip, + handlers: Arc>>, ) { let rpc = RpcServer::new(rpc); let internal_rpc = RpcServer::new(internal_rpc); @@ -615,8 +585,9 @@ where let gossip = gossip.clone(); let inner = handler.inner.clone(); let sync = handler.inner.sync.clone(); + let handlers = handlers.clone(); tokio::task::spawn(async move { - if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await { + if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync, &handlers).await { warn!("Handling incoming connection ended with error: {err}"); } }); @@ -715,6 +686,111 @@ where } } +/// Unspawned node +#[derive(Debug)] +pub struct UnspawnedNode { + /// + inner: NodeInner, + /// + client: crate::client::MemIroh, + /// + internal_rpc: FlumeServerEndpoint, + /// + gossip: Gossip, + /// + rpc_endpoint: E, + /// + handlers: BTreeMap<&'static [u8], Box>, +} + +impl> UnspawnedNode { + /// + pub fn endpoint(&self) -> &Endpoint { + &self.inner.endpoint + } + + /// + pub fn client(&self) -> &crate::client::MemIroh { + &self.client + } + + /// + pub fn add_handler(self, alpn: &'static [u8], handler: Box) -> Self { + let mut this = self; + this.handlers.insert(alpn, handler); + this + } + + /// + pub async fn spawn(self) -> Result> { + let blobs_store = self.inner.db.clone(); + match self.spawn_inner().await { + Ok(node) => Ok(node), + Err(err) => { + debug!("failed to spawn node, shutting down"); + blobs_store.shutdown().await; + Err(err) + } + } + } + + async fn spawn_inner(self) -> Result> { + let UnspawnedNode { + inner, + client, + internal_rpc, + handlers, + gossip, + rpc_endpoint, + } = self; + let inner = Arc::new(inner); + let endpoint = inner.endpoint.clone(); + + let task = { + let gossip = gossip.clone(); + let handler = rpc::Handler { + inner: inner.clone(), + }; + let handlers = Arc::new(handlers); + let me = endpoint.node_id().fmt_short(); + let ep = endpoint.clone(); + tokio::task::spawn( + async move { + Builder::run(ep, handler, rpc_endpoint, internal_rpc, gossip, handlers).await + } + .instrument(error_span!("node", %me)), + ) + }; + + let node = Node { + inner, + task: Arc::new(task), + client, + }; + + // spawn a task that updates the gossip endpoints. + // TODO: track task + let mut stream = endpoint.local_endpoints(); + tokio::task::spawn(async move { + while let Some(eps) = stream.next().await { + if let Err(err) = gossip.update_endpoints(&eps) { + warn!("Failed to update gossip endpoints: {err:?}"); + } + } + warn!("failed to retrieve local endpoints"); + }); + + // Wait for a single endpoint update, to make sure + // we found some endpoints + tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next()) + .await + .context("waiting for endpoint")? + .context("no endpoints")?; + + Ok(node) + } +} + /// Policy for garbage collection. #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] pub enum GcPolicy { @@ -738,11 +814,12 @@ async fn handle_connection( node: Arc>, gossip: Gossip, sync: DocsEngine, + handlers: &BTreeMap<&'static [u8], Box>, ) -> Result<()> { match alpn.as_bytes() { GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, DOCS_ALPN => sync.handle_connection(connecting).await?, - alpn if alpn == iroh_blobs::protocol::ALPN => { + iroh_blobs::protocol::ALPN => { let connection = connecting.await?; iroh_blobs::provider::handle_connection( connection, @@ -752,7 +829,12 @@ async fn handle_connection( ) .await } - _ => bail!("ignoring connection: unsupported ALPN protocol"), + alpn => { + let Some(handler) = handlers.get(alpn) else { + bail!("ignoring connection: unsupported ALPN protocol"); + }; + handler.accept(connecting).await?; + } } Ok(()) }