From 0e6d4415d88afc148e838e3a95d176de092c8348 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 20 Jun 2024 12:46:29 +0200 Subject: [PATCH 1/2] feat(iroh): allow to disable docs engine completely (#2390) ## Description Make the docs engine optional on the iroh node. * Add `Builder::disable_docs()` to disable the docs engine completely * If called, the docs engine will not be spawned and the docs protocol will not be registered. Incoming docs connnections will be dropped, and all docs-related RPC calls will return an error "docs are disabled". ## Breaking Changes * `iroh::node::Builder::with_db_and_store` now takes a `DocsStorage` enum instead of a `iroh_docs::store::Store`. ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [ ] Tests if relevant. - [x] All breaking changes documented. --- iroh/src/client/authors.rs | 2 +- iroh/src/node.rs | 71 ++++++++------- iroh/src/node/builder.rs | 75 +++++++++------- iroh/src/node/docs.rs | 55 ++++++++++++ iroh/src/node/protocol.rs | 8 -- iroh/src/node/rpc.rs | 176 +++++++++++++++++++++++-------------- iroh/src/rpc_protocol.rs | 4 +- iroh/tests/gc.rs | 24 ++--- iroh/tests/provide.rs | 6 +- 9 files changed, 265 insertions(+), 156 deletions(-) create mode 100644 iroh/src/node/docs.rs diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9..dfe8837d6d 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -42,7 +42,7 @@ where /// /// The default author can be set with [`Self::set_default`]. pub async fn default(&self) -> Result { - let res = self.rpc.rpc(AuthorGetDefaultRequest).await?; + let res = self.rpc.rpc(AuthorGetDefaultRequest).await??; Ok(res.author_id) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22..836eeda509 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,7 +13,6 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; -use iroh_docs::engine::Engine; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -24,14 +23,18 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, info, warn}; -use crate::{client::RpcService, node::protocol::ProtocolMap}; +use crate::{ + client::RpcService, + node::{docs::DocsEngine, protocol::ProtocolMap}, +}; mod builder; +mod docs; mod protocol; mod rpc; mod rpc_status; -pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; +pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; pub use protocol::ProtocolHandler; @@ -55,7 +58,7 @@ pub struct Node { #[derive(derive_more::Debug)] struct NodeInner { db: D, - docs: DocsEngine, + docs: Option, endpoint: Endpoint, gossip: Gossip, secret_key: SecretKey, @@ -314,9 +317,22 @@ impl NodeInner { join_set.shutdown().await; } + /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { - // Shutdown the different parts of the node concurrently. let error_code = Closed::ProviderTerminating; + + // Shutdown future for the docs engine, if enabled. + let docs_shutdown = { + let docs = self.docs.clone(); + async move { + if let Some(docs) = docs { + docs.shutdown().await + } else { + Ok(()) + } + } + }; + // We ignore all errors during shutdown. let _ = tokio::join!( // Close the endpoint. @@ -326,8 +342,8 @@ impl NodeInner { self.endpoint .clone() .close(error_code.into(), error_code.reason()), - // Shutdown sync engine. - self.docs.shutdown(), + // Shutdown docs engine. + docs_shutdown, // Shutdown blobs store engine. self.db.shutdown(), // Shutdown protocol handlers. @@ -342,7 +358,6 @@ impl NodeInner { ) { tracing::info!("Starting GC task with interval {:?}", gc_period); let db = &self.db; - let docs = &self.docs; let mut live = BTreeSet::new(); 'outer: loop { if let Err(cause) = db.gc_start().await { @@ -356,22 +371,24 @@ impl NodeInner { tracing::debug!("Starting GC"); live.clear(); - let doc_hashes = match docs.sync.content_hashes().await { - Ok(hashes) => hashes, - Err(err) => { - tracing::warn!("Error getting doc hashes: {}", err); - continue 'outer; - } - }; - for hash in doc_hashes { - match hash { - Ok(hash) => { - live.insert(hash); - } + if let Some(docs) = &self.docs { + let doc_hashes = match docs.sync.content_hashes().await { + Ok(hashes) => hashes, Err(err) => { - tracing::error!("Error getting doc hash: {}", err); + tracing::warn!("Error getting doc hashes: {}", err); continue 'outer; } + }; + for hash in doc_hashes { + match hash { + Ok(hash) => { + live.insert(hash); + } + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + continue 'outer; + } + } } } @@ -436,17 +453,6 @@ async fn handle_connection( } } -/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. -#[derive(Debug, Clone)] -pub(crate) struct DocsEngine(Engine); - -impl std::ops::Deref for DocsEngine { - type Target = Engine; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -655,7 +661,6 @@ mod tests { } #[cfg(feature = "fs-store")] - #[ignore = "flaky"] #[tokio::test] async fn test_default_author_persist() -> Result<()> { use crate::util::path::IrohPaths; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98..5b5164ed64 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -12,7 +12,7 @@ use iroh_blobs::{ downloader::Downloader, store::{Map, Store as BaoStore}, }; -use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ @@ -41,7 +41,7 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; +use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner}; /// Default bind address for the node. /// 11204 is "iroh" in leetspeak @@ -56,6 +56,17 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; +/// Storage backend for documents. +#[derive(Debug, Clone)] +pub enum DocsStorage { + /// Disable docs completely. + Disabled, + /// In-memory storage. + Memory, + /// File-based persistent storage. + Persistent(PathBuf), +} + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -85,7 +96,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -146,7 +157,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: iroh_docs::store::Store::memory(), + docs_storage: DocsStorage::Memory, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -159,7 +170,7 @@ impl Builder { /// Creates a new builder for [`Node`] using the given databases. pub fn with_db_and_store( blobs_store: D, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, storage: StorageConfig, ) -> Self { Self { @@ -172,7 +183,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store, + docs_storage, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -200,8 +211,7 @@ where .with_context(|| { format!("Failed to load blobs database from {}", blob_dir.display()) })?; - let docs_store = - iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?; + let docs_storage = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); let v0 = blobs_store .import_flat_store(iroh_blobs::store::fs::FlatStorePaths { @@ -237,7 +247,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store, + docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -258,7 +268,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -284,7 +294,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -300,6 +310,12 @@ where self } + /// Disables documents support on this node completely. + pub fn disable_docs(mut self) -> Self { + self.docs_storage = DocsStorage::Disabled; + self + } + /// Sets the relay servers to assist in establishing connectivity. /// /// Relay servers are used to discover other nodes by `PublicKey` and also help @@ -405,7 +421,6 @@ where async fn build_inner(self) -> Result> { trace!("building node"); let lp = LocalPoolHandle::new(num_cpus::get()); - let endpoint = { let mut transport_config = quinn::TransportConfig::default(); transport_config @@ -461,25 +476,22 @@ where let addr = endpoint.node_addr().await?; trace!("endpoint address: {addr:?}"); - // initialize the gossip protocol + // Initialize the gossip protocol. let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); - - // initialize the downloader + // Initialize the downloader. let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); - // load or create the default author for documents - // spawn the docs engine - let docs = DocsEngine( - Engine::spawn( - endpoint.clone(), - gossip.clone(), - self.docs_store, - self.blobs_store.clone(), - downloader.clone(), - self.storage.default_author_storage(), - ) - .await?, - ); + // Spawn the docs engine, if enabled. + // This returns None for DocsStorage::Disabled, otherwise Some(DocsEngine). + let docs = DocsEngine::spawn( + self.docs_storage, + self.blobs_store.clone(), + self.storage.default_author_storage(), + endpoint.clone(), + gossip.clone(), + downloader.clone(), + ) + .await?; // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); @@ -637,9 +649,10 @@ impl> ProtocolBuilde let gossip = self.gossip().clone(); self = self.accept(GOSSIP_ALPN, Arc::new(gossip)); - // Register docs. - let docs = self.inner.docs.clone(); - self = self.accept(DOCS_ALPN, Arc::new(docs)); + // Register docs, if enabled. + if let Some(docs) = self.inner.docs.clone() { + self = self.accept(DOCS_ALPN, Arc::new(docs)); + } self } diff --git a/iroh/src/node/docs.rs b/iroh/src/node/docs.rs new file mode 100644 index 0000000000..7f3ee8b183 --- /dev/null +++ b/iroh/src/node/docs.rs @@ -0,0 +1,55 @@ +use std::{ops::Deref, sync::Arc}; + +use anyhow::Result; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_blobs::downloader::Downloader; +use iroh_gossip::net::Gossip; + +use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_net::{endpoint::Connecting, Endpoint}; + +use crate::node::{DocsStorage, ProtocolHandler}; + +/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. +#[derive(Debug, Clone)] +pub(crate) struct DocsEngine(Engine); + +impl DocsEngine { + pub async fn spawn( + storage: DocsStorage, + blobs_store: S, + default_author_storage: DefaultAuthorStorage, + endpoint: Endpoint, + gossip: Gossip, + downloader: Downloader, + ) -> anyhow::Result> { + let docs_store = match storage { + DocsStorage::Disabled => return Ok(None), + DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), + DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, + }; + let engine = Engine::spawn( + endpoint, + gossip, + docs_store, + blobs_store, + downloader, + default_author_storage, + ) + .await?; + Ok(Some(DocsEngine(engine))) + } +} + +impl Deref for DocsEngine { + type Target = Engine; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ProtocolHandler for DocsEngine { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 3c50368b42..a0f5b53be5 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -5,8 +5,6 @@ use futures_lite::future::Boxed as BoxedFuture; use futures_util::future::join_all; use iroh_net::endpoint::Connecting; -use crate::node::DocsEngine; - /// Handler for incoming connections. /// /// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node @@ -119,9 +117,3 @@ impl ProtocolHandler for iroh_gossip::net::Gossip { Box::pin(async move { self.handle_connection(conn.await?).await }) } } - -impl ProtocolHandler for DocsEngine { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn).await }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 697b6d63cd..cc79655e87 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; -use iroh_base::rpc::RpcResult; +use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; @@ -30,12 +30,15 @@ use quic_rpc::{ ServiceEndpoint, }; use tokio::task::JoinSet; -use tokio_util::task::LocalPoolHandle; +use tokio_util::{either::Either, task::LocalPoolHandle}; use tracing::{debug, info, warn}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; -use crate::client::tags::TagInfo; -use crate::client::NodeStatus; +use crate::client::{ + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + tags::TagInfo, + NodeStatus, +}; +use crate::node::{docs::DocsEngine, NodeInner}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, @@ -49,8 +52,6 @@ use crate::rpc_protocol::{ NodeWatchResponse, Request, RpcService, SetTagOption, }; -use super::NodeInner; - mod docs; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); @@ -73,6 +74,38 @@ impl Handler { } impl Handler { + fn docs(&self) -> Option<&DocsEngine> { + self.inner.docs.as_ref() + } + + async fn with_docs(self, f: F) -> RpcResult + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> Fut, + Fut: std::future::Future>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + f(docs).await + } else { + Err(docs_disabled()) + } + } + + fn with_docs_stream(self, f: F) -> impl Stream> + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> S, + S: Stream>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + Either::Left(f(docs)) + } else { + Either::Right(futures_lite::stream::once(Err(docs_disabled()))) + } + } + pub(crate) fn spawn_rpc_request>( inner: Arc>, join_set: &mut JoinSet>, @@ -131,92 +164,95 @@ impl Handler { BlobReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, BlobAddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, BlobAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.author_list(req) + handler.with_docs_stream(|docs| docs.author_list(req)) }) .await } AuthorCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_create(req).await }) }) .await } AuthorImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_import(req).await }) }) .await } AuthorExport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_export(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_export(req).await }) }) .await } AuthorDelete(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_delete(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_delete(req).await }) }) .await } AuthorGetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_default(req) + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { Ok(docs.author_default(req)) }) }) .await } AuthorSetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_set_default(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_set_default(req).await }) }) .await } DocOpen(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_open(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_open(req).await }) }) .await } DocClose(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_close(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_close(req).await }) }) .await } DocStatus(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_status(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_status(req).await }) }) .await } DocList(msg) => { - chan.server_streaming(msg, self, |handler, req| handler.inner.docs.doc_list(req)) - .await + chan.server_streaming(msg, self, |handler, req| { + handler.with_docs_stream(|docs| docs.doc_list(req)) + }) + .await } DocCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_create(req).await }) }) .await } DocDrop(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_drop(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_drop(req).await }) }) .await } DocImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_import(req).await }) }) .await } DocSet(msg) => { - let bao_store = self.inner.db.clone(); - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set(&bao_store, req).await + let blobs_store = self.inner.db.clone(); + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set(&blobs_store, req).await }) }) .await } @@ -229,68 +265,70 @@ impl Handler { .await } DocDel(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_del(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_del(req).await }) }) .await } DocSetHash(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_hash(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_hash(req).await }) }) .await } DocGet(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.doc_get_many(req) + handler.with_docs_stream(|docs| docs.doc_get_many(req)) }) .await } DocGetExact(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_exact(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_exact(req).await }) }) .await } DocStartSync(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_start_sync(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_start_sync(req).await }) }) .await } DocLeave(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_leave(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_leave(req).await }) }) .await } DocShare(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_share(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_share(req).await }) }) .await } DocSubscribe(msg) => { chan.try_server_streaming(msg, self, |handler, req| async move { - handler.inner.docs.doc_subscribe(req).await + handler + .with_docs(|docs| async move { docs.doc_subscribe(req).await }) + .await }) .await } DocSetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_download_policy(req).await }) }) .await } DocGetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_download_policy(req).await }) }) .await } DocGetSyncPeers(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_sync_peers(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_sync_peers(req).await }) }) .await } @@ -467,6 +505,7 @@ impl Handler { msg: DocImportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; @@ -519,16 +558,14 @@ impl Handler { let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; - self.inner - .docs - .doc_set_hash(DocSetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; + docs.doc_set_hash(DocSetHashRequest { + doc_id, + author_id, + key: key.clone(), + hash, + size, + }) + .await?; drop(temp_tag); progress.send(DocImportProgress::AllDone { key }).await?; Ok(()) @@ -553,6 +590,7 @@ impl Handler { msg: DocExportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; let progress = FlumeProgressSender::new(progress); let DocExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); @@ -1128,3 +1166,7 @@ where res.map_err(Into::into) } + +fn docs_disabled() -> RpcError { + anyhow!("docs are disabled").into() +} diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8fe71e7d6a..8334590a11 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -439,7 +439,7 @@ pub struct AuthorCreateResponse { pub struct AuthorGetDefaultRequest; impl RpcMsg for AuthorGetDefaultRequest { - type Response = AuthorGetDefaultResponse; + type Response = RpcResult; } /// Response for [`AuthorGetDefaultRequest`] @@ -1153,7 +1153,7 @@ pub enum Response { AuthorList(RpcResult), AuthorCreate(RpcResult), - AuthorGetDefault(AuthorGetDefaultResponse), + AuthorGetDefault(RpcResult), AuthorSetDefault(RpcResult), AuthorImport(RpcResult), AuthorExport(RpcResult), diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index dcca0893b5..e032691df9 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use iroh::node::{self, Node}; +use iroh::node::{self, DocsStorage, Node}; use rand::RngCore; use iroh_blobs::{ @@ -41,17 +41,19 @@ async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume:: where S: iroh_blobs::store::Store, { - let doc_store = iroh_docs::store::Store::memory(); let (gc_send, gc_recv) = flume::unbounded(); - let node = - node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); - })) - .spawn() - .await - .unwrap(); + let node = node::Builder::with_db_and_store( + bao_store, + DocsStorage::Memory, + iroh::node::StorageConfig::Mem, + ) + .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .register_gc_done_cb(Box::new(move || { + gc_send.send(()).ok(); + })) + .spawn() + .await + .unwrap(); (node, gc_recv) } diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 13376273dd..7b9abf9648 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; use futures_lite::FutureExt; -use iroh::node::Builder; +use iroh::node::{Builder, DocsStorage}; use iroh_base::node_addr::AddrInfoOptions; use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -40,8 +40,8 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result(db: D) -> Builder { - let store = iroh_docs::store::Store::memory(); - iroh::node::Builder::with_db_and_store(db, store, iroh::node::StorageConfig::Mem).bind_port(0) + iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem) + .bind_port(0) } #[tokio::test] From abc7f5e9f3f72158222d7cd2680c52cd797d787d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 20 Jun 2024 14:18:35 +0300 Subject: [PATCH 2/2] refactor(iroh): use boxed client to get rid of the C type parameter (#2353) ## Description I implemented a boxed connection and a boxed service endpoint in quic-rpc. With this we can get rid of the `` type parameter and make the quinn and mem client/server side the same type. The nice thing about this approach is that it will not lead to additonal boxing on the mem path, and for the quinn or whatever io path the boxing will probably not matter that much compared to all the other things going on. ## Breaking Changes Breaking changes affect solely the iroh client. The iroh client as well as all the subsystem clients no longer need a type parameter C to distinguish between an in memory connection and a remote connection. - Code that directly uses the clients should be unaffected. E.g. `iroh.blobs().read(hash)` will still compile. - Code that takes a client as an argument will have to be modified to remove the type parameter. E.g. ```rust fn download(client: blobs::Client) where C: ... ``` will become just ```rust fn download(client: blobs::Client) ``` The type aliases `iroh::client::MemIroh` and `iroh::client::QuicIroh` for an iroh client specialized for memory or remote use have been retained, but will be removed in one of the next releases. In detail: - iroh::client::Iroh loses the `C` type parameter - iroh::client::blobs::Client loses the `C` type parameter - iroh::client::tags::Client loses the `C` type parameter - iroh::client::authors::Client loses the `C` type parameter - iroh::client::docs::Client loses the `C` type parameter ## Notes & open questions Note: I marked the old type aliases MemIroh, QuicIroh etc as deprecated. That does not seem to actually do anything, but just serves as a reminder to remove them in the near future. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. ~~- [x] Tests if relevant.~~ - [x] All breaking changes documented. --- Cargo.lock | 5 ++- iroh-cli/Cargo.toml | 2 +- iroh-cli/src/commands.rs | 6 +-- iroh-cli/src/commands/author.rs | 8 +--- iroh-cli/src/commands/blob.rs | 36 +++++------------- iroh-cli/src/commands/console.rs | 8 +--- iroh-cli/src/commands/doc.rs | 42 +++++---------------- iroh-cli/src/commands/node.rs | 7 +--- iroh-cli/src/commands/rpc.rs | 8 +--- iroh-cli/src/commands/start.rs | 4 +- iroh-cli/src/commands/tag.rs | 8 +--- iroh-cli/src/config.rs | 18 ++------- iroh/Cargo.toml | 2 +- iroh/examples/custom-protocol.rs | 6 +-- iroh/src/client.rs | 36 ++++++++++-------- iroh/src/client/authors.rs | 14 +++---- iroh/src/client/blobs.rs | 35 +++++++---------- iroh/src/client/docs.rs | 65 +++++++++++--------------------- iroh/src/client/mem.rs | 19 ---------- iroh/src/client/node.rs | 8 +--- iroh/src/client/quic.rs | 14 ++----- iroh/src/client/tags.rs | 13 +++---- iroh/src/node.rs | 6 +-- iroh/src/node/builder.rs | 9 +++-- iroh/src/node/rpc_status.rs | 2 +- iroh/tests/sync.rs | 14 +++---- 26 files changed, 134 insertions(+), 261 deletions(-) delete mode 100644 iroh/src/client/mem.rs diff --git a/Cargo.lock b/Cargo.lock index 2c2ffd4b8b..0c680d19b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4041,10 +4041,11 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d69b05325e19956f123fce85ebc4d99226552a0bb24bba4c886106297e708b" +checksum = "5e56dc58272a3f9c151b1c3a6df0e3caca083fd843b337e60f706fae2d974b6b" dependencies = [ + "anyhow", "bincode", "derive_more", "educe", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index f08cc7b784..360bf6c76d 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -45,7 +45,7 @@ parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.10.0", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] } rand = "0.8.5" ratatui = "0.26.2" reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 7e3502c275..bd1bc0d18a 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::{ensure, Context, Result}; use clap::Parser; use derive_more::FromStr; -use iroh::client::QuicIroh; +use iroh::client::Iroh; use crate::config::{ConsoleEnv, NodeConfig}; @@ -130,7 +130,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_console(data_dir_owned, &iroh).await?; console::run(&iroh, &env).await } @@ -151,7 +151,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_cli(data_dir_owned, &iroh).await?; command.run(&iroh, &env).await } diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index 2ad98a48b6..8493e4fd2d 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -4,9 +4,8 @@ use derive_more::FromStr; use futures_lite::StreamExt; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; +use iroh::client::Iroh; use iroh::docs::{Author, AuthorId}; -use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; @@ -38,10 +37,7 @@ pub enum AuthorCommands { } impl AuthorCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { author } => { env.set_author(author)?; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 9e2c7208c6..6e4c0aeba2 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -30,11 +30,10 @@ use iroh::{ BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, IncompleteBlobInfo, WrapOption, }, - Iroh, RpcService, + Iroh, }, net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; -use quic_rpc::ServiceConnection; use tokio::io::AsyncWriteExt; #[allow(clippy::large_enum_variant)] @@ -182,10 +181,7 @@ impl std::str::FromStr for TicketOrHash { } impl BlobCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Get { ticket, @@ -447,10 +443,7 @@ pub enum ListCommands { } impl ListCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blobs => { let mut response = iroh.blobs().list().await?; @@ -507,10 +500,7 @@ pub enum DeleteCommands { } impl DeleteCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blob { hash } => { let response = iroh.blobs().delete_blob(hash).await; @@ -540,10 +530,7 @@ fn apply_report_level(text: String, level: ReportLevel) -> console::StyledObject } } -pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut response = iroh.blobs().consistency_check(repair).await?; let verbosity = get_report_level(verbose); let print = |level: ReportLevel, entry: Option, message: String| { @@ -584,10 +571,7 @@ where Ok(()) } -pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut state = ValidateProgressState::new(); let mut response = iroh.blobs().validate(repair).await?; let verbosity = get_report_level(verbose); @@ -807,8 +791,8 @@ pub enum TicketOption { Print, } -pub async fn add_with_opts>( - client: &iroh::client::Iroh, +pub async fn add_with_opts( + client: &iroh::client::Iroh, source: BlobSource, opts: BlobAddOptions, ) -> Result<()> { @@ -840,8 +824,8 @@ pub async fn add_with_opts>( } /// Add data to iroh, either from a path or, if path is `None`, from STDIN. -pub async fn add>( - client: &iroh::client::Iroh, +pub async fn add( + client: &iroh::client::Iroh, source: BlobSourceIroh, tag: SetTagOption, ticket: TicketOption, diff --git a/iroh-cli/src/commands/console.rs b/iroh-cli/src/commands/console.rs index df10126d75..5f2d8bc383 100644 --- a/iroh-cli/src/commands/console.rs +++ b/iroh-cli/src/commands/console.rs @@ -2,8 +2,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use colored::Colorize; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use rustyline::{error::ReadlineError, Config, DefaultEditor}; use tokio::sync::{mpsc, oneshot}; @@ -12,10 +11,7 @@ use crate::{ config::{ConsoleEnv, ConsolePaths}, }; -pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { println!("{}", "Welcome to the Iroh console!".purple().bold()); println!("Type `{}` for a list of commands.", "help".bold()); let mut from_repl = Repl::spawn(env.clone()); diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index b2a13b3596..d26f785912 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -13,7 +13,6 @@ use dialoguer::Confirm; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; -use quic_rpc::ServiceConnection; use tokio::io::AsyncReadExt; use iroh::{ @@ -22,7 +21,7 @@ use iroh::{ client::{ blobs::WrapOption, docs::{Doc, Entry, LiveEvent, Origin, ShareMode}, - Iroh, RpcService, + Iroh, }, docs::{ store::{DownloadPolicy, FilterKind, Query, SortDirection}, @@ -303,10 +302,7 @@ impl From for iroh::docs::store::SortBy { } impl DocCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { id: doc } => { env.set_doc(doc)?; @@ -673,14 +669,7 @@ impl DocCommands { } } -async fn get_doc( - iroh: &Iroh, - env: &ConsoleEnv, - id: Option, -) -> anyhow::Result> -where - C: ServiceConnection, -{ +async fn get_doc(iroh: &Iroh, env: &ConsoleEnv, id: Option) -> anyhow::Result { iroh.docs() .open(env.doc(id)?) .await? @@ -688,14 +677,7 @@ where } /// Format the content. If an error occurs it's returned in a formatted, friendly way. -async fn fmt_content( - doc: &Doc, - entry: &Entry, - mode: DisplayContentMode, -) -> Result -where - C: ServiceConnection, -{ +async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Result { let read_failed = |err: anyhow::Error| format!(""); let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes())); let as_utf8 = |buf: Vec| String::from_utf8(buf).map(|repr| format!("\"{repr}\"")); @@ -743,10 +725,7 @@ fn human_len(entry: &Entry) -> HumanBytes { } #[must_use = "this won't be printed, you need to print it yourself"] -async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String -where - C: ServiceConnection, -{ +async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String { let key = std::str::from_utf8(entry.key()) .unwrap_or("") .bold(); @@ -776,18 +755,15 @@ fn tag_from_file_name(path: &Path) -> anyhow::Result { /// document via the hash of the blob. /// It also creates and powers the `ImportProgressBar`. #[tracing::instrument(skip_all)] -async fn import_coordinator( - doc: Doc, +async fn import_coordinator( + doc: Doc, author_id: AuthorId, root: PathBuf, prefix: String, blob_add_progress: impl Stream> + Send + Unpin + 'static, expected_size: u64, expected_entries: u64, -) -> Result<()> -where - C: ServiceConnection, -{ +) -> Result<()> { let imp = ImportProgressBar::new( &root.display().to_string(), doc.id(), @@ -982,7 +958,7 @@ mod tests { let cli = ConsoleEnv::for_console(data_dir.path().to_owned(), &node) .await .context("ConsoleEnv")?; - let iroh = iroh::client::QuicIroh::connect(data_dir.path()) + let iroh = iroh::client::Iroh::connect(data_dir.path()) .await .context("rpc connect")?; diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 4d2b8ad1bf..b85422069e 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -8,12 +8,10 @@ use comfy_table::{presets::NOTHING, Cell}; use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; -use iroh::client::RpcService; use iroh::net::{ endpoint::{ConnectionInfo, DirectAddrInfo}, key::PublicKey, }; -use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -38,10 +36,7 @@ pub enum NodeCommands { } impl NodeCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Connections => { let connections = iroh.connections().await?; diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 1d26121681..414a894ddb 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Subcommand; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use crate::config::ConsoleEnv; @@ -58,10 +57,7 @@ pub enum RpcCommands { } impl RpcCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Node { command } => command.run(iroh).await, Self::Blob { command } => command.run(iroh).await, diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index ec22c26de8..694e484424 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -33,7 +33,7 @@ pub async fn run_with_command( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let _guard = crate::logging::init_terminal_and_file_logging(&config.file_logs, iroh_data_root)?; @@ -68,7 +68,7 @@ async fn run_with_command_inner( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let relay_map = config.relay_map()?; diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index 42c228266b..fa4a298429 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -3,8 +3,7 @@ use bytes::Bytes; use clap::Subcommand; use futures_lite::StreamExt; use iroh::blobs::Tag; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -20,10 +19,7 @@ pub enum TagCommands { } impl TagCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::List => { let mut response = iroh.tags().list().await?; diff --git a/iroh-cli/src/config.rs b/iroh-cli/src/config.rs index 249b91af10..e7b7d1ac08 100644 --- a/iroh-cli/src/config.rs +++ b/iroh-cli/src/config.rs @@ -15,11 +15,10 @@ use iroh::net::{ }; use iroh::node::GcPolicy; use iroh::{ - client::{Iroh, RpcService}, + client::Iroh, docs::{AuthorId, NamespaceId}, }; use parking_lot::RwLock; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use tracing::warn; @@ -133,10 +132,7 @@ struct ConsoleEnvInner { impl ConsoleEnv { /// Read from environment variables and the console config file. - pub(crate) async fn for_console>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_console(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let console_data_dir = ConsolePaths::root(&iroh_data_dir); tokio::fs::create_dir_all(&console_data_dir) .await @@ -161,10 +157,7 @@ impl ConsoleEnv { } /// Read only from environment variables. - pub(crate) async fn for_cli>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_cli(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let author = env_author(None, iroh).await?; let env = ConsoleEnvInner { author, @@ -278,10 +271,7 @@ impl ConsoleEnv { } } -async fn env_author>( - from_config: Option, - iroh: &Iroh, -) -> Result { +async fn env_author(from_config: Option, iroh: &Iroh) -> Result { if let Some(author) = env::var(ENV_AUTHOR) .ok() .map(|s| { diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 15404847da..307ec354ba 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" } iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 4a12687725..6f76f87636 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -4,7 +4,7 @@ use anyhow::Result; use clap::Parser; use futures_lite::future::Boxed as BoxedFuture; use iroh::{ - client::MemIroh, + client::Iroh, net::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeId, @@ -59,7 +59,7 @@ const EXAMPLE_ALPN: &[u8] = b"example-proto/0"; #[derive(Debug, Clone)] struct ExampleProto { - client: MemIroh, + client: Iroh, endpoint: Endpoint, } @@ -89,7 +89,7 @@ impl ProtocolHandler for ExampleProto { } impl ExampleProto { - pub fn new(client: MemIroh, endpoint: Endpoint) -> Self { + pub fn new(client: Iroh, endpoint: Endpoint) -> Self { Self { client, endpoint } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index e158bfb355..ce9647f5fd 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -1,18 +1,23 @@ //! Client to an Iroh node. use futures_lite::{Stream, StreamExt}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; #[doc(inline)] pub use crate::rpc_protocol::RpcService; -mod mem; mod quic; -pub use self::mem::{Doc as MemDoc, Iroh as MemIroh, RpcClient as MemRpcClient}; +#[deprecated] +pub use self::docs::Doc as MemDoc; +#[deprecated] +pub use self::docs::Doc as QuicDoc; +pub use self::docs::Doc; pub use self::node::NodeStatus; -pub use self::quic::{Doc as QuicDoc, Iroh as QuicIroh, RpcClient as QuicRpcClient}; +#[deprecated] +pub use self::Iroh as MemIroh; +#[deprecated] +pub use self::Iroh as QuicIroh; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; @@ -23,38 +28,39 @@ pub mod tags; mod node; +/// Iroh rpc client - boxed so that we can have a concrete type. +pub(crate) type RpcClient = + quic_rpc::RpcClient>; + /// Iroh client. #[derive(Debug, Clone)] -pub struct Iroh { - rpc: RpcClient, +pub struct Iroh { + rpc: RpcClient, } -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Create a new high-level client to a Iroh node from the low-level RPC client. - pub fn new(rpc: RpcClient) -> Self { + pub fn new(rpc: RpcClient) -> Self { Self { rpc } } /// Blobs client - pub fn blobs(&self) -> &blobs::Client { + pub fn blobs(&self) -> &blobs::Client { blobs::Client::ref_cast(&self.rpc) } /// Docs client - pub fn docs(&self) -> &docs::Client { + pub fn docs(&self) -> &docs::Client { docs::Client::ref_cast(&self.rpc) } /// Authors client - pub fn authors(&self) -> &authors::Client { + pub fn authors(&self) -> &authors::Client { authors::Client::ref_cast(&self.rpc) } /// Tags client - pub fn tags(&self) -> &tags::Client { + pub fn tags(&self) -> &tags::Client { tags::Client::ref_cast(&self.rpc) } } diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index dfe8837d6d..7c87c0a4fa 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -3,27 +3,23 @@ use anyhow::Result; use futures_lite::{stream::StreamExt, Stream}; use iroh_docs::{Author, AuthorId}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest, - AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService, + AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, }; -use super::flatten; +use super::{flatten, RpcClient}; /// Iroh authors client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document author. /// /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 53245acd3d..861060b751 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -24,7 +24,7 @@ use iroh_blobs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; -use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection}; +use quic_rpc::client::BoxStreamSync; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -35,28 +35,25 @@ use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, + CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption, }; -use super::{flatten, tags, Iroh}; +use super::{flatten, tags, Iroh, RpcClient}; /// Iroh blobs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl<'a, C: ServiceConnection> From<&'a Iroh> for &'a RpcClient { - fn from(client: &'a Iroh) -> &'a RpcClient { +impl<'a> From<&'a Iroh> for &'a RpcClient { + fn from(client: &'a Iroh) -> &'a RpcClient { &client.blobs().rpc } } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -386,17 +383,14 @@ where } } - fn tags_client(&self) -> tags::Client { + fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), } } } -impl SimpleStore for Client -where - C: ServiceConnection, -{ +impl SimpleStore for Client { async fn load(&self, hash: Hash) -> anyhow::Result { self.read_to_bytes(hash).await } @@ -786,15 +780,12 @@ impl Reader { } } - pub(crate) async fn from_rpc_read>( - rpc: &RpcClient, - hash: Hash, - ) -> anyhow::Result { + pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result { Self::from_rpc_read_at(rpc, hash, 0, None).await } - async fn from_rpc_read_at>( - rpc: &RpcClient, + async fn from_rpc_read_at( + rpc: &RpcClient, hash: Hash, offset: u64, len: Option, diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index cdc15614fc..95d184a045 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -21,7 +21,7 @@ use iroh_docs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::{message::RpcMsg, RpcClient, ServiceConnection}; +use quic_rpc::message::RpcMsg; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; @@ -36,21 +36,18 @@ use crate::rpc_protocol::{ #[doc(inline)] pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; -use super::{blobs, flatten}; +use super::{blobs, flatten, RpcClient}; /// Iroh docs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document. - pub async fn create(&self) -> Result> { + pub async fn create(&self) -> Result { let res = self.rpc.rpc(DocCreateRequest {}).await??; let doc = Doc::new(self.rpc.clone(), res.id); Ok(doc) @@ -69,14 +66,14 @@ where /// Import a document from a namespace capability. /// /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync. - pub async fn import_namespace(&self, capability: Capability) -> Result> { + pub async fn import_namespace(&self, capability: Capability) -> Result { let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); Ok(doc) } /// Import a document from a ticket and join all peers in the ticket. - pub async fn import(&self, ticket: DocTicket) -> Result> { + pub async fn import(&self, ticket: DocTicket) -> Result { let DocTicket { capability, nodes } = ticket; let doc = self.import_namespace(capability).await?; doc.start_sync(nodes).await?; @@ -92,7 +89,7 @@ where pub async fn import_and_subscribe( &self, ticket: DocTicket, - ) -> Result<(Doc, impl Stream>)> { + ) -> Result<(Doc, impl Stream>)> { let DocTicket { capability, nodes } = ticket; let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); @@ -108,7 +105,7 @@ where } /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. - pub async fn open(&self, id: NamespaceId) -> Result>> { + pub async fn open(&self, id: NamespaceId) -> Result> { self.rpc.rpc(DocOpenRequest { doc_id: id }).await??; let doc = Doc::new(self.rpc.clone(), id); Ok(Some(doc)) @@ -117,28 +114,25 @@ where /// Document handle #[derive(Debug, Clone)] -pub struct Doc>(Arc>); +pub struct Doc(Arc); -impl> PartialEq for Doc { +impl PartialEq for Doc { fn eq(&self, other: &Self) -> bool { self.0.id == other.0.id } } -impl> Eq for Doc {} +impl Eq for Doc {} #[derive(Debug)] -struct DocInner> { +struct DocInner { id: NamespaceId, - rpc: RpcClient, + rpc: RpcClient, closed: AtomicBool, rt: tokio::runtime::Handle, } -impl Drop for DocInner -where - C: ServiceConnection, -{ +impl Drop for DocInner { fn drop(&mut self) { let doc_id = self.id; let rpc = self.rpc.clone(); @@ -150,11 +144,8 @@ where } } -impl Doc -where - C: ServiceConnection, -{ - fn new(rpc: RpcClient, id: NamespaceId) -> Self { +impl Doc { + fn new(rpc: RpcClient, id: NamespaceId) -> Self { Self(Arc::new(DocInner { rpc, id, @@ -420,8 +411,8 @@ where } } -impl<'a, C: ServiceConnection> From<&'a Doc> for &'a RpcClient { - fn from(doc: &'a Doc) -> &'a RpcClient { +impl<'a> From<&'a Doc> for &'a RpcClient { + fn from(doc: &'a Doc) -> &'a RpcClient { &doc.0.rpc } } @@ -476,26 +467,14 @@ impl Entry { /// Read the content of an [`Entry`] as a streaming [`blobs::Reader`]. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_reader( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_bytes( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() diff --git a/iroh/src/client/mem.rs b/iroh/src/client/mem.rs deleted file mode 100644 index 85f65a5d93..0000000000 --- a/iroh/src/client/mem.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Type declarations for an in-memory client to an iroh node running in the same process. -//! -//! The in-memory client is obtained directly from a running node through -//! [`crate::node::Node::client`] - -use quic_rpc::transport::flume::FlumeConnection; - -use crate::rpc_protocol::RpcService; - -/// RPC client to an iroh node running in the same process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// In-memory client to an iroh node running in the same process. -/// -/// This is obtained from [`crate::node::Node::client`]. -pub type Iroh = super::Iroh>; - -/// In-memory document client to an iroh node running in the same process. -pub type Doc = super::docs::Doc>; diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 6f8460b376..96ce9d87c2 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -6,21 +6,17 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ CounterStats, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, RpcService, + NodeStatusRequest, }; use super::{flatten, Iroh}; -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Get statistics of the running node. pub async fn stats(&self) -> Result> { let res = self.rpc.rpc(NodeStatsRequest {}).await??; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 7f7333810f..6351c9632d 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -8,8 +8,9 @@ use std::{ }; use anyhow::{bail, Context}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; +use super::Iroh; use crate::{ node::RpcStatus, rpc_protocol::{NodeStatusRequest, RpcService}, @@ -20,15 +21,7 @@ use crate::{ pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; /// RPC client to an iroh node running in a separate process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// Client to an iroh node running in a separate process. -/// -/// This is obtained from [`Iroh::connect`]. -pub type Iroh = super::Iroh>; - -/// RPC document client to an iroh node running in a separate process. -pub type Doc = super::docs::Doc>; +pub type RpcClient = quic_rpc::RpcClient>; impl Iroh { /// Connect to an iroh node running on the same computer, but in a different process. @@ -51,6 +44,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); let server_name = "localhost".to_string(); let connection = QuinnConnection::::new(endpoint, addr, server_name); + let connection = BoxedConnection::new(connection); let client = RpcClient::new(connection); // Do a status request to check if the server is running. let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest)) diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 9c3ef34f12..66166a396c 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -3,23 +3,20 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_blobs::{BlobFormat, Hash, Tag}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService}; +use super::RpcClient; +use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest}; /// Iroh tags client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// List all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 836eeda509..91e6febc61 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -63,7 +63,7 @@ struct NodeInner { gossip: Gossip, secret_key: SecretKey, cancel_token: CancellationToken, - client: crate::client::MemIroh, + client: crate::client::Iroh, #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, @@ -136,7 +136,7 @@ impl Node { } /// Return a client to control this node over an in-memory channel. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } @@ -183,7 +183,7 @@ impl Node { } impl std::ops::Deref for Node { - type Target = crate::client::MemIroh; + type Target = crate::client::Iroh; fn deref(&self) -> &Self::Target { &self.inner.client diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 5b5164ed64..d2917eb2ca 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -495,6 +495,9 @@ where // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); + // box the controller. Boxing has a special case for the flume channel that avoids allocations, + // so this has zero overhead. + let controller = quic_rpc::transport::boxed::Connection::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); let inner = Arc::new(NodeInner { @@ -557,7 +560,7 @@ impl> ProtocolBuilde /// # use std::sync::Arc; /// # use anyhow::Result; /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::MemIroh}; + /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::Iroh}; /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -566,7 +569,7 @@ impl> ProtocolBuilde /// /// #[derive(Debug)] /// struct MyProtocol { - /// client: MemIroh + /// client: Iroh /// } /// /// impl ProtocolHandler for MyProtocol { @@ -601,7 +604,7 @@ impl> ProtocolBuilde /// /// Note that RPC calls performed with the client will not complete until the node is /// spawned. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs index 00ad0c8ab2..29f633eb81 100644 --- a/iroh/src/node/rpc_status.rs +++ b/iroh/src/node/rpc_status.rs @@ -16,7 +16,7 @@ pub enum RpcStatus { /// The port we are connected on. port: u16, /// Actual connected RPC client. - client: crate::client::QuicRpcClient, + client: crate::client::RpcClient, }, } diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index a5e9b8a463..b2ff42fedd 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -13,7 +13,7 @@ use iroh::{ base::node_addr::AddrInfoOptions, client::{ docs::{Entry, LiveEvent, ShareMode}, - MemDoc, + Doc, }, net::key::{PublicKey, SecretKey}, node::{Builder, Node}, @@ -1012,14 +1012,14 @@ async fn test_list_docs_stream() -> Result<()> { } /// Get all entries of a document. -async fn get_all(doc: &MemDoc) -> anyhow::Result> { +async fn get_all(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.collect::>().await; entries.into_iter().collect() } /// Get all entries of a document with the blob content. -async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result> { +async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.and_then(|entry| async { let content = entry.content_bytes(doc).await; @@ -1031,7 +1031,7 @@ async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result, n: usize, cb: impl Fn(usize, usize) -> (AuthorId, String, String), @@ -1090,7 +1090,7 @@ async fn wait_for_events( } async fn assert_all_docs( - docs: &[MemDoc], + docs: &[Doc], node_ids: &[PublicKey], expected: &Vec, label: &str, @@ -1203,12 +1203,12 @@ async fn sync_drop_doc() -> Result<()> { Ok(()) } -async fn assert_latest(doc: &MemDoc, key: &[u8], value: &[u8]) { +async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { let content = get_latest(doc, key).await.unwrap(); assert_eq!(content, value.to_vec()); } -async fn get_latest(doc: &MemDoc, key: &[u8]) -> anyhow::Result> { +async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { let query = Query::single_latest_per_key().key_exact(key); let entry = doc .get_many(query)