From 9f10152d885aaf2884f8ada5eecdfc25f3938bbf Mon Sep 17 00:00:00 2001 From: Kasey Date: Mon, 29 Jan 2024 14:08:22 -0500 Subject: [PATCH] fix(examples): Adjust and add examples (#1968) ## Description Fixes incorrect information in our `iroh/examples` Also adds an example that shows the most basic way to use iroh to fetch content. related to #1929 ## Notes & open questions Unfortunately, we don't seem to have very simple APIs for fetching (that isn't just exporting the content to an external path). Working through this. Also adds: - `BlobDownloadProgress` - this takes a stream of `DownloadProgress` and puts it into a form that will poll the stream by calling `finish` - adjusts the `blob_download0` function to not eat certain errors ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. --- iroh-net/Cargo.toml | 13 ++ iroh-net/examples/connect-unreliable.rs | 84 +++++++++ iroh-net/examples/connect.rs | 87 ++++++++++ iroh-net/examples/listen-unreliable.rs | 86 +++++++++ iroh-net/examples/listen.rs | 90 ++++++++++ iroh-net/examples/magic.rs | 164 ------------------ iroh/Cargo.toml | 14 +- iroh/examples/collection-fetch.rs | 130 ++++++++++++++ .../{collection.rs => collection-provide.rs} | 17 +- iroh/examples/hello-world-fetch.rs | 116 +++++++++++++ ...{hello-world.rs => hello-world-provide.rs} | 22 ++- iroh/src/client.rs | 71 ++++++++ iroh/src/node.rs | 127 ++++++++------ 13 files changed, 789 insertions(+), 232 deletions(-) create mode 100644 iroh-net/examples/connect-unreliable.rs create mode 100644 iroh-net/examples/connect.rs create mode 100644 iroh-net/examples/listen-unreliable.rs create mode 100644 iroh-net/examples/listen.rs delete mode 100644 iroh-net/examples/magic.rs create mode 100644 iroh/examples/collection-fetch.rs rename iroh/examples/{collection.rs => collection-provide.rs} (83%) create mode 100644 iroh/examples/hello-world-fetch.rs rename iroh/examples/{hello-world.rs => hello-world-provide.rs} (74%) diff --git a/iroh-net/Cargo.toml b/iroh-net/Cargo.toml index 4c84800a6c..c5f4de007e 100644 --- a/iroh-net/Cargo.toml +++ b/iroh-net/Cargo.toml @@ -126,3 +126,16 @@ metrics = ["iroh-metrics/metrics"] [[bin]] name = "derper" required-features = ["derper"] + +[[example]] +name = "listen" + +[[example]] +name = "connect" + +[[example]] +name = "listen-unreliable" + +[[example]] +name = "connect-unreliable" + diff --git a/iroh-net/examples/connect-unreliable.rs b/iroh-net/examples/connect-unreliable.rs new file mode 100644 index 0000000000..efac2c5db8 --- /dev/null +++ b/iroh-net/examples/connect-unreliable.rs @@ -0,0 +1,84 @@ +//! The smallest example showing how to use iroh-net and `MagicEndpoint` to connect to a remote node and pass bytes using unreliable datagrams. +//! +//! We use the node ID (the PublicKey of the remote node), the direct UDP addresses, and the DERP url to achieve a connection. +//! +//! This example uses the default DERP servers to attempt to holepunch, and will use that DERP server to relay packets if the two devices cannot establish a direct UDP connection. +//! +//! Run the `listen-unreliable` example first (`iroh-net/examples/listen-unreliable.rs`), which will give you instructions on how to run this example to watch two nodes connect and exchange bytes. +use std::net::SocketAddr; + +use clap::Parser; +use iroh_base::base32; +use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint, NodeAddr}; +use tracing::info; +use url::Url; + +// An example ALPN that we are using to communicate over the `MagicEndpoint` +const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; + +#[derive(Debug, Parser)] +struct Cli { + /// The id of the remote node. + #[clap(long)] + node_id: iroh_net::NodeId, + /// The list of direct UDP addresses for the remote node. + #[clap(long, value_parser, num_args = 1.., value_delimiter = ' ')] + addrs: Vec, + /// The url of the DERP server the remote node can also be reached at. + #[clap(long)] + derp_url: Url, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + println!("\nconnect (unreliable) example!\n"); + let args = Cli::parse(); + let secret_key = SecretKey::generate(); + println!("secret key: {}", base32::fmt(secret_key.to_bytes())); + + // Build a `MagicEndpoint`, which uses PublicKeys as node identifiers, uses QUIC for directly connecting to other nodes, and uses the DERP protocol and DERP servers to holepunch direct connections between nodes when there are NATs or firewalls preventing direct connections. If no direct connection can be made, packets are relayed over the DERP servers. + let endpoint = MagicEndpoint::builder() + // The secret key is used to authenticate with other nodes. The PublicKey portion of this secret key is how we identify nodes, often referred to as the `node_id` in our codebase. + .secret_key(secret_key) + // Set the ALPN protocols this endpoint will accept on incoming connections + .alpns(vec![EXAMPLE_ALPN.to_vec()]) + // `DerpMode::Default` means that we will use the default DERP servers to holepunch and relay. + // Use `DerpMode::Custom` to pass in a `DerpMap` with custom DERP urls. + // Use `DerpMode::Disable` to disable holepunching and relaying over HTTPS + // If you want to experiment with relaying using your own DERP server, you must pass in the same custom DERP url to both the `listen` code AND the `connect` code + .derp_mode(DerpMode::Default) + // You can choose a port to bind to, but passing in `0` will bind the socket to a random available port + .bind(0) + .await?; + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + for local_endpoint in endpoint.local_endpoints().await? { + println!("\t{}", local_endpoint.addr) + } + + let derp_url = endpoint + .my_derp() + .expect("should be connected to a DERP server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected DERP server"); + println!("node DERP server url: {derp_url}\n"); + // Build a `NodeAddr` from the node_id, DERP url, and UDP addresses. + let addr = NodeAddr::from_parts(args.node_id, Some(args.derp_url), args.addrs); + + // Attempt to connect, over the given ALPN. + // Returns a QUIC connection. + let conn = endpoint.connect(addr, EXAMPLE_ALPN).await?; + info!("connected"); + + // Send a datagram over the connection. + let message = format!("{me} is saying 'hello!'"); + conn.send_datagram(message.as_bytes().to_vec().into())?; + + // Read a datagram over the connection. + let message = conn.read_datagram().await?; + let message = String::from_utf8(message.into())?; + println!("received: {message}"); + + Ok(()) +} diff --git a/iroh-net/examples/connect.rs b/iroh-net/examples/connect.rs new file mode 100644 index 0000000000..262a6384e3 --- /dev/null +++ b/iroh-net/examples/connect.rs @@ -0,0 +1,87 @@ +//! The smallest example showing how to use iroh-net and `MagicEndpoint` to connect to a remote node. +//! +//! We use the node ID (the PublicKey of the remote node), the direct UDP addresses, and the DERP url to achieve a connection. +//! +//! This example uses the default DERP servers to attempt to holepunch, and will use that DERP server to relay packets if the two devices cannot establish a direct UDP connection. +//! +//! Run the `listen` example first (`iroh-net/examples/listen.rs`), which will give you instructions on how to run this example to watch two nodes connect and exchange bytes. +use std::net::SocketAddr; + +use clap::Parser; +use iroh_base::base32; +use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint, NodeAddr}; +use tracing::info; +use url::Url; + +// An example ALPN that we are using to communicate over the `MagicEndpoint` +const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; + +#[derive(Debug, Parser)] +struct Cli { + /// The id of the remote node. + #[clap(long)] + node_id: iroh_net::NodeId, + /// The list of direct UDP addresses for the remote node. + #[clap(long, value_parser, num_args = 1.., value_delimiter = ' ')] + addrs: Vec, + /// The url of the DERP server the remote node can also be reached at. + #[clap(long)] + derp_url: Url, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + println!("\nconnect example!\n"); + let args = Cli::parse(); + let secret_key = SecretKey::generate(); + println!("secret key: {}", base32::fmt(secret_key.to_bytes())); + + // Build a `MagicEndpoint`, which uses PublicKeys as node identifiers, uses QUIC for directly connecting to other nodes, and uses the DERP protocol and DERP servers to holepunch direct connections between nodes when there are NATs or firewalls preventing direct connections. If no direct connection can be made, packets are relayed over the DERP servers. + let endpoint = MagicEndpoint::builder() + // The secret key is used to authenticate with other nodes. The PublicKey portion of this secret key is how we identify nodes, often referred to as the `node_id` in our codebase. + .secret_key(secret_key) + // Set the ALPN protocols this endpoint will accept on incoming connections + .alpns(vec![EXAMPLE_ALPN.to_vec()]) + // `DerpMode::Default` means that we will use the default DERP servers to holepunch and relay. + // Use `DerpMode::Custom` to pass in a `DerpMap` with custom DERP urls. + // Use `DerpMode::Disable` to disable holepunching and relaying over HTTPS + // If you want to experiment with relaying using your own DERP server, you must pass in the same custom DERP url to both the `listen` code AND the `connect` code + .derp_mode(DerpMode::Default) + // You can choose a port to bind to, but passing in `0` will bind the socket to a random available port + .bind(0) + .await?; + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + for local_endpoint in endpoint.local_endpoints().await? { + println!("\t{}", local_endpoint.addr) + } + + let derp_url = endpoint + .my_derp() + .expect("should be connected to a DERP server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected DERP server"); + println!("node DERP server url: {derp_url}\n"); + // Build a `NodeAddr` from the node_id, DERP url, and UDP addresses. + let addr = NodeAddr::from_parts(args.node_id, Some(args.derp_url), args.addrs); + + // Attempt to connect, over the given ALPN. + // Returns a Quinn connection. + let conn = endpoint.connect(addr, EXAMPLE_ALPN).await?; + info!("connected"); + + // Use the Quinn API to send and recv content. + let (mut send, mut recv) = conn.open_bi().await?; + + let message = format!("{me} is saying 'hello!'"); + send.write_all(message.as_bytes()).await?; + + // Call `finish` to close the send side of the connection gracefully. + send.finish().await?; + let message = recv.read_to_end(100).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + + Ok(()) +} diff --git a/iroh-net/examples/listen-unreliable.rs b/iroh-net/examples/listen-unreliable.rs new file mode 100644 index 0000000000..93d885fd25 --- /dev/null +++ b/iroh-net/examples/listen-unreliable.rs @@ -0,0 +1,86 @@ +//! The smallest example showing how to use iroh-net and `MagicEndpoint` to connect two devices and pass bytes using unreliable datagrams. +//! +//! This example uses the default DERP servers to attempt to holepunch, and will use that DERP server to relay packets if the two devices cannot establish a direct UDP connection. +//! run this example from the project root: +//! $ cargo run --example listen-unreliable +use iroh_base::base32; +use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint}; +use tracing::info; + +// An example ALPN that we are using to communicate over the `MagicEndpoint` +const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + println!("\nlisten (unreliable) example!\n"); + let secret_key = SecretKey::generate(); + println!("secret key: {}", base32::fmt(secret_key.to_bytes())); + + // Build a `MagicEndpoint`, which uses PublicKeys as node identifiers, uses QUIC for directly connecting to other nodes, and uses the DERP protocol and DERP servers to holepunch direct connections between nodes when there are NATs or firewalls preventing direct connections. If no direct connection can be made, packets are relayed over the DERP servers. + let endpoint = MagicEndpoint::builder() + // The secret key is used to authenticate with other nodes. The PublicKey portion of this secret key is how we identify nodes, often referred to as the `node_id` in our codebase. + .secret_key(secret_key) + // set the ALPN protocols this endpoint will accept on incoming connections + .alpns(vec![EXAMPLE_ALPN.to_vec()]) + // `DerpMode::Default` means that we will use the default DERP servers to holepunch and relay. + // Use `DerpMode::Custom` to pass in a `DerpMap` with custom DERP urls. + // Use `DerpMode::Disable` to disable holepunching and relaying over HTTPS + // If you want to experiment with relaying using your own DERP server, you must pass in the same custom DERP url to both the `listen` code AND the `connect` code + .derp_mode(DerpMode::Default) + // you can choose a port to bind to, but passing in `0` will bind the socket to a random available port + .bind(0) + .await?; + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + + let local_addrs = endpoint + .local_endpoints() + .await? + .into_iter() + .map(|endpoint| { + let addr = endpoint.addr.to_string(); + println!("\t{addr}"); + addr + }) + .collect::>() + .join(" "); + + let derp_url = endpoint + .my_derp() + .expect("should be connected to a DERP server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected DERP server"); + println!("node DERP server url: {derp_url}"); + println!("\nin a separate terminal run:"); + + println!( + "\tcargo run --example connect-unreliable -- --node-id {me} --addrs \"{local_addrs}\" --derp-url {derp_url}\n" + ); + // accept incoming connections, returns a normal QUIC connection + + while let Some(conn) = endpoint.accept().await { + // accept the connection and extract the `node_id` and ALPN + let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + info!( + "new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})", + conn.remote_address() + ); + // spawn a task to handle reading and writing off of the connection + tokio::spawn(async move { + // use the `quinn` API to read a datagram off the connection, and send a datagra, in return + while let Ok(message) = conn.read_datagram().await { + let message = String::from_utf8(message.into())?; + println!("received: {message}"); + + let message = format!("hi! you connected to {me}. bye bye"); + conn.send_datagram(message.as_bytes().to_vec().into())?; + } + + Ok::<_, anyhow::Error>(()) + }); + } + // stop with SIGINT (ctrl-c) + + Ok(()) +} diff --git a/iroh-net/examples/listen.rs b/iroh-net/examples/listen.rs new file mode 100644 index 0000000000..252571b5ba --- /dev/null +++ b/iroh-net/examples/listen.rs @@ -0,0 +1,90 @@ +//! The smallest example showing how to use iroh-net and `MagicEndpoint` to connect two devices. +//! +//! This example uses the default DERP servers to attempt to holepunch, and will use that DERP server to relay packets if the two devices cannot establish a direct UDP connection. +//! run this example from the project root: +//! $ cargo run --example listen +use iroh_base::base32; +use iroh_net::{derp::DerpMode, key::SecretKey, MagicEndpoint}; +use tracing::{debug, info}; + +// An example ALPN that we are using to communicate over the `MagicEndpoint` +const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + println!("\nlisten example!\n"); + let secret_key = SecretKey::generate(); + println!("secret key: {}", base32::fmt(secret_key.to_bytes())); + + // Build a `MagicEndpoint`, which uses PublicKeys as node identifiers, uses QUIC for directly connecting to other nodes, and uses the DERP protocol and DERP servers to holepunch direct connections between nodes when there are NATs or firewalls preventing direct connections. If no direct connection can be made, packets are relayed over the DERP servers. + let endpoint = MagicEndpoint::builder() + // The secret key is used to authenticate with other nodes. The PublicKey portion of this secret key is how we identify nodes, often referred to as the `node_id` in our codebase. + .secret_key(secret_key) + // set the ALPN protocols this endpoint will accept on incoming connections + .alpns(vec![EXAMPLE_ALPN.to_vec()]) + // `DerpMode::Default` means that we will use the default DERP servers to holepunch and relay. + // Use `DerpMode::Custom` to pass in a `DerpMap` with custom DERP urls. + // Use `DerpMode::Disable` to disable holepunching and relaying over HTTPS + // If you want to experiment with relaying using your own DERP server, you must pass in the same custom DERP url to both the `listen` code AND the `connect` code + .derp_mode(DerpMode::Default) + // you can choose a port to bind to, but passing in `0` will bind the socket to a random available port + .bind(0) + .await?; + + let me = endpoint.node_id(); + println!("node id: {me}"); + println!("node listening addresses:"); + + let local_addrs = endpoint + .local_endpoints() + .await? + .into_iter() + .map(|endpoint| { + let addr = endpoint.addr.to_string(); + println!("\t{addr}"); + addr + }) + .collect::>() + .join(" "); + + let derp_url = endpoint + .my_derp() + .expect("should be connected to a DERP server, try calling `endpoint.local_endpoints()` or `endpoint.connect()` first, to ensure the endpoint has actually attempted a connection before checking for the connected DERP server"); + println!("node DERP server url: {derp_url}"); + println!("\nin a separate terminal run:"); + + println!( + "\tcargo run --example connect -- --node-id {me} --addrs \"{local_addrs}\" --derp-url {derp_url}\n" + ); + // accept incoming connections, returns a normal QUIC connection + while let Some(conn) = endpoint.accept().await { + // accept the connection and extract the `node_id` and ALPN + let (node_id, alpn, conn) = iroh_net::magic_endpoint::accept_conn(conn).await?; + info!( + "new connection from {node_id} with ALPN {alpn} (coming from {})", + conn.remote_address() + ); + + // spawn a task to handle reading and writing off of the connection + tokio::spawn(async move { + // accept a bi-directional QUIC connection + // use the `quinn` APIs to send and recv content + let (mut send, mut recv) = conn.accept_bi().await?; + debug!("accepted bi stream, waiting for data..."); + let message = recv.read_to_end(100).await?; + let message = String::from_utf8(message)?; + println!("received: {message}"); + + let message = format!("hi! you connected to {me}. bye bye"); + send.write_all(message.as_bytes()).await?; + // call `finish` to close the connection gracefully + send.finish().await?; + + Ok::<_, anyhow::Error>(()) + }); + } + // stop with SIGINT (ctrl-c) + + Ok(()) +} diff --git a/iroh-net/examples/magic.rs b/iroh-net/examples/magic.rs deleted file mode 100644 index 2aa6f7985f..0000000000 --- a/iroh-net/examples/magic.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::net::SocketAddr; - -use clap::Parser; -use iroh_base::base32; -use iroh_net::{ - derp::{DerpMap, DerpMode}, - key::SecretKey, - magic_endpoint::accept_conn, - MagicEndpoint, NodeAddr, -}; -use tracing::{debug, info}; -use url::Url; - -const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/magic/0"; - -#[derive(Debug, Parser)] -struct Cli { - #[clap(short, long)] - secret: Option, - #[clap(short, long, default_value = "n0/iroh/examples/magic/0")] - alpn: String, - #[clap(short, long, default_value = "0")] - bind_port: u16, - #[clap(short, long)] - derp_url: Option, - #[clap(subcommand)] - command: Command, -} - -#[derive(Debug, Parser)] -enum Command { - Listen, - ListenUnreliable, - Connect { - node_id: String, - #[clap(long)] - addrs: Option>, - #[clap(long)] - derp_url: Option, - }, - ConnectUnreliable { - node_id: String, - #[clap(long)] - addrs: Option>, - #[clap(long)] - derp_url: Option, - }, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); - let args = Cli::parse(); - let secret_key = match args.secret { - None => { - let secret_key = SecretKey::generate(); - println!("our secret key: {}", base32::fmt(secret_key.to_bytes())); - secret_key - } - Some(key) => parse_secret(&key)?, - }; - - let derp_mode = match args.derp_url { - None => DerpMode::Default, - Some(url) => DerpMode::Custom(DerpMap::from_url(url)), - }; - - let endpoint = MagicEndpoint::builder() - .secret_key(secret_key) - .alpns(vec![args.alpn.to_string().into_bytes()]) - .derp_mode(derp_mode) - .bind(args.bind_port) - .await?; - - let me = endpoint.node_id(); - let local_addr = endpoint.local_endpoints().await?; - println!("magic socket listening on {local_addr:?}"); - println!("our node id: {me}"); - - match args.command { - Command::Listen => { - while let Some(conn) = endpoint.accept().await { - let (node_id, alpn, conn) = accept_conn(conn).await?; - info!( - "new connection from {node_id} with ALPN {alpn} (coming from {})", - conn.remote_address() - ); - tokio::spawn(async move { - let (mut send, mut recv) = conn.accept_bi().await?; - debug!("accepted bi stream, waiting for data..."); - let message = recv.read_to_end(1000).await?; - let message = String::from_utf8(message)?; - println!("received: {message}"); - - let message = format!("hi! you connected to {me}. bye bye"); - send.write_all(message.as_bytes()).await?; - send.finish().await?; - - Ok::<_, anyhow::Error>(()) - }); - } - } - Command::ListenUnreliable => { - while let Some(conn) = endpoint.accept().await { - let (node_id, alpn, conn) = accept_conn(conn).await?; - info!( - "new (unreliable) connection from {node_id} with ALPN {alpn} (coming from {})", - conn.remote_address() - ); - tokio::spawn(async move { - while let Ok(message) = conn.read_datagram().await { - let message = String::from_utf8(message.into())?; - println!("received: {message}"); - - let message = format!("hi! you connected to {me}. bye bye"); - conn.send_datagram(message.as_bytes().to_vec().into())?; - } - - Ok::<_, anyhow::Error>(()) - }); - } - } - Command::Connect { - node_id, - addrs, - derp_url, - } => { - let addr = NodeAddr::from_parts(node_id.parse()?, derp_url, addrs.unwrap_or_default()); - let conn = endpoint.connect(addr, EXAMPLE_ALPN).await?; - info!("connected"); - - let (mut send, mut recv) = conn.open_bi().await?; - - let message = format!("hello here's {me}"); - send.write_all(message.as_bytes()).await?; - send.finish().await?; - let message = recv.read_to_end(100).await?; - let message = String::from_utf8(message)?; - println!("received: {message}"); - } - Command::ConnectUnreliable { - node_id, - addrs, - derp_url, - } => { - let addr = NodeAddr::from_parts(node_id.parse()?, derp_url, addrs.unwrap_or_default()); - let conn = endpoint.connect(addr, EXAMPLE_ALPN).await?; - info!("connected"); - - let message = format!("hello here's {me}"); - conn.send_datagram(message.as_bytes().to_vec().into())?; - let message = conn.read_datagram().await?; - let message = String::from_utf8(message.into())?; - println!("received: {message}"); - } - } - Ok(()) -} - -fn parse_secret(secret: &str) -> anyhow::Result { - let bytes: [u8; 32] = base32::parse_array(secret)?; - let key = SecretKey::from(bytes); - Ok(key) -} diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 8053b6a37b..c1ed084098 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -103,15 +103,23 @@ name = "iroh" required-features = ["cli"] [[example]] -name = "collection" +name = "hello-world-provide" required-features = [] [[example]] -name = "dump-blob-stream" +name = "hello-world-fetch" +required-features = [] + +[[example]] +name = "collection-provide" required-features = [] [[example]] -name = "hello-world" +name = "collection-fetch" +required-features = [] + +[[example]] +name = "dump-blob-stream" required-features = [] [[example]] diff --git a/iroh/examples/collection-fetch.rs b/iroh/examples/collection-fetch.rs new file mode 100644 index 0000000000..99e11b3f57 --- /dev/null +++ b/iroh/examples/collection-fetch.rs @@ -0,0 +1,130 @@ +//! An example that fetches an iroh collection and prints the contents. +//! Will only work with collections that contain text, and is meant as a companion to the and `collection-provide` example. +//! +//! This is using an in memory database and a random node id. +//! Run the `collection-provide` example, which will give you instructions on how to run this example. +use anyhow::{bail, Context, Result}; +use iroh::{client::BlobDownloadProgress, rpc_protocol::BlobDownloadRequest}; +use iroh_bytes::BlobFormat; +use std::env; +use std::str::FromStr; +use tokio_util::task::LocalPoolHandle; +use tracing_subscriber::{prelude::*, EnvFilter}; + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + println!("\ncollection fetch example!"); + // get the ticket + let args: Vec = env::args().collect(); + + if args.len() != 2 { + bail!("expected one argument [BLOB_TICKET]\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example collection-provide`"); + } + + // deserialize ticket string into a ticket + let ticket = + iroh::ticket::BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example collection-provide`")?; + + // create a new, empty in memory database + let db = iroh_bytes::store::mem::Store::default(); + // create an in-memory doc store (not used in the example) + let doc_store = iroh_sync::store::memory::Store::default(); + // create a new iroh runtime with 1 worker thread + let lp = LocalPoolHandle::new(1); + // create a new node + let node = iroh::node::Node::builder(db, doc_store) + .local_pool(&lp) + .spawn() + .await?; + // create a client that allows us to interact with the running node + let client = node.client(); + + println!("fetching hash: {}", ticket.hash()); + println!("node id: {}", node.node_id()); + println!("node listening addresses:"); + let addrs = node.my_addr().await?; + for addr in addrs.direct_addresses() { + println!("\t{:?}", addr); + } + println!( + "node DERP server url: {:?}", + node.my_derp() + .expect("a default DERP url should be provided") + .to_string() + ); + let req = BlobDownloadRequest { + // The hash of the content we are trying to download. Provided in the ticket. + hash: ticket.hash(), + + // The format here is referring to the `BlobFormat`. We can request to download a single blob (which you can think of as a single file) or a `HashSeq` ("hash sequence"), which is a list of blobs you want to download. + // Iroh has a special kind of `HashSeq` called a "collection". A collection is just a `HashSeq` that reserves the first blob in the sequence for metadata about the `HashSeq` + // The metadata primarily contains the names of the blobs, which allows us, for example, to preserve filenames. + // When interacting with the iroh API, you will most likely be using blobs and collections. + format: ticket.format(), + + // The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node. + // This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the DERP url of that node. The DERP url is the url of the DERP server that that node is connected to. + // If the direct UDP addresses to that node do not work, than we can use the DERP node to attempt to holepunch between your current node and the remote node. + // If holepunching fails, iroh will use the DERP node to proxy a connection to the remote node over HTTPS. + // Thankfully, the ticket contains all of this information + peer: ticket.node_addr().clone(), + + // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. + tag: iroh::rpc_protocol::SetTagOption::Auto, + + // The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well). + out: iroh::rpc_protocol::DownloadLocation::Internal, + }; + + // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download. + let download_stream = client.blobs.download(req).await?; + + // You can also use the `BlobDownloadProgress` struct, that has the method `finish` that will poll the `DownloadProgress` stream for you. + let outcome = BlobDownloadProgress::new(download_stream) + .finish() + .await + .context("unable to download hash")?; + + println!( + "\ndownloaded {} bytes from node {}", + outcome.downloaded_size, + ticket.node_addr().node_id + ); + + // Get the content we have just fetched from the iroh database. + if ticket.format() == BlobFormat::HashSeq { + // If the `BlobFormat` is `HashSeq`, then we can assume for the example (and for any `HashSeq` that is derived from any iroh API), that it can be parsed as a `Collection` + // A `Collection` is a special `HashSeq`, where we preserve the names of any blobs added to the collection. (We do this by designating the first entry in the `Collection` as meta data.) + // To get the content of the collection, we first get the collection from the database using the `blobs` API + let collection = client + .blobs + .get_collection(ticket.hash()) + .await + .context("expect hash with `BlobFormat::HashSeq` to be a collection")?; + // Then we iterate through the collection, which gives us the name and hash of each entry in the collection. + for (name, hash) in collection.iter() { + println!("\nname: {name}, hash: {hash}"); + // Use the hash of the blob to get the content. + let content = client.blobs.read_to_bytes(*hash).await?; + println!( + "{}", + String::from_utf8(content.to_vec()) + .context("unable to parse blob as as utf-8 string")? + ); + } + } else { + bail!("'collection' example expects to fetch a collection, but the ticket indicates a single blob."); + } + + Ok(()) +} diff --git a/iroh/examples/collection.rs b/iroh/examples/collection-provide.rs similarity index 83% rename from iroh/examples/collection.rs rename to iroh/examples/collection-provide.rs index 01826abe59..f59d054fb6 100644 --- a/iroh/examples/collection.rs +++ b/iroh/examples/collection-provide.rs @@ -5,7 +5,7 @@ //! //! This is using an in memory database and a random node id. //! run this example from the project root: -//! $ cargo run -p collection +//! $ cargo run --example collection-provide use iroh_bytes::{format::collection::Collection, BlobFormat, Hash}; use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -22,6 +22,7 @@ pub fn setup_logging() { #[tokio::main] async fn main() -> anyhow::Result<()> { setup_logging(); + println!("\ncollection provide example!"); // create a new database and add two blobs let (mut db, names) = iroh_bytes::store::readonly_mem::Store::new([ ("blob1", b"the first blob of bytes".to_vec()), @@ -51,14 +52,22 @@ async fn main() -> anyhow::Result<()> { let ticket = node.ticket(hash, BlobFormat::HashSeq).await?; // print some info about the node println!("serving hash: {}", ticket.hash()); - println!("node NodeId: {}", ticket.node_addr().node_id); + println!("node id: {}", ticket.node_addr().node_id); println!("node listening addresses:"); for addr in ticket.node_addr().direct_addresses() { println!("\t{:?}", addr); } + println!( + "node DERP server url: {:?}", + ticket + .node_addr() + .derp_url() + .expect("a default DERP url should be provided") + .to_string() + ); // print the ticket, containing all the above information - println!("in another terminal, run:"); - println!("\t$ cargo run -- get --ticket {}", ticket); + println!("\nin another terminal, run:"); + println!("\tcargo run --example collection-fetch {}", ticket); // wait for the node to finish, this will block indefinitely // stop with SIGINT (ctrl+c) node.await?; diff --git a/iroh/examples/hello-world-fetch.rs b/iroh/examples/hello-world-fetch.rs new file mode 100644 index 0000000000..30cdb916ee --- /dev/null +++ b/iroh/examples/hello-world-fetch.rs @@ -0,0 +1,116 @@ +//! An example that fetches an iroh blob and prints the contents. +//! Will only work with blobs and collections that contain text, and is meant as a companion to the `hello-world-get` examples. +//! +//! This is using an in memory database and a random node id. +//! Run the `provide` example, which will give you instructions on how to run this example. +use anyhow::{bail, Context, Result}; +use iroh::{client::BlobDownloadProgress, rpc_protocol::BlobDownloadRequest}; +use iroh_bytes::BlobFormat; +use std::env; +use std::str::FromStr; +use tokio_util::task::LocalPoolHandle; +use tracing_subscriber::{prelude::*, EnvFilter}; + +// set the RUST_LOG env var to one of {debug,info,warn} to see logging info +pub fn setup_logging() { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); +} + +#[tokio::main] +async fn main() -> Result<()> { + setup_logging(); + println!("\n'Hello World' fetch example!"); + // get the ticket + let args: Vec = env::args().collect(); + + if args.len() != 2 { + bail!("expected one argument [BLOB_TICKET]\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`"); + } + + // deserialize ticket string into a ticket + let ticket = + iroh::ticket::BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`")?; + + // create a new, empty in memory database + let db = iroh_bytes::store::mem::Store::default(); + // create an in-memory doc store (not used in the example) + let doc_store = iroh_sync::store::memory::Store::default(); + // create a new iroh runtime with 1 worker thread + let lp = LocalPoolHandle::new(1); + // create a new node + let node = iroh::node::Node::builder(db, doc_store) + .local_pool(&lp) + .spawn() + .await?; + // create a client that allows us to interact with the running node + let client = node.client(); + + println!("fetching hash: {}", ticket.hash()); + println!("node id: {}", node.node_id()); + println!("node listening addresses:"); + let addrs = node.my_addr().await?; + for addr in addrs.direct_addresses() { + println!("\t{:?}", addr); + } + println!( + "node DERP server url: {:?}", + node.my_derp() + .expect("a default DERP url should be provided") + .to_string() + ); + let req = BlobDownloadRequest { + // The hash of the content we are trying to download. Provided in the ticket. + hash: ticket.hash(), + + // The format here is referring to the `BlobFormat`. We can request to download a single blob (which you can think of as a single file) or a `HashSeq` ("hash sequence"), which is a list of blobs you want to download. + // Iroh has a special kind of `HashSeq` called a "collection". A collection is just a `HashSeq` that reserves the first blob in the sequence for metadata about the `HashSeq` + // The metadata primarily contains the names of the blobs, which allows us, for example, to preserve filenames. + // When interacting with the iroh API, you will most likely be using blobs and collections. + format: ticket.format(), + + // The `peer` field is a `NodeAddr`, which combines all of the known address information we have for the remote node. + // This includes the `node_id` (or `PublicKey` of the node), any direct UDP addresses we know about for that node, as well as the DERP url of that node. The DERP url is the url of the DERP server that that node is connected to. + // If the direct UDP addresses to that node do not work, than we can use the DERP node to attempt to holepunch between your current node and the remote node. + // If holepunching fails, iroh will use the DERP node to proxy a connection to the remote node over HTTPS. + // Thankfully, the ticket contains all of this information + peer: ticket.node_addr().clone(), + + // You can create a special tag name (`SetTagOption::Named`), or create an automatic tag that is derived from the timestamp. + tag: iroh::rpc_protocol::SetTagOption::Auto, + + // The `DownloadLocation` can be `Internal`, which saves the blob in the internal data store, or `External`, which saves the data to the provided path (and optionally also inside the iroh internal data store as well). + out: iroh::rpc_protocol::DownloadLocation::Internal, + }; + + // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress on the state of your download. + let download_stream = client.blobs.download(req).await?; + + // You can also use the `BlobDownloadProgress` struct, that has the method `finish` that will poll the `DownloadProgress` stream for you. + let outcome = BlobDownloadProgress::new(download_stream) + .finish() + .await + .context("unable to download hash")?; + + println!( + "\ndownloaded {} bytes from node {}", + outcome.downloaded_size, + ticket.node_addr().node_id + ); + + // Get the content we have just fetched from the iroh database. + // If the `BlobFormat` is `Raw`, we have the hash for a single blob, and simply need to read the blob using the `blobs` API on the client to get the content. + if ticket.format() == BlobFormat::Raw { + let bytes = client.blobs.read_to_bytes(ticket.hash()).await?; + let s = + String::from_utf8(bytes.to_vec()).context("unable to parse blob as as utf-8 string")?; + println!("{s}"); + } else { + bail!("'Hello World' example expects to fetch a single blob, but the ticket indicates a collection."); + } + + Ok(()) +} diff --git a/iroh/examples/hello-world.rs b/iroh/examples/hello-world-provide.rs similarity index 74% rename from iroh/examples/hello-world.rs rename to iroh/examples/hello-world-provide.rs index 296ea008c1..525aab2817 100644 --- a/iroh/examples/hello-world.rs +++ b/iroh/examples/hello-world-provide.rs @@ -1,10 +1,8 @@ //! The smallest possible example to spin up a node and serve a single blob. //! -//! This can be downloaded using the iroh CLI. -//! //! This is using an in memory database and a random node id. //! run this example from the project root: -//! $ cargo run --example hello-world +//! $ cargo run --example hello-world-provide use iroh_bytes::BlobFormat; use tokio_util::task::LocalPoolHandle; use tracing_subscriber::{prelude::*, EnvFilter}; @@ -21,6 +19,7 @@ pub fn setup_logging() { #[tokio::main] async fn main() -> anyhow::Result<()> { setup_logging(); + println!("'Hello World' provide example!"); // create a new, empty in memory database let mut db = iroh_bytes::store::readonly_mem::Store::default(); // create an in-memory doc store (not used in the example) @@ -38,15 +37,24 @@ async fn main() -> anyhow::Result<()> { let ticket = node.ticket(hash, BlobFormat::Raw).await?; // print some info about the node println!("serving hash: {}", ticket.hash()); - println!("node NodeId: {}", ticket.node_addr().node_id); + println!("node id: {}", ticket.node_addr().node_id); println!("node listening addresses:"); for addr in ticket.node_addr().direct_addresses() { println!("\t{:?}", addr); } + println!( + "node DERP server url: {:?}", + ticket + .node_addr() + .derp_url() + .expect("a default DERP url should be provided") + .to_string() + ); // print the ticket, containing all the above information - println!("in another terminal, run:"); - println!("\t$ cargo run -- get --ticket {}", ticket); - // wait for the node to finish + println!("\nin another terminal, run:"); + println!("\t cargo run --example hello-world-fetch {}", ticket); + // wait for the node to finish, this will block indefinitely + // stop with SIGINT (ctrl+c) node.await?; Ok(()) } diff --git a/iroh/src/client.rs b/iroh/src/client.rs index abf52592c8..81d417785e 100644 --- a/iroh/src/client.rs +++ b/iroh/src/client.rs @@ -509,6 +509,77 @@ impl Stream for BlobAddProgress { } } +/// Outcome of a blob download operation. +#[derive(Debug, Clone)] +pub struct BlobDownloadOutcome { + /// The size of the data we already had locally + pub local_size: u64, + /// The size of the data we downloaded from the network + pub downloaded_size: u64, +} + +/// Progress stream for blob download operations. +#[derive(derive_more::Debug)] +pub struct BlobDownloadProgress { + #[debug(skip)] + stream: Pin> + Send + Unpin + 'static>>, +} + +impl BlobDownloadProgress { + /// Create a `BlobDownloadProgress` that can help you easily poll the `DownloadProgress` stream from your download until it is finished or errors. + pub fn new( + stream: (impl Stream, impl Into>> + + Send + + Unpin + + 'static), + ) -> Self { + let stream = stream.map(|item| match item { + Ok(item) => Ok(item.into()), + Err(err) => Err(err.into()), + }); + Self { + stream: Box::pin(stream), + } + } + /// Finish writing the stream, ignoring all intermediate progress events. + /// + /// Returns a [`BlobDownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally. + /// When importing a single blob, this is the size of that blob. + /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself). + pub async fn finish(mut self) -> Result { + let mut local_size = 0; + let mut network_size = 0; + while let Some(msg) = self.next().await { + match msg? { + DownloadProgress::Found { size, .. } => { + network_size += size; + } + + DownloadProgress::FoundLocal { size, .. } => { + local_size += size; + } + DownloadProgress::AllDone => { + let outcome = BlobDownloadOutcome { + local_size, + downloaded_size: network_size, + }; + return Ok(outcome); + } + DownloadProgress::Abort(err) => return Err(err.into()), + _ => {} + } + } + Err(anyhow!("Response stream ended prematurely")) + } +} + +impl Stream for BlobDownloadProgress { + type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.poll_next_unpin(cx) + } +} + /// Data reader for a single blob. /// /// Implements [`AsyncRead`]. diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 41e9ba4747..b767ae11f5 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -1111,69 +1111,32 @@ impl RpcHandler { msg: BlobDownloadRequest, progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { - let local = self.inner.rt.clone(); - let hash = msg.hash; - let format = msg.format; let db = self.inner.db.clone(); - let haf = HashAndFormat { hash, format }; + let peer = msg.peer.clone(); - let temp_pin = db.temp_tag(haf); + let hash_and_format = HashAndFormat { + hash: msg.hash, + format: msg.format, + }; + let temp_pin = db.temp_tag(hash_and_format); let ep = self.inner.endpoint.clone(); - let get_conn = - move || async move { ep.connect(msg.peer, iroh_bytes::protocol::ALPN).await }; + let get_conn = move || async move { ep.connect(peer, iroh_bytes::protocol::ALPN).await }; progress.send(DownloadProgress::Connected).await?; let db = self.inner.db.clone(); let this = self.clone(); - let _export = local.spawn_pinned(move || async move { - let stats = iroh_bytes::get::db::get_to_db( - &db, - get_conn, - &HashAndFormat { - hash: msg.hash, - format: msg.format, - }, - progress.clone(), - ) - .await?; - - progress - .send(DownloadProgress::NetworkDone { - bytes_written: stats.bytes_written, - bytes_read: stats.bytes_read, - elapsed: stats.elapsed, - }) - .await?; - match msg.out { - DownloadLocation::External { path, in_place } => { - if let Err(cause) = this - .blob_export( - path, - hash, - msg.format.is_hash_seq(), - in_place, - progress.clone(), - ) - .await - { - progress.send(DownloadProgress::Abort(cause.into())).await?; - } - } - DownloadLocation::Internal => { - // nothing to do - } - } - match msg.tag { - SetTagOption::Named(tag) => { - db.set_tag(tag, Some(haf)).await?; - } - SetTagOption::Auto => { - db.create_tag(haf).await?; - } + self.inner.rt.spawn_pinned(move || async move { + if let Err(err) = download_progress(db, get_conn, msg, progress.clone(), this).await { + progress + .send(DownloadProgress::Abort(err.into())) + .await + .ok(); + drop(temp_pin); + return; } + drop(temp_pin); - progress.send(DownloadProgress::AllDone).await?; - anyhow::Ok(()) + progress.send(DownloadProgress::AllDone).await.ok(); }); Ok(()) } @@ -1588,6 +1551,62 @@ impl RpcHandler { } } +async fn download_progress( + db: D, + get_conn: C, + msg: BlobDownloadRequest, + progress: impl ProgressSender + IdGenerator, + rpc: RpcHandler, +) -> Result<()> +where + D: BaoStore, + C: FnOnce() -> F, + F: Future>, +{ + let hash_and_format = HashAndFormat { + hash: msg.hash, + format: msg.format, + }; + + let stats = + iroh_bytes::get::db::get_to_db(&db, get_conn, &hash_and_format, progress.clone()).await?; + + progress + .send(DownloadProgress::NetworkDone { + bytes_written: stats.bytes_written, + bytes_read: stats.bytes_read, + elapsed: stats.elapsed, + }) + .await + .ok(); + + match msg.out { + DownloadLocation::External { path, in_place } => { + rpc.blob_export( + path, + msg.hash, + msg.format.is_hash_seq(), + in_place, + progress.clone(), + ) + .await?; + } + DownloadLocation::Internal => { + // nothing to do + } + } + + match msg.tag { + SetTagOption::Named(tag) => { + db.set_tag(tag, Some(hash_and_format)).await?; + } + SetTagOption::Auto => { + db.create_tag(hash_and_format).await?; + } + } + Ok(()) +} + fn handle_rpc_request>( msg: ProviderRequest, chan: RpcChannel,