diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index aac6f9a645..5130f336c2 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -101,3 +101,7 @@ required-features = ["examples"] [[example]] name = "client" required-features = ["examples"] + +[[example]] +name = "custom-protocol" +required-features = ["examples"] diff --git a/iroh/examples/custom-protocol.rs b/iroh/examples/custom-protocol.rs new file mode 100644 index 0000000000..e67896f76b --- /dev/null +++ b/iroh/examples/custom-protocol.rs @@ -0,0 +1,134 @@ +use std::{any::Any, fmt, sync::Arc}; + +use anyhow::Result; +use clap::Parser; +use futures_lite::future::Boxed; +use iroh::{ + blobs::store::Store, + net::{ + endpoint::{get_remote_node_id, Connection}, + NodeId, + }, + node::{Node, Protocol}, +}; +use tracing_subscriber::{prelude::*, EnvFilter}; + +const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; + +#[derive(Debug, Parser)] +pub struct Cli { + #[clap(subcommand)] + command: Command, +} + +#[derive(Debug, Parser)] +pub enum Command { + Accept, + Connect { node: NodeId }, +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + let args = Cli::parse(); + // create a new node + let node = iroh::node::Node::memory() + .accept(EXAMPLE_ALPN, |node| ExampleProtocol::build(node)) + .spawn() + .await?; + + // print the ticket if this is the accepting side + match args.command { + Command::Accept => { + let node_id = node.node_id(); + println!("node id: {node_id}"); + // wait until ctrl-c + tokio::signal::ctrl_c().await?; + } + Command::Connect { node: node_id } => { + let proto = ExampleProtocol::from_node(&node, EXAMPLE_ALPN).expect("it is registered"); + proto.connect(node_id).await?; + } + } + + node.shutdown().await?; + + Ok(()) +} + +#[derive(Debug)] +struct ExampleProtocol { + node: Node, +} + +impl Protocol for ExampleProtocol { + fn as_arc_any(self: Arc) -> Arc { + self + } + + fn accept(self: Arc, conn: quinn::Connection) -> Boxed> { + Box::pin(async move { self.handle_connection(conn).await }) + } +} + +impl ExampleProtocol { + fn build(node: Node) -> Arc { + Arc::new(Self { node }) + } + + fn from_node(node: &Node, alpn: &'static [u8]) -> Option> { + node.get_protocol::>(alpn) + } + + async fn handle_connection(&self, conn: Connection) -> Result<()> { + let remote_node_id = get_remote_node_id(&conn)?; + println!("accepting new connection from {remote_node_id}"); + let mut send_stream = conn.open_uni().await?; + println!("stream open!"); + // not that this is something that you wanted to do, but let's create a new blob for each + // incoming connection. this could be any mechanism, but we want to demonstrate how to use a + // custom protocol together with built-in iroh functionality + let content = format!("this blob is created for my beloved peer {remote_node_id} ♥"); + let hash = self + .node + .blobs() + .add_bytes(content.as_bytes().to_vec()) + .await?; + // send the hash over our custom proto + send_stream.write_all(hash.hash.as_bytes()).await?; + send_stream.finish().await?; + Ok(()) + } + + pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> { + println!("connecting to {remote_node_id}"); + let conn = self + .node + .endpoint() + .connect_by_node_id(&remote_node_id, EXAMPLE_ALPN) + .await?; + let mut recv_stream = conn.accept_uni().await?; + let hash_bytes = recv_stream.read_to_end(32).await?; + let hash = iroh::blobs::Hash::from_bytes(*(&hash_bytes.try_into().unwrap())); + println!("received hash: {hash}"); + self.node + .blobs() + .download(hash, remote_node_id.into()) + .await? + .await?; + println!("blob downloaded"); + let content = self.node.blobs().read_to_bytes(hash).await?; + let message = String::from_utf8(content.to_vec())?; + println!("blob content: {message}"); + Ok(()) + } +} + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 2e1f38ed25..c0668c7843 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -422,10 +422,16 @@ where } }; + let alpns = PROTOCOLS + .iter() + .chain(self.protocols.iter().map(|(alpn, _)| alpn)) + .map(|p| p.to_vec()) + .collect(); + let endpoint = Endpoint::builder() .secret_key(self.secret_key.clone()) .proxy_from_env() - .alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect()) + .alpns(alpns) .keylog(self.keylog) .transport_config(transport_config) .concurrent_connections(MAX_CONNECTIONS) diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 4dc7dbb29d..55046952e6 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,9 +1,11 @@ -use std::{any::Any, fmt, future::Future, pin::Pin, sync::Arc}; +use std::{any::Any, fmt, sync::Arc}; +use anyhow::Result; +use futures_lite::future::Boxed; use iroh_net::endpoint::Connection; /// Trait for iroh protocol handlers. -pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static { +pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static { /// Return `self` as `dyn Any`. /// /// Implementations can simply return `self` here. @@ -12,8 +14,5 @@ pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static { /// Accept an incoming connection. /// /// This runs on a freshly spawned tokio task so this can be long-running. - fn accept( - &self, - conn: Connection, - ) -> Pin> + 'static + Send + Sync>>; + fn accept(self: Arc, conn: Connection) -> Boxed>; }