Skip to content

Commit

Permalink
refactor: use new protocols api
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 12, 2024
1 parent 85e40de commit f750306
Show file tree
Hide file tree
Showing 14 changed files with 533 additions and 258 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub trait ReadableStore: Map {
}

/// The mutable part of a Bao store.
pub trait Store: ReadableStore + MapMut {
pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// This trait method imports a file from a local path.
///
/// `data` is the path to the file.
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Engine {
/// Handle an incoming iroh-docs connection.
pub async fn handle_connection(
&self,
conn: iroh_net::endpoint::Connecting,
conn: iroh_net::endpoint::Connection,
) -> anyhow::Result<()> {
self.to_live_actor
.send(ToLiveActor::HandleConnection { conn })
Expand Down
4 changes: 2 additions & 2 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub enum ToLiveActor {
reply: sync::oneshot::Sender<Result<()>>,
},
HandleConnection {
conn: iroh_net::endpoint::Connecting,
conn: iroh_net::endpoint::Connection,
},
AcceptSyncRequest {
namespace: NamespaceId,
Expand Down Expand Up @@ -749,7 +749,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}

#[instrument("accept", skip_all)]
pub async fn handle_connection(&mut self, conn: iroh_net::endpoint::Connecting) {
pub async fn handle_connection(&mut self, conn: iroh_net::endpoint::Connection) {
let to_actor_tx = self.sync_actor_tx.clone();
let accept_request_cb = move |namespace, peer| {
let to_actor_tx = to_actor_tx.clone();
Expand Down
3 changes: 1 addition & 2 deletions iroh-docs/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,14 @@ pub enum AcceptOutcome {
/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: iroh_net::endpoint::Connecting,
connection: iroh_net::endpoint::Connection,
accept_cb: F,
) -> Result<SyncFinished, AcceptError>
where
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = AcceptOutcome>,
{
let t_start = Instant::now();
let connection = connecting.await.map_err(AcceptError::connect)?;
let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?;
let (mut send_stream, mut recv_stream) = connection
.accept_bi()
Expand Down
6 changes: 4 additions & 2 deletions iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async fn main() -> Result<()> {
let args = Cli::parse();
// create a new node
let node = iroh::node::Node::memory()
.accept(EXAMPLE_ALPN, |node| ExampleProto::build(node))
.accept(EXAMPLE_ALPN, |node| {
Box::pin(async move { Ok(ExampleProto::build(node)) })
})
.spawn()
.await?;

Expand Down Expand Up @@ -66,7 +68,7 @@ impl<S: Store + fmt::Debug> Protocol for ExampleProto<S> {
self
}

fn accept(self: Arc<Self>, conn: quinn::Connection) -> Boxed<Result<()>> {
fn handle_connection(self: Arc<Self>, conn: quinn::Connection) -> Boxed<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorGetDefaultRequest).await?;
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
Ok(res.author_id)
}

Expand Down
32 changes: 18 additions & 14 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@
//! To shut down the node, call [`Node::shutdown`].
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::{any::Any, path::Path};
use std::path::Path;
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Result};
use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_docs::engine::Engine;
use iroh_net::util::AbortingJoinHandle;

use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::debug;

use crate::{client::RpcService, node::builder::ProtocolMap};
use crate::{client::RpcService, node::protocol::ProtocolMap};

mod builder;
mod protocol;
mod rpc;
mod rpc_status;

pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;
pub use protocol::Protocol;

Expand All @@ -50,6 +50,7 @@ pub struct Node<D> {
task: Arc<JoinHandle<()>>,
client: crate::client::MemIroh,
protocols: ProtocolMap,
tasks: Arc<Mutex<Option<JoinSet<()>>>>,
}

#[derive(derive_more::Debug)]
Expand All @@ -59,11 +60,11 @@ struct NodeInner<D> {
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<RpcService>,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
// #[allow(dead_code)]
// gc_task: Option<AbortingJoinHandle<()>>,
#[debug("rt")]
rt: LocalPoolHandle,
pub(crate) sync: DocsEngine,
// pub(crate) sync: DocsEngine,
downloader: Downloader,
}

Expand Down Expand Up @@ -155,11 +156,11 @@ impl<D: BaoStore> Node<D> {

/// Returns the protocol handler for a alpn.
pub fn get_protocol<P: Protocol>(&self, alpn: &[u8]) -> Option<Arc<P>> {
let protocols = self.protocols.read().unwrap();
let protocol: Arc<dyn Protocol> = protocols.get(alpn)?.clone();
let protocol_any: Arc<dyn Any + Send + Sync> = protocol.as_arc_any();
let protocol_ref = Arc::downcast(protocol_any).ok()?;
Some(protocol_ref)
self.protocols.get::<P>(alpn)
}

fn downloader(&self) -> &Downloader {
&self.inner.downloader
}

/// Aborts the node.
Expand All @@ -176,6 +177,9 @@ impl<D: BaoStore> Node<D> {
if let Ok(task) = Arc::try_unwrap(self.task) {
task.await?;
}
if let Some(mut tasks) = self.tasks.lock().unwrap().take() {
tasks.abort_all();
}
Ok(())
}

Expand Down
Loading

0 comments on commit f750306

Please sign in to comment.