Skip to content

Commit

Permalink
Split building and spawning into two parts
Browse files Browse the repository at this point in the history
first we build a passive node, then spawn. Until the passive node is spawned,
you can add custom handlers.
  • Loading branch information
rklaehn committed Jun 17, 2024
1 parent e9075f3 commit 806e152
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 53 deletions.
13 changes: 13 additions & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use std::path::Path;
use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::future::Boxed;
use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_docs::engine::Engine;
use iroh_net::endpoint::Connecting;
use iroh_net::util::AbortingJoinHandle;
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use quic_rpc::transport::flume::FlumeConnection;
Expand Down Expand Up @@ -49,6 +51,17 @@ pub struct Node<D> {
client: crate::client::MemIroh,
}

///
pub trait Protocol: Send + Sync + Debug + 'static {
/// Accept a connection
fn accept(&self, conn: Connecting) -> Boxed<Result<()>>;

/// Shutdown
fn shutdown(&self) -> Boxed<()> {
Box::pin(async move {})
}
}

#[derive(derive_more::Debug)]
struct NodeInner<D> {
db: D,
Expand Down
188 changes: 135 additions & 53 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::BTreeSet,
collections::{BTreeMap, BTreeSet},
net::{Ipv4Addr, SocketAddrV4},
path::{Path, PathBuf},
sync::Arc,
Expand All @@ -24,7 +24,9 @@ use iroh_net::{
Endpoint,
};
use quic_rpc::{
transport::{misc::DummyServerEndpoint, quinn::QuinnServerEndpoint},
transport::{
flume::FlumeServerEndpoint, misc::DummyServerEndpoint, quinn::QuinnServerEndpoint,
},
RpcServer, ServiceEndpoint,
};
use serde::{Deserialize, Serialize};
Expand All @@ -37,7 +39,7 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner};
use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

Expand Down Expand Up @@ -366,9 +368,14 @@ where
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D>> {
self.build().await?.spawn().await
}

/// Build a node without spawning it.
pub async fn build(self) -> Result<UnspawnedNode<D, E>> {
// We clone the blob store to shut it down in case the node fails to spawn.
let blobs_store = self.blobs_store.clone();
match self.spawn_inner().await {
match self.build_inner().await {
Ok(node) => Ok(node),
Err(err) => {
debug!("failed to spawn node, shutting down");
Expand All @@ -378,8 +385,8 @@ where
}
}

async fn spawn_inner(mut self) -> Result<Node<D>> {
trace!("spawning node");
async fn build_inner(mut self) -> Result<UnspawnedNode<D, E>> {
trace!("building endpoint");
let lp = LocalPoolHandle::new(num_cpus::get());

let mut transport_config = quinn::TransportConfig::default();
Expand Down Expand Up @@ -481,7 +488,7 @@ where
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone()));

let inner = Arc::new(NodeInner {
let inner = NodeInner {
db: self.blobs_store,
endpoint: endpoint.clone(),
secret_key: self.secret_key,
Expand All @@ -491,54 +498,16 @@ where
rt: lp.clone(),
sync,
downloader,
});
let task = {
let gossip = gossip.clone();
let handler = rpc::Handler {
inner: inner.clone(),
};
let me = endpoint.node_id().fmt_short();
let ep = endpoint.clone();
tokio::task::spawn(
async move {
Self::run(
ep,
handler,
self.rpc_endpoint,
internal_rpc,
gossip,
)
.await
}
.instrument(error_span!("node", %me)),
)
};

let node = Node {
let node = UnspawnedNode {
gossip,
rpc_endpoint: self.rpc_endpoint,
inner,
task: Arc::new(task),
client,
internal_rpc,
handlers: Default::default(),
};

// spawn a task that updates the gossip endpoints.
// TODO: track task
let mut stream = endpoint.local_endpoints();
tokio::task::spawn(async move {
while let Some(eps) = stream.next().await {
if let Err(err) = gossip.update_endpoints(&eps) {
warn!("Failed to update gossip endpoints: {err:?}");
}
}
warn!("failed to retrieve local endpoints");
});

// Wait for a single endpoint update, to make sure
// we found some endpoints
tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next())
.await
.context("waiting for endpoint")?
.context("no endpoints")?;

Ok(node)
}

Expand All @@ -549,6 +518,7 @@ where
rpc: E,
internal_rpc: impl ServiceEndpoint<RpcService>,
gossip: Gossip,
handlers: Arc<BTreeMap<&'static [u8], Box<dyn Protocol>>>,
) {
let rpc = RpcServer::new(rpc);
let internal_rpc = RpcServer::new(internal_rpc);
Expand Down Expand Up @@ -615,8 +585,9 @@ where
let gossip = gossip.clone();
let inner = handler.inner.clone();
let sync = handler.inner.sync.clone();
let handlers = handlers.clone();
tokio::task::spawn(async move {
if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync).await {
if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync, &handlers).await {
warn!("Handling incoming connection ended with error: {err}");
}
});
Expand Down Expand Up @@ -715,6 +686,111 @@ where
}
}

