diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9..dfe8837d6d 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -42,7 +42,7 @@ where /// /// The default author can be set with [`Self::set_default`]. pub async fn default(&self) -> Result { - let res = self.rpc.rpc(AuthorGetDefaultRequest).await?; + let res = self.rpc.rpc(AuthorGetDefaultRequest).await??; Ok(res.author_id) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22..836eeda509 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,7 +13,6 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; -use iroh_docs::engine::Engine; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -24,14 +23,18 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, info, warn}; -use crate::{client::RpcService, node::protocol::ProtocolMap}; +use crate::{ + client::RpcService, + node::{docs::DocsEngine, protocol::ProtocolMap}, +}; mod builder; +mod docs; mod protocol; mod rpc; mod rpc_status; -pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; +pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; pub use protocol::ProtocolHandler; @@ -55,7 +58,7 @@ pub struct Node { #[derive(derive_more::Debug)] struct NodeInner { db: D, - docs: DocsEngine, + docs: Option, endpoint: Endpoint, gossip: Gossip, secret_key: SecretKey, @@ -314,9 +317,22 @@ impl NodeInner { join_set.shutdown().await; } + /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { - // Shutdown the different parts of the node concurrently. let error_code = Closed::ProviderTerminating; + + // Shutdown future for the docs engine, if enabled. + let docs_shutdown = { + let docs = self.docs.clone(); + async move { + if let Some(docs) = docs { + docs.shutdown().await + } else { + Ok(()) + } + } + }; + // We ignore all errors during shutdown. let _ = tokio::join!( // Close the endpoint. @@ -326,8 +342,8 @@ impl NodeInner { self.endpoint .clone() .close(error_code.into(), error_code.reason()), - // Shutdown sync engine. - self.docs.shutdown(), + // Shutdown docs engine. + docs_shutdown, // Shutdown blobs store engine. self.db.shutdown(), // Shutdown protocol handlers. @@ -342,7 +358,6 @@ impl NodeInner { ) { tracing::info!("Starting GC task with interval {:?}", gc_period); let db = &self.db; - let docs = &self.docs; let mut live = BTreeSet::new(); 'outer: loop { if let Err(cause) = db.gc_start().await { @@ -356,22 +371,24 @@ impl NodeInner { tracing::debug!("Starting GC"); live.clear(); - let doc_hashes = match docs.sync.content_hashes().await { - Ok(hashes) => hashes, - Err(err) => { - tracing::warn!("Error getting doc hashes: {}", err); - continue 'outer; - } - }; - for hash in doc_hashes { - match hash { - Ok(hash) => { - live.insert(hash); - } + if let Some(docs) = &self.docs { + let doc_hashes = match docs.sync.content_hashes().await { + Ok(hashes) => hashes, Err(err) => { - tracing::error!("Error getting doc hash: {}", err); + tracing::warn!("Error getting doc hashes: {}", err); continue 'outer; } + }; + for hash in doc_hashes { + match hash { + Ok(hash) => { + live.insert(hash); + } + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + continue 'outer; + } + } } } @@ -436,17 +453,6 @@ async fn handle_connection( } } -/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. -#[derive(Debug, Clone)] -pub(crate) struct DocsEngine(Engine); - -impl std::ops::Deref for DocsEngine { - type Target = Engine; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - #[cfg(test)] mod tests { use std::time::Duration; @@ -655,7 +661,6 @@ mod tests { } #[cfg(feature = "fs-store")] - #[ignore = "flaky"] #[tokio::test] async fn test_default_author_persist() -> Result<()> { use crate::util::path::IrohPaths; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98..5b5164ed64 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -12,7 +12,7 @@ use iroh_blobs::{ downloader::Downloader, store::{Map, Store as BaoStore}, }; -use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ @@ -41,7 +41,7 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; +use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner}; /// Default bind address for the node. /// 11204 is "iroh" in leetspeak @@ -56,6 +56,17 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; +/// Storage backend for documents. +#[derive(Debug, Clone)] +pub enum DocsStorage { + /// Disable docs completely. + Disabled, + /// In-memory storage. + Memory, + /// File-based persistent storage. + Persistent(PathBuf), +} + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -85,7 +96,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -146,7 +157,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: iroh_docs::store::Store::memory(), + docs_storage: DocsStorage::Memory, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -159,7 +170,7 @@ impl Builder { /// Creates a new builder for [`Node`] using the given databases. pub fn with_db_and_store( blobs_store: D, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, storage: StorageConfig, ) -> Self { Self { @@ -172,7 +183,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store, + docs_storage, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -200,8 +211,7 @@ where .with_context(|| { format!("Failed to load blobs database from {}", blob_dir.display()) })?; - let docs_store = - iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?; + let docs_storage = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); let v0 = blobs_store .import_flat_store(iroh_blobs::store::fs::FlatStorePaths { @@ -237,7 +247,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store, + docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -258,7 +268,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -284,7 +294,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -300,6 +310,12 @@ where self } + /// Disables documents support on this node completely. + pub fn disable_docs(mut self) -> Self { + self.docs_storage = DocsStorage::Disabled; + self + } + /// Sets the relay servers to assist in establishing connectivity. /// /// Relay servers are used to discover other nodes by `PublicKey` and also help @@ -405,7 +421,6 @@ where async fn build_inner(self) -> Result> { trace!("building node"); let lp = LocalPoolHandle::new(num_cpus::get()); - let endpoint = { let mut transport_config = quinn::TransportConfig::default(); transport_config @@ -461,25 +476,22 @@ where let addr = endpoint.node_addr().await?; trace!("endpoint address: {addr:?}"); - // initialize the gossip protocol + // Initialize the gossip protocol. let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); - - // initialize the downloader + // Initialize the downloader. let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); - // load or create the default author for documents - // spawn the docs engine - let docs = DocsEngine( - Engine::spawn( - endpoint.clone(), - gossip.clone(), - self.docs_store, - self.blobs_store.clone(), - downloader.clone(), - self.storage.default_author_storage(), - ) - .await?, - ); + // Spawn the docs engine, if enabled. + // This returns None for DocsStorage::Disabled, otherwise Some(DocsEngine). + let docs = DocsEngine::spawn( + self.docs_storage, + self.blobs_store.clone(), + self.storage.default_author_storage(), + endpoint.clone(), + gossip.clone(), + downloader.clone(), + ) + .await?; // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); @@ -637,9 +649,10 @@ impl> ProtocolBuilde let gossip = self.gossip().clone(); self = self.accept(GOSSIP_ALPN, Arc::new(gossip)); - // Register docs. - let docs = self.inner.docs.clone(); - self = self.accept(DOCS_ALPN, Arc::new(docs)); + // Register docs, if enabled. + if let Some(docs) = self.inner.docs.clone() { + self = self.accept(DOCS_ALPN, Arc::new(docs)); + } self } diff --git a/iroh/src/node/docs.rs b/iroh/src/node/docs.rs new file mode 100644 index 0000000000..7f3ee8b183 --- /dev/null +++ b/iroh/src/node/docs.rs @@ -0,0 +1,55 @@ +use std::{ops::Deref, sync::Arc}; + +use anyhow::Result; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_blobs::downloader::Downloader; +use iroh_gossip::net::Gossip; + +use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_net::{endpoint::Connecting, Endpoint}; + +use crate::node::{DocsStorage, ProtocolHandler}; + +/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. +#[derive(Debug, Clone)] +pub(crate) struct DocsEngine(Engine); + +impl DocsEngine { + pub async fn spawn( + storage: DocsStorage, + blobs_store: S, + default_author_storage: DefaultAuthorStorage, + endpoint: Endpoint, + gossip: Gossip, + downloader: Downloader, + ) -> anyhow::Result> { + let docs_store = match storage { + DocsStorage::Disabled => return Ok(None), + DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), + DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, + }; + let engine = Engine::spawn( + endpoint, + gossip, + docs_store, + blobs_store, + downloader, + default_author_storage, + ) + .await?; + Ok(Some(DocsEngine(engine))) + } +} + +impl Deref for DocsEngine { + type Target = Engine; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ProtocolHandler for DocsEngine { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 3c50368b42..a0f5b53be5 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -5,8 +5,6 @@ use futures_lite::future::Boxed as BoxedFuture; use futures_util::future::join_all; use iroh_net::endpoint::Connecting; -use crate::node::DocsEngine; - /// Handler for incoming connections. /// /// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node @@ -119,9 +117,3 @@ impl ProtocolHandler for iroh_gossip::net::Gossip { Box::pin(async move { self.handle_connection(conn.await?).await }) } } - -impl ProtocolHandler for DocsEngine { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn).await }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 697b6d63cd..cc79655e87 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; -use iroh_base::rpc::RpcResult; +use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; @@ -30,12 +30,15 @@ use quic_rpc::{ ServiceEndpoint, }; use tokio::task::JoinSet; -use tokio_util::task::LocalPoolHandle; +use tokio_util::{either::Either, task::LocalPoolHandle}; use tracing::{debug, info, warn}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; -use crate::client::tags::TagInfo; -use crate::client::NodeStatus; +use crate::client::{ + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + tags::TagInfo, + NodeStatus, +}; +use crate::node::{docs::DocsEngine, NodeInner}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, @@ -49,8 +52,6 @@ use crate::rpc_protocol::{ NodeWatchResponse, Request, RpcService, SetTagOption, }; -use super::NodeInner; - mod docs; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); @@ -73,6 +74,38 @@ impl Handler { } impl Handler { + fn docs(&self) -> Option<&DocsEngine> { + self.inner.docs.as_ref() + } + + async fn with_docs(self, f: F) -> RpcResult + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> Fut, + Fut: std::future::Future>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + f(docs).await + } else { + Err(docs_disabled()) + } + } + + fn with_docs_stream(self, f: F) -> impl Stream> + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> S, + S: Stream>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + Either::Left(f(docs)) + } else { + Either::Right(futures_lite::stream::once(Err(docs_disabled()))) + } + } + pub(crate) fn spawn_rpc_request>( inner: Arc>, join_set: &mut JoinSet>, @@ -131,92 +164,95 @@ impl Handler { BlobReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, BlobAddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, BlobAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.author_list(req) + handler.with_docs_stream(|docs| docs.author_list(req)) }) .await } AuthorCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_create(req).await }) }) .await } AuthorImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_import(req).await }) }) .await } AuthorExport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_export(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_export(req).await }) }) .await } AuthorDelete(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_delete(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_delete(req).await }) }) .await } AuthorGetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_default(req) + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { Ok(docs.author_default(req)) }) }) .await } AuthorSetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_set_default(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_set_default(req).await }) }) .await } DocOpen(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_open(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_open(req).await }) }) .await } DocClose(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_close(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_close(req).await }) }) .await } DocStatus(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_status(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_status(req).await }) }) .await } DocList(msg) => { - chan.server_streaming(msg, self, |handler, req| handler.inner.docs.doc_list(req)) - .await + chan.server_streaming(msg, self, |handler, req| { + handler.with_docs_stream(|docs| docs.doc_list(req)) + }) + .await } DocCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_create(req).await }) }) .await } DocDrop(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_drop(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_drop(req).await }) }) .await } DocImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_import(req).await }) }) .await } DocSet(msg) => { - let bao_store = self.inner.db.clone(); - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set(&bao_store, req).await + let blobs_store = self.inner.db.clone(); + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set(&blobs_store, req).await }) }) .await } @@ -229,68 +265,70 @@ impl Handler { .await } DocDel(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_del(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_del(req).await }) }) .await } DocSetHash(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_hash(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_hash(req).await }) }) .await } DocGet(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.doc_get_many(req) + handler.with_docs_stream(|docs| docs.doc_get_many(req)) }) .await } DocGetExact(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_exact(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_exact(req).await }) }) .await } DocStartSync(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_start_sync(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_start_sync(req).await }) }) .await } DocLeave(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_leave(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_leave(req).await }) }) .await } DocShare(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_share(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_share(req).await }) }) .await } DocSubscribe(msg) => { chan.try_server_streaming(msg, self, |handler, req| async move { - handler.inner.docs.doc_subscribe(req).await + handler + .with_docs(|docs| async move { docs.doc_subscribe(req).await }) + .await }) .await } DocSetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_download_policy(req).await }) }) .await } DocGetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_download_policy(req).await }) }) .await } DocGetSyncPeers(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_sync_peers(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_sync_peers(req).await }) }) .await } @@ -467,6 +505,7 @@ impl Handler { msg: DocImportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; @@ -519,16 +558,14 @@ impl Handler { let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; - self.inner - .docs - .doc_set_hash(DocSetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; + docs.doc_set_hash(DocSetHashRequest { + doc_id, + author_id, + key: key.clone(), + hash, + size, + }) + .await?; drop(temp_tag); progress.send(DocImportProgress::AllDone { key }).await?; Ok(()) @@ -553,6 +590,7 @@ impl Handler { msg: DocExportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; let progress = FlumeProgressSender::new(progress); let DocExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); @@ -1128,3 +1166,7 @@ where res.map_err(Into::into) } + +fn docs_disabled() -> RpcError { + anyhow!("docs are disabled").into() +} diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8fe71e7d6a..8334590a11 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -439,7 +439,7 @@ pub struct AuthorCreateResponse { pub struct AuthorGetDefaultRequest; impl RpcMsg for AuthorGetDefaultRequest { - type Response = AuthorGetDefaultResponse; + type Response = RpcResult; } /// Response for [`AuthorGetDefaultRequest`] @@ -1153,7 +1153,7 @@ pub enum Response { AuthorList(RpcResult), AuthorCreate(RpcResult), - AuthorGetDefault(AuthorGetDefaultResponse), + AuthorGetDefault(RpcResult), AuthorSetDefault(RpcResult), AuthorImport(RpcResult), AuthorExport(RpcResult), diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index dcca0893b5..e032691df9 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use iroh::node::{self, Node}; +use iroh::node::{self, DocsStorage, Node}; use rand::RngCore; use iroh_blobs::{ @@ -41,17 +41,19 @@ async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume:: where S: iroh_blobs::store::Store, { - let doc_store = iroh_docs::store::Store::memory(); let (gc_send, gc_recv) = flume::unbounded(); - let node = - node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); - })) - .spawn() - .await - .unwrap(); + let node = node::Builder::with_db_and_store( + bao_store, + DocsStorage::Memory, + iroh::node::StorageConfig::Mem, + ) + .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .register_gc_done_cb(Box::new(move || { + gc_send.send(()).ok(); + })) + .spawn() + .await + .unwrap(); (node, gc_recv) } diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 13376273dd..7b9abf9648 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; use futures_lite::FutureExt; -use iroh::node::Builder; +use iroh::node::{Builder, DocsStorage}; use iroh_base::node_addr::AddrInfoOptions; use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -40,8 +40,8 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result(db: D) -> Builder { - let store = iroh_docs::store::Store::memory(); - iroh::node::Builder::with_db_and_store(db, store, iroh::node::StorageConfig::Mem).bind_port(0) + iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem) + .bind_port(0) } #[tokio::test]