From af3f2d74c855c4a596d7c0dbdc3ada178d43c8f9 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 20 Jun 2024 10:52:04 +0200 Subject: [PATCH 1/4] feat(iroh): allow to disable docs engine completely --- iroh/src/client/authors.rs | 2 +- iroh/src/node.rs | 71 +++++++------ iroh/src/node/builder.rs | 65 ++++++----- iroh/src/node/docs.rs | 54 ++++++++++ iroh/src/node/protocol.rs | 8 -- iroh/src/node/rpc.rs | 213 ++++++++++++++++++++++++------------- iroh/src/node/rpc/docs.rs | 57 +++++----- iroh/src/rpc_protocol.rs | 4 +- iroh/tests/gc.rs | 24 +++-- iroh/tests/provide.rs | 6 +- 10 files changed, 313 insertions(+), 191 deletions(-) create mode 100644 iroh/src/node/docs.rs diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9..dfe8837d6d 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -42,7 +42,7 @@ where /// /// The default author can be set with [`Self::set_default`]. pub async fn default(&self) -> Result { - let res = self.rpc.rpc(AuthorGetDefaultRequest).await?; + let res = self.rpc.rpc(AuthorGetDefaultRequest).await??; Ok(res.author_id) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22..836eeda509 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,7 +13,6 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; -use iroh_docs::engine::Engine; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -24,14 +23,18 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, info, warn}; -use crate::{client::RpcService, node::protocol::ProtocolMap}; +use crate::{ + client::RpcService, + node::{docs::DocsEngine, protocol::ProtocolMap}, +}; mod builder; +mod docs; mod protocol; mod rpc; mod rpc_status; -pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; +pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; pub use protocol::ProtocolHandler; @@ -55,7 +58,7 @@ pub struct Node { #[derive(derive_more::Debug)] struct NodeInner { db: D, - docs: DocsEngine, + docs: Option, endpoint: Endpoint, gossip: Gossip, secret_key: SecretKey, @@ -314,9 +317,22 @@ impl NodeInner { join_set.shutdown().await; } + /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { - // Shutdown the different parts of the node concurrently. let error_code = Closed::ProviderTerminating; + + // Shutdown future for the docs engine, if enabled. + let docs_shutdown = { + let docs = self.docs.clone(); + async move { + if let Some(docs) = docs { + docs.shutdown().await + } else { + Ok(()) + } + } + }; + // We ignore all errors during shutdown. let _ = tokio::join!( // Close the endpoint. @@ -326,8 +342,8 @@ impl NodeInner { self.endpoint .clone() .close(error_code.into(), error_code.reason()), - // Shutdown sync engine. - self.docs.shutdown(), + // Shutdown docs engine. + docs_shutdown, // Shutdown blobs store engine. self.db.shutdown(), // Shutdown protocol handlers. @@ -342,7 +358,6 @@ impl NodeInner { ) { tracing::info!("Starting GC task with interval {:?}", gc_period); let db = &self.db; - let docs = &self.docs; let mut live = BTreeSet::new(); 'outer: loop { if let Err(cause) = db.gc_start().await { @@ -356,22 +371,24 @@ impl NodeInner { tracing::debug!("Starting GC"); live.clear(); - let doc_hashes = match docs.sync.content_hashes().await { - Ok(hashes) => hashes, - Err(err) => { - tracing::warn!("Error getting doc hashes: {}", err); - continue 'outer; - } - }; - for hash in doc_hashes { - match hash { - Ok(hash) => { - live.insert(hash); - } + if let Some(docs) = &self.docs { + let doc_hashes = match docs.sync.content_hashes().await { + Ok(hashes) => hashes, Err(err) => { - tracing::error!("Error getting doc hash: {}", err); + tracing::warn!("Error getting doc hashes: {}", err); continue 'outer; } + }; + for hash in doc_hashes { + match hash { + Ok(hash) => { + live.insert(hash); + } + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + continue 'outer; + } + } } } @@ -436,17 +453,6 @@ async fn handle_connection( } } -/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. -#[derive(Debug, Clone)] -pub(crate) struct DocsEngine(Engine); - -impl std::ops::Deref for DocsEngine { - type Target = Engine; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -655,7 +661,6 @@ mod tests { } #[cfg(feature = "fs-store")] - #[ignore = "flaky"] #[tokio::test] async fn test_default_author_persist() -> Result<()> { use crate::util::path::IrohPaths; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98..ed95a367be 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -12,7 +12,7 @@ use iroh_blobs::{ downloader::Downloader, store::{Map, Store as BaoStore}, }; -use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ @@ -41,7 +41,7 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; +use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner}; /// Default bind address for the node. /// 11204 is "iroh" in leetspeak @@ -56,6 +56,15 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; +/// Storage backend for documents. +#[derive(Debug, Clone)] +pub enum DocsStorage { + /// In-memory storage. + Memory, + /// File-based persistent storage. + Persistent(PathBuf), +} + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -85,7 +94,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_store: iroh_docs::store::Store, + docs_store: Option, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -146,7 +155,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: iroh_docs::store::Store::memory(), + docs_store: Some(DocsStorage::Memory), node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -159,7 +168,7 @@ impl Builder { /// Creates a new builder for [`Node`] using the given databases. pub fn with_db_and_store( blobs_store: D, - docs_store: iroh_docs::store::Store, + docs_store: DocsStorage, storage: StorageConfig, ) -> Self { Self { @@ -172,7 +181,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store, + docs_store: Some(docs_store), node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -200,8 +209,7 @@ where .with_context(|| { format!("Failed to load blobs database from {}", blob_dir.display()) })?; - let docs_store = - iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?; + let docs_store = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); let v0 = blobs_store .import_flat_store(iroh_blobs::store::fs::FlatStorePaths { @@ -237,7 +245,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store, + docs_store: Some(docs_store), node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -300,6 +308,12 @@ where self } + /// Disables documents support on this node completely. + pub fn disable_docs(mut self) -> Self { + self.docs_store = None; + self + } + /// Sets the relay servers to assist in establishing connectivity. /// /// Relay servers are used to discover other nodes by `PublicKey` and also help @@ -405,7 +419,6 @@ where async fn build_inner(self) -> Result> { trace!("building node"); let lp = LocalPoolHandle::new(num_cpus::get()); - let endpoint = { let mut transport_config = quinn::TransportConfig::default(); transport_config @@ -461,25 +474,26 @@ where let addr = endpoint.node_addr().await?; trace!("endpoint address: {addr:?}"); - // initialize the gossip protocol + // Initialize the gossip protocol. let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); - - // initialize the downloader + // Initialize the downloader. let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); - // load or create the default author for documents - // spawn the docs engine - let docs = DocsEngine( - Engine::spawn( + // Spawn the docs engine, if enabled. + let docs = if let Some(docs_storage) = &self.docs_store { + let docs = DocsEngine::spawn( + docs_storage, + self.blobs_store.clone(), + self.storage.default_author_storage(), endpoint.clone(), gossip.clone(), - self.docs_store, - self.blobs_store.clone(), downloader.clone(), - self.storage.default_author_storage(), ) - .await?, - ); + .await?; + Some(docs) + } else { + None + }; // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); @@ -637,9 +651,10 @@ impl> ProtocolBuilde let gossip = self.gossip().clone(); self = self.accept(GOSSIP_ALPN, Arc::new(gossip)); - // Register docs. - let docs = self.inner.docs.clone(); - self = self.accept(DOCS_ALPN, Arc::new(docs)); + // Register docs, if enabled. + if let Some(docs) = self.inner.docs.clone() { + self = self.accept(DOCS_ALPN, Arc::new(docs)); + } self } diff --git a/iroh/src/node/docs.rs b/iroh/src/node/docs.rs new file mode 100644 index 0000000000..4203e9e59a --- /dev/null +++ b/iroh/src/node/docs.rs @@ -0,0 +1,54 @@ +use std::{ops::Deref, sync::Arc}; + +use anyhow::Result; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_blobs::downloader::Downloader; +use iroh_gossip::net::Gossip; + +use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_net::{endpoint::Connecting, Endpoint}; + +use crate::node::{DocsStorage, ProtocolHandler}; + +/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. +#[derive(Debug, Clone)] +pub(crate) struct DocsEngine(Engine); + +impl DocsEngine { + pub async fn spawn( + storage: &DocsStorage, + blobs_store: S, + default_author_storage: DefaultAuthorStorage, + endpoint: Endpoint, + gossip: Gossip, + downloader: Downloader, + ) -> anyhow::Result { + let docs_store = match storage { + DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), + DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, + }; + let engine = Engine::spawn( + endpoint, + gossip, + docs_store, + blobs_store, + downloader, + default_author_storage, + ) + .await?; + Ok(DocsEngine(engine)) + } +} + +impl Deref for DocsEngine { + type Target = Engine; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ProtocolHandler for DocsEngine { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 3c50368b42..a0f5b53be5 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -5,8 +5,6 @@ use futures_lite::future::Boxed as BoxedFuture; use futures_util::future::join_all; use iroh_net::endpoint::Connecting; -use crate::node::DocsEngine; - /// Handler for incoming connections. /// /// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node @@ -119,9 +117,3 @@ impl ProtocolHandler for iroh_gossip::net::Gossip { Box::pin(async move { self.handle_connection(conn.await?).await }) } } - -impl ProtocolHandler for DocsEngine { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn).await }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 697b6d63cd..bafca9586a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; -use iroh_base::rpc::RpcResult; +use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; @@ -33,24 +33,26 @@ use tokio::task::JoinSet; use tokio_util::task::LocalPoolHandle; use tracing::{debug, info, warn}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; -use crate::client::tags::TagInfo; -use crate::client::NodeStatus; +use crate::client::{ + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + tags::TagInfo, + NodeStatus, +}; +use crate::node::{docs::DocsEngine, rpc::docs::ITER_CHANNEL_CAP, NodeInner}; use crate::rpc_protocol::{ - BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, - BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, - BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, - ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, - NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, - NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, - NodeWatchResponse, Request, RpcService, SetTagOption, + AuthorListResponse, BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, + BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, + BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse, + BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, + BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, + DocExportFileRequest, DocExportFileResponse, DocGetManyResponse, DocImportFileRequest, + DocImportFileResponse, DocListResponse, DocSetHashRequest, ListTagsRequest, NodeAddrRequest, + NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, + NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, + NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse, + Request, RpcService, SetTagOption, }; -use super::NodeInner; - mod docs; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); @@ -73,6 +75,24 @@ impl Handler { } impl Handler { + fn docs(&self) -> Option<&DocsEngine> { + self.inner.docs.as_ref() + } + + async fn with_docs(self, f: F) -> RpcResult + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> Fut, + Fut: std::future::Future>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + f(docs).await + } else { + Err(docs_disabled()) + } + } + pub(crate) fn spawn_rpc_request>( inner: Arc>, join_set: &mut JoinSet>, @@ -131,92 +151,115 @@ impl Handler { BlobReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, BlobAddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, BlobAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.author_list(req) + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + if let Some(docs) = handler.docs() { + docs.author_list(req, tx); + } else { + tx.send(Err(anyhow!("docs are disabled"))) + .expect("has capacity"); + } + rx.into_stream().map(|r| { + r.map(|author_id| AuthorListResponse { author_id }) + .map_err(Into::into) + }) }) .await } AuthorCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_create(req).await }) }) .await } AuthorImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_import(req).await }) }) .await } AuthorExport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_export(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_export(req).await }) }) .await } AuthorDelete(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_delete(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_delete(req).await }) }) .await } AuthorGetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_default(req) + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { Ok(docs.author_default(req)) }) }) .await } AuthorSetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_set_default(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_set_default(req).await }) }) .await } DocOpen(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_open(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_open(req).await }) }) .await } DocClose(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_close(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_close(req).await }) }) .await } DocStatus(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_status(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_status(req).await }) }) .await } DocList(msg) => { - chan.server_streaming(msg, self, |handler, req| handler.inner.docs.doc_list(req)) - .await + chan.server_streaming(msg, self, |handler, req| { + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + if let Some(docs) = handler.docs() { + docs.doc_list(req, tx); + } else { + tx.send(Err(anyhow!("docs are disabled"))) + .expect("has capacity"); + } + rx.into_stream().map(|r| { + r.map(|(id, capability)| DocListResponse { id, capability }) + .map_err(Into::into) + }) + }) + .await } DocCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_create(req).await }) }) .await } DocDrop(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_drop(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_drop(req).await }) }) .await } DocImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_import(req).await }) }) .await } DocSet(msg) => { - let bao_store = self.inner.db.clone(); - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set(&bao_store, req).await + let blobs_store = self.inner.db.clone(); + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set(&blobs_store, req).await }) }) .await } @@ -229,68 +272,82 @@ impl Handler { .await } DocDel(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_del(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_del(req).await }) }) .await } DocSetHash(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_hash(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_hash(req).await }) }) .await } DocGet(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.doc_get_many(req) + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + if let Some(docs) = handler.docs() { + docs.doc_get_many(req, tx); + } else { + tx.send(Err(anyhow!("docs are disabled"))) + .expect("has capacity"); + } + rx.into_stream().map(|r| { + r.map(|entry| DocGetManyResponse { entry }) + .map_err(Into::into) + }) }) .await } DocGetExact(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_exact(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_exact(req).await }) }) .await } DocStartSync(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_start_sync(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_start_sync(req).await }) }) .await } DocLeave(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_leave(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_leave(req).await }) }) .await } DocShare(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_share(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_share(req).await }) }) .await } DocSubscribe(msg) => { chan.try_server_streaming(msg, self, |handler, req| async move { - handler.inner.docs.doc_subscribe(req).await + if let Some(docs) = handler.docs() { + docs.doc_subscribe(req).await + } else { + Err(docs_disabled()) + } }) .await } DocSetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_download_policy(req).await }) }) .await } DocGetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_download_policy(req).await }) }) .await } DocGetSyncPeers(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_sync_peers(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_sync_peers(req).await }) }) .await } @@ -467,6 +524,7 @@ impl Handler { msg: DocImportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; @@ -519,16 +577,14 @@ impl Handler { let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; - self.inner - .docs - .doc_set_hash(DocSetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; + docs.doc_set_hash(DocSetHashRequest { + doc_id, + author_id, + key: key.clone(), + hash, + size, + }) + .await?; drop(temp_tag); progress.send(DocImportProgress::AllDone { key }).await?; Ok(()) @@ -553,6 +609,7 @@ impl Handler { msg: DocExportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; let progress = FlumeProgressSender::new(progress); let DocExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); @@ -1128,3 +1185,7 @@ where res.map_err(Into::into) } + +fn docs_disabled() -> RpcError { + anyhow!("docs are disabled").into() +} diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index 4fbabf64ff..4d74d8d983 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -3,29 +3,30 @@ use anyhow::anyhow; use futures_lite::Stream; use iroh_blobs::{store::Store as BaoStore, BlobFormat}; -use iroh_docs::{Author, DocTicket, NamespaceSecret}; +use iroh_docs::{ + Author, AuthorId, CapabilityKind, DocTicket, NamespaceId, NamespaceSecret, SignedEntry, +}; use tokio_stream::StreamExt; use crate::client::docs::ShareMode; -use crate::node::DocsEngine; +use crate::node::docs::DocsEngine; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorCreateResponse, AuthorDeleteRequest, AuthorDeleteResponse, AuthorExportRequest, AuthorExportResponse, AuthorGetDefaultRequest, AuthorGetDefaultResponse, - AuthorImportRequest, AuthorImportResponse, AuthorListRequest, AuthorListResponse, - AuthorSetDefaultRequest, AuthorSetDefaultResponse, DocCloseRequest, DocCloseResponse, - DocCreateRequest, DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, - DocDropResponse, DocGetDownloadPolicyRequest, DocGetDownloadPolicyResponse, DocGetExactRequest, - DocGetExactResponse, DocGetManyRequest, DocGetManyResponse, DocGetSyncPeersRequest, - DocGetSyncPeersResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, - DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, - DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, DocSetHashRequest, - DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, - DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, DocStatusResponse, - DocSubscribeRequest, DocSubscribeResponse, RpcResult, + AuthorImportRequest, AuthorImportResponse, AuthorListRequest, AuthorSetDefaultRequest, + AuthorSetDefaultResponse, DocCloseRequest, DocCloseResponse, DocCreateRequest, + DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, DocDropResponse, + DocGetDownloadPolicyRequest, DocGetDownloadPolicyResponse, DocGetExactRequest, + DocGetExactResponse, DocGetManyRequest, DocGetSyncPeersRequest, DocGetSyncPeersResponse, + DocImportRequest, DocImportResponse, DocLeaveRequest, DocLeaveResponse, DocListRequest, + DocOpenRequest, DocOpenResponse, DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, + DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, + DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, + DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, RpcResult, }; /// Capacity for the flume channels to forward sync store iterators to async RPC streams. -const ITER_CHANNEL_CAP: usize = 64; +pub(super) const ITER_CHANNEL_CAP: usize = 64; #[allow(missing_docs)] impl DocsEngine { @@ -57,8 +58,8 @@ impl DocsEngine { pub fn author_list( &self, _req: AuthorListRequest, - ) -> impl Stream> { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + tx: flume::Sender>, + ) { let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -68,10 +69,6 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); - rx.into_stream().map(|r| { - r.map(|author_id| AuthorListResponse { author_id }) - .map_err(Into::into) - }) } pub async fn author_import(&self, req: AuthorImportRequest) -> RpcResult { @@ -108,8 +105,12 @@ impl DocsEngine { Ok(DocDropResponse {}) } - pub fn doc_list(&self, _req: DocListRequest) -> impl Stream> { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + pub fn doc_list( + &self, + _req: DocListRequest, + tx: flume::Sender>, + ) { + // let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -119,10 +120,6 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); - rx.into_stream().map(|r| { - r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(Into::into) - }) } pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult { @@ -249,9 +246,9 @@ impl DocsEngine { pub fn doc_get_many( &self, req: DocGetManyRequest, - ) -> impl Stream> { + tx: flume::Sender>, + ) { let DocGetManyRequest { doc_id, query } = req; - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -261,10 +258,6 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); - rx.into_stream().map(|r| { - r.map(|entry| DocGetManyResponse { entry }) - .map_err(Into::into) - }) } pub async fn doc_get_exact(&self, req: DocGetExactRequest) -> RpcResult { diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8fe71e7d6a..8334590a11 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -439,7 +439,7 @@ pub struct AuthorCreateResponse { pub struct AuthorGetDefaultRequest; impl RpcMsg for AuthorGetDefaultRequest { - type Response = AuthorGetDefaultResponse; + type Response = RpcResult; } /// Response for [`AuthorGetDefaultRequest`] @@ -1153,7 +1153,7 @@ pub enum Response { AuthorList(RpcResult), AuthorCreate(RpcResult), - AuthorGetDefault(AuthorGetDefaultResponse), + AuthorGetDefault(RpcResult), AuthorSetDefault(RpcResult), AuthorImport(RpcResult), AuthorExport(RpcResult), diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index dcca0893b5..e032691df9 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use iroh::node::{self, Node}; +use iroh::node::{self, DocsStorage, Node}; use rand::RngCore; use iroh_blobs::{ @@ -41,17 +41,19 @@ async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume:: where S: iroh_blobs::store::Store, { - let doc_store = iroh_docs::store::Store::memory(); let (gc_send, gc_recv) = flume::unbounded(); - let node = - node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); - })) - .spawn() - .await - .unwrap(); + let node = node::Builder::with_db_and_store( + bao_store, + DocsStorage::Memory, + iroh::node::StorageConfig::Mem, + ) + .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .register_gc_done_cb(Box::new(move || { + gc_send.send(()).ok(); + })) + .spawn() + .await + .unwrap(); (node, gc_recv) } diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 13376273dd..7b9abf9648 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; use futures_lite::FutureExt; -use iroh::node::Builder; +use iroh::node::{Builder, DocsStorage}; use iroh_base::node_addr::AddrInfoOptions; use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -40,8 +40,8 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result(db: D) -> Builder { - let store = iroh_docs::store::Store::memory(); - iroh::node::Builder::with_db_and_store(db, store, iroh::node::StorageConfig::Mem).bind_port(0) + iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem) + .bind_port(0) } #[tokio::test] From 356e530d2753538101d7b7c88522de76006e6492 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 20 Jun 2024 11:44:12 +0200 Subject: [PATCH 2/4] rename docs_store to docs_storage --- iroh/src/node/builder.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index ed95a367be..39d9015fb0 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -94,7 +94,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_store: Option, + docs_storage: Option, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -155,7 +155,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: Some(DocsStorage::Memory), + docs_storage: Some(DocsStorage::Memory), node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -168,7 +168,7 @@ impl Builder { /// Creates a new builder for [`Node`] using the given databases. pub fn with_db_and_store( blobs_store: D, - docs_store: DocsStorage, + docs_storage: DocsStorage, storage: StorageConfig, ) -> Self { Self { @@ -181,7 +181,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: Some(docs_store), + docs_storage: Some(docs_storage), node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -209,7 +209,7 @@ where .with_context(|| { format!("Failed to load blobs database from {}", blob_dir.display()) })?; - let docs_store = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); + let docs_storage = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); let v0 = blobs_store .import_flat_store(iroh_blobs::store::fs::FlatStorePaths { @@ -245,7 +245,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: Some(docs_store), + docs_storage: Some(docs_storage), node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -266,7 +266,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -292,7 +292,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -310,7 +310,7 @@ where /// Disables documents support on this node completely. pub fn disable_docs(mut self) -> Self { - self.docs_store = None; + self.docs_storage = None; self } @@ -480,7 +480,7 @@ where let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); // Spawn the docs engine, if enabled. - let docs = if let Some(docs_storage) = &self.docs_store { + let docs = if let Some(docs_storage) = &self.docs_storage { let docs = DocsEngine::spawn( docs_storage, self.blobs_store.clone(), From 3e91647a8cd5e9da74b723c161fd2cb4535e1305 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 20 Jun 2024 11:53:08 +0200 Subject: [PATCH 3/4] refactor: use Either for docs rpc streams --- iroh/src/node/rpc.rs | 83 +++++++++++++++------------------------ iroh/src/node/rpc/docs.rs | 57 +++++++++++++++------------ 2 files changed, 64 insertions(+), 76 deletions(-) diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index bafca9586a..cc79655e87 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -30,7 +30,7 @@ use quic_rpc::{ ServiceEndpoint, }; use tokio::task::JoinSet; -use tokio_util::task::LocalPoolHandle; +use tokio_util::{either::Either, task::LocalPoolHandle}; use tracing::{debug, info, warn}; use crate::client::{ @@ -38,19 +38,18 @@ use crate::client::{ tags::TagInfo, NodeStatus, }; -use crate::node::{docs::DocsEngine, rpc::docs::ITER_CHANNEL_CAP, NodeInner}; +use crate::node::{docs::DocsEngine, NodeInner}; use crate::rpc_protocol::{ - AuthorListResponse, BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, - BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, - BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse, - BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, - BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, - DocExportFileRequest, DocExportFileResponse, DocGetManyResponse, DocImportFileRequest, - DocImportFileResponse, DocListResponse, DocSetHashRequest, ListTagsRequest, NodeAddrRequest, - NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, - NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, - NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, NodeWatchResponse, - Request, RpcService, SetTagOption, + BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, + BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, + BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, + BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, + CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, + DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, + ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, + NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, + NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, + NodeWatchResponse, Request, RpcService, SetTagOption, }; mod docs; @@ -93,6 +92,20 @@ impl Handler { } } + fn with_docs_stream(self, f: F) -> impl Stream> + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> S, + S: Stream>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + Either::Left(f(docs)) + } else { + Either::Right(futures_lite::stream::once(Err(docs_disabled()))) + } + } + pub(crate) fn spawn_rpc_request>( inner: Arc>, join_set: &mut JoinSet>, @@ -154,17 +167,7 @@ impl Handler { AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); - if let Some(docs) = handler.docs() { - docs.author_list(req, tx); - } else { - tx.send(Err(anyhow!("docs are disabled"))) - .expect("has capacity"); - } - rx.into_stream().map(|r| { - r.map(|author_id| AuthorListResponse { author_id }) - .map_err(Into::into) - }) + handler.with_docs_stream(|docs| docs.author_list(req)) }) .await } @@ -224,17 +227,7 @@ impl Handler { } DocList(msg) => { chan.server_streaming(msg, self, |handler, req| { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); - if let Some(docs) = handler.docs() { - docs.doc_list(req, tx); - } else { - tx.send(Err(anyhow!("docs are disabled"))) - .expect("has capacity"); - } - rx.into_stream().map(|r| { - r.map(|(id, capability)| DocListResponse { id, capability }) - .map_err(Into::into) - }) + handler.with_docs_stream(|docs| docs.doc_list(req)) }) .await } @@ -285,17 +278,7 @@ impl Handler { } DocGet(msg) => { chan.server_streaming(msg, self, |handler, req| { - let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); - if let Some(docs) = handler.docs() { - docs.doc_get_many(req, tx); - } else { - tx.send(Err(anyhow!("docs are disabled"))) - .expect("has capacity"); - } - rx.into_stream().map(|r| { - r.map(|entry| DocGetManyResponse { entry }) - .map_err(Into::into) - }) + handler.with_docs_stream(|docs| docs.doc_get_many(req)) }) .await } @@ -325,11 +308,9 @@ impl Handler { } DocSubscribe(msg) => { chan.try_server_streaming(msg, self, |handler, req| async move { - if let Some(docs) = handler.docs() { - docs.doc_subscribe(req).await - } else { - Err(docs_disabled()) - } + handler + .with_docs(|docs| async move { docs.doc_subscribe(req).await }) + .await }) .await } diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index 4d74d8d983..4fbabf64ff 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -3,30 +3,29 @@ use anyhow::anyhow; use futures_lite::Stream; use iroh_blobs::{store::Store as BaoStore, BlobFormat}; -use iroh_docs::{ - Author, AuthorId, CapabilityKind, DocTicket, NamespaceId, NamespaceSecret, SignedEntry, -}; +use iroh_docs::{Author, DocTicket, NamespaceSecret}; use tokio_stream::StreamExt; use crate::client::docs::ShareMode; -use crate::node::docs::DocsEngine; +use crate::node::DocsEngine; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorCreateResponse, AuthorDeleteRequest, AuthorDeleteResponse, AuthorExportRequest, AuthorExportResponse, AuthorGetDefaultRequest, AuthorGetDefaultResponse, - AuthorImportRequest, AuthorImportResponse, AuthorListRequest, AuthorSetDefaultRequest, - AuthorSetDefaultResponse, DocCloseRequest, DocCloseResponse, DocCreateRequest, - DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, DocDropResponse, - DocGetDownloadPolicyRequest, DocGetDownloadPolicyResponse, DocGetExactRequest, - DocGetExactResponse, DocGetManyRequest, DocGetSyncPeersRequest, DocGetSyncPeersResponse, - DocImportRequest, DocImportResponse, DocLeaveRequest, DocLeaveResponse, DocListRequest, - DocOpenRequest, DocOpenResponse, DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, - DocSetHashRequest, DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, - DocShareResponse, DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, - DocStatusResponse, DocSubscribeRequest, DocSubscribeResponse, RpcResult, + AuthorImportRequest, AuthorImportResponse, AuthorListRequest, AuthorListResponse, + AuthorSetDefaultRequest, AuthorSetDefaultResponse, DocCloseRequest, DocCloseResponse, + DocCreateRequest, DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, + DocDropResponse, DocGetDownloadPolicyRequest, DocGetDownloadPolicyResponse, DocGetExactRequest, + DocGetExactResponse, DocGetManyRequest, DocGetManyResponse, DocGetSyncPeersRequest, + DocGetSyncPeersResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, + DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, + DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, DocSetHashRequest, + DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, + DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, DocStatusResponse, + DocSubscribeRequest, DocSubscribeResponse, RpcResult, }; /// Capacity for the flume channels to forward sync store iterators to async RPC streams. -pub(super) const ITER_CHANNEL_CAP: usize = 64; +const ITER_CHANNEL_CAP: usize = 64; #[allow(missing_docs)] impl DocsEngine { @@ -58,8 +57,8 @@ impl DocsEngine { pub fn author_list( &self, _req: AuthorListRequest, - tx: flume::Sender>, - ) { + ) -> impl Stream> { + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -69,6 +68,10 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); + rx.into_stream().map(|r| { + r.map(|author_id| AuthorListResponse { author_id }) + .map_err(Into::into) + }) } pub async fn author_import(&self, req: AuthorImportRequest) -> RpcResult { @@ -105,12 +108,8 @@ impl DocsEngine { Ok(DocDropResponse {}) } - pub fn doc_list( - &self, - _req: DocListRequest, - tx: flume::Sender>, - ) { - // let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); + pub fn doc_list(&self, _req: DocListRequest) -> impl Stream> { + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -120,6 +119,10 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); + rx.into_stream().map(|r| { + r.map(|(id, capability)| DocListResponse { id, capability }) + .map_err(Into::into) + }) } pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult { @@ -246,9 +249,9 @@ impl DocsEngine { pub fn doc_get_many( &self, req: DocGetManyRequest, - tx: flume::Sender>, - ) { + ) -> impl Stream> { let DocGetManyRequest { doc_id, query } = req; + let (tx, rx) = flume::bounded(ITER_CHANNEL_CAP); let sync = self.sync.clone(); // we need to spawn a task to send our request to the sync handle, because the method // itself must be sync. @@ -258,6 +261,10 @@ impl DocsEngine { tx2.send_async(Err(err)).await.ok(); } }); + rx.into_stream().map(|r| { + r.map(|entry| DocGetManyResponse { entry }) + .map_err(Into::into) + }) } pub async fn doc_get_exact(&self, req: DocGetExactRequest) -> RpcResult { From 4689cd34473cb6547a93abe23651f00e4660c563 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 20 Jun 2024 11:57:17 +0200 Subject: [PATCH 4/4] use DocsStorage::Disabled instead of option --- iroh/src/node/builder.rs | 36 +++++++++++++++++------------------- iroh/src/node/docs.rs | 7 ++++--- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 39d9015fb0..5b5164ed64 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -59,6 +59,8 @@ const MAX_STREAMS: u64 = 10; /// Storage backend for documents. #[derive(Debug, Clone)] pub enum DocsStorage { + /// Disable docs completely. + Disabled, /// In-memory storage. Memory, /// File-based persistent storage. @@ -94,7 +96,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_storage: Option, + docs_storage: DocsStorage, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -155,7 +157,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_storage: Some(DocsStorage::Memory), + docs_storage: DocsStorage::Memory, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -181,7 +183,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_storage: Some(docs_storage), + docs_storage, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -245,7 +247,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_storage: Some(docs_storage), + docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -310,7 +312,7 @@ where /// Disables documents support on this node completely. pub fn disable_docs(mut self) -> Self { - self.docs_storage = None; + self.docs_storage = DocsStorage::Disabled; self } @@ -480,20 +482,16 @@ where let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); // Spawn the docs engine, if enabled. - let docs = if let Some(docs_storage) = &self.docs_storage { - let docs = DocsEngine::spawn( - docs_storage, - self.blobs_store.clone(), - self.storage.default_author_storage(), - endpoint.clone(), - gossip.clone(), - downloader.clone(), - ) - .await?; - Some(docs) - } else { - None - }; + // This returns None for DocsStorage::Disabled, otherwise Some(DocsEngine). + let docs = DocsEngine::spawn( + self.docs_storage, + self.blobs_store.clone(), + self.storage.default_author_storage(), + endpoint.clone(), + gossip.clone(), + downloader.clone(), + ) + .await?; // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); diff --git a/iroh/src/node/docs.rs b/iroh/src/node/docs.rs index 4203e9e59a..7f3ee8b183 100644 --- a/iroh/src/node/docs.rs +++ b/iroh/src/node/docs.rs @@ -16,14 +16,15 @@ pub(crate) struct DocsEngine(Engine); impl DocsEngine { pub async fn spawn( - storage: &DocsStorage, + storage: DocsStorage, blobs_store: S, default_author_storage: DefaultAuthorStorage, endpoint: Endpoint, gossip: Gossip, downloader: Downloader, - ) -> anyhow::Result { + ) -> anyhow::Result> { let docs_store = match storage { + DocsStorage::Disabled => return Ok(None), DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, }; @@ -36,7 +37,7 @@ impl DocsEngine { default_author_storage, ) .await?; - Ok(DocsEngine(engine)) + Ok(Some(DocsEngine(engine))) } }