Skip to content

Commit

Permalink
refactor(iroh)!: Remove server channel type parameter (#2461)
Browse files Browse the repository at this point in the history
## Description

This is a followup to the modularize services PR. I did not want to do
it in that one since it was big enough already.

## Breaking Changes

- A `#[must_use]` annotation was added to AbortingJoinHandle

```
     Checked [   0.095s] 75 checks; 74 passed, 1 failed, 0 unnecessary

--- failure struct_must_use_added: struct #[must_use] added ---

Description:
A struct is now #[must_use]. Downstream crates that did not use its value will get a compiler lint.
        ref: https://doc.rust-lang.org/reference/attributes/diagnostics.html#the-must_use-attribute
       impl: https://github.com/obi1kenobi/cargo-semver-checks/tree/v0.32.0/src/lints/struct_must_use_added.ron

Failed in:
  struct AbortingJoinHandle in /home/runner/work/iroh/iroh/iroh-net/src/util.rs:18
     Summary semver requires new minor version: 0 major and 1 minor checks failed
```

## Notes & open questions

## Change checklist

- [x] Self-review.
- [x] ~~Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.~~
- [x] ~~Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jul 5, 2024
1 parent 74e8a6a commit f4d1e71
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 85 deletions.
1 change: 1 addition & 0 deletions iroh-net/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod chain;

/// A join handle that owns the task it is running, and aborts it when dropped.
#[derive(Debug, derive_more::Deref)]
#[must_use = "Aborting join handles abort the task when dropped"]
pub struct AbortingJoinHandle<T> {
handle: tokio::task::JoinHandle<T>,
}
Expand Down
5 changes: 1 addition & 4 deletions iroh/src/client/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
use anyhow::{bail, Context};
use quic_rpc::transport::{boxed::Connection as BoxedConnection, quinn::QuinnConnection};

use super::Iroh;
use super::{Iroh, RpcClient};
use crate::{
node::RpcStatus,
rpc_protocol::{node::StatusRequest, RpcService},
Expand All @@ -20,9 +20,6 @@ use crate::{
// TODO: Change to "/iroh-rpc/1"
pub(crate) const RPC_ALPN: [u8; 17] = *b"n0/provider-rpc/1";

/// RPC client to an iroh node running in a separate process.
pub type RpcClient = quic_rpc::RpcClient<RpcService, BoxedConnection<RpcService>>;

impl Iroh {
/// Connect to an iroh node running on the same computer, but in a different process.
pub async fn connect_path(root: impl AsRef<Path>) -> anyhow::Result<Self> {
Expand Down
20 changes: 13 additions & 7 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,14 @@ use iroh_gossip::net::Gossip;
use iroh_net::key::SecretKey;
use iroh_net::Endpoint;
use iroh_net::{endpoint::DirectAddrsStream, util::SharedAbortingJoinHandle};
use quic_rpc::{RpcServer, ServiceEndpoint};
use quic_rpc::transport::ServerEndpoint as _;
use quic_rpc::RpcServer;
use tokio::task::JoinSet;
use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, error, info, warn};

use crate::{
client::RpcService,
node::{docs::DocsEngine, protocol::ProtocolMap},
};
use crate::node::{docs::DocsEngine, protocol::ProtocolMap};

mod builder;
mod docs;
Expand All @@ -73,6 +71,14 @@ pub use self::builder::{
pub use self::rpc_status::RpcStatus;
pub use protocol::ProtocolHandler;

/// The quic-rpc server endpoint for the iroh node.
///
/// We use a boxed endpoint here to allow having a concrete type for the server endpoint.
pub type IrohServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>;

/// A server which implements the iroh node.
///
/// Clients can connect to this server and requests hashes from it.
Expand Down Expand Up @@ -245,8 +251,8 @@ impl<D: iroh_blobs::store::Store> NodeInner<D> {

async fn run(
self: Arc<Self>,
external_rpc: impl ServiceEndpoint<RpcService>,
internal_rpc: impl ServiceEndpoint<RpcService>,
external_rpc: IrohServerEndpoint,
internal_rpc: IrohServerEndpoint,
protocols: Arc<ProtocolMap>,
gc_policy: GcPolicy,
gc_done_callback: Option<Box<dyn Fn() + Send>>,
Expand Down
62 changes: 15 additions & 47 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use iroh_net::{
relay::RelayMode,
Endpoint,
};
use quic_rpc::transport::{
boxed::BoxableServerEndpoint, flume::FlumeServerEndpoint, quinn::QuinnServerEndpoint,
};
use quic_rpc::transport::{boxed::BoxableServerEndpoint, quinn::QuinnServerEndpoint};
use serde::{Deserialize, Serialize};
use tokio_util::{sync::CancellationToken, task::LocalPoolHandle};
use tracing::{debug, error_span, trace, Instrument};
Expand All @@ -41,7 +39,7 @@ use crate::{
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{docs::DocsEngine, rpc_status::RpcStatus, Node, NodeInner};
use super::{docs::DocsEngine, rpc_status::RpcStatus, IrohServerEndpoint, Node, NodeInner};

/// Default bind address for the node.
/// 11204 is "iroh" in leetspeak <https://simple.wikipedia.org/wiki/Leet>
Expand All @@ -56,11 +54,6 @@ const DEFAULT_GC_INTERVAL: Duration = Duration::from_secs(60 * 5);
const MAX_CONNECTIONS: u32 = 1024;
const MAX_STREAMS: u64 = 10;

type BoxedServerEndpoint = quic_rpc::transport::boxed::ServerEndpoint<
crate::rpc_protocol::Request,
crate::rpc_protocol::Response,
>;

/// Storage backend for documents.
#[derive(Debug, Clone)]
pub enum DocsStorage {
Expand Down Expand Up @@ -93,7 +86,7 @@ where
storage: StorageConfig,
bind_port: Option<u16>,
secret_key: SecretKey,
rpc_endpoint: BoxedServerEndpoint,
rpc_endpoint: IrohServerEndpoint,
rpc_port: Option<u16>,
blobs_store: D,
keylog: bool,
Expand Down Expand Up @@ -180,7 +173,7 @@ impl BoxableServerEndpoint<crate::rpc_protocol::Request, crate::rpc_protocol::Re
}
}

fn mk_external_rpc() -> BoxedServerEndpoint {
fn mk_external_rpc() -> IrohServerEndpoint {
quic_rpc::transport::boxed::ServerEndpoint::new(DummyServerEndpoint)
}

Expand Down Expand Up @@ -308,53 +301,28 @@ where
})
}

/// Configure rpc endpoint, changing the type of the builder to the new endpoint type.
pub fn rpc_endpoint(self, value: BoxedServerEndpoint, port: Option<u16>) -> Builder<D> {
// we can't use ..self here because the return type is different
Builder {
storage: self.storage,
bind_port: self.bind_port,
secret_key: self.secret_key,
blobs_store: self.blobs_store,
keylog: self.keylog,
/// Configure rpc endpoint.
pub fn rpc_endpoint(self, value: IrohServerEndpoint, port: Option<u16>) -> Self {
Self {
rpc_endpoint: value,
rpc_port: port,
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_storage: self.docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
..self
}
}

/// Configure the default iroh rpc endpoint.
pub async fn enable_rpc(self) -> Result<Builder<D>> {
pub async fn enable_rpc(self) -> Result<Self> {
let (ep, actual_rpc_port) = make_rpc_endpoint(&self.secret_key, DEFAULT_RPC_PORT)?;
let ep = quic_rpc::transport::boxed::ServerEndpoint::new(ep);
if let StorageConfig::Persistent(ref root) = self.storage {
// store rpc endpoint
RpcStatus::store(root, actual_rpc_port).await?;
}

Ok(Builder {
storage: self.storage,
bind_port: self.bind_port,
secret_key: self.secret_key,
blobs_store: self.blobs_store,
keylog: self.keylog,
Ok(Self {
rpc_endpoint: ep,
rpc_port: Some(actual_rpc_port),
relay_mode: self.relay_mode,
dns_resolver: self.dns_resolver,
gc_policy: self.gc_policy,
docs_storage: self.docs_storage,
node_discovery: self.node_discovery,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
gc_done_callback: self.gc_done_callback,
..self
})
}

Expand Down Expand Up @@ -551,7 +519,8 @@ where
let gossip_dispatcher = GossipDispatcher::new(gossip.clone());

// Initialize the internal RPC connection.
let (internal_rpc, controller) = quic_rpc::transport::flume::connection(32);
let (internal_rpc, controller) = quic_rpc::transport::flume::connection::<RpcService>(32);
let internal_rpc = quic_rpc::transport::boxed::ServerEndpoint::new(internal_rpc);
// box the controller. Boxing has a special case for the flume channel that avoids allocations,
// so this has zero overhead.
let controller = quic_rpc::transport::boxed::Connection::new(controller);
Expand Down Expand Up @@ -597,9 +566,8 @@ where
#[derive(derive_more::Debug)]
pub struct ProtocolBuilder<D> {
inner: Arc<NodeInner<D>>,
internal_rpc: FlumeServerEndpoint<RpcService>,
#[debug("external rpc")]
external_rpc: BoxedServerEndpoint,
internal_rpc: IrohServerEndpoint,
external_rpc: IrohServerEndpoint,
protocols: ProtocolMap,
#[debug("callback")]
gc_done_callback: Option<Box<dyn Fn() + Send>>,
Expand Down
53 changes: 26 additions & 27 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,7 @@ use iroh_blobs::{
use iroh_io::AsyncSliceReader;
use iroh_net::relay::RelayUrl;
use iroh_net::{Endpoint, NodeAddr, NodeId};
use quic_rpc::{
server::{RpcChannel, RpcServerError},
ServiceEndpoint,
};
use quic_rpc::server::{RpcChannel, RpcServerError};
use tokio::task::JoinSet;
use tokio_util::{either::Either, task::LocalPoolHandle};
use tracing::{debug, info, warn};
Expand Down Expand Up @@ -64,6 +61,8 @@ use crate::rpc_protocol::{
Request, RpcService,
};

use super::IrohServerEndpoint;

mod docs;

const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -118,10 +117,10 @@ impl<D: BaoStore> Handler<D> {
}
}

pub(crate) fn spawn_rpc_request<E: ServiceEndpoint<RpcService>>(
pub(crate) fn spawn_rpc_request(
inner: Arc<NodeInner<D>>,
join_set: &mut JoinSet<anyhow::Result<()>>,
accepting: quic_rpc::server::Accepting<RpcService, E>,
accepting: quic_rpc::server::Accepting<RpcService, IrohServerEndpoint>,
) {
let handler = Self::new(inner);
join_set.spawn(async move {
Expand All @@ -133,11 +132,11 @@ impl<D: BaoStore> Handler<D> {
});
}

async fn handle_node_request<E: ServiceEndpoint<RpcService>>(
async fn handle_node_request(
self,
msg: node::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use node::Request::*;
debug!("handling node request: {msg}");
match msg {
Expand All @@ -157,11 +156,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_blobs_request<E: ServiceEndpoint<RpcService>>(
async fn handle_blobs_request(
self,
msg: blobs::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use blobs::Request::*;
debug!("handling blob request: {msg}");
match msg {
Expand Down Expand Up @@ -189,23 +188,23 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_tags_request<E: ServiceEndpoint<RpcService>>(
async fn handle_tags_request(
self,
msg: tags::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use tags::Request::*;
match msg {
ListTags(msg) => chan.server_streaming(msg, self, Self::blob_list_tags).await,
DeleteTag(msg) => chan.rpc(msg, self, Self::blob_delete_tag).await,
}
}

async fn handle_gossip_request<E: ServiceEndpoint<RpcService>>(
async fn handle_gossip_request(
self,
msg: gossip::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use gossip::Request::*;
match msg {
Subscribe(msg) => {
Expand All @@ -225,11 +224,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_authors_request<E: ServiceEndpoint<RpcService>>(
async fn handle_authors_request(
self,
msg: authors::Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use authors::Request::*;
match msg {
List(msg) => {
Expand Down Expand Up @@ -277,11 +276,11 @@ impl<D: BaoStore> Handler<D> {
}
}

async fn handle_docs_request<E: ServiceEndpoint<RpcService>>(
async fn handle_docs_request(
self,
msg: DocsRequest,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use DocsRequest::*;
match msg {
Open(msg) => {
Expand Down Expand Up @@ -412,11 +411,11 @@ impl<D: BaoStore> Handler<D> {
}
}

pub(crate) async fn handle_rpc_request<E: ServiceEndpoint<RpcService>>(
pub(crate) async fn handle_rpc_request(
self,
msg: Request,
chan: RpcChannel<RpcService, E>,
) -> Result<(), RpcServerError<E>> {
chan: RpcChannel<RpcService, IrohServerEndpoint>,
) -> Result<(), RpcServerError<IrohServerEndpoint>> {
use Request::*;
debug!("handling rpc request: {msg}");
match msg {
Expand Down

0 comments on commit f4d1e71

Please sign in to comment.