Skip to content

Commit

Permalink
fixes and add example
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 11, 2024
1 parent 79071f4 commit aa1d78a
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 7 deletions.
4 changes: 4 additions & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,7 @@ required-features = ["examples"]
[[example]]
name = "client"
required-features = ["examples"]

[[example]]
name = "custom-protocol"
required-features = ["examples"]
134 changes: 134 additions & 0 deletions iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use std::{any::Any, fmt, sync::Arc};

use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed;
use iroh::{
blobs::store::Store,
net::{
endpoint::{get_remote_node_id, Connection},
NodeId,
},
node::{Node, Protocol},
};
use tracing_subscriber::{prelude::*, EnvFilter};

const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0";

#[derive(Debug, Parser)]
pub struct Cli {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, Parser)]
pub enum Command {
Accept,
Connect { node: NodeId },
}

#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Cli::parse();
// create a new node
let node = iroh::node::Node::memory()
.accept(EXAMPLE_ALPN, |node| ExampleProtocol::build(node))
.spawn()
.await?;

// print the ticket if this is the accepting side
match args.command {
Command::Accept => {
let node_id = node.node_id();
println!("node id: {node_id}");
// wait until ctrl-c
tokio::signal::ctrl_c().await?;
}
Command::Connect { node: node_id } => {
let proto = ExampleProtocol::from_node(&node, EXAMPLE_ALPN).expect("it is registered");
proto.connect(node_id).await?;
}
}

node.shutdown().await?;

Ok(())
}

#[derive(Debug)]
struct ExampleProtocol<S> {
node: Node<S>,
}

impl<S: Store + fmt::Debug> Protocol for ExampleProtocol<S> {
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> {
self
}

fn accept(self: Arc<Self>, conn: quinn::Connection) -> Boxed<Result<()>> {
Box::pin(async move { self.handle_connection(conn).await })
}
}

impl<S: Store + fmt::Debug> ExampleProtocol<S> {
fn build(node: Node<S>) -> Arc<Self> {
Arc::new(Self { node })
}

fn from_node(node: &Node<S>, alpn: &'static [u8]) -> Option<Arc<Self>> {
node.get_protocol::<ExampleProtocol<S>>(alpn)
}

async fn handle_connection(&self, conn: Connection) -> Result<()> {
let remote_node_id = get_remote_node_id(&conn)?;
println!("accepting new connection from {remote_node_id}");
let mut send_stream = conn.open_uni().await?;
println!("stream open!");
// not that this is something that you wanted to do, but let's create a new blob for each
// incoming connection. this could be any mechanism, but we want to demonstrate how to use a
// custom protocol together with built-in iroh functionality
let content = format!("this blob is created for my beloved peer {remote_node_id} ♥");
let hash = self
.node
.blobs()
.add_bytes(content.as_bytes().to_vec())
.await?;
// send the hash over our custom proto
send_stream.write_all(hash.hash.as_bytes()).await?;
send_stream.finish().await?;
Ok(())
}

pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> {
println!("connecting to {remote_node_id}");
let conn = self
.node
.endpoint()
.connect_by_node_id(&remote_node_id, EXAMPLE_ALPN)
.await?;
let mut recv_stream = conn.accept_uni().await?;
let hash_bytes = recv_stream.read_to_end(32).await?;
let hash = iroh::blobs::Hash::from_bytes(*(&hash_bytes.try_into().unwrap()));
println!("received hash: {hash}");
self.node
.blobs()
.download(hash, remote_node_id.into())
.await?
.await?;
println!("blob downloaded");
let content = self.node.blobs().read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("blob content: {message}");
Ok(())
}
}

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}
8 changes: 7 additions & 1 deletion iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,16 @@ where
}
};

let alpns = PROTOCOLS
.iter()
.chain(self.protocols.iter().map(|(alpn, _)| alpn))
.map(|p| p.to_vec())
.collect();

let endpoint = Endpoint::builder()
.secret_key(self.secret_key.clone())
.proxy_from_env()
.alpns(PROTOCOLS.iter().map(|p| p.to_vec()).collect())
.alpns(alpns)
.keylog(self.keylog)
.transport_config(transport_config)
.concurrent_connections(MAX_CONNECTIONS)
Expand Down
11 changes: 5 additions & 6 deletions iroh/src/node/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{any::Any, fmt, future::Future, pin::Pin, sync::Arc};
use std::{any::Any, fmt, sync::Arc};

use anyhow::Result;
use futures_lite::future::Boxed;
use iroh_net::endpoint::Connection;

/// Trait for iroh protocol handlers.
pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static {
pub trait Protocol: Send + Sync + Any + fmt::Debug + 'static {
/// Return `self` as `dyn Any`.
///
/// Implementations can simply return `self` here.
Expand All @@ -12,8 +14,5 @@ pub trait Protocol: Sync + Send + Any + fmt::Debug + 'static {
/// Accept an incoming connection.
///
/// This runs on a freshly spawned tokio task so this can be long-running.
fn accept(
&self,
conn: Connection,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + 'static + Send + Sync>>;
fn accept(self: Arc<Self>, conn: Connection) -> Boxed<Result<()>>;
}

0 comments on commit aa1d78a

Please sign in to comment.