-
Notifications
You must be signed in to change notification settings - Fork 171
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(iroh): Accept custom protocols #2357
Closed
+266
−11
Closed
Changes from 6 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
79071f4
feat: custom protocols
Frando aa1d78a
fixes and add example
Frando 62dda48
improve example
Frando 2b14986
improve example
Frando 9261259
make builder send again
Frando 85e40de
fix & clippy
Frando ee043e5
cleanups and PR review
Frando db35136
improve code structure
Frando c7517ce
fixup
Frando 11a609f
rename back to accept
Frando File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
use std::{any::Any, fmt, sync::Arc}; | ||
|
||
use anyhow::Result; | ||
use clap::Parser; | ||
use futures_lite::future::Boxed; | ||
use iroh::{ | ||
blobs::store::Store, | ||
net::{ | ||
endpoint::{get_remote_node_id, Connection}, | ||
NodeId, | ||
}, | ||
node::{Node, Protocol}, | ||
}; | ||
use tracing_subscriber::{prelude::*, EnvFilter}; | ||
|
||
#[derive(Debug, Parser)] | ||
pub struct Cli { | ||
#[clap(subcommand)] | ||
command: Command, | ||
} | ||
|
||
#[derive(Debug, Parser)] | ||
pub enum Command { | ||
Accept, | ||
Connect { node: NodeId }, | ||
} | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
setup_logging(); | ||
let args = Cli::parse(); | ||
// create a new node | ||
let node = iroh::node::Node::memory() | ||
.accept(EXAMPLE_ALPN, |node| ExampleProto::build(node)) | ||
.spawn() | ||
.await?; | ||
|
||
// print the ticket if this is the accepting side | ||
match args.command { | ||
Command::Accept => { | ||
let node_id = node.node_id(); | ||
println!("node id: {node_id}"); | ||
// wait until ctrl-c | ||
tokio::signal::ctrl_c().await?; | ||
} | ||
Command::Connect { node: node_id } => { | ||
let proto = ExampleProto::get_from_node(&node, EXAMPLE_ALPN).expect("it is registered"); | ||
proto.connect(node_id).await?; | ||
} | ||
} | ||
|
||
node.shutdown().await?; | ||
|
||
Ok(()) | ||
} | ||
|
||
const EXAMPLE_ALPN: &'static [u8] = b"example-proto/0"; | ||
|
||
#[derive(Debug)] | ||
struct ExampleProto<S> { | ||
node: Node<S>, | ||
} | ||
|
||
impl<S: Store + fmt::Debug> Protocol for ExampleProto<S> { | ||
fn as_arc_any(self: Arc<Self>) -> Arc<dyn Any + Send + Sync> { | ||
self | ||
} | ||
|
||
fn accept(self: Arc<Self>, conn: quinn::Connection) -> Boxed<Result<()>> { | ||
Frando marked this conversation as resolved.
Show resolved
Hide resolved
Frando marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Box::pin(async move { self.handle_connection(conn).await }) | ||
} | ||
} | ||
|
||
impl<S: Store + fmt::Debug> ExampleProto<S> { | ||
fn build(node: Node<S>) -> Arc<Self> { | ||
Arc::new(Self { node }) | ||
} | ||
|
||
fn get_from_node(node: &Node<S>, alpn: &'static [u8]) -> Option<Arc<Self>> { | ||
node.get_protocol::<ExampleProto<S>>(alpn) | ||
} | ||
|
||
async fn handle_connection(&self, conn: Connection) -> Result<()> { | ||
let remote_node_id = get_remote_node_id(&conn)?; | ||
println!("accepted connection from {remote_node_id}"); | ||
let mut send_stream = conn.open_uni().await?; | ||
// not that this is something that you wanted to do, but let's create a new blob for each | ||
// incoming connection. this could be any mechanism, but we want to demonstrate how to use a | ||
// custom protocol together with built-in iroh functionality | ||
let content = format!("this blob is created for my beloved peer {remote_node_id} ♥"); | ||
let hash = self | ||
.node | ||
.blobs() | ||
.add_bytes(content.as_bytes().to_vec()) | ||
.await?; | ||
// send the hash over our custom proto | ||
send_stream.write_all(hash.hash.as_bytes()).await?; | ||
send_stream.finish().await?; | ||
println!("closing connection from {remote_node_id}"); | ||
Ok(()) | ||
} | ||
|
||
async fn connect(&self, remote_node_id: NodeId) -> Result<()> { | ||
println!("our node id: {}", self.node.node_id()); | ||
println!("connecting to {remote_node_id}"); | ||
let conn = self | ||
.node | ||
.endpoint() | ||
.connect_by_node_id(&remote_node_id, EXAMPLE_ALPN) | ||
.await?; | ||
let mut recv_stream = conn.accept_uni().await?; | ||
let hash_bytes = recv_stream.read_to_end(32).await?; | ||
let hash = iroh::blobs::Hash::from_bytes(hash_bytes.try_into().unwrap()); | ||
println!("received hash: {hash}"); | ||
self.node | ||
.blobs() | ||
.download(hash, remote_node_id.into()) | ||
.await? | ||
.await?; | ||
println!("blob downloaded"); | ||
let content = self.node.blobs().read_to_bytes(hash).await?; | ||
let message = String::from_utf8(content.to_vec())?; | ||
println!("blob content: {message}"); | ||
Ok(()) | ||
} | ||
} | ||
|
||
// set the RUST_LOG env var to one of {debug,info,warn} to see logging info | ||
pub fn setup_logging() { | ||
tracing_subscriber::registry() | ||
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) | ||
.with(EnvFilter::from_default_env()) | ||
.try_init() | ||
.ok(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,8 +5,8 @@ | |
//! To shut down the node, call [`Node::shutdown`]. | ||
use std::fmt::Debug; | ||
use std::net::SocketAddr; | ||
use std::path::Path; | ||
use std::sync::Arc; | ||
use std::{any::Any, path::Path}; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use futures_lite::StreamExt; | ||
|
@@ -23,14 +23,16 @@ use tokio_util::sync::CancellationToken; | |
use tokio_util::task::LocalPoolHandle; | ||
use tracing::debug; | ||
|
||
use crate::client::RpcService; | ||
use crate::{client::RpcService, node::builder::ProtocolMap}; | ||
|
||
mod builder; | ||
mod protocol; | ||
mod rpc; | ||
mod rpc_status; | ||
|
||
pub use self::builder::{Builder, DiscoveryConfig, GcPolicy, StorageConfig}; | ||
pub use self::rpc_status::RpcStatus; | ||
pub use protocol::Protocol; | ||
|
||
/// A server which implements the iroh node. | ||
/// | ||
|
@@ -47,6 +49,7 @@ pub struct Node<D> { | |
inner: Arc<NodeInner<D>>, | ||
task: Arc<JoinHandle<()>>, | ||
client: crate::client::MemIroh, | ||
protocols: ProtocolMap, | ||
} | ||
|
||
#[derive(derive_more::Debug)] | ||
|
@@ -150,6 +153,15 @@ impl<D: BaoStore> Node<D> { | |
self.inner.endpoint.my_relay() | ||
} | ||
|
||
/// Returns the protocol handler for a alpn. | ||
pub fn get_protocol<P: Protocol>(&self, alpn: &[u8]) -> Option<Arc<P>> { | ||
let protocols = self.protocols.read().unwrap(); | ||
let protocol: Arc<dyn Protocol> = protocols.get(alpn)?.clone(); | ||
let protocol_any: Arc<dyn Any + Send + Sync> = protocol.as_arc_any(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why stored as |
||
let protocol_ref = Arc::downcast(protocol_any).ok()?; | ||
Some(protocol_ref) | ||
} | ||
|
||
/// Aborts the node. | ||
/// | ||
/// This does not gracefully terminate currently: all connections are closed and | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
into_arc_any
is more correct i guess?as
seems like a naming violationThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this have to exist at all? why not do this directly on the argument the node builder method accepts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a way to cast from
dyn Protocol
todyn Any
apart from having this as a trait method implemented on the concrete type.However what we can do, and what I did in the most recent commit, is move this into a separate trait which is auto-impled for all
T: Send + Sync + 'static
. I like this more, now users don't have to care about this at all and it is purely an implementation detail.