From cd43f18e0ae9a7755aab0a402ee115e69fee60a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Thu, 20 Jun 2024 13:36:46 +0300 Subject: [PATCH] refactor(iroh): use boxed client to get rid of the C type parameter (#2353) ## Description I implemented a boxed connection and a boxed service endpoint in quic-rpc. With this we can get rid of the `` type parameter and make the quinn and mem client/server side the same type. The nice thing about this approach is that it will not lead to additonal boxing on the mem path, and for the quinn or whatever io path the boxing will probably not matter that much compared to all the other things going on. ## Breaking Changes A lot. Iroh and all the clients no longer have ``. ## Notes & open questions Note: I marked the old type aliases MemIroh, QuicIroh etc as deprecated. That does not seem to actually do anything, but just serves as a reminder to remove them in the near future. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. ~~- [x] Tests if relevant.~~ - [x] All breaking changes documented. --- Cargo.lock | 5 ++- iroh-cli/Cargo.toml | 2 +- iroh-cli/src/commands.rs | 6 +-- iroh-cli/src/commands/author.rs | 8 +--- iroh-cli/src/commands/blob.rs | 36 +++++------------- iroh-cli/src/commands/console.rs | 8 +--- iroh-cli/src/commands/doc.rs | 42 +++++---------------- iroh-cli/src/commands/node.rs | 7 +--- iroh-cli/src/commands/rpc.rs | 8 +--- iroh-cli/src/commands/start.rs | 4 +- iroh-cli/src/commands/tag.rs | 8 +--- iroh-cli/src/config.rs | 18 ++------- iroh/Cargo.toml | 2 +- iroh/examples/custom-protocol.rs | 6 +-- iroh/src/client.rs | 36 ++++++++++-------- iroh/src/client/authors.rs | 14 +++---- iroh/src/client/blobs.rs | 35 +++++++---------- iroh/src/client/docs.rs | 65 +++++++++++--------------------- iroh/src/client/mem.rs | 19 ---------- iroh/src/client/node.rs | 8 +--- iroh/src/client/quic.rs | 14 ++----- iroh/src/client/tags.rs | 13 +++---- iroh/src/node.rs | 6 +-- iroh/src/node/builder.rs | 9 +++-- iroh/src/node/rpc_status.rs | 2 +- iroh/tests/sync.rs | 14 +++---- 26 files changed, 134 insertions(+), 261 deletions(-) delete mode 100644 iroh/src/client/mem.rs diff --git a/Cargo.lock b/Cargo.lock index 2c2ffd4b8ba..0c680d19b77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4041,10 +4041,11 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d69b05325e19956f123fce85ebc4d99226552a0bb24bba4c886106297e708b" +checksum = "5e56dc58272a3f9c151b1c3a6df0e3caca083fd843b337e60f706fae2d974b6b" dependencies = [ + "anyhow", "bincode", "derive_more", "educe", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index f08cc7b7842..360bf6c76d4 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -45,7 +45,7 @@ parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.10.0", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] } rand = "0.8.5" ratatui = "0.26.2" reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 7e3502c275b..bd1bc0d18af 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::{ensure, Context, Result}; use clap::Parser; use derive_more::FromStr; -use iroh::client::QuicIroh; +use iroh::client::Iroh; use crate::config::{ConsoleEnv, NodeConfig}; @@ -130,7 +130,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_console(data_dir_owned, &iroh).await?; console::run(&iroh, &env).await } @@ -151,7 +151,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_cli(data_dir_owned, &iroh).await?; command.run(&iroh, &env).await } diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index 2ad98a48b6b..8493e4fd2dd 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -4,9 +4,8 @@ use derive_more::FromStr; use futures_lite::StreamExt; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; +use iroh::client::Iroh; use iroh::docs::{Author, AuthorId}; -use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; @@ -38,10 +37,7 @@ pub enum AuthorCommands { } impl AuthorCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { author } => { env.set_author(author)?; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 9e2c7208c62..6e4c0aeba2e 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -30,11 +30,10 @@ use iroh::{ BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, IncompleteBlobInfo, WrapOption, }, - Iroh, RpcService, + Iroh, }, net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; -use quic_rpc::ServiceConnection; use tokio::io::AsyncWriteExt; #[allow(clippy::large_enum_variant)] @@ -182,10 +181,7 @@ impl std::str::FromStr for TicketOrHash { } impl BlobCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Get { ticket, @@ -447,10 +443,7 @@ pub enum ListCommands { } impl ListCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blobs => { let mut response = iroh.blobs().list().await?; @@ -507,10 +500,7 @@ pub enum DeleteCommands { } impl DeleteCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blob { hash } => { let response = iroh.blobs().delete_blob(hash).await; @@ -540,10 +530,7 @@ fn apply_report_level(text: String, level: ReportLevel) -> console::StyledObject } } -pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut response = iroh.blobs().consistency_check(repair).await?; let verbosity = get_report_level(verbose); let print = |level: ReportLevel, entry: Option, message: String| { @@ -584,10 +571,7 @@ where Ok(()) } -pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut state = ValidateProgressState::new(); let mut response = iroh.blobs().validate(repair).await?; let verbosity = get_report_level(verbose); @@ -807,8 +791,8 @@ pub enum TicketOption { Print, } -pub async fn add_with_opts>( - client: &iroh::client::Iroh, +pub async fn add_with_opts( + client: &iroh::client::Iroh, source: BlobSource, opts: BlobAddOptions, ) -> Result<()> { @@ -840,8 +824,8 @@ pub async fn add_with_opts>( } /// Add data to iroh, either from a path or, if path is `None`, from STDIN. -pub async fn add>( - client: &iroh::client::Iroh, +pub async fn add( + client: &iroh::client::Iroh, source: BlobSourceIroh, tag: SetTagOption, ticket: TicketOption, diff --git a/iroh-cli/src/commands/console.rs b/iroh-cli/src/commands/console.rs index df10126d75f..5f2d8bc3832 100644 --- a/iroh-cli/src/commands/console.rs +++ b/iroh-cli/src/commands/console.rs @@ -2,8 +2,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use colored::Colorize; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use rustyline::{error::ReadlineError, Config, DefaultEditor}; use tokio::sync::{mpsc, oneshot}; @@ -12,10 +11,7 @@ use crate::{ config::{ConsoleEnv, ConsolePaths}, }; -pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { println!("{}", "Welcome to the Iroh console!".purple().bold()); println!("Type `{}` for a list of commands.", "help".bold()); let mut from_repl = Repl::spawn(env.clone()); diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index b2a13b3596d..d26f7859127 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -13,7 +13,6 @@ use dialoguer::Confirm; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; -use quic_rpc::ServiceConnection; use tokio::io::AsyncReadExt; use iroh::{ @@ -22,7 +21,7 @@ use iroh::{ client::{ blobs::WrapOption, docs::{Doc, Entry, LiveEvent, Origin, ShareMode}, - Iroh, RpcService, + Iroh, }, docs::{ store::{DownloadPolicy, FilterKind, Query, SortDirection}, @@ -303,10 +302,7 @@ impl From for iroh::docs::store::SortBy { } impl DocCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { id: doc } => { env.set_doc(doc)?; @@ -673,14 +669,7 @@ impl DocCommands { } } -async fn get_doc( - iroh: &Iroh, - env: &ConsoleEnv, - id: Option, -) -> anyhow::Result> -where - C: ServiceConnection, -{ +async fn get_doc(iroh: &Iroh, env: &ConsoleEnv, id: Option) -> anyhow::Result { iroh.docs() .open(env.doc(id)?) .await? @@ -688,14 +677,7 @@ where } /// Format the content. If an error occurs it's returned in a formatted, friendly way. -async fn fmt_content( - doc: &Doc, - entry: &Entry, - mode: DisplayContentMode, -) -> Result -where - C: ServiceConnection, -{ +async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Result { let read_failed = |err: anyhow::Error| format!(""); let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes())); let as_utf8 = |buf: Vec| String::from_utf8(buf).map(|repr| format!("\"{repr}\"")); @@ -743,10 +725,7 @@ fn human_len(entry: &Entry) -> HumanBytes { } #[must_use = "this won't be printed, you need to print it yourself"] -async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String -where - C: ServiceConnection, -{ +async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String { let key = std::str::from_utf8(entry.key()) .unwrap_or("") .bold(); @@ -776,18 +755,15 @@ fn tag_from_file_name(path: &Path) -> anyhow::Result { /// document via the hash of the blob. /// It also creates and powers the `ImportProgressBar`. #[tracing::instrument(skip_all)] -async fn import_coordinator( - doc: Doc, +async fn import_coordinator( + doc: Doc, author_id: AuthorId, root: PathBuf, prefix: String, blob_add_progress: impl Stream> + Send + Unpin + 'static, expected_size: u64, expected_entries: u64, -) -> Result<()> -where - C: ServiceConnection, -{ +) -> Result<()> { let imp = ImportProgressBar::new( &root.display().to_string(), doc.id(), @@ -982,7 +958,7 @@ mod tests { let cli = ConsoleEnv::for_console(data_dir.path().to_owned(), &node) .await .context("ConsoleEnv")?; - let iroh = iroh::client::QuicIroh::connect(data_dir.path()) + let iroh = iroh::client::Iroh::connect(data_dir.path()) .await .context("rpc connect")?; diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 4d2b8ad1bf8..b85422069e0 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -8,12 +8,10 @@ use comfy_table::{presets::NOTHING, Cell}; use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; -use iroh::client::RpcService; use iroh::net::{ endpoint::{ConnectionInfo, DirectAddrInfo}, key::PublicKey, }; -use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -38,10 +36,7 @@ pub enum NodeCommands { } impl NodeCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Connections => { let connections = iroh.connections().await?; diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 1d261216816..414a894ddb8 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Subcommand; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use crate::config::ConsoleEnv; @@ -58,10 +57,7 @@ pub enum RpcCommands { } impl RpcCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Node { command } => command.run(iroh).await, Self::Blob { command } => command.run(iroh).await, diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index ec22c26de87..694e4844248 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -33,7 +33,7 @@ pub async fn run_with_command( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let _guard = crate::logging::init_terminal_and_file_logging(&config.file_logs, iroh_data_root)?; @@ -68,7 +68,7 @@ async fn run_with_command_inner( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let relay_map = config.relay_map()?; diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index 42c228266b2..fa4a2984299 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -3,8 +3,7 @@ use bytes::Bytes; use clap::Subcommand; use futures_lite::StreamExt; use iroh::blobs::Tag; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -20,10 +19,7 @@ pub enum TagCommands { } impl TagCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::List => { let mut response = iroh.tags().list().await?; diff --git a/iroh-cli/src/config.rs b/iroh-cli/src/config.rs index 249b91af103..e7b7d1ac083 100644 --- a/iroh-cli/src/config.rs +++ b/iroh-cli/src/config.rs @@ -15,11 +15,10 @@ use iroh::net::{ }; use iroh::node::GcPolicy; use iroh::{ - client::{Iroh, RpcService}, + client::Iroh, docs::{AuthorId, NamespaceId}, }; use parking_lot::RwLock; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use tracing::warn; @@ -133,10 +132,7 @@ struct ConsoleEnvInner { impl ConsoleEnv { /// Read from environment variables and the console config file. - pub(crate) async fn for_console>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_console(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let console_data_dir = ConsolePaths::root(&iroh_data_dir); tokio::fs::create_dir_all(&console_data_dir) .await @@ -161,10 +157,7 @@ impl ConsoleEnv { } /// Read only from environment variables. - pub(crate) async fn for_cli>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_cli(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let author = env_author(None, iroh).await?; let env = ConsoleEnvInner { author, @@ -278,10 +271,7 @@ impl ConsoleEnv { } } -async fn env_author>( - from_config: Option, - iroh: &Iroh, -) -> Result { +async fn env_author(from_config: Option, iroh: &Iroh) -> Result { if let Some(author) = env::var(ENV_AUTHOR) .ok() .map(|s| { diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 15404847da2..307ec354bae 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" } iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 4a126877251..6f76f876363 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -4,7 +4,7 @@ use anyhow::Result; use clap::Parser; use futures_lite::future::Boxed as BoxedFuture; use iroh::{ - client::MemIroh, + client::Iroh, net::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeId, @@ -59,7 +59,7 @@ const EXAMPLE_ALPN: &[u8] = b"example-proto/0"; #[derive(Debug, Clone)] struct ExampleProto { - client: MemIroh, + client: Iroh, endpoint: Endpoint, } @@ -89,7 +89,7 @@ impl ProtocolHandler for ExampleProto { } impl ExampleProto { - pub fn new(client: MemIroh, endpoint: Endpoint) -> Self { + pub fn new(client: Iroh, endpoint: Endpoint) -> Self { Self { client, endpoint } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index e158bfb3553..ce9647f5fd2 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -1,18 +1,23 @@ //! Client to an Iroh node. use futures_lite::{Stream, StreamExt}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; #[doc(inline)] pub use crate::rpc_protocol::RpcService; -mod mem; mod quic; -pub use self::mem::{Doc as MemDoc, Iroh as MemIroh, RpcClient as MemRpcClient}; +#[deprecated] +pub use self::docs::Doc as MemDoc; +#[deprecated] +pub use self::docs::Doc as QuicDoc; +pub use self::docs::Doc; pub use self::node::NodeStatus; -pub use self::quic::{Doc as QuicDoc, Iroh as QuicIroh, RpcClient as QuicRpcClient}; +#[deprecated] +pub use self::Iroh as MemIroh; +#[deprecated] +pub use self::Iroh as QuicIroh; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; @@ -23,38 +28,39 @@ pub mod tags; mod node; +/// Iroh rpc client - boxed so that we can have a concrete type. +pub(crate) type RpcClient = + quic_rpc::RpcClient>; + /// Iroh client. #[derive(Debug, Clone)] -pub struct Iroh { - rpc: RpcClient, +pub struct Iroh { + rpc: RpcClient, } -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Create a new high-level client to a Iroh node from the low-level RPC client. - pub fn new(rpc: RpcClient) -> Self { + pub fn new(rpc: RpcClient) -> Self { Self { rpc } } /// Blobs client - pub fn blobs(&self) -> &blobs::Client { + pub fn blobs(&self) -> &blobs::Client { blobs::Client::ref_cast(&self.rpc) } /// Docs client - pub fn docs(&self) -> &docs::Client { + pub fn docs(&self) -> &docs::Client { docs::Client::ref_cast(&self.rpc) } /// Authors client - pub fn authors(&self) -> &authors::Client { + pub fn authors(&self) -> &authors::Client { authors::Client::ref_cast(&self.rpc) } /// Tags client - pub fn tags(&self) -> &tags::Client { + pub fn tags(&self) -> &tags::Client { tags::Client::ref_cast(&self.rpc) } } diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9c..eadd3026c1a 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -3,27 +3,23 @@ use anyhow::Result; use futures_lite::{stream::StreamExt, Stream}; use iroh_docs::{Author, AuthorId}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest, - AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService, + AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, }; -use super::flatten; +use super::{flatten, RpcClient}; /// Iroh authors client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document author. /// /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 53245acd3dc..861060b751f 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -24,7 +24,7 @@ use iroh_blobs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; -use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection}; +use quic_rpc::client::BoxStreamSync; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -35,28 +35,25 @@ use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, + CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption, }; -use super::{flatten, tags, Iroh}; +use super::{flatten, tags, Iroh, RpcClient}; /// Iroh blobs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl<'a, C: ServiceConnection> From<&'a Iroh> for &'a RpcClient { - fn from(client: &'a Iroh) -> &'a RpcClient { +impl<'a> From<&'a Iroh> for &'a RpcClient { + fn from(client: &'a Iroh) -> &'a RpcClient { &client.blobs().rpc } } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -386,17 +383,14 @@ where } } - fn tags_client(&self) -> tags::Client { + fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), } } } -impl SimpleStore for Client -where - C: ServiceConnection, -{ +impl SimpleStore for Client { async fn load(&self, hash: Hash) -> anyhow::Result { self.read_to_bytes(hash).await } @@ -786,15 +780,12 @@ impl Reader { } } - pub(crate) async fn from_rpc_read>( - rpc: &RpcClient, - hash: Hash, - ) -> anyhow::Result { + pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result { Self::from_rpc_read_at(rpc, hash, 0, None).await } - async fn from_rpc_read_at>( - rpc: &RpcClient, + async fn from_rpc_read_at( + rpc: &RpcClient, hash: Hash, offset: u64, len: Option, diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index cdc15614fce..95d184a0458 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -21,7 +21,7 @@ use iroh_docs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::{message::RpcMsg, RpcClient, ServiceConnection}; +use quic_rpc::message::RpcMsg; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; @@ -36,21 +36,18 @@ use crate::rpc_protocol::{ #[doc(inline)] pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; -use super::{blobs, flatten}; +use super::{blobs, flatten, RpcClient}; /// Iroh docs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document. - pub async fn create(&self) -> Result> { + pub async fn create(&self) -> Result { let res = self.rpc.rpc(DocCreateRequest {}).await??; let doc = Doc::new(self.rpc.clone(), res.id); Ok(doc) @@ -69,14 +66,14 @@ where /// Import a document from a namespace capability. /// /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync. - pub async fn import_namespace(&self, capability: Capability) -> Result> { + pub async fn import_namespace(&self, capability: Capability) -> Result { let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); Ok(doc) } /// Import a document from a ticket and join all peers in the ticket. - pub async fn import(&self, ticket: DocTicket) -> Result> { + pub async fn import(&self, ticket: DocTicket) -> Result { let DocTicket { capability, nodes } = ticket; let doc = self.import_namespace(capability).await?; doc.start_sync(nodes).await?; @@ -92,7 +89,7 @@ where pub async fn import_and_subscribe( &self, ticket: DocTicket, - ) -> Result<(Doc, impl Stream>)> { + ) -> Result<(Doc, impl Stream>)> { let DocTicket { capability, nodes } = ticket; let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); @@ -108,7 +105,7 @@ where } /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. - pub async fn open(&self, id: NamespaceId) -> Result>> { + pub async fn open(&self, id: NamespaceId) -> Result> { self.rpc.rpc(DocOpenRequest { doc_id: id }).await??; let doc = Doc::new(self.rpc.clone(), id); Ok(Some(doc)) @@ -117,28 +114,25 @@ where /// Document handle #[derive(Debug, Clone)] -pub struct Doc>(Arc>); +pub struct Doc(Arc); -impl> PartialEq for Doc { +impl PartialEq for Doc { fn eq(&self, other: &Self) -> bool { self.0.id == other.0.id } } -impl> Eq for Doc {} +impl Eq for Doc {} #[derive(Debug)] -struct DocInner> { +struct DocInner { id: NamespaceId, - rpc: RpcClient, + rpc: RpcClient, closed: AtomicBool, rt: tokio::runtime::Handle, } -impl Drop for DocInner -where - C: ServiceConnection, -{ +impl Drop for DocInner { fn drop(&mut self) { let doc_id = self.id; let rpc = self.rpc.clone(); @@ -150,11 +144,8 @@ where } } -impl Doc -where - C: ServiceConnection, -{ - fn new(rpc: RpcClient, id: NamespaceId) -> Self { +impl Doc { + fn new(rpc: RpcClient, id: NamespaceId) -> Self { Self(Arc::new(DocInner { rpc, id, @@ -420,8 +411,8 @@ where } } -impl<'a, C: ServiceConnection> From<&'a Doc> for &'a RpcClient { - fn from(doc: &'a Doc) -> &'a RpcClient { +impl<'a> From<&'a Doc> for &'a RpcClient { + fn from(doc: &'a Doc) -> &'a RpcClient { &doc.0.rpc } } @@ -476,26 +467,14 @@ impl Entry { /// Read the content of an [`Entry`] as a streaming [`blobs::Reader`]. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_reader( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_bytes( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() diff --git a/iroh/src/client/mem.rs b/iroh/src/client/mem.rs deleted file mode 100644 index 85f65a5d93a..00000000000 --- a/iroh/src/client/mem.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Type declarations for an in-memory client to an iroh node running in the same process. -//! -//! The in-memory client is obtained directly from a running node through -//! [`crate::node::Node::client`] - -use quic_rpc::transport::flume::FlumeConnection; - -use crate::rpc_protocol::RpcService; - -/// RPC client to an iroh node running in the same process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// In-memory client to an iroh node running in the same process. -/// -/// This is obtained from [`crate::node::Node::client`]. -pub type Iroh = super::Iroh>; - -/// In-memory document client to an iroh node running in the same process. -pub type Doc = super::docs::Doc>; diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 6f8460b3764..96ce9d87c22 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -6,21 +6,17 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ CounterStats, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, RpcService, + NodeStatusRequest, }; use super::{flatten, Iroh}; -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Get statistics of the running node. pub async fn stats(&self) -> Result> { let res = self.rpc.rpc(NodeStatsRequest {}).await??; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 7f7333810f5..6351c9632d0 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -8,8 +8,9 @@ use std::{ }; use anyhow::{bail, Context}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; +use super::Iroh; use crate::{ node::RpcStatus, rpc_protocol::{NodeStatusRequest, RpcService}, @@ -20,15 +21,7 @@ use crate::{ pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; /// RPC client to an iroh node running in a separate process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// Client to an iroh node running in a separate process. -/// -/// This is obtained from [`Iroh::connect`]. -pub type Iroh = super::Iroh>; - -/// RPC document client to an iroh node running in a separate process. -pub type Doc = super::docs::Doc>; +pub type RpcClient = quic_rpc::RpcClient>; impl Iroh { /// Connect to an iroh node running on the same computer, but in a different process. @@ -51,6 +44,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); let server_name = "localhost".to_string(); let connection = QuinnConnection::::new(endpoint, addr, server_name); + let connection = BoxedConnection::new(connection); let client = RpcClient::new(connection); // Do a status request to check if the server is running. let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest)) diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 9c3ef34f127..66166a396cd 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -3,23 +3,20 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_blobs::{BlobFormat, Hash, Tag}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService}; +use super::RpcClient; +use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest}; /// Iroh tags client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// List all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22e..eeadfc17780 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -60,7 +60,7 @@ struct NodeInner { gossip: Gossip, secret_key: SecretKey, cancel_token: CancellationToken, - client: crate::client::MemIroh, + client: crate::client::Iroh, #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, @@ -133,7 +133,7 @@ impl Node { } /// Return a client to control this node over an in-memory channel. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } @@ -180,7 +180,7 @@ impl Node { } impl std::ops::Deref for Node { - type Target = crate::client::MemIroh; + type Target = crate::client::Iroh; fn deref(&self) -> &Self::Target { &self.inner.client diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98f..e720acaaf25 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -483,6 +483,9 @@ where // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); + // box the controller. Boxing has a special case for the flume channel that avoids allocations, + // so this has zero overhead. + let controller = quic_rpc::transport::boxed::Connection::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); let inner = Arc::new(NodeInner { @@ -545,7 +548,7 @@ impl> ProtocolBuilde /// # use std::sync::Arc; /// # use anyhow::Result; /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::MemIroh}; + /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::Iroh}; /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -554,7 +557,7 @@ impl> ProtocolBuilde /// /// #[derive(Debug)] /// struct MyProtocol { - /// client: MemIroh + /// client: Iroh /// } /// /// impl ProtocolHandler for MyProtocol { @@ -589,7 +592,7 @@ impl> ProtocolBuilde /// /// Note that RPC calls performed with the client will not complete until the node is /// spawned. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs index 00ad0c8ab2d..29f633eb81a 100644 --- a/iroh/src/node/rpc_status.rs +++ b/iroh/src/node/rpc_status.rs @@ -16,7 +16,7 @@ pub enum RpcStatus { /// The port we are connected on. port: u16, /// Actual connected RPC client. - client: crate::client::QuicRpcClient, + client: crate::client::RpcClient, }, } diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index a5e9b8a463d..b2ff42fedd6 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -13,7 +13,7 @@ use iroh::{ base::node_addr::AddrInfoOptions, client::{ docs::{Entry, LiveEvent, ShareMode}, - MemDoc, + Doc, }, net::key::{PublicKey, SecretKey}, node::{Builder, Node}, @@ -1012,14 +1012,14 @@ async fn test_list_docs_stream() -> Result<()> { } /// Get all entries of a document. -async fn get_all(doc: &MemDoc) -> anyhow::Result> { +async fn get_all(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.collect::>().await; entries.into_iter().collect() } /// Get all entries of a document with the blob content. -async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result> { +async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.and_then(|entry| async { let content = entry.content_bytes(doc).await; @@ -1031,7 +1031,7 @@ async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result, n: usize, cb: impl Fn(usize, usize) -> (AuthorId, String, String), @@ -1090,7 +1090,7 @@ async fn wait_for_events( } async fn assert_all_docs( - docs: &[MemDoc], + docs: &[Doc], node_ids: &[PublicKey], expected: &Vec, label: &str, @@ -1203,12 +1203,12 @@ async fn sync_drop_doc() -> Result<()> { Ok(()) } -async fn assert_latest(doc: &MemDoc, key: &[u8], value: &[u8]) { +async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { let content = get_latest(doc, key).await.unwrap(); assert_eq!(content, value.to_vec()); } -async fn get_latest(doc: &MemDoc, key: &[u8]) -> anyhow::Result> { +async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { let query = Query::single_latest_per_key().key_exact(key); let entry = doc .get_many(query)