diff --git a/Cargo.lock b/Cargo.lock index 2c2ffd4b8b..0c680d19b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4041,10 +4041,11 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d69b05325e19956f123fce85ebc4d99226552a0bb24bba4c886106297e708b" +checksum = "5e56dc58272a3f9c151b1c3a6df0e3caca083fd843b337e60f706fae2d974b6b" dependencies = [ + "anyhow", "bincode", "derive_more", "educe", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index f08cc7b784..360bf6c76d 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -45,7 +45,7 @@ parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.10.0", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] } rand = "0.8.5" ratatui = "0.26.2" reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 7e3502c275..bd1bc0d18a 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::{ensure, Context, Result}; use clap::Parser; use derive_more::FromStr; -use iroh::client::QuicIroh; +use iroh::client::Iroh; use crate::config::{ConsoleEnv, NodeConfig}; @@ -130,7 +130,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_console(data_dir_owned, &iroh).await?; console::run(&iroh, &env).await } @@ -151,7 +151,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_cli(data_dir_owned, &iroh).await?; command.run(&iroh, &env).await } diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index 2ad98a48b6..8493e4fd2d 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -4,9 +4,8 @@ use derive_more::FromStr; use futures_lite::StreamExt; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; +use iroh::client::Iroh; use iroh::docs::{Author, AuthorId}; -use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; @@ -38,10 +37,7 @@ pub enum AuthorCommands { } impl AuthorCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { author } => { env.set_author(author)?; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 9e2c7208c6..6e4c0aeba2 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -30,11 +30,10 @@ use iroh::{ BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, IncompleteBlobInfo, WrapOption, }, - Iroh, RpcService, + Iroh, }, net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; -use quic_rpc::ServiceConnection; use tokio::io::AsyncWriteExt; #[allow(clippy::large_enum_variant)] @@ -182,10 +181,7 @@ impl std::str::FromStr for TicketOrHash { } impl BlobCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Get { ticket, @@ -447,10 +443,7 @@ pub enum ListCommands { } impl ListCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blobs => { let mut response = iroh.blobs().list().await?; @@ -507,10 +500,7 @@ pub enum DeleteCommands { } impl DeleteCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blob { hash } => { let response = iroh.blobs().delete_blob(hash).await; @@ -540,10 +530,7 @@ fn apply_report_level(text: String, level: ReportLevel) -> console::StyledObject } } -pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut response = iroh.blobs().consistency_check(repair).await?; let verbosity = get_report_level(verbose); let print = |level: ReportLevel, entry: Option, message: String| { @@ -584,10 +571,7 @@ where Ok(()) } -pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut state = ValidateProgressState::new(); let mut response = iroh.blobs().validate(repair).await?; let verbosity = get_report_level(verbose); @@ -807,8 +791,8 @@ pub enum TicketOption { Print, } -pub async fn add_with_opts>( - client: &iroh::client::Iroh, +pub async fn add_with_opts( + client: &iroh::client::Iroh, source: BlobSource, opts: BlobAddOptions, ) -> Result<()> { @@ -840,8 +824,8 @@ pub async fn add_with_opts>( } /// Add data to iroh, either from a path or, if path is `None`, from STDIN. -pub async fn add>( - client: &iroh::client::Iroh, +pub async fn add( + client: &iroh::client::Iroh, source: BlobSourceIroh, tag: SetTagOption, ticket: TicketOption, diff --git a/iroh-cli/src/commands/console.rs b/iroh-cli/src/commands/console.rs index df10126d75..5f2d8bc383 100644 --- a/iroh-cli/src/commands/console.rs +++ b/iroh-cli/src/commands/console.rs @@ -2,8 +2,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use colored::Colorize; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use rustyline::{error::ReadlineError, Config, DefaultEditor}; use tokio::sync::{mpsc, oneshot}; @@ -12,10 +11,7 @@ use crate::{ config::{ConsoleEnv, ConsolePaths}, }; -pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { println!("{}", "Welcome to the Iroh console!".purple().bold()); println!("Type `{}` for a list of commands.", "help".bold()); let mut from_repl = Repl::spawn(env.clone()); diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index b2a13b3596..d26f785912 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -13,7 +13,6 @@ use dialoguer::Confirm; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; -use quic_rpc::ServiceConnection; use tokio::io::AsyncReadExt; use iroh::{ @@ -22,7 +21,7 @@ use iroh::{ client::{ blobs::WrapOption, docs::{Doc, Entry, LiveEvent, Origin, ShareMode}, - Iroh, RpcService, + Iroh, }, docs::{ store::{DownloadPolicy, FilterKind, Query, SortDirection}, @@ -303,10 +302,7 @@ impl From for iroh::docs::store::SortBy { } impl DocCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { id: doc } => { env.set_doc(doc)?; @@ -673,14 +669,7 @@ impl DocCommands { } } -async fn get_doc( - iroh: &Iroh, - env: &ConsoleEnv, - id: Option, -) -> anyhow::Result> -where - C: ServiceConnection, -{ +async fn get_doc(iroh: &Iroh, env: &ConsoleEnv, id: Option) -> anyhow::Result { iroh.docs() .open(env.doc(id)?) .await? @@ -688,14 +677,7 @@ where } /// Format the content. If an error occurs it's returned in a formatted, friendly way. -async fn fmt_content( - doc: &Doc, - entry: &Entry, - mode: DisplayContentMode, -) -> Result -where - C: ServiceConnection, -{ +async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Result { let read_failed = |err: anyhow::Error| format!(""); let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes())); let as_utf8 = |buf: Vec| String::from_utf8(buf).map(|repr| format!("\"{repr}\"")); @@ -743,10 +725,7 @@ fn human_len(entry: &Entry) -> HumanBytes { } #[must_use = "this won't be printed, you need to print it yourself"] -async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String -where - C: ServiceConnection, -{ +async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String { let key = std::str::from_utf8(entry.key()) .unwrap_or("") .bold(); @@ -776,18 +755,15 @@ fn tag_from_file_name(path: &Path) -> anyhow::Result { /// document via the hash of the blob. /// It also creates and powers the `ImportProgressBar`. #[tracing::instrument(skip_all)] -async fn import_coordinator( - doc: Doc, +async fn import_coordinator( + doc: Doc, author_id: AuthorId, root: PathBuf, prefix: String, blob_add_progress: impl Stream> + Send + Unpin + 'static, expected_size: u64, expected_entries: u64, -) -> Result<()> -where - C: ServiceConnection, -{ +) -> Result<()> { let imp = ImportProgressBar::new( &root.display().to_string(), doc.id(), @@ -982,7 +958,7 @@ mod tests { let cli = ConsoleEnv::for_console(data_dir.path().to_owned(), &node) .await .context("ConsoleEnv")?; - let iroh = iroh::client::QuicIroh::connect(data_dir.path()) + let iroh = iroh::client::Iroh::connect(data_dir.path()) .await .context("rpc connect")?; diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 4d2b8ad1bf..b85422069e 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -8,12 +8,10 @@ use comfy_table::{presets::NOTHING, Cell}; use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; -use iroh::client::RpcService; use iroh::net::{ endpoint::{ConnectionInfo, DirectAddrInfo}, key::PublicKey, }; -use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -38,10 +36,7 @@ pub enum NodeCommands { } impl NodeCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Connections => { let connections = iroh.connections().await?; diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 1d26121681..414a894ddb 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Subcommand; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use crate::config::ConsoleEnv; @@ -58,10 +57,7 @@ pub enum RpcCommands { } impl RpcCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Node { command } => command.run(iroh).await, Self::Blob { command } => command.run(iroh).await, diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index ec22c26de8..694e484424 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -33,7 +33,7 @@ pub async fn run_with_command( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let _guard = crate::logging::init_terminal_and_file_logging(&config.file_logs, iroh_data_root)?; @@ -68,7 +68,7 @@ async fn run_with_command_inner( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let relay_map = config.relay_map()?; diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index 42c228266b..fa4a298429 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -3,8 +3,7 @@ use bytes::Bytes; use clap::Subcommand; use futures_lite::StreamExt; use iroh::blobs::Tag; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -20,10 +19,7 @@ pub enum TagCommands { } impl TagCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::List => { let mut response = iroh.tags().list().await?; diff --git a/iroh-cli/src/config.rs b/iroh-cli/src/config.rs index 249b91af10..e7b7d1ac08 100644 --- a/iroh-cli/src/config.rs +++ b/iroh-cli/src/config.rs @@ -15,11 +15,10 @@ use iroh::net::{ }; use iroh::node::GcPolicy; use iroh::{ - client::{Iroh, RpcService}, + client::Iroh, docs::{AuthorId, NamespaceId}, }; use parking_lot::RwLock; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use tracing::warn; @@ -133,10 +132,7 @@ struct ConsoleEnvInner { impl ConsoleEnv { /// Read from environment variables and the console config file. - pub(crate) async fn for_console>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_console(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let console_data_dir = ConsolePaths::root(&iroh_data_dir); tokio::fs::create_dir_all(&console_data_dir) .await @@ -161,10 +157,7 @@ impl ConsoleEnv { } /// Read only from environment variables. - pub(crate) async fn for_cli>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_cli(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let author = env_author(None, iroh).await?; let env = ConsoleEnvInner { author, @@ -278,10 +271,7 @@ impl ConsoleEnv { } } -async fn env_author>( - from_config: Option, - iroh: &Iroh, -) -> Result { +async fn env_author(from_config: Option, iroh: &Iroh) -> Result { if let Some(author) = env::var(ENV_AUTHOR) .ok() .map(|s| { diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 15404847da..307ec354ba 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" } iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 4a12687725..6f76f87636 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -4,7 +4,7 @@ use anyhow::Result; use clap::Parser; use futures_lite::future::Boxed as BoxedFuture; use iroh::{ - client::MemIroh, + client::Iroh, net::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeId, @@ -59,7 +59,7 @@ const EXAMPLE_ALPN: &[u8] = b"example-proto/0"; #[derive(Debug, Clone)] struct ExampleProto { - client: MemIroh, + client: Iroh, endpoint: Endpoint, } @@ -89,7 +89,7 @@ impl ProtocolHandler for ExampleProto { } impl ExampleProto { - pub fn new(client: MemIroh, endpoint: Endpoint) -> Self { + pub fn new(client: Iroh, endpoint: Endpoint) -> Self { Self { client, endpoint } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index e158bfb355..ce9647f5fd 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -1,18 +1,23 @@ //! Client to an Iroh node. use futures_lite::{Stream, StreamExt}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; #[doc(inline)] pub use crate::rpc_protocol::RpcService; -mod mem; mod quic; -pub use self::mem::{Doc as MemDoc, Iroh as MemIroh, RpcClient as MemRpcClient}; +#[deprecated] +pub use self::docs::Doc as MemDoc; +#[deprecated] +pub use self::docs::Doc as QuicDoc; +pub use self::docs::Doc; pub use self::node::NodeStatus; -pub use self::quic::{Doc as QuicDoc, Iroh as QuicIroh, RpcClient as QuicRpcClient}; +#[deprecated] +pub use self::Iroh as MemIroh; +#[deprecated] +pub use self::Iroh as QuicIroh; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; @@ -23,38 +28,39 @@ pub mod tags; mod node; +/// Iroh rpc client - boxed so that we can have a concrete type. +pub(crate) type RpcClient = + quic_rpc::RpcClient>; + /// Iroh client. #[derive(Debug, Clone)] -pub struct Iroh { - rpc: RpcClient, +pub struct Iroh { + rpc: RpcClient, } -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Create a new high-level client to a Iroh node from the low-level RPC client. - pub fn new(rpc: RpcClient) -> Self { + pub fn new(rpc: RpcClient) -> Self { Self { rpc } } /// Blobs client - pub fn blobs(&self) -> &blobs::Client { + pub fn blobs(&self) -> &blobs::Client { blobs::Client::ref_cast(&self.rpc) } /// Docs client - pub fn docs(&self) -> &docs::Client { + pub fn docs(&self) -> &docs::Client { docs::Client::ref_cast(&self.rpc) } /// Authors client - pub fn authors(&self) -> &authors::Client { + pub fn authors(&self) -> &authors::Client { authors::Client::ref_cast(&self.rpc) } /// Tags client - pub fn tags(&self) -> &tags::Client { + pub fn tags(&self) -> &tags::Client { tags::Client::ref_cast(&self.rpc) } } diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9..7c87c0a4fa 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -3,27 +3,23 @@ use anyhow::Result; use futures_lite::{stream::StreamExt, Stream}; use iroh_docs::{Author, AuthorId}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest, - AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService, + AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, }; -use super::flatten; +use super::{flatten, RpcClient}; /// Iroh authors client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document author. /// /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author @@ -42,7 +38,7 @@ where /// /// The default author can be set with [`Self::set_default`]. pub async fn default(&self) -> Result { - let res = self.rpc.rpc(AuthorGetDefaultRequest).await?; + let res = self.rpc.rpc(AuthorGetDefaultRequest).await??; Ok(res.author_id) } diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 53245acd3d..861060b751 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -24,7 +24,7 @@ use iroh_blobs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; -use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection}; +use quic_rpc::client::BoxStreamSync; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -35,28 +35,25 @@ use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, + CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption, }; -use super::{flatten, tags, Iroh}; +use super::{flatten, tags, Iroh, RpcClient}; /// Iroh blobs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl<'a, C: ServiceConnection> From<&'a Iroh> for &'a RpcClient { - fn from(client: &'a Iroh) -> &'a RpcClient { +impl<'a> From<&'a Iroh> for &'a RpcClient { + fn from(client: &'a Iroh) -> &'a RpcClient { &client.blobs().rpc } } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -386,17 +383,14 @@ where } } - fn tags_client(&self) -> tags::Client { + fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), } } } -impl SimpleStore for Client -where - C: ServiceConnection, -{ +impl SimpleStore for Client { async fn load(&self, hash: Hash) -> anyhow::Result { self.read_to_bytes(hash).await } @@ -786,15 +780,12 @@ impl Reader { } } - pub(crate) async fn from_rpc_read>( - rpc: &RpcClient, - hash: Hash, - ) -> anyhow::Result { + pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result { Self::from_rpc_read_at(rpc, hash, 0, None).await } - async fn from_rpc_read_at>( - rpc: &RpcClient, + async fn from_rpc_read_at( + rpc: &RpcClient, hash: Hash, offset: u64, len: Option, diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index cdc15614fc..95d184a045 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -21,7 +21,7 @@ use iroh_docs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::{message::RpcMsg, RpcClient, ServiceConnection}; +use quic_rpc::message::RpcMsg; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; @@ -36,21 +36,18 @@ use crate::rpc_protocol::{ #[doc(inline)] pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; -use super::{blobs, flatten}; +use super::{blobs, flatten, RpcClient}; /// Iroh docs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document. - pub async fn create(&self) -> Result> { + pub async fn create(&self) -> Result { let res = self.rpc.rpc(DocCreateRequest {}).await??; let doc = Doc::new(self.rpc.clone(), res.id); Ok(doc) @@ -69,14 +66,14 @@ where /// Import a document from a namespace capability. /// /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync. - pub async fn import_namespace(&self, capability: Capability) -> Result> { + pub async fn import_namespace(&self, capability: Capability) -> Result { let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); Ok(doc) } /// Import a document from a ticket and join all peers in the ticket. - pub async fn import(&self, ticket: DocTicket) -> Result> { + pub async fn import(&self, ticket: DocTicket) -> Result { let DocTicket { capability, nodes } = ticket; let doc = self.import_namespace(capability).await?; doc.start_sync(nodes).await?; @@ -92,7 +89,7 @@ where pub async fn import_and_subscribe( &self, ticket: DocTicket, - ) -> Result<(Doc, impl Stream>)> { + ) -> Result<(Doc, impl Stream>)> { let DocTicket { capability, nodes } = ticket; let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); @@ -108,7 +105,7 @@ where } /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. - pub async fn open(&self, id: NamespaceId) -> Result>> { + pub async fn open(&self, id: NamespaceId) -> Result> { self.rpc.rpc(DocOpenRequest { doc_id: id }).await??; let doc = Doc::new(self.rpc.clone(), id); Ok(Some(doc)) @@ -117,28 +114,25 @@ where /// Document handle #[derive(Debug, Clone)] -pub struct Doc>(Arc>); +pub struct Doc(Arc); -impl> PartialEq for Doc { +impl PartialEq for Doc { fn eq(&self, other: &Self) -> bool { self.0.id == other.0.id } } -impl> Eq for Doc {} +impl Eq for Doc {} #[derive(Debug)] -struct DocInner> { +struct DocInner { id: NamespaceId, - rpc: RpcClient, + rpc: RpcClient, closed: AtomicBool, rt: tokio::runtime::Handle, } -impl Drop for DocInner -where - C: ServiceConnection, -{ +impl Drop for DocInner { fn drop(&mut self) { let doc_id = self.id; let rpc = self.rpc.clone(); @@ -150,11 +144,8 @@ where } } -impl Doc -where - C: ServiceConnection, -{ - fn new(rpc: RpcClient, id: NamespaceId) -> Self { +impl Doc { + fn new(rpc: RpcClient, id: NamespaceId) -> Self { Self(Arc::new(DocInner { rpc, id, @@ -420,8 +411,8 @@ where } } -impl<'a, C: ServiceConnection> From<&'a Doc> for &'a RpcClient { - fn from(doc: &'a Doc) -> &'a RpcClient { +impl<'a> From<&'a Doc> for &'a RpcClient { + fn from(doc: &'a Doc) -> &'a RpcClient { &doc.0.rpc } } @@ -476,26 +467,14 @@ impl Entry { /// Read the content of an [`Entry`] as a streaming [`blobs::Reader`]. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_reader( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_bytes( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() diff --git a/iroh/src/client/mem.rs b/iroh/src/client/mem.rs deleted file mode 100644 index 85f65a5d93..0000000000 --- a/iroh/src/client/mem.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Type declarations for an in-memory client to an iroh node running in the same process. -//! -//! The in-memory client is obtained directly from a running node through -//! [`crate::node::Node::client`] - -use quic_rpc::transport::flume::FlumeConnection; - -use crate::rpc_protocol::RpcService; - -/// RPC client to an iroh node running in the same process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// In-memory client to an iroh node running in the same process. -/// -/// This is obtained from [`crate::node::Node::client`]. -pub type Iroh = super::Iroh>; - -/// In-memory document client to an iroh node running in the same process. -pub type Doc = super::docs::Doc>; diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 6f8460b376..96ce9d87c2 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -6,21 +6,17 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ CounterStats, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, RpcService, + NodeStatusRequest, }; use super::{flatten, Iroh}; -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Get statistics of the running node. pub async fn stats(&self) -> Result> { let res = self.rpc.rpc(NodeStatsRequest {}).await??; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 7f7333810f..6351c9632d 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -8,8 +8,9 @@ use std::{ }; use anyhow::{bail, Context}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; +use super::Iroh; use crate::{ node::RpcStatus, rpc_protocol::{NodeStatusRequest, RpcService}, @@ -20,15 +21,7 @@ use crate::{ pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; /// RPC client to an iroh node running in a separate process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// Client to an iroh node running in a separate process. -/// -/// This is obtained from [`Iroh::connect`]. -pub type Iroh = super::Iroh>; - -/// RPC document client to an iroh node running in a separate process. -pub type Doc = super::docs::Doc>; +pub type RpcClient = quic_rpc::RpcClient>; impl Iroh { /// Connect to an iroh node running on the same computer, but in a different process. @@ -51,6 +44,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); let server_name = "localhost".to_string(); let connection = QuinnConnection::::new(endpoint, addr, server_name); + let connection = BoxedConnection::new(connection); let client = RpcClient::new(connection); // Do a status request to check if the server is running. let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest)) diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 9c3ef34f12..66166a396c 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -3,23 +3,20 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_blobs::{BlobFormat, Hash, Tag}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService}; +use super::RpcClient; +use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest}; /// Iroh tags client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// List all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 1ad2457a06..91e6febc61 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,7 +13,6 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::store::{GcMarkEvent, GcSweepEvent, Store as BaoStore}; use iroh_blobs::{downloader::Downloader, protocol::Closed}; -use iroh_docs::engine::Engine; use iroh_gossip::net::Gossip; use iroh_net::key::SecretKey; use iroh_net::Endpoint; @@ -24,14 +23,18 @@ use tokio_util::sync::CancellationToken; use tokio_util::task::LocalPoolHandle; use tracing::{debug, error, info, warn}; -use crate::{client::RpcService, node::protocol::ProtocolMap}; +use crate::{ + client::RpcService, + node::{docs::DocsEngine, protocol::ProtocolMap}, +}; mod builder; +mod docs; mod protocol; mod rpc; mod rpc_status; -pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; +pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, StorageConfig}; pub use self::rpc_status::RpcStatus; pub use protocol::ProtocolHandler; @@ -55,12 +58,12 @@ pub struct Node { #[derive(derive_more::Debug)] struct NodeInner { db: D, - docs: DocsEngine, + docs: Option, endpoint: Endpoint, gossip: Gossip, secret_key: SecretKey, cancel_token: CancellationToken, - client: crate::client::MemIroh, + client: crate::client::Iroh, #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, @@ -133,7 +136,7 @@ impl Node { } /// Return a client to control this node over an in-memory channel. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } @@ -180,7 +183,7 @@ impl Node { } impl std::ops::Deref for Node { - type Target = crate::client::MemIroh; + type Target = crate::client::Iroh; fn deref(&self) -> &Self::Target { &self.inner.client @@ -314,9 +317,22 @@ impl NodeInner { join_set.shutdown().await; } + /// Shutdown the different parts of the node concurrently. async fn shutdown(&self, protocols: Arc) { - // Shutdown the different parts of the node concurrently. let error_code = Closed::ProviderTerminating; + + // Shutdown future for the docs engine, if enabled. + let docs_shutdown = { + let docs = self.docs.clone(); + async move { + if let Some(docs) = docs { + docs.shutdown().await + } else { + Ok(()) + } + } + }; + // We ignore all errors during shutdown. let _ = tokio::join!( // Close the endpoint. @@ -326,8 +342,8 @@ impl NodeInner { self.endpoint .clone() .close(error_code.into(), error_code.reason()), - // Shutdown sync engine. - self.docs.shutdown(), + // Shutdown docs engine. + docs_shutdown, // Shutdown blobs store engine. self.db.shutdown(), // Shutdown protocol handlers. @@ -342,7 +358,6 @@ impl NodeInner { ) { tracing::info!("Starting GC task with interval {:?}", gc_period); let db = &self.db; - let docs = &self.docs; let mut live = BTreeSet::new(); 'outer: loop { if let Err(cause) = db.gc_start().await { @@ -356,22 +371,24 @@ impl NodeInner { tracing::debug!("Starting GC"); live.clear(); - let doc_hashes = match docs.sync.content_hashes().await { - Ok(hashes) => hashes, - Err(err) => { - tracing::warn!("Error getting doc hashes: {}", err); - continue 'outer; - } - }; - for hash in doc_hashes { - match hash { - Ok(hash) => { - live.insert(hash); - } + if let Some(docs) = &self.docs { + let doc_hashes = match docs.sync.content_hashes().await { + Ok(hashes) => hashes, Err(err) => { - tracing::error!("Error getting doc hash: {}", err); + tracing::warn!("Error getting doc hashes: {}", err); continue 'outer; } + }; + for hash in doc_hashes { + match hash { + Ok(hash) => { + live.insert(hash); + } + Err(err) => { + tracing::error!("Error getting doc hash: {}", err); + continue 'outer; + } + } } } @@ -436,17 +453,6 @@ async fn handle_connection( } } -/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. -#[derive(Debug, Clone)] -pub(crate) struct DocsEngine(Engine); - -impl std::ops::Deref for DocsEngine { - type Target = Engine; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98..d2917eb2ca 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -12,7 +12,7 @@ use iroh_blobs::{ downloader::Downloader, store::{Map, Store as BaoStore}, }; -use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_docs::engine::DefaultAuthorStorage; use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ @@ -41,7 +41,7 @@ use crate::{ util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; +use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner}; /// Default bind address for the node. /// 11204 is "iroh" in leetspeak @@ -56,6 +56,17 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5); const MAX_CONNECTIONS: u32 = 1024; const MAX_STREAMS: u64 = 10; +/// Storage backend for documents. +#[derive(Debug, Clone)] +pub enum DocsStorage { + /// Disable docs completely. + Disabled, + /// In-memory storage. + Memory, + /// File-based persistent storage. + Persistent(PathBuf), +} + /// Builder for the [`Node`]. /// /// You must supply a blob store and a document store. @@ -85,7 +96,7 @@ where gc_policy: GcPolicy, dns_resolver: Option, node_discovery: DiscoveryConfig, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: bool, /// Callback to register when a gc loop is done @@ -146,7 +157,7 @@ impl Default for Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store: iroh_docs::store::Store::memory(), + docs_storage: DocsStorage::Memory, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -159,7 +170,7 @@ impl Builder { /// Creates a new builder for [`Node`] using the given databases. pub fn with_db_and_store( blobs_store: D, - docs_store: iroh_docs::store::Store, + docs_storage: DocsStorage, storage: StorageConfig, ) -> Self { Self { @@ -172,7 +183,7 @@ impl Builder { dns_resolver: None, rpc_endpoint: Default::default(), gc_policy: GcPolicy::Disabled, - docs_store, + docs_storage, node_discovery: Default::default(), #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -200,8 +211,7 @@ where .with_context(|| { format!("Failed to load blobs database from {}", blob_dir.display()) })?; - let docs_store = - iroh_docs::store::fs::Store::persistent(IrohPaths::DocsDatabase.with_root(root))?; + let docs_storage = DocsStorage::Persistent(IrohPaths::DocsDatabase.with_root(root)); let v0 = blobs_store .import_flat_store(iroh_blobs::store::fs::FlatStorePaths { @@ -237,7 +247,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store, + docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: false, @@ -258,7 +268,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -284,7 +294,7 @@ where relay_mode: self.relay_mode, dns_resolver: self.dns_resolver, gc_policy: self.gc_policy, - docs_store: self.docs_store, + docs_storage: self.docs_storage, node_discovery: self.node_discovery, #[cfg(any(test, feature = "test-utils"))] insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify, @@ -300,6 +310,12 @@ where self } + /// Disables documents support on this node completely. + pub fn disable_docs(mut self) -> Self { + self.docs_storage = DocsStorage::Disabled; + self + } + /// Sets the relay servers to assist in establishing connectivity. /// /// Relay servers are used to discover other nodes by `PublicKey` and also help @@ -405,7 +421,6 @@ where async fn build_inner(self) -> Result> { trace!("building node"); let lp = LocalPoolHandle::new(num_cpus::get()); - let endpoint = { let mut transport_config = quinn::TransportConfig::default(); transport_config @@ -461,28 +476,28 @@ where let addr = endpoint.node_addr().await?; trace!("endpoint address: {addr:?}"); - // initialize the gossip protocol + // Initialize the gossip protocol. let gossip = Gossip::from_endpoint(endpoint.clone(), Default::default(), &addr.info); - - // initialize the downloader + // Initialize the downloader. let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone()); - // load or create the default author for documents - // spawn the docs engine - let docs = DocsEngine( - Engine::spawn( - endpoint.clone(), - gossip.clone(), - self.docs_store, - self.blobs_store.clone(), - downloader.clone(), - self.storage.default_author_storage(), - ) - .await?, - ); + // Spawn the docs engine, if enabled. + // This returns None for DocsStorage::Disabled, otherwise Some(DocsEngine). + let docs = DocsEngine::spawn( + self.docs_storage, + self.blobs_store.clone(), + self.storage.default_author_storage(), + endpoint.clone(), + gossip.clone(), + downloader.clone(), + ) + .await?; // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); + // box the controller. Boxing has a special case for the flume channel that avoids allocations, + // so this has zero overhead. + let controller = quic_rpc::transport::boxed::Connection::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); let inner = Arc::new(NodeInner { @@ -545,7 +560,7 @@ impl> ProtocolBuilde /// # use std::sync::Arc; /// # use anyhow::Result; /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::MemIroh}; + /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::Iroh}; /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -554,7 +569,7 @@ impl> ProtocolBuilde /// /// #[derive(Debug)] /// struct MyProtocol { - /// client: MemIroh + /// client: Iroh /// } /// /// impl ProtocolHandler for MyProtocol { @@ -589,7 +604,7 @@ impl> ProtocolBuilde /// /// Note that RPC calls performed with the client will not complete until the node is /// spawned. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } @@ -637,9 +652,10 @@ impl> ProtocolBuilde let gossip = self.gossip().clone(); self = self.accept(GOSSIP_ALPN, Arc::new(gossip)); - // Register docs. - let docs = self.inner.docs.clone(); - self = self.accept(DOCS_ALPN, Arc::new(docs)); + // Register docs, if enabled. + if let Some(docs) = self.inner.docs.clone() { + self = self.accept(DOCS_ALPN, Arc::new(docs)); + } self } diff --git a/iroh/src/node/docs.rs b/iroh/src/node/docs.rs new file mode 100644 index 0000000000..7f3ee8b183 --- /dev/null +++ b/iroh/src/node/docs.rs @@ -0,0 +1,55 @@ +use std::{ops::Deref, sync::Arc}; + +use anyhow::Result; +use futures_lite::future::Boxed as BoxedFuture; +use iroh_blobs::downloader::Downloader; +use iroh_gossip::net::Gossip; + +use iroh_docs::engine::{DefaultAuthorStorage, Engine}; +use iroh_net::{endpoint::Connecting, Endpoint}; + +use crate::node::{DocsStorage, ProtocolHandler}; + +/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. +#[derive(Debug, Clone)] +pub(crate) struct DocsEngine(Engine); + +impl DocsEngine { + pub async fn spawn( + storage: DocsStorage, + blobs_store: S, + default_author_storage: DefaultAuthorStorage, + endpoint: Endpoint, + gossip: Gossip, + downloader: Downloader, + ) -> anyhow::Result> { + let docs_store = match storage { + DocsStorage::Disabled => return Ok(None), + DocsStorage::Memory => iroh_docs::store::fs::Store::memory(), + DocsStorage::Persistent(path) => iroh_docs::store::fs::Store::persistent(path)?, + }; + let engine = Engine::spawn( + endpoint, + gossip, + docs_store, + blobs_store, + downloader, + default_author_storage, + ) + .await?; + Ok(Some(DocsEngine(engine))) + } +} + +impl Deref for DocsEngine { + type Target = Engine; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl ProtocolHandler for DocsEngine { + fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 3c50368b42..a0f5b53be5 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -5,8 +5,6 @@ use futures_lite::future::Boxed as BoxedFuture; use futures_util::future::join_all; use iroh_net::endpoint::Connecting; -use crate::node::DocsEngine; - /// Handler for incoming connections. /// /// An iroh node can accept connections for arbitrary ALPN protocols. By default, the iroh node @@ -119,9 +117,3 @@ impl ProtocolHandler for iroh_gossip::net::Gossip { Box::pin(async move { self.handle_connection(conn.await?).await }) } } - -impl ProtocolHandler for DocsEngine { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { self.handle_connection(conn).await }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 697b6d63cd..cc79655e87 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -7,7 +7,7 @@ use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use genawaiter::sync::{Co, Gen}; -use iroh_base::rpc::RpcResult; +use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::downloader::{DownloadRequest, Downloader}; use iroh_blobs::export::ExportProgress; use iroh_blobs::format::collection::Collection; @@ -30,12 +30,15 @@ use quic_rpc::{ ServiceEndpoint, }; use tokio::task::JoinSet; -use tokio_util::task::LocalPoolHandle; +use tokio_util::{either::Either, task::LocalPoolHandle}; use tracing::{debug, info, warn}; -use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; -use crate::client::tags::TagInfo; -use crate::client::NodeStatus; +use crate::client::{ + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + tags::TagInfo, + NodeStatus, +}; +use crate::node::{docs::DocsEngine, NodeInner}; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, @@ -49,8 +52,6 @@ use crate::rpc_protocol::{ NodeWatchResponse, Request, RpcService, SetTagOption, }; -use super::NodeInner; - mod docs; const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); @@ -73,6 +74,38 @@ impl Handler { } impl Handler { + fn docs(&self) -> Option<&DocsEngine> { + self.inner.docs.as_ref() + } + + async fn with_docs(self, f: F) -> RpcResult + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> Fut, + Fut: std::future::Future>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + f(docs).await + } else { + Err(docs_disabled()) + } + } + + fn with_docs_stream(self, f: F) -> impl Stream> + where + T: Send + 'static, + F: FnOnce(DocsEngine) -> S, + S: Stream>, + { + if let Some(docs) = self.docs() { + let docs = docs.clone(); + Either::Left(f(docs)) + } else { + Either::Right(futures_lite::stream::once(Err(docs_disabled()))) + } + } + pub(crate) fn spawn_rpc_request>( inner: Arc>, join_set: &mut JoinSet>, @@ -131,92 +164,95 @@ impl Handler { BlobReadAt(msg) => chan.server_streaming(msg, self, Self::blob_read_at).await, BlobAddStream(msg) => chan.bidi_streaming(msg, self, Self::blob_add_stream).await, BlobAddStreamUpdate(_msg) => Err(RpcServerError::UnexpectedUpdateMessage), + AuthorList(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.author_list(req) + handler.with_docs_stream(|docs| docs.author_list(req)) }) .await } AuthorCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_create(req).await }) }) .await } AuthorImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_import(req).await }) }) .await } AuthorExport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_export(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_export(req).await }) }) .await } AuthorDelete(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_delete(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_delete(req).await }) }) .await } AuthorGetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_default(req) + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { Ok(docs.author_default(req)) }) }) .await } AuthorSetDefault(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.author_set_default(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.author_set_default(req).await }) }) .await } DocOpen(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_open(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_open(req).await }) }) .await } DocClose(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_close(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_close(req).await }) }) .await } DocStatus(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_status(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_status(req).await }) }) .await } DocList(msg) => { - chan.server_streaming(msg, self, |handler, req| handler.inner.docs.doc_list(req)) - .await + chan.server_streaming(msg, self, |handler, req| { + handler.with_docs_stream(|docs| docs.doc_list(req)) + }) + .await } DocCreate(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_create(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_create(req).await }) }) .await } DocDrop(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_drop(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_drop(req).await }) }) .await } DocImport(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_import(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_import(req).await }) }) .await } DocSet(msg) => { - let bao_store = self.inner.db.clone(); - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set(&bao_store, req).await + let blobs_store = self.inner.db.clone(); + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set(&blobs_store, req).await }) }) .await } @@ -229,68 +265,70 @@ impl Handler { .await } DocDel(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_del(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_del(req).await }) }) .await } DocSetHash(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_hash(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_hash(req).await }) }) .await } DocGet(msg) => { chan.server_streaming(msg, self, |handler, req| { - handler.inner.docs.doc_get_many(req) + handler.with_docs_stream(|docs| docs.doc_get_many(req)) }) .await } DocGetExact(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_exact(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_exact(req).await }) }) .await } DocStartSync(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_start_sync(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_start_sync(req).await }) }) .await } DocLeave(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_leave(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_leave(req).await }) }) .await } DocShare(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_share(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_share(req).await }) }) .await } DocSubscribe(msg) => { chan.try_server_streaming(msg, self, |handler, req| async move { - handler.inner.docs.doc_subscribe(req).await + handler + .with_docs(|docs| async move { docs.doc_subscribe(req).await }) + .await }) .await } DocSetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_set_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_set_download_policy(req).await }) }) .await } DocGetDownloadPolicy(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_download_policy(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_download_policy(req).await }) }) .await } DocGetSyncPeers(msg) => { - chan.rpc(msg, self, |handler, req| async move { - handler.inner.docs.doc_get_sync_peers(req).await + chan.rpc(msg, self, |handler, req| { + handler.with_docs(|docs| async move { docs.doc_get_sync_peers(req).await }) }) .await } @@ -467,6 +505,7 @@ impl Handler { msg: DocImportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; @@ -519,16 +558,14 @@ impl Handler { let hash_and_format = temp_tag.inner(); let HashAndFormat { hash, .. } = *hash_and_format; - self.inner - .docs - .doc_set_hash(DocSetHashRequest { - doc_id, - author_id, - key: key.clone(), - hash, - size, - }) - .await?; + docs.doc_set_hash(DocSetHashRequest { + doc_id, + author_id, + key: key.clone(), + hash, + size, + }) + .await?; drop(temp_tag); progress.send(DocImportProgress::AllDone { key }).await?; Ok(()) @@ -553,6 +590,7 @@ impl Handler { msg: DocExportFileRequest, progress: flume::Sender, ) -> anyhow::Result<()> { + let _docs = self.docs().ok_or_else(|| anyhow!("docs are disabled"))?; let progress = FlumeProgressSender::new(progress); let DocExportFileRequest { entry, path, mode } = msg; let key = bytes::Bytes::from(entry.key().to_vec()); @@ -1128,3 +1166,7 @@ where res.map_err(Into::into) } + +fn docs_disabled() -> RpcError { + anyhow!("docs are disabled").into() +} diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs index 00ad0c8ab2..29f633eb81 100644 --- a/iroh/src/node/rpc_status.rs +++ b/iroh/src/node/rpc_status.rs @@ -16,7 +16,7 @@ pub enum RpcStatus { /// The port we are connected on. port: u16, /// Actual connected RPC client. - client: crate::client::QuicRpcClient, + client: crate::client::RpcClient, }, } diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 8fe71e7d6a..8334590a11 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -439,7 +439,7 @@ pub struct AuthorCreateResponse { pub struct AuthorGetDefaultRequest; impl RpcMsg for AuthorGetDefaultRequest { - type Response = AuthorGetDefaultResponse; + type Response = RpcResult; } /// Response for [`AuthorGetDefaultRequest`] @@ -1153,7 +1153,7 @@ pub enum Response { AuthorList(RpcResult), AuthorCreate(RpcResult), - AuthorGetDefault(AuthorGetDefaultResponse), + AuthorGetDefault(RpcResult), AuthorSetDefault(RpcResult), AuthorImport(RpcResult), AuthorExport(RpcResult), diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index dcca0893b5..e032691df9 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -6,7 +6,7 @@ use std::{ use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; use bytes::Bytes; -use iroh::node::{self, Node}; +use iroh::node::{self, DocsStorage, Node}; use rand::RngCore; use iroh_blobs::{ @@ -41,17 +41,19 @@ async fn wrap_in_node(bao_store: S, gc_period: Duration) -> (Node, flume:: where S: iroh_blobs::store::Store, { - let doc_store = iroh_docs::store::Store::memory(); let (gc_send, gc_recv) = flume::unbounded(); - let node = - node::Builder::with_db_and_store(bao_store, doc_store, iroh::node::StorageConfig::Mem) - .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) - .register_gc_done_cb(Box::new(move || { - gc_send.send(()).ok(); - })) - .spawn() - .await - .unwrap(); + let node = node::Builder::with_db_and_store( + bao_store, + DocsStorage::Memory, + iroh::node::StorageConfig::Mem, + ) + .gc_policy(iroh::node::GcPolicy::Interval(gc_period)) + .register_gc_done_cb(Box::new(move || { + gc_send.send(()).ok(); + })) + .spawn() + .await + .unwrap(); (node, gc_recv) } diff --git a/iroh/tests/provide.rs b/iroh/tests/provide.rs index 13376273dd..7b9abf9648 100644 --- a/iroh/tests/provide.rs +++ b/iroh/tests/provide.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::{Context, Result}; use bytes::Bytes; use futures_lite::FutureExt; -use iroh::node::Builder; +use iroh::node::{Builder, DocsStorage}; use iroh_base::node_addr::AddrInfoOptions; use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId}; use quic_rpc::transport::misc::DummyServerEndpoint; @@ -40,8 +40,8 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result(db: D) -> Builder { - let store = iroh_docs::store::Store::memory(); - iroh::node::Builder::with_db_and_store(db, store, iroh::node::StorageConfig::Mem).bind_port(0) + iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem) + .bind_port(0) } #[tokio::test] diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index a5e9b8a463..b2ff42fedd 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -13,7 +13,7 @@ use iroh::{ base::node_addr::AddrInfoOptions, client::{ docs::{Entry, LiveEvent, ShareMode}, - MemDoc, + Doc, }, net::key::{PublicKey, SecretKey}, node::{Builder, Node}, @@ -1012,14 +1012,14 @@ async fn test_list_docs_stream() -> Result<()> { } /// Get all entries of a document. -async fn get_all(doc: &MemDoc) -> anyhow::Result> { +async fn get_all(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.collect::>().await; entries.into_iter().collect() } /// Get all entries of a document with the blob content. -async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result> { +async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.and_then(|entry| async { let content = entry.content_bytes(doc).await; @@ -1031,7 +1031,7 @@ async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result, n: usize, cb: impl Fn(usize, usize) -> (AuthorId, String, String), @@ -1090,7 +1090,7 @@ async fn wait_for_events( } async fn assert_all_docs( - docs: &[MemDoc], + docs: &[Doc], node_ids: &[PublicKey], expected: &Vec, label: &str, @@ -1203,12 +1203,12 @@ async fn sync_drop_doc() -> Result<()> { Ok(()) } -async fn assert_latest(doc: &MemDoc, key: &[u8], value: &[u8]) { +async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { let content = get_latest(doc, key).await.unwrap(); assert_eq!(content, value.to_vec()); } -async fn get_latest(doc: &MemDoc, key: &[u8]) -> anyhow::Result> { +async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { let query = Query::single_latest_per_key().key_exact(key); let entry = doc .get_many(query)