Skip to content

Commit

Permalink
Get rid of the service type parameter in the rpc handler
Browse files Browse the repository at this point in the history
we can now box this with acceptable cost.
  • Loading branch information
rklaehn committed Jul 5, 2024
1 parent c85fca4 commit 409d720
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 53 deletions.
5 changes: 1 addition & 4 deletions iroh/src/client/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use anyhow::{bail, Context};
use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection};

use super::Iroh;
use super::{Iroh, RpcClient};
use crate::{
node::RpcStatus,
rpc_protocol::{node::StatusRequest, RpcService},
Expand All @@ -20,9 +20,6 @@ use crate::{
// TODO: Change to "/iroh-rpc/1"
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";

/// RPC client to an iroh node running in a separate process.
pub type RpcClient = quic_rpc::RpcClient<RpcService, BoxedConnection<RpcService>>;

impl Iroh {
/// Connect to an iroh node running on the same computer, but in a different process.
pub async fn connect_path(root: impl AsRef<Path>) -> anyhow::Result<Self> {
Expand Down
17 changes: 10 additions & 7 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,14 @@ use iroh_gossip::net::Gossip;
use iroh_net::key::SecretKey;
use iroh_net::Endpoint;
use iroh_net::{endpoint::DirectAddrsStream, util::SharedAbortingJoinHandle};
use quic_rpc::{RpcServer, ServiceEndpoint};
use quic_rpc::transport::ServerEndpoint as _;
use quic_rpc::RpcServer;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};

use crate::{
client::RpcService,
node::{docs::DocsEngine, protocol::ProtocolMap},
};
use crate::node::{docs::DocsEngine, protocol::ProtocolMap};

mod builder;
mod docs;
Expand All @@ -39,6 +37,11 @@ pub use self::builder::{Builder, DiscoveryConfig, DocsStorage, GcPolicy, Storage
pub use self::rpc_status::RpcStatus;
pub use protocol::ProtocolHandler;

type IrohServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>;

/// A server which implements the iroh node.
///
/// Clients can connect to this server and requests hashes from it.
Expand Down Expand Up @@ -211,8 +214,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

async fn run(
self: Arc<Self>,
external_rpc: impl ServiceEndpoint<RpcService>,
internal_rpc: impl ServiceEndpoint<RpcService>,
external_rpc: IrohServerEndpoint,
internal_rpc: IrohServerEndpoint,
protocols: Arc<ProtocolMap>,
gc_policy: GcPolicy,
gc_done_callback: Option<Box<dyn Fn() + Send>>,
Expand Down
24 changes: 9 additions & 15 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use iroh_net::{
relay::RelayMode,
Endpoint,
};
use quic_rpc::transport::{
boxed::BoxableServerEndpoint, flume::FlumeServerEndpoint, quinn::QuinnServerEndpoint,
};
use quic_rpc::transport::{boxed::BoxableServerEndpoint, quinn::QuinnServerEndpoint};
use serde::{Deserialize, Serialize};
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error_span, trace, Instrument};
Expand All @@ -41,7 +39,7 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner};
use super::{docs::DocsEngine, rpc_status::RpcStatus, IrohServerEndpoint, Node, NodeInner};

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
Expand All @@ -56,11 +54,6 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;

type BoxedServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>;

/// Storage backend for documents.
#[derive(Debug, Clone)]
pub enum DocsStorage {
Expand Down Expand Up @@ -93,7 +86,7 @@ where
storage: StorageConfig,
bind_port: Option<u16>,
secret_key: SecretKey,
rpc_endpoint: BoxedServerEndpoint,
rpc_endpoint: IrohServerEndpoint,
rpc_port: Option<u16>,
blobs_store: D,
keylog: bool,
Expand Down Expand Up @@ -180,7 +173,7 @@ impl BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Re
}
}

fn mk_external_rpc() -> BoxedServerEndpoint {
fn mk_external_rpc() -> IrohServerEndpoint {
quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint)
}

Expand Down Expand Up @@ -309,7 +302,7 @@ where
}

/// Configure rpc endpoint, changing the type of the builder to the new endpoint type.
pub fn rpc_endpoint(self, value: BoxedServerEndpoint, port: Option<u16>) -> Builder<D> {
pub fn rpc_endpoint(self, value: IrohServerEndpoint, port: Option<u16>) -> Builder<D> {
// we can't use ..self here because the return type is different
Builder {
storage: self.storage,
Expand Down Expand Up @@ -551,7 +544,8 @@ where
let gossip_dispatcher = GossipDispatcher::new(gossip.clone());

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32);
let (internal_rpc, controller) = quic_rpc::transport::flume::connection::<RpcService>(32);
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
// box the controller. Boxing has a special case for the flume channel that avoids allocations,
// so this has zero overhead.
let controller = quic_rpc::transport::boxed::Connection::new(controller);
Expand Down Expand Up @@ -597,9 +591,9 @@ where
#[derive(derive_more::Debug)]
pub struct ProtocolBuilder<D> {
inner: Arc<NodeInner<D>>,
internal_rpc: FlumeServerEndpoint<RpcService>,
internal_rpc: IrohServerEndpoint,
#[debug("external rpc")]
external_rpc: BoxedServerEndpoint,
external_rpc: IrohServerEndpoint,
protocols: ProtocolMap,
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
Expand Down
53 changes: 26 additions & 27 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use iroh_blobs::{
use iroh_io::AsyncSliceReader;
use iroh_net::relay::RelayUrl;
use iroh_net::{Endpoint, NodeAddr, NodeId};
use quic_rpc::{
server::{RpcChannel, RpcServerError},
ServiceEndpoint,
};
use quic_rpc::server::{RpcChannel, RpcServerError};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -64,6 +61,8 @@ use crate::rpc_protocol::{
Request, RpcService,
};

use super::IrohServerEndpoint;

mod docs;

const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -118,10 +117,10 @@ impl<D: BaoStore> Handler<D> {
}
}

pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
pub(crate) fn spawn_rpc_request(
inner: Arc<NodeInner<D>>,
join_set: &mut JoinSet<anyhow::Result<()>>,
accepting: quic_rpc::server::Accepting<RpcService, E>,
accepting: quic_rpc::server::Accepting<RpcService, IrohServerEndpoint>,
) {
let handler = Self::new(inner);
join_set.spawn(async move {
Expand All @@ -133,11 +132,11 @@ impl<D: BaoStore> Handler<D> {
});
}

async fn handle_node_request<E: ServiceEndpoint<RpcService>>(
async fn handle_node_request(
self,
msg: node::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use node::Request::*;
debug!("handling node request: {msg}");
match msg {
Expand All @@ -157,11 +156,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_blobs_request<E: ServiceEndpoint<RpcService>>(
async fn handle_blobs_request(
self,
msg: blobs::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use blobs::Request::*;
debug!("handling blob request: {msg}");
match msg {
Expand Down Expand Up @@ -189,23 +188,23 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_tags_request<E: ServiceEndpoint<RpcService>>(
async fn handle_tags_request(
self,
msg: tags::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use tags::Request::*;
match msg {
ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await,
DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await,
}
}

async fn handle_gossip_request<E: ServiceEndpoint<RpcService>>(
async fn handle_gossip_request(
self,
msg: gossip::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use gossip::Request::*;
match msg {
Subscribe(msg) => {
Expand All @@ -225,11 +224,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_authors_request<E: ServiceEndpoint<RpcService>>(
async fn handle_authors_request(
self,
msg: authors::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use authors::Request::*;
match msg {
List(msg) => {
Expand Down Expand Up @@ -277,11 +276,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_docs_request<E: ServiceEndpoint<RpcService>>(
async fn handle_docs_request(
self,
msg: DocsRequest,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use DocsRequest::*;
match msg {
Open(msg) => {
Expand Down Expand Up @@ -412,11 +411,11 @@ impl<D: BaoStore> Handler<D> {
}
}

pub(crate) async fn handle_rpc_request<E: ServiceEndpoint<RpcService>>(
pub(crate) async fn handle_rpc_request(
self,
msg: Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use Request::*;
debug!("handling rpc request: {msg}");
match msg {
Expand Down

0 comments on commit 409d720

Please sign in to comment.