Skip to content

Commit

Permalink
Do gossip and blobs as custom handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 17, 2024
1 parent 806e152 commit 14b58a6
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 33 deletions.
3 changes: 2 additions & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use bao_tree::{
BaoTree, ChunkRanges,
};
use bytes::Bytes;
use derive_more::Debug;
use futures_lite::{Stream, StreamExt};
use genawaiter::rc::{Co, Gen};
use iroh_base::rpc::RpcError;
Expand Down Expand Up @@ -295,7 +296,7 @@ pub trait ReadableStore: Map {
}

/// The mutable part of a Bao store.
pub trait Store: ReadableStore + MapMut {
pub trait Store: ReadableStore + MapMut + Debug {
/// This trait method imports a file from a local path.
///
/// `data` is the path to the file.
Expand Down
1 change: 1 addition & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tracing::debug;
use crate::client::RpcService;

mod builder;
mod protocol;
mod rpc;
mod rpc_status;

Expand Down
46 changes: 14 additions & 32 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol};
use super::{
protocol::BlobsProtocol, rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner, Protocol,
};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN];

Expand Down Expand Up @@ -368,7 +370,14 @@ where
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(self) -> Result<Node<D>> {
self.build().await?.spawn().await
let unspawned = self.build().await?;
let gossip = unspawned.gossip.clone();
let blobs = BlobsProtocol::new(unspawned.inner.db.clone(), unspawned.inner.rt.clone());
unspawned
.add_handler(GOSSIP_ALPN, Box::new(gossip))
.add_handler(iroh_blobs::protocol::ALPN, Box::new(blobs))
.spawn()
.await
}

/// Build a node without spawning it.
Expand Down Expand Up @@ -544,10 +553,7 @@ where
_ = cancel_token.cancelled() => {
// clean shutdown of the blobs db to close the write transaction
handler.inner.db.shutdown().await;

if let Err(err) = handler.inner.sync.shutdown().await {
warn!("sync shutdown error: {:?}", err);
}
handler.inner.sync.shutdown().await;
break
},
// handle rpc requests. This will do nothing if rpc is not configured, since
Expand Down Expand Up @@ -582,12 +588,10 @@ where
continue;
}
};
let gossip = gossip.clone();
let inner = handler.inner.clone();
let sync = handler.inner.sync.clone();
let handlers = handlers.clone();
tokio::task::spawn(async move {
if let Err(err) = handle_connection(connecting, alpn, inner, gossip, sync, &handlers).await {
if let Err(err) = handle_connection(connecting, alpn, sync, &handlers).await {
warn!("Handling incoming connection ended with error: {err}");
}
});
Expand Down Expand Up @@ -808,27 +812,14 @@ impl Default for GcPolicy {

// TODO: Restructure this code to not take all these arguments.
#[allow(clippy::too_many_arguments)]
async fn handle_connection<D: BaoStore>(
async fn handle_connection(
connecting: iroh_net::endpoint::Connecting,
alpn: String,
node: Arc<NodeInner<D>>,
gossip: Gossip,
sync: DocsEngine,
handlers: &BTreeMap<&'static [u8], Box<dyn Protocol>>,
) -> Result<()> {
match alpn.as_bytes() {
GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?,
DOCS_ALPN => sync.handle_connection(connecting).await?,
iroh_blobs::protocol::ALPN => {
let connection = connecting.await?;
iroh_blobs::provider::handle_connection(
connection,
node.db.clone(),
MockEventSender,
node.rt.clone(),
)
.await
}
alpn => {
let Some(handler) = handlers.get(alpn) else {
bail!("ignoring connection: unsupported ALPN protocol");
Expand Down Expand Up @@ -886,12 +877,3 @@ fn make_rpc_endpoint(

Ok((rpc_endpoint, actual_rpc_port))
}

#[derive(Debug, Clone)]
struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Box::pin(std::future::ready(()))
}
}
69 changes: 69 additions & 0 deletions iroh/src/node/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use anyhow::Result;
use futures_lite::future;
use iroh_net::endpoint::Connecting;
use std::ops::Deref;
use tracing::warn;

use super::{DocsEngine, Protocol};

#[derive(Debug)]
pub(crate) struct BlobsProtocol<S> {
rt: tokio_util::task::LocalPoolHandle,
store: S,
}

impl<S: iroh_blobs::store::Store> BlobsProtocol<S> {
pub fn new(store: S, rt: tokio_util::task::LocalPoolHandle) -> Self {
Self { rt, store }
}
}

impl<S: iroh_blobs::store::Store> Protocol for BlobsProtocol<S> {
fn accept(&self, conn: Connecting) -> future::Boxed<Result<()>> {
let store = self.store.clone();
let rt = self.rt.clone();
Box::pin(async move {
iroh_blobs::provider::handle_connection(conn.await?, store, MockEventSender, rt).await;
Ok(())
})
}

fn shutdown(&self) -> future::Boxed<()> {
let store = self.store.clone();
Box::pin(async move {
store.shutdown().await;
})
}
}

#[derive(Debug, Clone)]
struct MockEventSender;

impl iroh_blobs::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
Box::pin(std::future::ready(()))
}
}

impl Protocol for iroh_gossip::net::Gossip {
fn accept(&self, conn: Connecting) -> future::Boxed<Result<()>> {
let this = self.clone();
Box::pin(async move { this.handle_connection(conn.await?).await })
}
}

impl Protocol for DocsEngine {
fn accept(&self, conn: Connecting) -> future::Boxed<Result<()>> {
let this = self.clone();
Box::pin(async move { this.handle_connection(conn).await })
}

fn shutdown(&self) -> future::Boxed<()> {
let this = self.clone();
Box::pin(async move {
if let Err(err) = this.deref().shutdown().await {
warn!("Error while shutting down docs engine: {err:?}");
}
})
}
}

0 comments on commit 14b58a6

Please sign in to comment.