Skip to content

Commit

Permalink
refactor(iroh): streamline client and node module api
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 3, 2024
1 parent 54ec489 commit 43d9db0
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 37 deletions.
6 changes: 3 additions & 3 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::path::{Path, PathBuf};
use anyhow::{ensure, Context, Result};
use clap::Parser;
use derive_more::FromStr;
use iroh::client::quic::Iroh as IrohRpc;
use iroh::client::QuicIroh;

use crate::config::{ConsoleEnv, NodeConfig};

Expand Down Expand Up @@ -127,7 +127,7 @@ impl Cli {
.await
} else {
crate::logging::init_terminal_logging()?;
let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?;
let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?;
console::run(&iroh, &env).await
}
}
Expand All @@ -144,7 +144,7 @@ impl Cli {
.await
} else {
crate::logging::init_terminal_logging()?;
let iroh = IrohRpc::connect(data_dir).await.context("rpc connect")?;
let iroh = QuicIroh::connect(data_dir).await.context("rpc connect")?;
command.run(&iroh, &env).await
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ mod tests {

// set up command, getting iroh node
let cli = ConsoleEnv::for_console(data_dir.path()).context("ConsoleEnv")?;
let iroh = iroh::client::quic::Iroh::connect(data_dir.path())
let iroh = iroh::client::QuicIroh::connect(data_dir.path())
.await
.context("rpc connect")?;

Expand Down
4 changes: 2 additions & 2 deletions iroh-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn run_with_command<F, T>(
command: F,
) -> Result<()>
where
F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static,
F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static,
T: Future<Output = Result<()>> + 'static,
{
let _guard = crate::logging::init_terminal_and_file_logging(&config.file_logs, iroh_data_root)?;
Expand Down Expand Up @@ -68,7 +68,7 @@ async fn run_with_command_inner<F, T>(
command: F,
) -> Result<()>
where
F: FnOnce(iroh::client::mem::Iroh) -> T + Send + 'static,
F: FnOnce(iroh::client::MemIroh) -> T + Send + 'static,
T: Future<Output = Result<()>> + 'static,
{
let relay_map = config.relay_map()?;
Expand Down
9 changes: 7 additions & 2 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ use quic_rpc::{RpcClient, ServiceConnection};
#[doc(inline)]
pub use crate::rpc_protocol::ProviderService;

pub mod mem;
pub mod quic;
mod mem;
mod quic;

pub use self::mem::{Doc as MemDoc, Iroh as MemIroh, RpcClient as MemRpcClient};
pub use self::quic::{Doc as QuicDoc, Iroh as QuicIroh, RpcClient as QuicRpcClient};

pub(crate) use self::quic::{connect_raw as quic_connect_raw, RPC_ALPN};

pub mod authors;
pub mod blobs;
Expand Down
7 changes: 4 additions & 3 deletions iroh/src/client/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ use crate::{
rpc_protocol::{NodeStatusRequest, ProviderRequest, ProviderResponse, ProviderService},
};

/// TODO: Change to "/iroh-rpc/1"
pub const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";
/// ALPN used by irohs RPC mechanism.
// TODO: Change to "/iroh-rpc/1"
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";

/// RPC client to an iroh node running in a separate process.
pub type RpcClient =
Expand Down Expand Up @@ -45,7 +46,7 @@ impl Iroh {

/// Create a raw RPC client to an iroh node running on the same computer, but in a different
/// process.
pub async fn connect_raw(rpc_port: u16) -> anyhow::Result<RpcClient> {
pub(crate) async fn connect_raw(rpc_port: u16) -> anyhow::Result<RpcClient> {
let bind_addr = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0).into();
let endpoint = create_quinn_client(bind_addr, vec![RPC_ALPN.to_vec()], false)?;
let addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), rpc_port);
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
//!
//! ## Feature Flags
//!
//! - `metrics`: Enable metrics collection
//! - `fs-store`: Enables the disk based storage backend for `iroh-bytes`.
//! - `metrics`: Enable metrics collection. Enabled by default.
//! - `fs-store`: Enables the disk based storage backend for `iroh-bytes`. Enabled by default.
//!
#![cfg_attr(docsrs, feature(doc_cfg))]
#![deny(missing_docs, rustdoc::broken_intra_doc_links)]
Expand Down
12 changes: 6 additions & 6 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ mod builder;
mod rpc;
mod rpc_status;

pub use builder::{Builder, GcPolicy, NodeDiscoveryConfig, StorageConfig};
pub use rpc_status::RpcStatus;
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;

type EventCallback = Box<dyn Fn(Event) -> BoxFuture<()> + 'static + Sync + Send>;

Expand Down Expand Up @@ -88,7 +88,7 @@ impl iroh_bytes::provider::EventSender for Callbacks {
pub struct Node<D> {
inner: Arc<NodeInner<D>>,
task: Arc<JoinHandle<()>>,
client: crate::client::mem::Iroh,
client: crate::client::MemIroh,
}

#[derive(derive_more::Debug)]
Expand Down Expand Up @@ -197,12 +197,12 @@ impl<D: BaoStore> Node<D> {
}

/// Returns a handle that can be used to do RPC calls to the node internally.
pub fn controller(&self) -> crate::client::mem::RpcClient {
pub fn controller(&self) -> crate::client::MemRpcClient {
RpcClient::new(self.inner.controller.clone())
}

/// Return a client to control this node over an in-memory channel.
pub fn client(&self) -> &crate::client::mem::Iroh {
pub fn client(&self) -> &crate::client::MemIroh {
&self.client
}

Expand Down Expand Up @@ -254,7 +254,7 @@ impl<D: BaoStore> Node<D> {
}

impl<D> std::ops::Deref for Node<D> {
type Target = crate::client::mem::Iroh;
type Target = crate::client::MemIroh;

fn deref(&self) -> &Self::Target {
&self.client
Expand Down
18 changes: 9 additions & 9 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
client::quic::RPC_ALPN,
client::RPC_ALPN,
node::{Event, NodeInner},
rpc_protocol::{ProviderRequest, ProviderResponse, ProviderService},
sync_engine::SyncEngine,
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, Callbacks, EventCallback, Node, RpcStatus};
use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};

pub const PROTOCOLS: [&[u8]; 3] = [iroh_bytes::protocol::ALPN, GOSSIP_ALPN, SYNC_ALPN];

Expand Down Expand Up @@ -84,8 +84,8 @@ where
keylog: bool,
relay_mode: RelayMode,
gc_policy: GcPolicy,
node_discovery: NodeDiscoveryConfig,
dns_resolver: Option<DnsResolver>,
node_discovery: DiscoveryConfig,
docs_store: iroh_sync::store::fs::Store,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
Expand All @@ -102,7 +102,7 @@ pub enum StorageConfig {

/// Configuration for node discovery.
#[derive(Debug, Default)]
pub enum NodeDiscoveryConfig {
pub enum DiscoveryConfig {
/// Use no node discovery mechanism.
None,
/// Use the default discovery mechanism.
Expand Down Expand Up @@ -297,9 +297,9 @@ where

/// Sets the node discovery mechanism.
///
/// The default is [`NodeDiscoveryConfig::Default`]. Use [`NodeDiscoveryConfig::Custom`] to pass a
/// The default is [`DiscoveryConfig::Default`]. Use [`DiscoveryConfig::Custom`] to pass a
/// custom [`Discovery`].
pub fn node_discovery(mut self, config: NodeDiscoveryConfig) -> Self {
pub fn node_discovery(mut self, config: DiscoveryConfig) -> Self {
self.node_discovery = config;
self
}
Expand Down Expand Up @@ -365,9 +365,9 @@ where
.max_concurrent_uni_streams(0u32.into());

let discovery: Option<Box<dyn Discovery>> = match self.node_discovery {
NodeDiscoveryConfig::None => None,
NodeDiscoveryConfig::Custom(discovery) => Some(discovery),
NodeDiscoveryConfig::Default => {
DiscoveryConfig::None => None,
DiscoveryConfig::Custom(discovery) => Some(discovery),
DiscoveryConfig::Default => {
let discovery = ConcurrentDiscovery::from_services(vec![
// Enable DNS discovery by default
Box::new(DnsDiscovery::n0_dns()),
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node/rpc_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub enum RpcStatus {
/// The port we are connected on.
port: u16,
/// Actual connected RPC client.
client: crate::client::quic::RpcClient,
client: crate::client::QuicRpcClient,
},
}

Expand All @@ -40,7 +40,7 @@ impl RpcStatus {
.await
.context("read rpc lock file")?;
let running_rpc_port = u16::from_le_bytes(buffer);
if let Ok(client) = crate::client::quic::connect_raw(running_rpc_port).await {
if let Ok(client) = crate::client::quic_connect_raw(running_rpc_port).await {
return Ok(RpcStatus::Running {
port: running_rpc_port,
client,
Expand Down
14 changes: 7 additions & 7 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use iroh::{
base::node_addr::AddrInfoOptions,
client::{
docs::{Entry, LiveEvent, ShareMode},
mem::Doc,
MemDoc,
},
net::key::{PublicKey, SecretKey},
node::{Builder, Node},
Expand Down Expand Up @@ -821,14 +821,14 @@ async fn sync_big() -> Result<()> {
}

/// Get all entries of a document.
async fn get_all(doc: &Doc) -> anyhow::Result<Vec<Entry>> {
async fn get_all(doc: &MemDoc) -> anyhow::Result<Vec<Entry>> {
let entries = doc.get_many(Query::all()).await?;
let entries = entries.collect::<Vec<_>>().await;
entries.into_iter().collect()
}

/// Get all entries of a document with the blob content.
async fn get_all_with_content(doc: &Doc) -> anyhow::Result<Vec<(Entry, Bytes)>> {
async fn get_all_with_content(doc: &MemDoc) -> anyhow::Result<Vec<(Entry, Bytes)>> {
let entries = doc.get_many(Query::all()).await?;
let entries = entries.and_then(|entry| async {
let content = entry.content_bytes(doc).await;
Expand All @@ -840,7 +840,7 @@ async fn get_all_with_content(doc: &Doc) -> anyhow::Result<Vec<(Entry, Bytes)>>
}

async fn publish(
docs: &[Doc],
docs: &[MemDoc],
expected: &mut Vec<ExpectedEntry>,
n: usize,
cb: impl Fn(usize, usize) -> (AuthorId, String, String),
Expand Down Expand Up @@ -899,7 +899,7 @@ async fn wait_for_events(
}

async fn assert_all_docs(
docs: &[Doc],
docs: &[MemDoc],
node_ids: &[PublicKey],
expected: &Vec<ExpectedEntry>,
label: &str,
Expand Down Expand Up @@ -1012,12 +1012,12 @@ async fn sync_drop_doc() -> Result<()> {
Ok(())
}

async fn assert_latest(doc: &Doc, key: &[u8], value: &[u8]) {
async fn assert_latest(doc: &MemDoc, key: &[u8], value: &[u8]) {
let content = get_latest(doc, key).await.unwrap();
assert_eq!(content, value.to_vec());
}

async fn get_latest(doc: &Doc, key: &[u8]) -> anyhow::Result<Vec<u8>> {
async fn get_latest(doc: &MemDoc, key: &[u8]) -> anyhow::Result<Vec<u8>> {
let query = Query::single_latest_per_key().key_exact(key);
let entry = doc
.get_many(query)
Expand Down

0 comments on commit 43d9db0

Please sign in to comment.