From 14b58a6c4965082785facbc680f5bc9574521be2 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Mon, 17 Jun 2024 11:12:29 +0300 Subject: [PATCH] Do gossip and blobs as custom handlers --- iroh-blobs/src/store/traits.rs | 3 +- iroh/src/node.rs | 1 + iroh/src/node/builder.rs | 46 +++++++---------------- iroh/src/node/protocol.rs | 69 ++++++++++++++++++++++++++++++++++ 4 files changed, 86 insertions(+), 33 deletions(-) create mode 100644 iroh/src/node/protocol.rs diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index e0ec3e6b39..64555f1131 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -6,6 +6,7 @@ use bao_tree::{ BaoTree, ChunkRanges, }; use bytes::Bytes; +use derive_more::Debug; use futures_lite::{Stream, StreamExt}; use genawaiter::rc::{Co, Gen}; use iroh_base::rpc::RpcError; @@ -295,7 +296,7 @@ pub trait ReadableStore: Map { } /// The mutable part of a Bao store. -pub trait Store: ReadableStore + MapMut { +pub trait Store: ReadableStore + MapMut + Debug { /// This trait method imports a file from a local path. /// /// `data` is the path to the file. diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 5e3bf025f1..56b451c86a 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -28,6 +28,7 @@ use tracing::debug; use crate::client::RpcService; mod builder; +mod protocol; mod rpc; mod rpc_status; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index ce48966871..aa2628860e 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -39,7 +39,9 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol}; +use super::{ + protocol::BlobsProtocol, rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol, +}; pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN]; @@ -368,7 +370,14 @@ where /// connections. The returned [`Node`] can be used to control the task as well as /// get information about it. pub async fn spawn(self) -> Result> { - self.build().await?.spawn().await + let unspawned = self.build().await?; + let gossip = unspawned.gossip.clone(); + let blobs = BlobsProtocol::new(unspawned.inner.db.clone(), unspawned.inner.rt.clone()); + unspawned + .add_handler(GOSSIP_ALPN, Box::new(gossip)) + .add_handler(iroh_blobs::protocol::ALPN, Box::new(blobs)) + .spawn() + .await } /// Build a node without spawning it. @@ -544,10 +553,7 @@ where _ = cancel_token.cancelled() => { // clean shutdown of the blobs db to close the write transaction handler.inner.db.shutdown().await; - - if let Err(err) = handler.inner.sync.shutdown().await { - warn!("sync shutdown error: {:?}", err); - } + handler.inner.sync.shutdown().await; break }, // handle rpc requests. This will do nothing if rpc is not configured, since @@ -582,12 +588,10 @@ where continue; } }; - let gossip = gossip.clone(); - let inner = handler.inner.clone(); let sync = handler.inner.sync.clone(); let handlers = handlers.clone(); tokio::task::spawn(async move { - if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync, &handlers).await { + if let Err(err) = handle_connection(connecting, alpn, sync, &handlers).await { warn!("Handling incoming connection ended with error: {err}"); } }); @@ -808,27 +812,14 @@ impl Default for GcPolicy { // TODO: Restructure this code to not take all these arguments. #[allow(clippy::too_many_arguments)] -async fn handle_connection( +async fn handle_connection( connecting: iroh_net::endpoint::Connecting, alpn: String, - node: Arc>, - gossip: Gossip, sync: DocsEngine, handlers: &BTreeMap<&'static [u8], Box>, ) -> Result<()> { match alpn.as_bytes() { - GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, DOCS_ALPN => sync.handle_connection(connecting).await?, - iroh_blobs::protocol::ALPN => { - let connection = connecting.await?; - iroh_blobs::provider::handle_connection( - connection, - node.db.clone(), - MockEventSender, - node.rt.clone(), - ) - .await - } alpn => { let Some(handler) = handlers.get(alpn) else { bail!("ignoring connection: unsupported ALPN protocol"); @@ -886,12 +877,3 @@ fn make_rpc_endpoint( Ok((rpc_endpoint, actual_rpc_port)) } - -#[derive(Debug, Clone)] -struct MockEventSender; - -impl iroh_blobs::provider::EventSender for MockEventSender { - fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> { - Box::pin(std::future::ready(())) - } -} diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs new file mode 100644 index 0000000000..7dd291fc04 --- /dev/null +++ b/iroh/src/node/protocol.rs @@ -0,0 +1,69 @@ +use anyhow::Result; +use futures_lite::future; +use iroh_net::endpoint::Connecting; +use std::ops::Deref; +use tracing::warn; + +use super::{DocsEngine, Protocol}; + +#[derive(Debug)] +pub(crate) struct BlobsProtocol { + rt: tokio_util::task::LocalPoolHandle, + store: S, +} + +impl BlobsProtocol { + pub fn new(store: S, rt: tokio_util::task::LocalPoolHandle) -> Self { + Self { rt, store } + } +} + +impl Protocol for BlobsProtocol { + fn accept(&self, conn: Connecting) -> future::Boxed> { + let store = self.store.clone(); + let rt = self.rt.clone(); + Box::pin(async move { + iroh_blobs::provider::handle_connection(conn.await?, store, MockEventSender, rt).await; + Ok(()) + }) + } + + fn shutdown(&self) -> future::Boxed<()> { + let store = self.store.clone(); + Box::pin(async move { + store.shutdown().await; + }) + } +} + +#[derive(Debug, Clone)] +struct MockEventSender; + +impl iroh_blobs::provider::EventSender for MockEventSender { + fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> { + Box::pin(std::future::ready(())) + } +} + +impl Protocol for iroh_gossip::net::Gossip { + fn accept(&self, conn: Connecting) -> future::Boxed> { + let this = self.clone(); + Box::pin(async move { this.handle_connection(conn.await?).await }) + } +} + +impl Protocol for DocsEngine { + fn accept(&self, conn: Connecting) -> future::Boxed> { + let this = self.clone(); + Box::pin(async move { this.handle_connection(conn).await }) + } + + fn shutdown(&self) -> future::Boxed<()> { + let this = self.clone(); + Box::pin(async move { + if let Err(err) = this.deref().shutdown().await { + warn!("Error while shutting down docs engine: {err:?}"); + } + }) + } +}