Skip to content

Commit

Permalink
use add_handler for all the internal protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 17, 2024
1 parent 14b58a6 commit e0ca1d8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 26 deletions.
2 changes: 1 addition & 1 deletion iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct Node<D> {
client: crate::client::MemIroh,
}

///
/// A protocol
pub trait Protocol: Send + Sync + Debug + 'static {
/// Accept a connection
fn accept(&self, conn: Connecting) -> Boxed<Result<()>>;
Expand Down
36 changes: 11 additions & 25 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,11 @@ where
let unspawned = self.build().await?;
let gossip = unspawned.gossip.clone();
let blobs = BlobsProtocol::new(unspawned.inner.db.clone(), unspawned.inner.rt.clone());
let docs = unspawned.inner.sync.clone();
unspawned
.add_handler(GOSSIP_ALPN, Box::new(gossip))
.add_handler(iroh_blobs::protocol::ALPN, Box::new(blobs))
.add_handler(DOCS_ALPN, Box::new(docs))
.spawn()
.await
}
Expand Down Expand Up @@ -588,10 +590,9 @@ where
continue;
}
};
let sync = handler.inner.sync.clone();
let handlers = handlers.clone();
tokio::task::spawn(async move {
if let Err(err) = handle_connection(connecting, alpn, sync, &handlers).await {
if let Err(err) = handle_connection(connecting, alpn, &handlers).await {
warn!("Handling incoming connection ended with error: {err}");
}
});
Expand Down Expand Up @@ -693,39 +694,33 @@ where
/// Unspawned node
#[derive(Debug)]
pub struct UnspawnedNode<D, E> {
///
inner: NodeInner<D>,
///
client: crate::client::MemIroh,
///
internal_rpc: FlumeServerEndpoint<RpcService>,
///
gossip: Gossip,
///
rpc_endpoint: E,
///
handlers: BTreeMap<&'static [u8], Box<dyn Protocol>>,
}

impl<D: BaoStore, E: ServiceEndpoint<RpcService>> UnspawnedNode<D, E> {
///
/// The endpoint
pub fn endpoint(&self) -> &Endpoint {
&self.inner.endpoint
}

///
/// The client, not useable as of now, but you can store it for later
pub fn client(&self) -> &crate::client::MemIroh {
&self.client
}

///
/// Add a handler
pub fn add_handler(self, alpn: &'static [u8], handler: Box<dyn Protocol>) -> Self {
let mut this = self;
this.handlers.insert(alpn, handler);
this
}

///
/// Spawn an active node with an accept loop.
pub async fn spawn(self) -> Result<Node<D>> {
let blobs_store = self.inner.db.clone();
match self.spawn_inner().await {
Expand Down Expand Up @@ -810,24 +805,15 @@ impl Default for GcPolicy {
}
}

// TODO: Restructure this code to not take all these arguments.
#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connecting: iroh_net::endpoint::Connecting,
alpn: String,
sync: DocsEngine,
handlers: &BTreeMap<&'static [u8], Box<dyn Protocol>>,
) -> Result<()> {
match alpn.as_bytes() {
DOCS_ALPN => sync.handle_connection(connecting).await?,
alpn => {
let Some(handler) = handlers.get(alpn) else {
bail!("ignoring connection: unsupported ALPN protocol");
};
handler.accept(connecting).await?;
}
}
Ok(())
let Some(handler) = handlers.get(alpn.as_bytes()) else {
bail!("ignoring connection: unsupported ALPN protocol");
};
handler.accept(connecting).await
}

const DEFAULT_RPC_PORT: u16 = 0x1337;
Expand Down

0 comments on commit e0ca1d8

Please sign in to comment.