Skip to content

Commit

Permalink
refactor: use new protocols api and allow to disable docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 12, 2024
1 parent 11a609f commit a369048
Show file tree
Hide file tree
Showing 15 changed files with 478 additions and 260 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub trait ReadableStore: Map {
}

/// The mutable part of a Bao store.
pub trait Store: ReadableStore + MapMut {
pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
/// This trait method imports a file from a local path.
///
/// `data` is the path to the file.
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Engine {
/// Handle an incoming iroh-docs connection.
pub async fn handle_connection(
&self,
conn: iroh_net::endpoint::Connecting,
conn: iroh_net::endpoint::Connection,
) -> anyhow::Result<()> {
self.to_live_actor
.send(ToLiveActor::HandleConnection { conn })
Expand Down
4 changes: 2 additions & 2 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub enum ToLiveActor {
reply: sync::oneshot::Sender<Result<()>>,
},
HandleConnection {
conn: iroh_net::endpoint::Connecting,
conn: iroh_net::endpoint::Connection,
},
AcceptSyncRequest {
namespace: NamespaceId,
Expand Down Expand Up @@ -749,7 +749,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}

#[instrument("accept", skip_all)]
pub async fn handle_connection(&mut self, conn: iroh_net::endpoint::Connecting) {
pub async fn handle_connection(&mut self, conn: iroh_net::endpoint::Connection) {
let to_actor_tx = self.sync_actor_tx.clone();
let accept_request_cb = move |namespace, peer| {
let to_actor_tx = to_actor_tx.clone();
Expand Down
3 changes: 1 addition & 2 deletions iroh-docs/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,14 @@ pub enum AcceptOutcome {
/// Handle an iroh-docs connection and sync all shared documents in the replica store.
pub async fn handle_connection<F, Fut>(
sync: SyncHandle,
connecting: iroh_net::endpoint::Connecting,
connection: iroh_net::endpoint::Connection,
accept_cb: F,
) -> Result<SyncFinished, AcceptError>
where
F: Fn(NamespaceId, PublicKey) -> Fut,
Fut: Future<Output = AcceptOutcome>,
{
let t_start = Instant::now();
let connection = connecting.await.map_err(AcceptError::connect)?;
let peer = get_remote_node_id(&connection).map_err(AcceptError::connect)?;
let (mut send_stream, mut recv_stream) = connection
.accept_bi()
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.18.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.18.0", path = "../iroh-net" }
num_cpus = { version = "1.15.0" }
once_cell = "1.17.0"
portable-atomic = "1"
iroh-docs = { version = "0.18.0", path = "../iroh-docs" }
iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" }
once_cell = "1.18.0"
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] }
Expand Down
4 changes: 3 additions & 1 deletion iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ async fn main() -> Result<()> {
let args = Cli::parse();
// create a new node
let node = iroh::node::Node::memory()
.accept(EXAMPLE_ALPN, |node| ExampleProto::build(node))
.accept(EXAMPLE_ALPN, |node| {
Box::pin(async move { Ok(ExampleProto::build(node)) })
})
.spawn()
.await?;

Expand Down
2 changes: 1 addition & 1 deletion iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ where
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorGetDefaultRequest).await?;
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
Ok(res.author_id)
}

Expand Down
23 changes: 13 additions & 10 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::sync::{Arc, Mutex};

use anyhow::{anyhow, Result};
use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_docs::engine::Engine;
use iroh_net::util::AbortingJoinHandle;
use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint};
use once_cell::sync::OnceCell;
use quic_rpc::transport::flume::FlumeConnection;
use quic_rpc::RpcClient;
use tokio::task::JoinHandle;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::debug;
Expand All @@ -31,7 +30,7 @@ mod protocol;
mod rpc;
mod rpc_status;

pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;
pub use protocol::Protocol;

Expand Down Expand Up @@ -60,12 +59,10 @@ struct NodeInner<D> {
secret_key: SecretKey,
cancel_token: CancellationToken,
controller: FlumeConnection<RpcService>,
#[allow(dead_code)]
gc_task: Option<AbortingJoinHandle<()>>,
#[debug("rt")]
rt: LocalPoolHandle,
pub(crate) sync: DocsEngine,
downloader: Downloader,
tasks: Mutex<Option<JoinSet<()>>>,
}

/// In memory node.
Expand Down Expand Up @@ -156,7 +153,11 @@ impl<D: BaoStore> Node<D> {

/// Returns the protocol handler for a alpn.
pub fn get_protocol<P: Protocol>(&self, alpn: &[u8]) -> Option<Arc<P>> {
self.protocols.get(alpn)
self.protocols.get::<P>(alpn)
}

fn downloader(&self) -> &Downloader {
&self.inner.downloader
}

/// Aborts the node.
Expand All @@ -171,8 +172,10 @@ impl<D: BaoStore> Node<D> {
self.inner.cancel_token.cancel();

if let Ok(mut task) = Arc::try_unwrap(self.task) {
let task = task.take().expect("cannot be empty");
task.await?;
task.take().expect("cannot be empty").await?;
}
if let Some(mut tasks) = self.inner.tasks.lock().unwrap().take() {
tasks.abort_all();
}
Ok(())
}
Expand Down
Loading

0 comments on commit a369048

Please sign in to comment.