diff --git a/Cargo.lock b/Cargo.lock index 2c2ffd4b8b..0c680d19b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4041,10 +4041,11 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.10.1" +version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0d69b05325e19956f123fce85ebc4d99226552a0bb24bba4c886106297e708b" +checksum = "5e56dc58272a3f9c151b1c3a6df0e3caca083fd843b337e60f706fae2d974b6b" dependencies = [ + "anyhow", "bincode", "derive_more", "educe", diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index f08cc7b784..360bf6c76d 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -45,7 +45,7 @@ parking_lot = "0.12.1" pkarr = { version = "1.1.5", default-features = false } portable-atomic = "1" postcard = "1.0.8" -quic-rpc = { version = "0.10.0", features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] } rand = "0.8.5" ratatui = "0.26.2" reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] } diff --git a/iroh-cli/src/commands.rs b/iroh-cli/src/commands.rs index 7e3502c275..bd1bc0d18a 100644 --- a/iroh-cli/src/commands.rs +++ b/iroh-cli/src/commands.rs @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf}; use anyhow::{ensure, Context, Result}; use clap::Parser; use derive_more::FromStr; -use iroh::client::QuicIroh; +use iroh::client::Iroh; use crate::config::{ConsoleEnv, NodeConfig}; @@ -130,7 +130,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_console(data_dir_owned, &iroh).await?; console::run(&iroh, &env).await } @@ -151,7 +151,7 @@ impl Cli { .await } else { crate::logging::init_terminal_logging()?; - let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?; + let iroh = Iroh::connect(data_dir).await.context("rpc connect")?; let env = ConsoleEnv::for_cli(data_dir_owned, &iroh).await?; command.run(&iroh, &env).await } diff --git a/iroh-cli/src/commands/author.rs b/iroh-cli/src/commands/author.rs index 2ad98a48b6..8493e4fd2d 100644 --- a/iroh-cli/src/commands/author.rs +++ b/iroh-cli/src/commands/author.rs @@ -4,9 +4,8 @@ use derive_more::FromStr; use futures_lite::StreamExt; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; +use iroh::client::Iroh; use iroh::docs::{Author, AuthorId}; -use quic_rpc::ServiceConnection; use crate::config::ConsoleEnv; @@ -38,10 +37,7 @@ pub enum AuthorCommands { } impl AuthorCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { author } => { env.set_author(author)?; diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 9e2c7208c6..6e4c0aeba2 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -30,11 +30,10 @@ use iroh::{ BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, IncompleteBlobInfo, WrapOption, }, - Iroh, RpcService, + Iroh, }, net::{key::PublicKey, relay::RelayUrl, NodeAddr}, }; -use quic_rpc::ServiceConnection; use tokio::io::AsyncWriteExt; #[allow(clippy::large_enum_variant)] @@ -182,10 +181,7 @@ impl std::str::FromStr for TicketOrHash { } impl BlobCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Get { ticket, @@ -447,10 +443,7 @@ pub enum ListCommands { } impl ListCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blobs => { let mut response = iroh.blobs().list().await?; @@ -507,10 +500,7 @@ pub enum DeleteCommands { } impl DeleteCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Blob { hash } => { let response = iroh.blobs().delete_blob(hash).await; @@ -540,10 +530,7 @@ fn apply_report_level(text: String, level: ReportLevel) -> console::StyledObject } } -pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn consistency_check(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut response = iroh.blobs().consistency_check(repair).await?; let verbosity = get_report_level(verbose); let print = |level: ReportLevel, entry: Option, message: String| { @@ -584,10 +571,7 @@ where Ok(()) } -pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn validate(iroh: &Iroh, verbose: u8, repair: bool) -> Result<()> { let mut state = ValidateProgressState::new(); let mut response = iroh.blobs().validate(repair).await?; let verbosity = get_report_level(verbose); @@ -807,8 +791,8 @@ pub enum TicketOption { Print, } -pub async fn add_with_opts>( - client: &iroh::client::Iroh, +pub async fn add_with_opts( + client: &iroh::client::Iroh, source: BlobSource, opts: BlobAddOptions, ) -> Result<()> { @@ -840,8 +824,8 @@ pub async fn add_with_opts>( } /// Add data to iroh, either from a path or, if path is `None`, from STDIN. -pub async fn add>( - client: &iroh::client::Iroh, +pub async fn add( + client: &iroh::client::Iroh, source: BlobSourceIroh, tag: SetTagOption, ticket: TicketOption, diff --git a/iroh-cli/src/commands/console.rs b/iroh-cli/src/commands/console.rs index df10126d75..5f2d8bc383 100644 --- a/iroh-cli/src/commands/console.rs +++ b/iroh-cli/src/commands/console.rs @@ -2,8 +2,7 @@ use anyhow::Result; use clap::{Parser, Subcommand}; use colored::Colorize; use iroh::base::base32::fmt_short; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use rustyline::{error::ReadlineError, Config, DefaultEditor}; use tokio::sync::{mpsc, oneshot}; @@ -12,10 +11,7 @@ use crate::{ config::{ConsoleEnv, ConsolePaths}, }; -pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> -where - C: ServiceConnection, -{ +pub async fn run(iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { println!("{}", "Welcome to the Iroh console!".purple().bold()); println!("Type `{}` for a list of commands.", "help".bold()); let mut from_repl = Repl::spawn(env.clone()); diff --git a/iroh-cli/src/commands/doc.rs b/iroh-cli/src/commands/doc.rs index b2a13b3596..d26f785912 100644 --- a/iroh-cli/src/commands/doc.rs +++ b/iroh-cli/src/commands/doc.rs @@ -13,7 +13,6 @@ use dialoguer::Confirm; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle}; -use quic_rpc::ServiceConnection; use tokio::io::AsyncReadExt; use iroh::{ @@ -22,7 +21,7 @@ use iroh::{ client::{ blobs::WrapOption, docs::{Doc, Entry, LiveEvent, Origin, ShareMode}, - Iroh, RpcService, + Iroh, }, docs::{ store::{DownloadPolicy, FilterKind, Query, SortDirection}, @@ -303,10 +302,7 @@ impl From for iroh::docs::store::SortBy { } impl DocCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Switch { id: doc } => { env.set_doc(doc)?; @@ -673,14 +669,7 @@ impl DocCommands { } } -async fn get_doc( - iroh: &Iroh, - env: &ConsoleEnv, - id: Option, -) -> anyhow::Result> -where - C: ServiceConnection, -{ +async fn get_doc(iroh: &Iroh, env: &ConsoleEnv, id: Option) -> anyhow::Result { iroh.docs() .open(env.doc(id)?) .await? @@ -688,14 +677,7 @@ where } /// Format the content. If an error occurs it's returned in a formatted, friendly way. -async fn fmt_content( - doc: &Doc, - entry: &Entry, - mode: DisplayContentMode, -) -> Result -where - C: ServiceConnection, -{ +async fn fmt_content(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> Result { let read_failed = |err: anyhow::Error| format!(""); let encode_hex = |err: std::string::FromUtf8Error| format!("0x{}", hex::encode(err.as_bytes())); let as_utf8 = |buf: Vec| String::from_utf8(buf).map(|repr| format!("\"{repr}\"")); @@ -743,10 +725,7 @@ fn human_len(entry: &Entry) -> HumanBytes { } #[must_use = "this won't be printed, you need to print it yourself"] -async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String -where - C: ServiceConnection, -{ +async fn fmt_entry(doc: &Doc, entry: &Entry, mode: DisplayContentMode) -> String { let key = std::str::from_utf8(entry.key()) .unwrap_or("") .bold(); @@ -776,18 +755,15 @@ fn tag_from_file_name(path: &Path) -> anyhow::Result { /// document via the hash of the blob. /// It also creates and powers the `ImportProgressBar`. #[tracing::instrument(skip_all)] -async fn import_coordinator( - doc: Doc, +async fn import_coordinator( + doc: Doc, author_id: AuthorId, root: PathBuf, prefix: String, blob_add_progress: impl Stream> + Send + Unpin + 'static, expected_size: u64, expected_entries: u64, -) -> Result<()> -where - C: ServiceConnection, -{ +) -> Result<()> { let imp = ImportProgressBar::new( &root.display().to_string(), doc.id(), @@ -982,7 +958,7 @@ mod tests { let cli = ConsoleEnv::for_console(data_dir.path().to_owned(), &node) .await .context("ConsoleEnv")?; - let iroh = iroh::client::QuicIroh::connect(data_dir.path()) + let iroh = iroh::client::Iroh::connect(data_dir.path()) .await .context("rpc connect")?; diff --git a/iroh-cli/src/commands/node.rs b/iroh-cli/src/commands/node.rs index 4d2b8ad1bf..b85422069e 100644 --- a/iroh-cli/src/commands/node.rs +++ b/iroh-cli/src/commands/node.rs @@ -8,12 +8,10 @@ use comfy_table::{presets::NOTHING, Cell}; use futures_lite::{Stream, StreamExt}; use human_time::ToHumanTimeString; use iroh::client::Iroh; -use iroh::client::RpcService; use iroh::net::{ endpoint::{ConnectionInfo, DirectAddrInfo}, key::PublicKey, }; -use quic_rpc::ServiceConnection; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -38,10 +36,7 @@ pub enum NodeCommands { } impl NodeCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::Connections => { let connections = iroh.connections().await?; diff --git a/iroh-cli/src/commands/rpc.rs b/iroh-cli/src/commands/rpc.rs index 1d26121681..414a894ddb 100644 --- a/iroh-cli/src/commands/rpc.rs +++ b/iroh-cli/src/commands/rpc.rs @@ -1,7 +1,6 @@ use anyhow::Result; use clap::Subcommand; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; use crate::config::ConsoleEnv; @@ -58,10 +57,7 @@ pub enum RpcCommands { } impl RpcCommands { - pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh, env: &ConsoleEnv) -> Result<()> { match self { Self::Node { command } => command.run(iroh).await, Self::Blob { command } => command.run(iroh).await, diff --git a/iroh-cli/src/commands/start.rs b/iroh-cli/src/commands/start.rs index ec22c26de8..694e484424 100644 --- a/iroh-cli/src/commands/start.rs +++ b/iroh-cli/src/commands/start.rs @@ -33,7 +33,7 @@ pub async fn run_with_command( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let _guard = crate::logging::init_terminal_and_file_logging(&config.file_logs, iroh_data_root)?; @@ -68,7 +68,7 @@ async fn run_with_command_inner( command: F, ) -> Result<()> where - F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static, + F: FnOnce(iroh::client::Iroh) -> T + Send + 'static, T: Future> + 'static, { let relay_map = config.relay_map()?; diff --git a/iroh-cli/src/commands/tag.rs b/iroh-cli/src/commands/tag.rs index 42c228266b..fa4a298429 100644 --- a/iroh-cli/src/commands/tag.rs +++ b/iroh-cli/src/commands/tag.rs @@ -3,8 +3,7 @@ use bytes::Bytes; use clap::Subcommand; use futures_lite::StreamExt; use iroh::blobs::Tag; -use iroh::client::{Iroh, RpcService}; -use quic_rpc::ServiceConnection; +use iroh::client::Iroh; #[derive(Subcommand, Debug, Clone)] #[allow(clippy::large_enum_variant)] @@ -20,10 +19,7 @@ pub enum TagCommands { } impl TagCommands { - pub async fn run(self, iroh: &Iroh) -> Result<()> - where - C: ServiceConnection, - { + pub async fn run(self, iroh: &Iroh) -> Result<()> { match self { Self::List => { let mut response = iroh.tags().list().await?; diff --git a/iroh-cli/src/config.rs b/iroh-cli/src/config.rs index 249b91af10..e7b7d1ac08 100644 --- a/iroh-cli/src/config.rs +++ b/iroh-cli/src/config.rs @@ -15,11 +15,10 @@ use iroh::net::{ }; use iroh::node::GcPolicy; use iroh::{ - client::{Iroh, RpcService}, + client::Iroh, docs::{AuthorId, NamespaceId}, }; use parking_lot::RwLock; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use tracing::warn; @@ -133,10 +132,7 @@ struct ConsoleEnvInner { impl ConsoleEnv { /// Read from environment variables and the console config file. - pub(crate) async fn for_console>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_console(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let console_data_dir = ConsolePaths::root(&iroh_data_dir); tokio::fs::create_dir_all(&console_data_dir) .await @@ -161,10 +157,7 @@ impl ConsoleEnv { } /// Read only from environment variables. - pub(crate) async fn for_cli>( - iroh_data_dir: PathBuf, - iroh: &Iroh, - ) -> Result { + pub(crate) async fn for_cli(iroh_data_dir: PathBuf, iroh: &Iroh) -> Result { let author = env_author(None, iroh).await?; let env = ConsoleEnvInner { author, @@ -278,10 +271,7 @@ impl ConsoleEnv { } } -async fn env_author>( - from_config: Option, - iroh: &Iroh, -) -> Result { +async fn env_author(from_config: Option, iroh: &Iroh) -> Result { if let Some(author) = env::var(ENV_AUTHOR) .ok() .map(|s| { diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 15404847da..307ec354ba 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" } iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" } parking_lot = "0.12.1" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.10.0", default-features = false, features = ["flume-transport", "quinn-transport"] } +quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] } quinn = { package = "iroh-quinn", version = "0.10" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs index 4a12687725..6f76f87636 100644 --- a/iroh/examples/custom-protocol.rs +++ b/iroh/examples/custom-protocol.rs @@ -4,7 +4,7 @@ use anyhow::Result; use clap::Parser; use futures_lite::future::Boxed as BoxedFuture; use iroh::{ - client::MemIroh, + client::Iroh, net::{ endpoint::{get_remote_node_id, Connecting}, Endpoint, NodeId, @@ -59,7 +59,7 @@ const EXAMPLE_ALPN: &[u8] = b"example-proto/0"; #[derive(Debug, Clone)] struct ExampleProto { - client: MemIroh, + client: Iroh, endpoint: Endpoint, } @@ -89,7 +89,7 @@ impl ProtocolHandler for ExampleProto { } impl ExampleProto { - pub fn new(client: MemIroh, endpoint: Endpoint) -> Self { + pub fn new(client: Iroh, endpoint: Endpoint) -> Self { Self { client, endpoint } } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index e158bfb355..ce9647f5fd 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -1,18 +1,23 @@ //! Client to an Iroh node. use futures_lite::{Stream, StreamExt}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; #[doc(inline)] pub use crate::rpc_protocol::RpcService; -mod mem; mod quic; -pub use self::mem::{Doc as MemDoc, Iroh as MemIroh, RpcClient as MemRpcClient}; +#[deprecated] +pub use self::docs::Doc as MemDoc; +#[deprecated] +pub use self::docs::Doc as QuicDoc; +pub use self::docs::Doc; pub use self::node::NodeStatus; -pub use self::quic::{Doc as QuicDoc, Iroh as QuicIroh, RpcClient as QuicRpcClient}; +#[deprecated] +pub use self::Iroh as MemIroh; +#[deprecated] +pub use self::Iroh as QuicIroh; pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN}; @@ -23,38 +28,39 @@ pub mod tags; mod node; +/// Iroh rpc client - boxed so that we can have a concrete type. +pub(crate) type RpcClient = + quic_rpc::RpcClient>; + /// Iroh client. #[derive(Debug, Clone)] -pub struct Iroh { - rpc: RpcClient, +pub struct Iroh { + rpc: RpcClient, } -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Create a new high-level client to a Iroh node from the low-level RPC client. - pub fn new(rpc: RpcClient) -> Self { + pub fn new(rpc: RpcClient) -> Self { Self { rpc } } /// Blobs client - pub fn blobs(&self) -> &blobs::Client { + pub fn blobs(&self) -> &blobs::Client { blobs::Client::ref_cast(&self.rpc) } /// Docs client - pub fn docs(&self) -> &docs::Client { + pub fn docs(&self) -> &docs::Client { docs::Client::ref_cast(&self.rpc) } /// Authors client - pub fn authors(&self) -> &authors::Client { + pub fn authors(&self) -> &authors::Client { authors::Client::ref_cast(&self.rpc) } /// Tags client - pub fn tags(&self) -> &tags::Client { + pub fn tags(&self) -> &tags::Client { tags::Client::ref_cast(&self.rpc) } } diff --git a/iroh/src/client/authors.rs b/iroh/src/client/authors.rs index bf642fc3d9..eadd3026c1 100644 --- a/iroh/src/client/authors.rs +++ b/iroh/src/client/authors.rs @@ -3,27 +3,23 @@ use anyhow::Result; use futures_lite::{stream::StreamExt, Stream}; use iroh_docs::{Author, AuthorId}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use crate::rpc_protocol::{ AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest, - AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService, + AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, }; -use super::flatten; +use super::{flatten, RpcClient}; /// Iroh authors client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document author. /// /// You likely want to save the returned [`AuthorId`] somewhere so that you can use this author diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 53245acd3d..861060b751 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -24,7 +24,7 @@ use iroh_blobs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicU64, Ordering}; -use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection}; +use quic_rpc::client::BoxStreamSync; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; @@ -35,28 +35,25 @@ use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, - CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, + CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, SetTagOption, }; -use super::{flatten, tags, Iroh}; +use super::{flatten, tags, Iroh, RpcClient}; /// Iroh blobs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl<'a, C: ServiceConnection> From<&'a Iroh> for &'a RpcClient { - fn from(client: &'a Iroh) -> &'a RpcClient { +impl<'a> From<&'a Iroh> for &'a RpcClient { + fn from(client: &'a Iroh) -> &'a RpcClient { &client.blobs().rpc } } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Stream the contents of a a single blob. /// /// Returns a [`Reader`], which can report the size of the blob before reading it. @@ -386,17 +383,14 @@ where } } - fn tags_client(&self) -> tags::Client { + fn tags_client(&self) -> tags::Client { tags::Client { rpc: self.rpc.clone(), } } } -impl SimpleStore for Client -where - C: ServiceConnection, -{ +impl SimpleStore for Client { async fn load(&self, hash: Hash) -> anyhow::Result { self.read_to_bytes(hash).await } @@ -786,15 +780,12 @@ impl Reader { } } - pub(crate) async fn from_rpc_read>( - rpc: &RpcClient, - hash: Hash, - ) -> anyhow::Result { + pub(crate) async fn from_rpc_read(rpc: &RpcClient, hash: Hash) -> anyhow::Result { Self::from_rpc_read_at(rpc, hash, 0, None).await } - async fn from_rpc_read_at>( - rpc: &RpcClient, + async fn from_rpc_read_at( + rpc: &RpcClient, hash: Hash, offset: u64, len: Option, diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index cdc15614fc..95d184a045 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -21,7 +21,7 @@ use iroh_docs::{ }; use iroh_net::NodeAddr; use portable_atomic::{AtomicBool, Ordering}; -use quic_rpc::{message::RpcMsg, RpcClient, ServiceConnection}; +use quic_rpc::message::RpcMsg; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; @@ -36,21 +36,18 @@ use crate::rpc_protocol::{ #[doc(inline)] pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; -use super::{blobs, flatten}; +use super::{blobs, flatten, RpcClient}; /// Iroh docs client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// Create a new document. - pub async fn create(&self) -> Result> { + pub async fn create(&self) -> Result { let res = self.rpc.rpc(DocCreateRequest {}).await??; let doc = Doc::new(self.rpc.clone(), res.id); Ok(doc) @@ -69,14 +66,14 @@ where /// Import a document from a namespace capability. /// /// This does not start sync automatically. Use [`Doc::start_sync`] to start sync. - pub async fn import_namespace(&self, capability: Capability) -> Result> { + pub async fn import_namespace(&self, capability: Capability) -> Result { let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); Ok(doc) } /// Import a document from a ticket and join all peers in the ticket. - pub async fn import(&self, ticket: DocTicket) -> Result> { + pub async fn import(&self, ticket: DocTicket) -> Result { let DocTicket { capability, nodes } = ticket; let doc = self.import_namespace(capability).await?; doc.start_sync(nodes).await?; @@ -92,7 +89,7 @@ where pub async fn import_and_subscribe( &self, ticket: DocTicket, - ) -> Result<(Doc, impl Stream>)> { + ) -> Result<(Doc, impl Stream>)> { let DocTicket { capability, nodes } = ticket; let res = self.rpc.rpc(DocImportRequest { capability }).await??; let doc = Doc::new(self.rpc.clone(), res.doc_id); @@ -108,7 +105,7 @@ where } /// Get a [`Doc`] client for a single document. Return None if the document cannot be found. - pub async fn open(&self, id: NamespaceId) -> Result>> { + pub async fn open(&self, id: NamespaceId) -> Result> { self.rpc.rpc(DocOpenRequest { doc_id: id }).await??; let doc = Doc::new(self.rpc.clone(), id); Ok(Some(doc)) @@ -117,28 +114,25 @@ where /// Document handle #[derive(Debug, Clone)] -pub struct Doc>(Arc>); +pub struct Doc(Arc); -impl> PartialEq for Doc { +impl PartialEq for Doc { fn eq(&self, other: &Self) -> bool { self.0.id == other.0.id } } -impl> Eq for Doc {} +impl Eq for Doc {} #[derive(Debug)] -struct DocInner> { +struct DocInner { id: NamespaceId, - rpc: RpcClient, + rpc: RpcClient, closed: AtomicBool, rt: tokio::runtime::Handle, } -impl Drop for DocInner -where - C: ServiceConnection, -{ +impl Drop for DocInner { fn drop(&mut self) { let doc_id = self.id; let rpc = self.rpc.clone(); @@ -150,11 +144,8 @@ where } } -impl Doc -where - C: ServiceConnection, -{ - fn new(rpc: RpcClient, id: NamespaceId) -> Self { +impl Doc { + fn new(rpc: RpcClient, id: NamespaceId) -> Self { Self(Arc::new(DocInner { rpc, id, @@ -420,8 +411,8 @@ where } } -impl<'a, C: ServiceConnection> From<&'a Doc> for &'a RpcClient { - fn from(doc: &'a Doc) -> &'a RpcClient { +impl<'a> From<&'a Doc> for &'a RpcClient { + fn from(doc: &'a Doc) -> &'a RpcClient { &doc.0.rpc } } @@ -476,26 +467,14 @@ impl Entry { /// Read the content of an [`Entry`] as a streaming [`blobs::Reader`]. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_reader( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_reader(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. - pub async fn content_bytes( - &self, - client: impl Into<&RpcClient>, - ) -> Result - where - C: ServiceConnection, - { + pub async fn content_bytes(&self, client: impl Into<&RpcClient>) -> Result { blobs::Reader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() diff --git a/iroh/src/client/mem.rs b/iroh/src/client/mem.rs deleted file mode 100644 index 85f65a5d93..0000000000 --- a/iroh/src/client/mem.rs +++ /dev/null @@ -1,19 +0,0 @@ -//! Type declarations for an in-memory client to an iroh node running in the same process. -//! -//! The in-memory client is obtained directly from a running node through -//! [`crate::node::Node::client`] - -use quic_rpc::transport::flume::FlumeConnection; - -use crate::rpc_protocol::RpcService; - -/// RPC client to an iroh node running in the same process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// In-memory client to an iroh node running in the same process. -/// -/// This is obtained from [`crate::node::Node::client`]. -pub type Iroh = super::Iroh>; - -/// In-memory document client to an iroh node running in the same process. -pub type Doc = super::docs::Doc>; diff --git a/iroh/src/client/node.rs b/iroh/src/client/node.rs index 6f8460b376..96ce9d87c2 100644 --- a/iroh/src/client/node.rs +++ b/iroh/src/client/node.rs @@ -6,21 +6,17 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_base::key::PublicKey; use iroh_net::{endpoint::ConnectionInfo, relay::RelayUrl, NodeAddr, NodeId}; -use quic_rpc::ServiceConnection; use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ CounterStats, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest, NodeIdRequest, NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, - NodeStatusRequest, RpcService, + NodeStatusRequest, }; use super::{flatten, Iroh}; -impl Iroh -where - C: ServiceConnection, -{ +impl Iroh { /// Get statistics of the running node. pub async fn stats(&self) -> Result> { let res = self.rpc.rpc(NodeStatsRequest {}).await??; diff --git a/iroh/src/client/quic.rs b/iroh/src/client/quic.rs index 7f7333810f..6351c9632d 100644 --- a/iroh/src/client/quic.rs +++ b/iroh/src/client/quic.rs @@ -8,8 +8,9 @@ use std::{ }; use anyhow::{bail, Context}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection}; +use super::Iroh; use crate::{ node::RpcStatus, rpc_protocol::{NodeStatusRequest, RpcService}, @@ -20,15 +21,7 @@ use crate::{ pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1"; /// RPC client to an iroh node running in a separate process. -pub type RpcClient = quic_rpc::RpcClient>; - -/// Client to an iroh node running in a separate process. -/// -/// This is obtained from [`Iroh::connect`]. -pub type Iroh = super::Iroh>; - -/// RPC document client to an iroh node running in a separate process. -pub type Doc = super::docs::Doc>; +pub type RpcClient = quic_rpc::RpcClient>; impl Iroh { /// Connect to an iroh node running on the same computer, but in a different process. @@ -51,6 +44,7 @@ pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result { let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port); let server_name = "localhost".to_string(); let connection = QuinnConnection::::new(endpoint, addr, server_name); + let connection = BoxedConnection::new(connection); let client = RpcClient::new(connection); // Do a status request to check if the server is running. let _version = tokio::time::timeout(Duration::from_secs(1), client.rpc(NodeStatusRequest)) diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index 9c3ef34f12..66166a396c 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -3,23 +3,20 @@ use anyhow::Result; use futures_lite::{Stream, StreamExt}; use iroh_blobs::{BlobFormat, Hash, Tag}; -use quic_rpc::{RpcClient, ServiceConnection}; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; -use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest, RpcService}; +use super::RpcClient; +use crate::rpc_protocol::{DeleteTagRequest, ListTagsRequest}; /// Iroh tags client. #[derive(Debug, Clone, RefCast)] #[repr(transparent)] -pub struct Client { - pub(super) rpc: RpcClient, +pub struct Client { + pub(super) rpc: RpcClient, } -impl Client -where - C: ServiceConnection, -{ +impl Client { /// List all tags. pub async fn list(&self) -> Result>> { let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22..eeadfc1778 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -60,7 +60,7 @@ struct NodeInner { gossip: Gossip, secret_key: SecretKey, cancel_token: CancellationToken, - client: crate::client::MemIroh, + client: crate::client::Iroh, #[debug("rt")] rt: LocalPoolHandle, downloader: Downloader, @@ -133,7 +133,7 @@ impl Node { } /// Return a client to control this node over an in-memory channel. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } @@ -180,7 +180,7 @@ impl Node { } impl std::ops::Deref for Node { - type Target = crate::client::MemIroh; + type Target = crate::client::Iroh; fn deref(&self) -> &Self::Target { &self.inner.client diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index f64297da98..e720acaaf2 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -483,6 +483,9 @@ where // Initialize the internal RPC connection. let (internal_rpc, controller) = quic_rpc::transport::flume::connection(1); + // box the controller. Boxing has a special case for the flume channel that avoids allocations, + // so this has zero overhead. + let controller = quic_rpc::transport::boxed::Connection::new(controller); let client = crate::client::Iroh::new(quic_rpc::RpcClient::new(controller.clone())); let inner = Arc::new(NodeInner { @@ -545,7 +548,7 @@ impl> ProtocolBuilde /// # use std::sync::Arc; /// # use anyhow::Result; /// # use futures_lite::future::Boxed as BoxedFuture; - /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::MemIroh}; + /// # use iroh::{node::{Node, ProtocolHandler}, net::endpoint::Connecting, client::Iroh}; /// # /// # #[tokio::main] /// # async fn main() -> Result<()> { @@ -554,7 +557,7 @@ impl> ProtocolBuilde /// /// #[derive(Debug)] /// struct MyProtocol { - /// client: MemIroh + /// client: Iroh /// } /// /// impl ProtocolHandler for MyProtocol { @@ -589,7 +592,7 @@ impl> ProtocolBuilde /// /// Note that RPC calls performed with the client will not complete until the node is /// spawned. - pub fn client(&self) -> &crate::client::MemIroh { + pub fn client(&self) -> &crate::client::Iroh { &self.inner.client } diff --git a/iroh/src/node/rpc_status.rs b/iroh/src/node/rpc_status.rs index 00ad0c8ab2..29f633eb81 100644 --- a/iroh/src/node/rpc_status.rs +++ b/iroh/src/node/rpc_status.rs @@ -16,7 +16,7 @@ pub enum RpcStatus { /// The port we are connected on. port: u16, /// Actual connected RPC client. - client: crate::client::QuicRpcClient, + client: crate::client::RpcClient, }, } diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index a5e9b8a463..b2ff42fedd 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -13,7 +13,7 @@ use iroh::{ base::node_addr::AddrInfoOptions, client::{ docs::{Entry, LiveEvent, ShareMode}, - MemDoc, + Doc, }, net::key::{PublicKey, SecretKey}, node::{Builder, Node}, @@ -1012,14 +1012,14 @@ async fn test_list_docs_stream() -> Result<()> { } /// Get all entries of a document. -async fn get_all(doc: &MemDoc) -> anyhow::Result> { +async fn get_all(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.collect::>().await; entries.into_iter().collect() } /// Get all entries of a document with the blob content. -async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result> { +async fn get_all_with_content(doc: &Doc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?; let entries = entries.and_then(|entry| async { let content = entry.content_bytes(doc).await; @@ -1031,7 +1031,7 @@ async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result, n: usize, cb: impl Fn(usize, usize) -> (AuthorId, String, String), @@ -1090,7 +1090,7 @@ async fn wait_for_events( } async fn assert_all_docs( - docs: &[MemDoc], + docs: &[Doc], node_ids: &[PublicKey], expected: &Vec, label: &str, @@ -1203,12 +1203,12 @@ async fn sync_drop_doc() -> Result<()> { Ok(()) } -async fn assert_latest(doc: &MemDoc, key: &[u8], value: &[u8]) { +async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) { let content = get_latest(doc, key).await.unwrap(); assert_eq!(content, value.to_vec()); } -async fn get_latest(doc: &MemDoc, key: &[u8]) -> anyhow::Result> { +async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result> { let query = Query::single_latest_per_key().key_exact(key); let entry = doc .get_many(query)