Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iroh): Accept custom protocols #2357

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ required-features = ["examples"]
[[example]]
name = "client"
required-features = ["examples"]

[[example]]
name = "custom-protocol"
required-features = ["examples"]
135 changes: 135 additions & 0 deletions iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use std::{any::Any, fmt, sync::Arc};

use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed;
use iroh::{
blobs::store::Store,
net::{
endpoint::{get_remote_node_id, Connection},
NodeId,
},
node::{Node, Protocol},
};
use tracing_subscriber::{prelude::*, EnvFilter};

#[derive(Debug, Parser)]
pub struct Cli {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, Parser)]
pub enum Command {
Accept,
Connect { node: NodeId },
}

#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Cli::parse();
// create a new node
let node = iroh::node::Node::memory()
.accept(EXAMPLE_ALPN, |node| ExampleProto::build(node))
.spawn()
.await?;

// print the ticket if this is the accepting side
match args.command {
Command::Accept => {
let node_id = node.node_id();
println!("node id: {node_id}");
// wait until ctrl-c
tokio::signal::ctrl_c().await?;
}
Command::Connect { node: node_id } => {
let proto = ExampleProto::get_from_node(&node, EXAMPLE_ALPN).expect("it is registered");
proto.connect(node_id).await?;
}
}

node.shutdown().await?;

Ok(())
}

const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0";

#[derive(Debug)]
struct ExampleProto<S> {
node: Node<S>,
}

impl<S: Store + fmt::Debug> Protocol for ExampleProto<S> {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

into_arc_any is more correct i guess? as seems like a naming violation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this have to exist at all? why not do this directly on the argument the node builder method accepts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a way to cast from dyn Protocol to dyn Any apart from having this as a trait method implemented on the concrete type.

let x: Arc<dyn Protocol> = protocols.get(alpn);
// this does not compile even if Protocol: Any
let x: Arc<dyn Any> = x;

However what we can do, and what I did in the most recent commit, is move this into a separate trait which is auto-impled for all T: Send + Sync + 'static. I like this more, now users don't have to care about this at all and it is purely an implementation detail.

self
}

fn accept(self: Arc<Self>, conn: quinn::Connection) -> Boxed<Result<()>> {
Frando marked this conversation as resolved.
Show resolved Hide resolved
Frando marked this conversation as resolved.
Show resolved Hide resolved
Box::pin(async move { self.handle_connection(conn).await })
}
}

impl<S: Store + fmt::Debug> ExampleProto<S> {
fn build(node: Node<S>) -> Arc<Self> {
Arc::new(Self { node })
}

fn get_from_node(node: &Node<S>, alpn: &'static [u8]) -> Option<Arc<Self>> {
node.get_protocol::<ExampleProto<S>>(alpn)
}

async fn handle_connection(&self, conn: Connection) -> Result<()> {
let remote_node_id = get_remote_node_id(&conn)?;
println!("accepted connection from {remote_node_id}");
let mut send_stream = conn.open_uni().await?;
// not that this is something that you wanted to do, but let's create a new blob for each
// incoming connection. this could be any mechanism, but we want to demonstrate how to use a
// custom protocol together with built-in iroh functionality
let content = format!("this blob is created for my beloved peer {remote_node_id} ♥");
let hash = self
.node
.blobs()
.add_bytes(content.as_bytes().to_vec())
.await?;
// send the hash over our custom proto
send_stream.write_all(hash.hash.as_bytes()).await?;
send_stream.finish().await?;
println!("closing connection from {remote_node_id}");
Ok(())
}

async fn connect(&self, remote_node_id: NodeId) -> Result<()> {
println!("our node id: {}", self.node.node_id());
println!("connecting to {remote_node_id}");
let conn = self
.node
.endpoint()
.connect_by_node_id(&remote_node_id, EXAMPLE_ALPN)
.await?;
let mut recv_stream = conn.accept_uni().await?;
let hash_bytes = recv_stream.read_to_end(32).await?;
let hash = iroh::blobs::Hash::from_bytes(hash_bytes.try_into().unwrap());
println!("received hash: {hash}");
self.node
.blobs()
.download(hash, remote_node_id.into())
.await?
.await?;
println!("blob downloaded");
let content = self.node.blobs().read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("blob content: {message}");
Ok(())
}
}

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}
16 changes: 14 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
//! To shut down the node, call [`Node::shutdown`].
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::Arc;
use std::{any::Any, path::Path};

use anyhow::{anyhow, Result};
use futures_lite::StreamExt;
Expand All @@ -23,14 +23,16 @@ use tokio_util::sync::CancellationToken;
use tokio_util::task::LocalPoolHandle;
use tracing::debug;

use crate::client::RpcService;
use crate::{client::RpcService, node::builder::ProtocolMap};

mod builder;
mod protocol;
mod rpc;
mod rpc_status;

pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig};
pub use self::rpc_status::RpcStatus;
pub use protocol::Protocol;

/// A server which implements the iroh node.
///
Expand All @@ -47,6 +49,7 @@ pub struct Node<D> {
inner: Arc<NodeInner<D>>,
task: Arc<JoinHandle<()>>,
client: crate::client::MemIroh,
protocols: ProtocolMap,
}

#[derive(derive_more::Debug)]
Expand Down Expand Up @@ -150,6 +153,15 @@ impl<D: BaoStore> Node<D> {
self.inner.endpoint.my_relay()
}

/// Returns the protocol handler for a alpn.
pub fn get_protocol<P: Protocol>(&self, alpn: &[u8]) -> Option<Arc<P>> {
let protocols = self.protocols.read().unwrap();
let protocol: Arc<dyn Protocol> = protocols.get(alpn)?.clone();
let protocol_any: Arc<dyn Any + Send + Sync> = protocol.as_arc_any();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why stored as Any if it is then downcast anyway?

let protocol_ref = Arc::downcast(protocol_any).ok()?;
Some(protocol_ref)
}

/// Aborts the node.
///
/// This does not gracefully terminate currently: all connections are closed and
Expand Down
Loading
Loading