Skip to content

Commit

Permalink
feat(iroh): allow to disable docs engine completely (n0-computer#2390)
Browse files Browse the repository at this point in the history
Make the docs engine optional on the iroh node.

* Add `Builder::disable_docs()` to disable the docs engine completely
* If called, the docs engine will not be spawned and the docs protocol
will not be registered. Incoming docs connnections will be dropped, and
all docs-related RPC calls will return an error "docs are disabled".

* `iroh::node::Builder::with_db_and_store` now takes a `DocsStorage`
enum instead of a `iroh_docs::store::Store`.

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
Frando authored and ppodolsky committed Jun 22, 2024
1 parent 8eab045 commit 02e404e
Show file tree
Hide file tree
Showing 9 changed files with 265 additions and 155 deletions.
2 changes: 1 addition & 1 deletion iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorGetDefaultRequest).await?;
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
Ok(res.author_id)
}

Expand Down
70 changes: 38 additions & 32 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};

use crate::{client::RpcService, node::protocol::ProtocolMap};
use crate::{
client::RpcService,
node::{docs::DocsEngine, protocol::ProtocolMap},
};

mod builder;
mod docs;
mod protocol;
mod rpc;
mod rpc_status;

pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;
pub use protocol::ProtocolHandler;

Expand All @@ -56,7 +60,7 @@ pub struct Node<D> {
#[derive(derive_more::Debug)]
struct NodeInner<D> {
db: D,
docs: DocsEngine,
docs: Option<DocsEngine>,
endpoint: Endpoint,
gossip: Gossip,
secret_key: SecretKey,
Expand Down Expand Up @@ -325,9 +329,22 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
join_set.shutdown().await;
}

/// Shutdown the different parts of the node concurrently.
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
// Shutdown the different parts of the node concurrently.
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
let docs_shutdown = {
let docs = self.docs.clone();
async move {
if let Some(docs) = docs {
docs.shutdown().await
} else {
Ok(())
}
}
};

// We ignore all errors during shutdown.
let _ = tokio::join!(
// Close the endpoint.
Expand All @@ -337,8 +354,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
self.endpoint
.clone()
.close(error_code.into(), error_code.reason()),
// Shutdown sync engine.
self.docs.shutdown(),
// Shutdown docs engine.
docs_shutdown,
// Shutdown blobs store engine.
self.db.shutdown(),
// Shutdown protocol handlers.
Expand All @@ -353,7 +370,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
) {
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = &self.db;
let docs = &self.docs;
let mut live = BTreeSet::new();
'outer: loop {
if let Err(cause) = db.gc_start().await {
Expand All @@ -367,22 +383,24 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
tracing::debug!("Starting GC");
live.clear();

let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
for hash in doc_hashes {
match hash {
Ok(hash) => {
live.insert(hash);
}
if let Some(docs) = &self.docs {
let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
for hash in doc_hashes {
match hash {
Ok(hash) => {
live.insert(hash);
}
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
continue 'outer;
}
}
}
}

Expand Down Expand Up @@ -447,17 +465,6 @@ async fn handle_connection(
}
}

/// Wrapper around [`Engine`] so that we can implement our RPC methods directly.
#[derive(Debug, Clone)]
pub(crate) struct DocsEngine(Engine);

impl std::ops::Deref for DocsEngine {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down Expand Up @@ -666,7 +673,6 @@ mod tests {
}

#[cfg(feature = "fs-store")]
#[ignore = "flaky"]
#[tokio::test]
async fn test_default_author_persist() -> Result<()> {
use crate::util::path::IrohPaths;
Expand Down
75 changes: 44 additions & 31 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroh_blobs::{
downloader::Downloader,
store::{Map, Store as BaoStore},
};
use iroh_docs::engine::{DefaultAuthorStorage, Engine};
use iroh_docs::engine::DefaultAuthorStorage;
use iroh_docs::net::DOCS_ALPN;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_net::{
Expand Down Expand Up @@ -41,7 +41,7 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner};
use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner};

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
Expand All @@ -56,6 +56,17 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;

/// Storage backend for documents.
#[derive(Debug, Clone)]
pub enum DocsStorage {
/// Disable docs completely.
Disabled,
/// In-memory storage.
Memory,
/// File-based persistent storage.
Persistent(PathBuf),
}

/// Builder for the [`Node`].
///
/// You must supply a blob store and a document store.
Expand Down Expand Up @@ -85,7 +96,7 @@ where
gc_policy: GcPolicy,
dns_resolver: Option<DnsResolver>,
node_discovery: DiscoveryConfig,
docs_store: iroh_docs::store::Store,
docs_storage: DocsStorage,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
/// Callback to register when a gc loop is done
Expand Down Expand Up @@ -146,7 +157,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
dns_resolver: None,
rpc_endpoint: Default::default(),
gc_policy: GcPolicy::Disabled,
docs_store: iroh_docs::store::Store::memory(),
docs_storage: DocsStorage::Memory,
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand All @@ -159,7 +170,7 @@ impl<D: Map> Builder<D> {
/// Creates a new builder for [`Node`] using the given databases.
pub fn with_db_and_store(
blobs_store: D,
docs_store: iroh_docs::store::Store,
docs_storage: DocsStorage,
storage: StorageConfig,
) -> Self {
Self {
Expand All @@ -172,7 +183,7 @@ impl<D: Map> Builder<D> {
dns_resolver: None,
rpc_endpoint: Default::default(),
gc_policy: GcPolicy::Disabled,
docs_store,
docs_storage,
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand Down Expand Up @@ -200,8 +211,7 @@ where
.with_context(|| {
format!("Failed to load blobs database from {}", blob_dir.display())
})?;
let docs_store =
iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?;
let docs_storage = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root));

let v0 = blobs_store
.import_flat_store(iroh_blobs::store::fs::FlatStorePaths {
Expand Down Expand Up @@ -237,7 +247,7 @@ where
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_store,
docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand All @@ -258,7 +268,7 @@ where
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_store: self.docs_store,
docs_storage: self.docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
Expand All @@ -284,7 +294,7 @@ where
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_store: self.docs_store,
docs_storage: self.docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
Expand All @@ -300,6 +310,12 @@ where
self
}

/// Disables documents support on this node completely.
pub fn disable_docs(mut self) -> Self {
self.docs_storage = DocsStorage::Disabled;
self
}

/// Sets the relay servers to assist in establishing connectivity.
///
/// Relay servers are used to discover other nodes by `PublicKey` and also help
Expand Down Expand Up @@ -405,7 +421,6 @@ where
async fn build_inner(self) -> Result<ProtocolBuilder<D, E>> {
trace!("building node");
let lp = LocalPoolHandle::new(num_cpus::get());

let endpoint = {
let mut transport_config = quinn::TransportConfig::default();
transport_config
Expand Down Expand Up @@ -461,25 +476,22 @@ where
let addr = endpoint.node_addr().await?;
trace!("endpoint address: {addr:?}");

// initialize the gossip protocol
// Initialize the gossip protocol.
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info);

// initialize the downloader
// Initialize the downloader.
let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone());

// load or create the default author for documents
// spawn the docs engine
let docs = DocsEngine(
Engine::spawn(
endpoint.clone(),
gossip.clone(),
self.docs_store,
self.blobs_store.clone(),
downloader.clone(),
self.storage.default_author_storage(),
)
.await?,
);
// Spawn the docs engine, if enabled.
// This returns None for DocsStorage::Disabled, otherwise Some(DocsEngine).
let docs = DocsEngine::spawn(
self.docs_storage,
self.blobs_store.clone(),
self.storage.default_author_storage(),
endpoint.clone(),
gossip.clone(),
downloader.clone(),
)
.await?;

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
Expand Down Expand Up @@ -637,9 +649,10 @@ impl<D: iroh_blobs::store::Store, E: ServiceEndpoint<RpcService>> ProtocolBuilde
let gossip = self.gossip().clone();
self = self.accept(GOSSIP_ALPN, Arc::new(gossip));

// Register docs.
let docs = self.inner.docs.clone();
self = self.accept(DOCS_ALPN, Arc::new(docs));
// Register docs, if enabled.
if let Some(docs) = self.inner.docs.clone() {
self = self.accept(DOCS_ALPN, Arc::new(docs));
}

self
}
Expand Down
55 changes: 55 additions & 0 deletions iroh/src/node/docs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{ops::Deref, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh_blobs::downloader::Downloader;
use iroh_gossip::net::Gossip;

use iroh_docs::engine::{DefaultAuthorStorage, Engine};
use iroh_net::{endpoint::Connecting, Endpoint};

use crate::node::{DocsStorage, ProtocolHandler};

/// Wrapper around [`Engine`] so that we can implement our RPC methods directly.
#[derive(Debug, Clone)]
pub(crate) struct DocsEngine(Engine);

impl DocsEngine {
pub async fn spawn<S: iroh_blobs::store::Store>(
storage: DocsStorage,
blobs_store: S,
default_author_storage: DefaultAuthorStorage,
endpoint: Endpoint,
gossip: Gossip,
downloader: Downloader,
) -> anyhow::Result<Option<Self>> {
let docs_store = match storage {
DocsStorage::Disabled => return Ok(None),
DocsStorage::Memory => iroh_docs::store::fs::Store::memory(),
DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?,
};
let engine = Engine::spawn(
endpoint,
gossip,
docs_store,
blobs_store,
downloader,
default_author_storage,
)
.await?;
Ok(Some(DocsEngine(engine)))
}
}

impl Deref for DocsEngine {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl ProtocolHandler for DocsEngine {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
}
Loading

0 comments on commit 02e404e

Please sign in to comment.