Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(iroh): Allow to register custom protocols #2358

Merged
merged 35 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
79071f4
feat: custom protocols
Frando Jun 11, 2024
aa1d78a
fixes and add example
Frando Jun 11, 2024
62dda48
improve example
Frando Jun 11, 2024
2b14986
improve example
Frando Jun 11, 2024
9261259
make builder send again
Frando Jun 11, 2024
85e40de
fix & clippy
Frando Jun 11, 2024
ee043e5
cleanups and PR review
Frando Jun 12, 2024
db35136
improve code structure
Frando Jun 12, 2024
c7517ce
fixup
Frando Jun 12, 2024
11a609f
rename back to accept
Frando Jun 12, 2024
a369048
refactor: use new protocols api and allow to disable docs
Frando Jun 12, 2024
63fbcc0
improvements
Frando Jun 12, 2024
13d3bf6
simplify
Frando Jun 12, 2024
f56bb8c
fixup
Frando Jun 12, 2024
97ddd64
fixup
Frando Jun 12, 2024
f97826e
use JoinSet
Frando Jun 13, 2024
1f10806
improve shutdown
Frando Jun 13, 2024
41bb962
fix shutdown
Frando Jun 13, 2024
5703d6e
fix doctest
Frando Jun 13, 2024
f4abba5
cleanup
Frando Jun 13, 2024
1d54b14
use OnceCell not Mutex
Frando Jun 13, 2024
1c20c7b
docs
Frando Jun 13, 2024
0da4375
further cleanup
Frando Jun 13, 2024
f2f43e0
feat(iroh-net): allow to change the accepted ALPNs
Frando Jun 18, 2024
b8165dc
refactor: use two-stage build for node spawning and optional protocols
Frando Jun 18, 2024
4799ea1
revert making docs optional, move to separate PR
Frando Jun 18, 2024
766c8e5
refactor: concurrent shutdown
Frando Jun 18, 2024
d1c75e3
fix: no need for empty alpn call in endpoint builder
Frando Jun 18, 2024
78f9e2e
Merge remote-tracking branch 'origin/main' into optional-protocols
Frando Jun 18, 2024
c1d0e5d
fix: doctest
Frando Jun 18, 2024
88a77fa
Merge branch 'main' into optional-protocols
Frando Jun 18, 2024
34ea612
address PR review
Frando Jun 19, 2024
823005f
rename UnspawnedNode to ProtocolBuilder
Frando Jun 19, 2024
b58d79e
expand docs
Frando Jun 19, 2024
e3b042d
rename trait Protocol to ProtocolHandler
Frando Jun 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions iroh-blobs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1486,6 +1486,8 @@ impl Actor {
let mut msgs = PeekableFlumeReceiver::new(self.state.msgs.clone());
while let Some(msg) = msgs.recv() {
if let ActorMessage::Shutdown { tx } = msg {
// Make sure the database is dropped before we send the reply.
drop(self);
if let Some(tx) = tx {
tx.send(()).ok();
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ pub trait ReadableStore: Map {
}

/// The mutable part of a Bao store.
pub trait Store: ReadableStore + MapMut {
pub trait Store: ReadableStore + MapMut + std::fmt::Debug {
Frando marked this conversation as resolved.
Show resolved Hide resolved
/// This trait method imports a file from a local path.
///
/// `data` is the path to the file.
Expand Down
82 changes: 56 additions & 26 deletions iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,12 @@ impl Builder {
}
};
let secret_key = self.secret_key.unwrap_or_else(SecretKey::generate);
let mut server_config = make_server_config(
&secret_key,
self.alpn_protocols,
self.transport_config,
self.keylog,
)?;
if let Some(c) = self.concurrent_connections {
server_config.concurrent_connections(c);
}
let static_config = StaticConfig {
transport_config: Arc::new(self.transport_config.unwrap_or_default()),
keylog: self.keylog,
concurrent_connections: self.concurrent_connections,
secret_key: secret_key.clone(),
};
let dns_resolver = self
.dns_resolver
.unwrap_or_else(|| default_resolver().clone());
Expand All @@ -149,7 +146,7 @@ impl Builder {
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
};
Endpoint::bind(Some(server_config), msock_opts, self.keylog).await
Endpoint::bind(static_config, msock_opts, self.alpn_protocols).await
}

// # The very common methods everyone basically needs.
Expand Down Expand Up @@ -296,17 +293,41 @@ impl Builder {
}
}

/// Configuration for a [`quinn::Endpoint`] that cannot be changed at runtime.
#[derive(Debug)]
struct StaticConfig {
secret_key: SecretKey,
transport_config: Arc<quinn::TransportConfig>,
keylog: bool,
concurrent_connections: Option<u32>,
}

impl StaticConfig {
/// Create a [`quinn::ServerConfig`] with the specified ALPN protocols.
fn create_server_config(&self, alpn_protocols: Vec<Vec<u8>>) -> Result<quinn::ServerConfig> {
let mut server_config = make_server_config(
&self.secret_key,
alpn_protocols,
self.transport_config.clone(),
self.keylog,
)?;
if let Some(c) = self.concurrent_connections {
server_config.concurrent_connections(c);
}
Ok(server_config)
}
}

/// Creates a [`quinn::ServerConfig`] with the given secret key and limits.
pub fn make_server_config(
secret_key: &SecretKey,
alpn_protocols: Vec<Vec<u8>>,
transport_config: Option<quinn::TransportConfig>,
transport_config: Arc<quinn::TransportConfig>,
keylog: bool,
) -> Result<quinn::ServerConfig> {
let tls_server_config = tls::make_server_config(secret_key, alpn_protocols, keylog)?;
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_server_config));
server_config.transport_config(Arc::new(transport_config.unwrap_or_default()));

server_config.transport_config(transport_config);
Ok(server_config)
}

Expand Down Expand Up @@ -334,12 +355,11 @@ pub fn make_server_config(
/// [QUIC]: https://quicwg.org
#[derive(Clone, Debug)]
pub struct Endpoint {
secret_key: Arc<SecretKey>,
msock: Handle,
endpoint: quinn::Endpoint,
rtt_actor: Arc<rtt_actor::RttHandle>,
keylog: bool,
cancel_token: CancellationToken,
static_config: Arc<StaticConfig>,
}

impl Endpoint {
Expand All @@ -359,16 +379,17 @@ impl Endpoint {
/// This is for internal use, the public interface is the [`Builder`] obtained from
/// [Self::builder]. See the methods on the builder for documentation of the parameters.
async fn bind(
server_config: Option<quinn::ServerConfig>,
static_config: StaticConfig,
msock_opts: magicsock::Options,
keylog: bool,
initial_alpns: Vec<Vec<u8>>,
) -> Result<Self> {
let secret_key = msock_opts.secret_key.clone();
let span = info_span!("magic_ep", me = %secret_key.public().fmt_short());
let span = info_span!("magic_ep", me = %static_config.secret_key.public().fmt_short());
let _guard = span.enter();
let msock = magicsock::MagicSock::spawn(msock_opts).await?;
trace!("created magicsock");

let server_config = static_config.create_server_config(initial_alpns)?;

let mut endpoint_config = quinn::EndpointConfig::default();
// Setting this to false means that quinn will ignore packets that have the QUIC fixed bit
// set to 0. The fixed bit is the 3rd bit of the first byte of a packet.
Expand All @@ -379,22 +400,31 @@ impl Endpoint {

let endpoint = quinn::Endpoint::new_with_abstract_socket(
endpoint_config,
server_config,
Some(server_config),
msock.clone(),
Arc::new(quinn::TokioRuntime),
)?;
trace!("created quinn endpoint");

Ok(Self {
secret_key: Arc::new(secret_key),
msock,
endpoint,
rtt_actor: Arc::new(rtt_actor::RttHandle::new()),
keylog,
cancel_token: CancellationToken::new(),
static_config: Arc::new(static_config),
})
}

/// Set the list of accepted ALPN protocols.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Set the list of accepted ALPN protocols.
/// Sets the list of accepted ALPN protocols.

https://github.com/rust-lang/rfcs/blob/master/text/1574-more-api-documentation-conventions.md#appendix-a-full-conventions-text

///
/// This will only affect new incoming connections.
/// Note that this *overrides* the current list of ALPNs.
pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) -> Result<()> {
let server_config = self.static_config.create_server_config(alpns)?;
self.endpoint.set_server_config(Some(server_config));
Ok(())
}

// # Methods for establishing connectivity.

/// Connects to a remote [`Endpoint`].
Expand Down Expand Up @@ -480,10 +510,10 @@ impl Endpoint {
let client_config = {
let alpn_protocols = vec![alpn.to_vec()];
let tls_client_config = tls::make_client_config(
&self.secret_key,
&self.static_config.secret_key,
Some(*node_id),
alpn_protocols,
self.keylog,
self.static_config.keylog,
)?;
let mut client_config = quinn::ClientConfig::new(Arc::new(tls_client_config));
let mut transport_config = quinn::TransportConfig::default();
Expand Down Expand Up @@ -579,15 +609,15 @@ impl Endpoint {

/// Returns the secret_key of this endpoint.
pub fn secret_key(&self) -> &SecretKey {
&self.secret_key
&self.static_config.secret_key
}

/// Returns the node id of this endpoint.
///
/// This ID is the unique addressing information of this node and other peers must know
/// it to be able to connect to this node.
pub fn node_id(&self) -> NodeId {
self.secret_key.public()
self.static_config.secret_key.public()
}

/// Returns the current [`NodeAddr`] for this endpoint.
Expand Down
4 changes: 4 additions & 0 deletions iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,7 @@ required-features = ["examples"]
[[example]]
name = "client"
required-features = ["examples"]

[[example]]
name = "custom-protocol"
required-features = ["examples"]
127 changes: 127 additions & 0 deletions iroh/examples/custom-protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::sync::Arc;

use anyhow::Result;
use clap::Parser;
use futures_lite::future::Boxed as BoxedFuture;
use iroh::{
client::MemIroh,
net::{
endpoint::{get_remote_node_id, Connecting},
Endpoint, NodeId,
},
node::ProtocolHandler,
};
use tracing_subscriber::{prelude::*, EnvFilter};

#[derive(Debug, Parser)]
pub struct Cli {
#[clap(subcommand)]
command: Command,
}

#[derive(Debug, Parser)]
pub enum Command {
Accept,
Connect { node: NodeId },
}

#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Cli::parse();
// create a new node
let builder = iroh::node::Node::memory().build().await?;
let proto = ExampleProto::new(builder.client().clone(), builder.endpoint().clone());
let node = builder
.accept(EXAMPLE_ALPN, Arc::new(proto.clone()))
.spawn()
.await?;

// print the ticket if this is the accepting side
match args.command {
Command::Accept => {
let node_id = node.node_id();
println!("node id: {node_id}");
// wait until ctrl-c
tokio::signal::ctrl_c().await?;
}
Command::Connect { node: node_id } => {
proto.connect(node_id).await?;
}
}

node.shutdown().await?;

Ok(())
}

const EXAMPLE_ALPN: &[u8] = b"example-proto/0";

#[derive(Debug, Clone)]
struct ExampleProto {
client: MemIroh,
endpoint: Endpoint,
}

impl ProtocolHandler for ExampleProto {
fn accept(self: Arc<Self>, connecting: Connecting) -> BoxedFuture<Result<()>> {
Box::pin(async move {
let connection = connecting.await?;
let peer = get_remote_node_id(&connection)?;
println!("accepted connection from {peer}");
let mut send_stream = connection.open_uni().await?;
// Let's create a new blob for each incoming connection.
// This functions as an example of using existing iroh functionality within a protocol
// (you likely don't want to create a new blob for each connection for real)
let content = format!("this blob is created for my beloved peer {peer} ♥");
let hash = self
.client
.blobs()
.add_bytes(content.as_bytes().to_vec())
.await?;
// Send the hash over our custom protocol.
send_stream.write_all(hash.hash.as_bytes()).await?;
send_stream.finish().await?;
println!("closing connection from {peer}");
Ok(())
})
}
}

impl ExampleProto {
pub fn new(client: MemIroh, endpoint: Endpoint) -> Self {
Self { client, endpoint }
}

pub async fn connect(&self, remote_node_id: NodeId) -> Result<()> {
println!("our node id: {}", self.endpoint.node_id());
println!("connecting to {remote_node_id}");
let conn = self
.endpoint
.connect_by_node_id(&remote_node_id, EXAMPLE_ALPN)
.await?;
let mut recv_stream = conn.accept_uni().await?;
let hash_bytes = recv_stream.read_to_end(32).await?;
let hash = iroh::blobs::Hash::from_bytes(hash_bytes.try_into().unwrap());
println!("received hash: {hash}");
self.client
.blobs()
.download(hash, remote_node_id.into())
.await?
.await?;
println!("blob downloaded");
let content = self.client.blobs().read_to_bytes(hash).await?;
let message = String::from_utf8(content.to_vec())?;
println!("blob content: {message}");
Ok(())
}
}

/// Set the RUST_LOG env var to one of {debug,info,warn} to see logging.
fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}
Loading
Loading