diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index a8756be58fa..ab26a6212c0 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -9,6 +9,7 @@ use std::{ use anyhow::{bail, Context, Result}; use futures_lite::StreamExt; use iroh_base::key::SecretKey; +use iroh_blobs::protocol::ALPN as BLOBS_ALPN; use iroh_blobs::{ downloader::Downloader, protocol::Closed, @@ -43,8 +44,6 @@ use super::{ protocol::BlobsProtocol, rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol, }; -pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN]; - /// Default bind address for the node. /// 11204 is "iroh" in leetspeak pub const DEFAULT_BIND_PORT: u16 = 11204; @@ -370,23 +369,15 @@ where /// connections. The returned [`Node`] can be used to control the task as well as /// get information about it. pub async fn spawn(self) -> Result> { - let unspawned = self.build().await?; - let gossip = unspawned.gossip.clone(); - let blobs = BlobsProtocol::new(unspawned.inner.db.clone(), unspawned.inner.rt.clone()); - let docs = unspawned.inner.sync.clone(); - unspawned - .add_handler(GOSSIP_ALPN, Box::new(gossip)) - .add_handler(iroh_blobs::protocol::ALPN, Box::new(blobs)) - .add_handler(DOCS_ALPN, Box::new(docs)) - .spawn() - .await + let unspawned = self.build(&[GOSSIP_ALPN, BLOBS_ALPN, DOCS_ALPN]).await?; + unspawned.add_iroh_protocols()?.spawn().await } /// Build a node without spawning it. - pub async fn build(self) -> Result> { + pub async fn build(self, protocols: &[&'static [u8]]) -> Result> { // We clone the blob store to shut it down in case the node fails to spawn. let blobs_store = self.blobs_store.clone(); - match self.build_inner().await { + match self.build_inner(protocols).await { Ok(node) => Ok(node), Err(err) => { debug!("failed to spawn node, shutting down"); @@ -396,7 +387,7 @@ where } } - async fn build_inner(mut self) -> Result> { + async fn build_inner(mut self, protocols: &[&'static [u8]]) -> Result> { trace!("building endpoint"); let lp = LocalPoolHandle::new(num_cpus::get()); @@ -422,7 +413,7 @@ where let endpoint = Endpoint::builder() .secret_key(self.secret_key.clone()) .proxy_from_env() - .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) + .alpns(protocols.iter().map(|p| p.to_vec()).collect()) .keylog(self.keylog) .transport_config(transport_config) .concurrent_connections(MAX_CONNECTIONS) @@ -517,6 +508,7 @@ where inner, client, internal_rpc, + protocols: protocols.into_iter().copied().collect(), handlers: Default::default(), }; Ok(node) @@ -529,7 +521,7 @@ where rpc: E, internal_rpc: impl ServiceEndpoint, gossip: Gossip, - handlers: Arc>>, + handlers: Arc>>, ) { let rpc = RpcServer::new(rpc); let internal_rpc = RpcServer::new(internal_rpc); @@ -699,10 +691,21 @@ pub struct UnspawnedNode { internal_rpc: FlumeServerEndpoint, gossip: Gossip, rpc_endpoint: E, - handlers: BTreeMap<&'static [u8], Box>, + protocols: BTreeSet<&'static [u8]>, + handlers: BTreeMap<&'static [u8], Arc>, } impl> UnspawnedNode { + /// The blobs db + pub fn blobs_db(&self) -> &D { + &self.inner.db + } + + /// A local pool handle to run IO tasks + pub fn local_rt(&self) -> &LocalPoolHandle { + &self.inner.rt + } + /// The endpoint pub fn endpoint(&self) -> &Endpoint { &self.inner.endpoint @@ -714,10 +717,27 @@ impl> UnspawnedNode { } /// Add a handler - pub fn add_handler(self, alpn: &'static [u8], handler: Box) -> Self { + pub fn add_handler(self, alpn: &'static [u8], handler: Arc) -> Result { + anyhow::ensure!( + self.protocols.contains(alpn), + "unexpected protocol {alpn:?}" + ); let mut this = self; this.handlers.insert(alpn, handler); - this + Ok(this) + } + + /// Add the three default iroh protocols (blobs, gossip and docs) + /// + /// This requires that the endpoint is configured with the required ALPNs. + pub fn add_iroh_protocols(self) -> Result { + let gossip = self.gossip.clone(); + let blobs = BlobsProtocol::new(self.blobs_db().clone(), self.local_rt().clone()); + let docs = self.inner.sync.clone(); + Ok(self + .add_handler(GOSSIP_ALPN, Arc::new(gossip))? + .add_handler(BLOBS_ALPN, Arc::new(blobs))? + .add_handler(DOCS_ALPN, Arc::new(docs))?) } /// Spawn an active node with an accept loop. @@ -741,7 +761,12 @@ impl> UnspawnedNode { handlers, gossip, rpc_endpoint, + protocols, } = self; + anyhow::ensure!( + handlers.len() == protocols.len(), + "missing protocol handler!" + ); let inner = Arc::new(inner); let endpoint = inner.endpoint.clone(); @@ -808,12 +833,12 @@ impl Default for GcPolicy { async fn handle_connection( connecting: iroh_net::endpoint::Connecting, alpn: String, - handlers: &BTreeMap<&'static [u8], Box>, + handlers: &BTreeMap<&'static [u8], Arc>, ) -> Result<()> { let Some(handler) = handlers.get(alpn.as_bytes()) else { bail!("ignoring connection: unsupported ALPN protocol"); }; - handler.accept(connecting).await + handler.clone().accept(connecting).await } const DEFAULT_RPC_PORT: u16 = 0x1337; diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 7dd291fc04c..ed1921d5d84 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -57,7 +57,6 @@ impl Protocol for DocsEngine { let this = self.clone(); Box::pin(async move { this.handle_connection(conn).await }) } - fn shutdown(&self) -> future::Boxed<()> { let this = self.clone(); Box::pin(async move {