Skip to content

Commit

Permalink
Add fn add_iroh_protocols in UnspawnedNode
Browse files Browse the repository at this point in the history
Helper to add the default iroh protocols in case you are also adding
custom protocols.
  • Loading branch information
rklaehn committed Jun 17, 2024
1 parent e0ca1d8 commit 3280cd0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
69 changes: 47 additions & 22 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
use anyhow::{bail, Context, Result};
use futures_lite::StreamExt;
use iroh_base::key::SecretKey;
use iroh_blobs::protocol::ALPN as BLOBS_ALPN;
use iroh_blobs::{
downloader::Downloader,
protocol::Closed,
Expand Down Expand Up @@ -43,8 +44,6 @@ use super::{
protocol::BlobsProtocol, rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol,
};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
pub const DEFAULT_BIND_PORT: u16 = 11204;
Expand Down Expand Up @@ -370,23 +369,15 @@ where
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D>> {
let unspawned = self.build().await?;
let gossip = unspawned.gossip.clone();
let blobs = BlobsProtocol::new(unspawned.inner.db.clone(), unspawned.inner.rt.clone());
let docs = unspawned.inner.sync.clone();
unspawned
.add_handler(GOSSIP_ALPN, Box::new(gossip))
.add_handler(iroh_blobs::protocol::ALPN, Box::new(blobs))
.add_handler(DOCS_ALPN, Box::new(docs))
.spawn()
.await
let unspawned = self.build(&[GOSSIP_ALPN, BLOBS_ALPN, DOCS_ALPN]).await?;
unspawned.add_iroh_protocols()?.spawn().await
}

/// Build a node without spawning it.
pub async fn build(self) -> Result<UnspawnedNode<D, E>> {
pub async fn build(self, protocols: &[&'static [u8]]) -> Result<UnspawnedNode<D, E>> {
// We clone the blob store to shut it down in case the node fails to spawn.
let blobs_store = self.blobs_store.clone();
match self.build_inner().await {
match self.build_inner(protocols).await {
Ok(node) => Ok(node),
Err(err) => {
debug!("failed to spawn node, shutting down");
Expand All @@ -396,7 +387,7 @@ where
}
}

async fn build_inner(mut self) -> Result<UnspawnedNode<D, E>> {
async fn build_inner(mut self, protocols: &[&'static [u8]]) -> Result<UnspawnedNode<D, E>> {
trace!("building endpoint");
let lp = LocalPoolHandle::new(num_cpus::get());

Expand All @@ -422,7 +413,7 @@ where
let endpoint = Endpoint::builder()
.secret_key(self.secret_key.clone())
.proxy_from_env()
.alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect())
.alpns(protocols.iter().map(|p| p.to_vec()).collect())
.keylog(self.keylog)
.transport_config(transport_config)
.concurrent_connections(MAX_CONNECTIONS)
Expand Down Expand Up @@ -517,6 +508,7 @@ where
inner,
client,
internal_rpc,
protocols: protocols.into_iter().copied().collect(),
handlers: Default::default(),
};
Ok(node)
Expand All @@ -529,7 +521,7 @@ where
rpc: E,
internal_rpc: impl ServiceEndpoint<RpcService>,
gossip: Gossip,
handlers: Arc<BTreeMap<&'static [u8], Box<dyn Protocol>>>,
handlers: Arc<BTreeMap<&'static [u8], Arc<dyn Protocol>>>,
) {
let rpc = RpcServer::new(rpc);
let internal_rpc = RpcServer::new(internal_rpc);
Expand Down Expand Up @@ -699,10 +691,21 @@ pub struct UnspawnedNode<D, E> {
internal_rpc: FlumeServerEndpoint<RpcService>,
gossip: Gossip,
rpc_endpoint: E,
handlers: BTreeMap<&'static [u8], Box<dyn Protocol>>,
protocols: BTreeSet<&'static [u8]>,
handlers: BTreeMap<&'static [u8], Arc<dyn Protocol>>,
}

impl<D: BaoStore, E: ServiceEndpoint<RpcService>> UnspawnedNode<D, E> {
/// The blobs db
pub fn blobs_db(&self) -> &D {
&self.inner.db
}

/// A local pool handle to run IO tasks
pub fn local_rt(&self) -> &LocalPoolHandle {
&self.inner.rt
}

/// The endpoint
pub fn endpoint(&self) -> &Endpoint {
&self.inner.endpoint
Expand All @@ -714,10 +717,27 @@ impl<D: BaoStore, E: ServiceEndpoint<RpcService>> UnspawnedNode<D, E> {
}

/// Add a handler
pub fn add_handler(self, alpn: &'static [u8], handler: Box<dyn Protocol>) -> Self {
pub fn add_handler(self, alpn: &'static [u8], handler: Arc<dyn Protocol>) -> Result<Self> {
anyhow::ensure!(
self.protocols.contains(alpn),
"unexpected protocol {alpn:?}"
);
let mut this = self;
this.handlers.insert(alpn, handler);
this
Ok(this)
}

/// Add the three default iroh protocols (blobs, gossip and docs)
///
/// This requires that the endpoint is configured with the required ALPNs.
pub fn add_iroh_protocols(self) -> Result<Self> {
let gossip = self.gossip.clone();
let blobs = BlobsProtocol::new(self.blobs_db().clone(), self.local_rt().clone());
let docs = self.inner.sync.clone();
Ok(self
.add_handler(GOSSIP_ALPN, Arc::new(gossip))?
.add_handler(BLOBS_ALPN, Arc::new(blobs))?
.add_handler(DOCS_ALPN, Arc::new(docs))?)
}

/// Spawn an active node with an accept loop.
Expand All @@ -741,7 +761,12 @@ impl<D: BaoStore, E: ServiceEndpoint<RpcService>> UnspawnedNode<D, E> {
handlers,
gossip,
rpc_endpoint,
protocols,
} = self;
anyhow::ensure!(
handlers.len() == protocols.len(),
"missing protocol handler!"
);
let inner = Arc::new(inner);
let endpoint = inner.endpoint.clone();

Expand Down Expand Up @@ -808,12 +833,12 @@ impl Default for GcPolicy {
async fn handle_connection(
connecting: iroh_net::endpoint::Connecting,
alpn: String,
handlers: &BTreeMap<&'static [u8], Box<dyn Protocol>>,
handlers: &BTreeMap<&'static [u8], Arc<dyn Protocol>>,
) -> Result<()> {
let Some(handler) = handlers.get(alpn.as_bytes()) else {
bail!("ignoring connection: unsupported ALPN protocol");
};
handler.accept(connecting).await
handler.clone().accept(connecting).await
}

const DEFAULT_RPC_PORT: u16 = 0x1337;
Expand Down
1 change: 0 additions & 1 deletion iroh/src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl Protocol for DocsEngine {
let this = self.clone();
Box::pin(async move { this.handle_connection(conn).await })
}

fn shutdown(&self) -> future::Boxed<()> {
let this = self.clone();
Box::pin(async move {
Expand Down

0 comments on commit 3280cd0

Please sign in to comment.