/// Unspawned node
#[derive(Debug)]
pub struct UnspawnedNode<D, E> {
///
inner: NodeInner<D>,
///
client: crate::client::MemIroh,
///
internal_rpc: FlumeServerEndpoint<RpcService>,
///
gossip: Gossip,
///
rpc_endpoint: E,
///
handlers: BTreeMap<&'static [u8], Box<dyn Protocol>>,
}

impl<D: BaoStore, E: ServiceEndpoint<RpcService>> UnspawnedNode<D, E> {
///
pub fn endpoint(&self) -> &Endpoint {
&self.inner.endpoint
}

///
pub fn client(&self) -> &crate::client::MemIroh {
&self.client
}

///
pub fn add_handler(self, alpn: &'static [u8], handler: Box<dyn Protocol>) -> Self {
let mut this = self;
this.handlers.insert(alpn, handler);
this
}

///
pub async fn spawn(self) -> Result<Node<D>> {
let blobs_store = self.inner.db.clone();
match self.spawn_inner().await {
Ok(node) => Ok(node),
Err(err) => {
debug!("failed to spawn node, shutting down");
blobs_store.shutdown().await;
Err(err)
}
}
}

async fn spawn_inner(self) -> Result<Node<D>> {
let UnspawnedNode {
inner,
client,
internal_rpc,
handlers,
gossip,
rpc_endpoint,
} = self;
let inner = Arc::new(inner);
let endpoint = inner.endpoint.clone();

let task = {
let gossip = gossip.clone();
let handler = rpc::Handler {
inner: inner.clone(),
};
let handlers = Arc::new(handlers);
let me = endpoint.node_id().fmt_short();
let ep = endpoint.clone();
tokio::task::spawn(
async move {
Builder::run(ep, handler, rpc_endpoint, internal_rpc, gossip, handlers).await
}
.instrument(error_span!("node", %me)),
)
};

let node = Node {
inner,
task: Arc::new(task),
client,
};

// spawn a task that updates the gossip endpoints.
// TODO: track task
let mut stream = endpoint.local_endpoints();
tokio::task::spawn(async move {
while let Some(eps) = stream.next().await {
if let Err(err) = gossip.update_endpoints(&eps) {
warn!("Failed to update gossip endpoints: {err:?}");
}
}
warn!("failed to retrieve local endpoints");
});

// Wait for a single endpoint update, to make sure
// we found some endpoints
tokio::time::timeout(ENDPOINT_WAIT, endpoint.local_endpoints().next())
.await
.context("waiting for endpoint")?
.context("no endpoints")?;

Ok(node)
}
}

/// Policy for garbage collection.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum GcPolicy {
Expand All @@ -738,11 +814,12 @@ async fn handle_connection<D: BaoStore>(
node: Arc<NodeInner<D>>,
gossip: Gossip,
sync: DocsEngine,
handlers: &BTreeMap<&'static [u8], Box<dyn Protocol>>,
) -> Result<()> {
match alpn.as_bytes() {
GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?,
DOCS_ALPN => sync.handle_connection(connecting).await?,
alpn if alpn == iroh_blobs::protocol::ALPN => {
iroh_blobs::protocol::ALPN => {
let connection = connecting.await?;
iroh_blobs::provider::handle_connection(
connection,
Expand All @@ -752,7 +829,12 @@ async fn handle_connection<D: BaoStore>(
)
.await
}
_ => bail!("ignoring connection: unsupported ALPN protocol"),
alpn => {
let Some(handler) = handlers.get(alpn) else {
bail!("ignoring connection: unsupported ALPN protocol");
};
handler.accept(connecting).await?;
}
}
Ok(())
}
Expand Down

0 comments on commit 806e152

Please sign in to comment.