Skip to content

Commit

Permalink
refactor(iroh)!: Eliminate the type parameter for the rpc service typ…
Browse files Browse the repository at this point in the history
…e in the builder

needs new minor version of quic-rpc to make boxed(...) public
  • Loading branch information
rklaehn committed Jun 25, 2024
1 parent 38e8ce0 commit 9faf9be
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 28 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
portable-atomic = "1"
postcard = "1.0.8"
quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"] }
quic-rpc = { version = "0.10.2", features = ["flume-transport", "quinn-transport"], git = "https://github.com/n0-computer/quic-rpc", branch = "boxed-futures-public-constructor" }
rand = "0.8.5"
ratatui = "0.26.2"
reqwest = { version = "0.12.4", default-features = false, features = ["json", "rustls-tls"] }
Expand Down
2 changes: 1 addition & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ iroh-docs = { version = "0.18.0", path = "../iroh-docs" }
iroh-gossip = { version = "0.18.0", path = "../iroh-gossip" }
parking_lot = "0.12.1"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"] }
quic-rpc = { version = "0.10.2", default-features = false, features = ["flume-transport", "quinn-transport"], git = "https://github.com/n0-computer/quic-rpc", branch = "boxed-futures-public-constructor" }
quinn = { package = "iroh-quinn", version = "0.10" }
rand = "0.8"
serde = { version = "1", features = ["derive"] }
Expand Down
83 changes: 63 additions & 20 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ use iroh_net::{
relay::RelayMode,
Endpoint,
};
use quic_rpc::{
transport::{
flume::FlumeServerEndpoint, misc::DummyServerEndpoint, quinn::QuinnServerEndpoint,
},
ServiceEndpoint,
use quic_rpc::transport::{
boxed::BoxableServerEndpoint, flume::FlumeServerEndpoint, quinn::QuinnServerEndpoint,
};
use serde::{Deserialize, Serialize};
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
Expand Down Expand Up @@ -81,15 +78,17 @@ pub enum DocsStorage {
/// The returned [`Node`] is awaitable to know when it finishes. It can be terminated
/// using [`Node::shutdown`].
#[derive(derive_more::Debug)]
pub struct Builder<D, E = DummyServerEndpoint>
pub struct Builder<D>
where
D: Map,
E: ServiceEndpoint<RpcService>,
{
storage: StorageConfig,
bind_port: Option<u16>,
secret_key: SecretKey,
rpc_endpoint: E,
rpc_endpoint: quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>,
blobs_store: D,
keylog: bool,
relay_mode: RelayMode,
Expand Down Expand Up @@ -145,6 +144,40 @@ impl From<Box<ConcurrentDiscovery>> for DiscoveryConfig {
}
}

#[derive(Debug, Default)]
pub struct DummyServerEndpoint;

impl BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Response>
for DummyServerEndpoint
{
fn clone_box(
&self,
) -> Box<dyn BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Response>>
{
Box::new(DummyServerEndpoint)
}

fn accept_bi_boxed(
&self,
) -> quic_rpc::transport::boxed::AcceptFuture<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
> {
quic_rpc::transport::boxed::AcceptFuture::boxed(futures_lite::future::pending())
}

fn local_addr(&self) -> &[quic_rpc::transport::LocalAddr] {
&[]
}
}

fn mk_external_rpc() -> quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
> {
quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint::default())
}

impl Default for Builder<iroh_blobs::store::mem::Store> {
fn default() -> Self {
Self {
Expand All @@ -155,7 +188,7 @@ impl Default for Builder<iroh_blobs::store::mem::Store> {
keylog: false,
relay_mode: RelayMode::Default,
dns_resolver: None,
rpc_endpoint: Default::default(),
rpc_endpoint: mk_external_rpc(),
gc_policy: GcPolicy::Disabled,
docs_storage: DocsStorage::Memory,
node_discovery: Default::default(),
Expand All @@ -181,7 +214,7 @@ impl<D: Map> Builder<D> {
keylog: false,
relay_mode: RelayMode::Default,
dns_resolver: None,
rpc_endpoint: Default::default(),
rpc_endpoint: mk_external_rpc(),
gc_policy: GcPolicy::Disabled,
docs_storage,
node_discovery: Default::default(),
Expand All @@ -192,16 +225,15 @@ impl<D: Map> Builder<D> {
}
}

impl<D, E> Builder<D, E>
impl<D> Builder<D>
where
D: BaoStore,
E: ServiceEndpoint<RpcService>,
{
/// Persist all node data in the provided directory.
pub async fn persist(
self,
root: impl AsRef<Path>,
) -> Result<Builder<iroh_blobs::store::fs::Store, E>> {
) -> Result<Builder<iroh_blobs::store::fs::Store>> {
let root = root.as_ref();
let blob_dir = IrohPaths::BaoStoreDir.with_root(root);

Expand Down Expand Up @@ -256,7 +288,13 @@ where
}

/// Configure rpc endpoint, changing the type of the builder to the new endpoint type.
pub fn rpc_endpoint<E2: ServiceEndpoint<RpcService>>(self, value: E2) -> Builder<D, E2> {
pub fn rpc_endpoint(
self,
value: quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>,
) -> Builder<D> {
// we can't use ..self here because the return type is different
Builder {
storage: self.storage,
Expand All @@ -277,8 +315,9 @@ where
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc(self) -> Result<Builder<D, QuinnServerEndpoint<RpcService>>> {
pub async fn enable_rpc(self) -> Result<Builder<D>> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
let ep = quic_rpc::transport::boxed::ServerEndpoint::new(ep);
if let StorageConfig::Persistent(ref root) = self.storage {
// store rpc endpoint
RpcStatus::store(root, actual_rpc_port).await?;
Expand Down Expand Up @@ -406,7 +445,7 @@ where
///
/// Returns an [`ProtocolBuilder`], on which custom protocols can be registered with
/// [`ProtocolBuilder::accept`]. To spawn the node, call [`ProtocolBuilder::spawn`].
pub async fn build(self) -> Result<ProtocolBuilder<D, E>> {
pub async fn build(self) -> Result<ProtocolBuilder<D>> {
// Clone the blob store to shutdown in case of error.
let blobs_store = self.blobs_store.clone();
match self.build_inner().await {
Expand All @@ -418,7 +457,7 @@ where
}
}

async fn build_inner(self) -> Result<ProtocolBuilder<D, E>> {
async fn build_inner(self) -> Result<ProtocolBuilder<D>> {
trace!("building node");
let lp = LocalPoolHandle::new(num_cpus::get());
let endpoint = {
Expand Down Expand Up @@ -536,17 +575,21 @@ where
/// Note that RPC calls performed with client returned from [`Self::client`] will not complete
/// until the node is spawned.
#[derive(derive_more::Debug)]
pub struct ProtocolBuilder<D, E> {
pub struct ProtocolBuilder<D> {
inner: Arc<NodeInner<D>>,
internal_rpc: FlumeServerEndpoint<RpcService>,
external_rpc: E,
#[debug("external rpc")]
external_rpc: quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>,
protocols: ProtocolMap,
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
gc_policy: GcPolicy,
}

impl<D: iroh_blobs::store::Store, E: ServiceEndpoint<RpcService>> ProtocolBuilder<D, E> {
impl<D: iroh_blobs::store::Store> ProtocolBuilder<D> {
/// Registers a protocol handler for incoming connections.
///
/// Use this to register custom protocols onto the iroh node. Whenever a new connection for
Expand Down
3 changes: 1 addition & 2 deletions iroh/tests/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use futures_lite::FutureExt;
use iroh::node::{Builder, DocsStorage};
use iroh_base::node_addr::AddrInfoOptions;
use iroh_net::{defaults::default_relay_map, key::SecretKey, NodeAddr, NodeId};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::RngCore;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
Expand Down Expand Up @@ -39,7 +38,7 @@ async fn dial(secret_key: SecretKey, peer: NodeAddr) -> anyhow::Result<quinn::Co
.context("failed to connect to provider")
}

fn test_node<D: Store>(db: D) -> Builder<D, DummyServerEndpoint> {
fn test_node<D: Store>(db: D) -> Builder<D> {
iroh::node::Builder::with_db_and_store(db, DocsStorage::Memory, iroh::node::StorageConfig::Mem)
.bind_port(0)
}
Expand Down
3 changes: 1 addition & 2 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use iroh::{
net::key::{PublicKey, SecretKey},
node::{Builder, Node},
};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::{CryptoRng, Rng, SeedableRng};
use tracing::{debug, error_span, info, Instrument};
use tracing_subscriber::{prelude::*, EnvFilter};
Expand All @@ -32,7 +31,7 @@ use iroh_net::relay::RelayMode;

const TIMEOUT: Duration = Duration::from_secs(60);

fn test_node(secret_key: SecretKey) -> Builder<iroh_blobs::store::mem::Store, DummyServerEndpoint> {
fn test_node(secret_key: SecretKey) -> Builder<iroh_blobs::store::mem::Store> {
Node::memory()
.secret_key(secret_key)
.relay_mode(RelayMode::Disabled)
Expand Down

0 comments on commit 9faf9be

Please sign in to comment.