Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh): allow to disable docs engine completely #2390

Merged
merged 4 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
///
/// The default author can be set with [`Self::set_default`].
pub async fn default(&self) -> Result<AuthorId> {
let res = self.rpc.rpc(AuthorGetDefaultRequest).await?;
let res = self.rpc.rpc(AuthorGetDefaultRequest).await??;
Ok(res.author_id)
}

Expand Down
71 changes: 38 additions & 33 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use futures_lite::StreamExt;
use iroh_base::key::PublicKey;
use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore};
use iroh_blobs::{downloader::Downloader, protocol::Closed};
use iroh_docs::engine::Engine;
use iroh_gossip::net::Gossip;
use iroh_net::key::SecretKey;
use iroh_net::Endpoint;
Expand All @@ -24,14 +23,18 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};

use crate::{client::RpcService, node::protocol::ProtocolMap};
use crate::{
client::RpcService,
node::{docs::DocsEngine, protocol::ProtocolMap},
};

mod builder;
mod docs;
mod protocol;
mod rpc;
mod rpc_status;

pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;
pub use protocol::ProtocolHandler;

Expand All @@ -55,7 +58,7 @@ pub struct Node<D> {
#[derive(derive_more::Debug)]
struct NodeInner<D> {
db: D,
docs: DocsEngine,
docs: Option<DocsEngine>,
endpoint: Endpoint,
gossip: Gossip,
secret_key: SecretKey,
Expand Down Expand Up @@ -314,9 +317,22 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
join_set.shutdown().await;
}

/// Shutdown the different parts of the node concurrently.
async fn shutdown(&self, protocols: Arc<ProtocolMap>) {
// Shutdown the different parts of the node concurrently.
let error_code = Closed::ProviderTerminating;

// Shutdown future for the docs engine, if enabled.
let docs_shutdown = {
let docs = self.docs.clone();
async move {
if let Some(docs) = docs {
docs.shutdown().await
} else {
Ok(())
}
}
};

// We ignore all errors during shutdown.
let _ = tokio::join!(
// Close the endpoint.
Expand All @@ -326,8 +342,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
self.endpoint
.clone()
.close(error_code.into(), error_code.reason()),
// Shutdown sync engine.
self.docs.shutdown(),
// Shutdown docs engine.
docs_shutdown,
// Shutdown blobs store engine.
self.db.shutdown(),
// Shutdown protocol handlers.
Expand All @@ -342,7 +358,6 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
) {
tracing::info!("Starting GC task with interval {:?}", gc_period);
let db = &self.db;
let docs = &self.docs;
let mut live = BTreeSet::new();
'outer: loop {
if let Err(cause) = db.gc_start().await {
Expand All @@ -356,22 +371,24 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {
tracing::debug!("Starting GC");
live.clear();

let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
for hash in doc_hashes {
match hash {
Ok(hash) => {
live.insert(hash);
}
if let Some(docs) = &self.docs {
let doc_hashes = match docs.sync.content_hashes().await {
Ok(hashes) => hashes,
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
tracing::warn!("Error getting doc hashes: {}", err);
continue 'outer;
}
};
for hash in doc_hashes {
match hash {
Ok(hash) => {
live.insert(hash);
}
Err(err) => {
tracing::error!("Error getting doc hash: {}", err);
continue 'outer;
}
}
}
}

Expand Down Expand Up @@ -436,17 +453,6 @@ async fn handle_connection(
}
}

/// Wrapper around [`Engine`] so that we can implement our RPC methods directly.
#[derive(Debug, Clone)]
pub(crate) struct DocsEngine(Engine);

impl std::ops::Deref for DocsEngine {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.0
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand Down Expand Up @@ -655,7 +661,6 @@ mod tests {
}

#[cfg(feature = "fs-store")]
#[ignore = "flaky"]
#[tokio::test]
async fn test_default_author_persist() -> Result<()> {
use crate::util::path::IrohPaths;
Expand Down
65 changes: 40 additions & 25 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroh_blobs::{
downloader::Downloader,
store::{Map, Store as BaoStore},
};
use iroh_docs::engine::{DefaultAuthorStorage, Engine};
use iroh_docs::engine::DefaultAuthorStorage;
use iroh_docs::net::DOCS_ALPN;
use iroh_gossip::net::{Gossip, GOSSIP_ALPN};
use iroh_net::{
Expand Down Expand Up @@ -41,7 +41,7 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner};
use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner};

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
Expand All @@ -56,6 +56,15 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;

/// Storage backend for documents.
#[derive(Debug, Clone)]
pub enum DocsStorage {
/// In-memory storage.
Memory,
/// File-based persistent storage.
Persistent(PathBuf),
}

/// Builder for the [`Node`].
///
/// You must supply a blob store and a document store.
Expand Down Expand Up @@ -85,7 +94,7 @@ where
gc_policy: GcPolicy,
dns_resolver: Option<DnsResolver>,
node_discovery: DiscoveryConfig,
docs_store: iroh_docs::store::Store,
docs_store: Option<DocsStorage>,
Frando marked this conversation as resolved.
Show resolved Hide resolved
Frando marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
/// Callback to register when a gc loop is done
Expand Down Expand Up @@ -146,7 +155,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
dns_resolver: None,
rpc_endpoint: Default::default(),
gc_policy: GcPolicy::Disabled,
docs_store: iroh_docs::store::Store::memory(),
docs_store: Some(DocsStorage::Memory),
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand All @@ -159,7 +168,7 @@ impl<D: Map> Builder<D> {
/// Creates a new builder for [`Node`] using the given databases.
pub fn with_db_and_store(
blobs_store: D,
docs_store: iroh_docs::store::Store,
docs_store: DocsStorage,
storage: StorageConfig,
) -> Self {
Self {
Expand All @@ -172,7 +181,7 @@ impl<D: Map> Builder<D> {
dns_resolver: None,
rpc_endpoint: Default::default(),
gc_policy: GcPolicy::Disabled,
docs_store,
docs_store: Some(docs_store),
node_discovery: Default::default(),
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand Down Expand Up @@ -200,8 +209,7 @@ where
.with_context(|| {
format!("Failed to load blobs database from {}", blob_dir.display())
})?;
let docs_store =
iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?;
let docs_store = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root));

let v0 = blobs_store
.import_flat_store(iroh_blobs::store::fs::FlatStorePaths {
Expand Down Expand Up @@ -237,7 +245,7 @@ where
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_store,
docs_store: Some(docs_store),
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
Expand Down Expand Up @@ -300,6 +308,12 @@ where
self
}

/// Disables documents support on this node completely.
pub fn disable_docs(mut self) -> Self {
self.docs_store = None;
self
}

/// Sets the relay servers to assist in establishing connectivity.
///
/// Relay servers are used to discover other nodes by `PublicKey` and also help
Expand Down Expand Up @@ -405,7 +419,6 @@ where
async fn build_inner(self) -> Result<ProtocolBuilder<D, E>> {
trace!("building node");
let lp = LocalPoolHandle::new(num_cpus::get());

let endpoint = {
let mut transport_config = quinn::TransportConfig::default();
transport_config
Expand Down Expand Up @@ -461,25 +474,26 @@ where
let addr = endpoint.node_addr().await?;
trace!("endpoint address: {addr:?}");

// initialize the gossip protocol
// Initialize the gossip protocol.
let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info);

// initialize the downloader
// Initialize the downloader.
let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone());

// load or create the default author for documents
// spawn the docs engine
let docs = DocsEngine(
Engine::spawn(
// Spawn the docs engine, if enabled.
let docs = if let Some(docs_storage) = &self.docs_store {
let docs = DocsEngine::spawn(
docs_storage,
self.blobs_store.clone(),
self.storage.default_author_storage(),
endpoint.clone(),
gossip.clone(),
self.docs_store,
self.blobs_store.clone(),
downloader.clone(),
self.storage.default_author_storage(),
)
.await?,
);
.await?;
Some(docs)
} else {
None
};

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1);
Expand Down Expand Up @@ -637,9 +651,10 @@ impl<D: iroh_blobs::store::Store, E: ServiceEndpoint<RpcService>> ProtocolBuilde
let gossip = self.gossip().clone();
self = self.accept(GOSSIP_ALPN, Arc::new(gossip));

// Register docs.
let docs = self.inner.docs.clone();
self = self.accept(DOCS_ALPN, Arc::new(docs));
// Register docs, if enabled.
if let Some(docs) = self.inner.docs.clone() {
self = self.accept(DOCS_ALPN, Arc::new(docs));
}

self
}
Expand Down
54 changes: 54 additions & 0 deletions iroh/src/node/docs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{ops::Deref, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed as BoxedFuture;
use iroh_blobs::downloader::Downloader;
use iroh_gossip::net::Gossip;

use iroh_docs::engine::{DefaultAuthorStorage, Engine};
use iroh_net::{endpoint::Connecting, Endpoint};

use crate::node::{DocsStorage, ProtocolHandler};

/// Wrapper around [`Engine`] so that we can implement our RPC methods directly.
#[derive(Debug, Clone)]
pub(crate) struct DocsEngine(Engine);

impl DocsEngine {
pub async fn spawn<S: iroh_blobs::store::Store>(
storage: &DocsStorage,
blobs_store: S,
default_author_storage: DefaultAuthorStorage,
endpoint: Endpoint,
gossip: Gossip,
downloader: Downloader,
) -> anyhow::Result<Self> {
let docs_store = match storage {
DocsStorage::Memory => iroh_docs::store::fs::Store::memory(),
DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?,
};
let engine = Engine::spawn(
endpoint,
gossip,
docs_store,
blobs_store,
downloader,
default_author_storage,
)
.await?;
Ok(DocsEngine(engine))
}
}

impl Deref for DocsEngine {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.0
}
}

impl ProtocolHandler for DocsEngine {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
}
8 changes: 0 additions & 8 deletions iroh/src/node/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use futures_lite::future::Boxed as BoxedFuture;
use futures_util::future::join_all;
use iroh_net::endpoint::Connecting;

use crate::node::DocsEngine;

/// Handler for incoming connections.
///
/// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node
Expand Down Expand Up @@ -119,9 +117,3 @@ impl ProtocolHandler for iroh_gossip::net::Gossip {
Box::pin(async move { self.handle_connection(conn.await?).await })
}
}

impl ProtocolHandler for DocsEngine {
fn accept(self: Arc<Self>, conn: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
}
Loading
Loading