Skip to content

Commit

Permalink
move some examples down to iroh-bytes (n0-computer#1981)
Browse files Browse the repository at this point in the history
## Description

closes n0-computer#1929 

### `provide-bytes`
This example adds either a single blob or collection to the database and
provides it. It gives instructions on how to run the fetch side of the
example for `fetch-fsm` and `fetch-stream`.

We could make this example more "advanced" by having the user provide a
path & serve the blob or collection of that content, but I wanted to
keep its simple at first.

### `fetch-fsm`
This example fetches a blob or collection using the `get` finite state
machine directly, printing the contents to the terminal.

### `fetch-stream`
This example fetches a blob or collection using a helper function to
turn the `get` finite state machine into a stream. It prints the
contents to the terminal.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] Tests if relevant.
  • Loading branch information
ramfox authored and fubuloubu committed Feb 21, 2024
1 parent 3f7f688 commit 518c07a
Show file tree
Hide file tree
Showing 8 changed files with 512 additions and 157 deletions.
21 changes: 18 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,20 @@ proptest = "1.0.0"
serde_json = "1.0.107"
serde_test = "1.0.176"
tokio = { version = "1", features = ["macros", "test-util"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
rcgen = "0.12.0"
rustls = { version = "0.21.0", default-features = false, features = ["quic"] }

[features]
default = ["flat-db"]
flat-db = ["reflink-copy"]

[[example]]
name = "provide-bytes"

[[example]]
name = "fetch-fsm"

[[example]]
name = "fetch-stream"

88 changes: 88 additions & 0 deletions iroh-bytes/examples/connect/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
//! Common code used to created quinn connections in the examples
use anyhow::{bail, Context, Result};
use std::{path::PathBuf, sync::Arc};
use tokio::fs;

pub const EXAMPLE_ALPN: &[u8] = b"n0/iroh/examples/bytes/0";

// Path where the tls certificates are saved. This example expects that you have run the `provide-bytes` example first, which generates the certificates.
pub const CERT_PATH: &str = "./certs";

// derived from `quinn/examples/client.rs`
// load the certificates from CERT_PATH
// Assumes that you have already run the `provide-bytes` example, that generates the certificates
#[allow(unused)]
pub async fn load_certs() -> Result<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
let path = PathBuf::from(CERT_PATH).join("cert.der");
match fs::read(path).await {
Ok(cert) => {
roots.add(&rustls::Certificate(cert))?;
}
Err(e) => {
bail!("failed to open local server certificate: {}\nYou must run the `provide-bytes` example to create the certificate.\n\tcargo run --example provide-bytes", e);
}
}
Ok(roots)
}

// derived from `quinn/examples/server.rs`
// creates a self signed certificate and saves it to "./certs"
#[allow(unused)]
pub async fn make_and_write_certs() -> Result<(rustls::PrivateKey, rustls::Certificate)> {
let path = std::path::PathBuf::from(CERT_PATH);
let cert = rcgen::generate_simple_self_signed(vec!["localhost".into()]).unwrap();
let key_path = path.join("key.der");
let cert_path = path.join("cert.der");

let key = cert.serialize_private_key_der();
let cert = cert.serialize_der().unwrap();
tokio::fs::create_dir_all(path)
.await
.context("failed to create certificate directory")?;
tokio::fs::write(cert_path, &cert)
.await
.context("failed to write certificate")?;
tokio::fs::write(key_path, &key)
.await
.context("failed to write private key")?;

Ok((rustls::PrivateKey(key), rustls::Certificate(cert)))
}

// derived from `quinn/examples/client.rs`
// Creates a client quinn::Endpoint
#[allow(unused)]
pub fn make_client_endpoint(roots: rustls::RootCertStore) -> Result<quinn::Endpoint> {
let mut client_crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();

client_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()];

let client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?;
endpoint.set_default_client_config(client_config);
Ok(endpoint)
}

// derived from `quinn/examples/server.rs`
// makes a quinn server endpoint
#[allow(unused)]
pub fn make_server_endpoint(
key: rustls::PrivateKey,
cert: rustls::Certificate,
) -> Result<quinn::Endpoint> {
let mut server_crypto = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![cert], key)?;
server_crypto.alpn_protocols = vec![EXAMPLE_ALPN.to_vec()];
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
let transport_config = Arc::get_mut(&mut server_config.transport).unwrap();
transport_config.max_concurrent_uni_streams(0_u8.into());

let endpoint = quinn::Endpoint::server(server_config, "[::1]:4433".parse()?)?;
Ok(endpoint)
}
174 changes: 174 additions & 0 deletions iroh-bytes/examples/fetch-fsm.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
//! An example how to download a single blob or collection from a node and write it to stdout using the `get` finite state machine directly.
//!
//! Since this example does not use `iroh-net::MagicEndpoint`, it does not do any holepunching, and so will only work locally or between two processes that have public IP addresses.
//!
//! Run the provide-bytes example first. It will give instructions on how to run this example properly.
use std::net::SocketAddr;

use anyhow::{Context, Result};
use iroh_io::ConcatenateSliceWriter;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_bytes::{
get::fsm::{AtInitial, ConnectedNext, EndBlobNext},
hashseq::HashSeq,
protocol::GetRequest,
Hash,
};

mod connect;
use connect::{load_certs, make_client_endpoint};

// set the RUST_LOG env var to one of {debug,info,warn} to see logging info
pub fn setup_logging() {
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr))
.with(EnvFilter::from_default_env())
.try_init()
.ok();
}

#[tokio::main]
async fn main() -> Result<()> {
println!("\nfetch bytes example!");
setup_logging();
let args: Vec<_> = std::env::args().collect();
if args.len() != 4 {
anyhow::bail!("usage: fetch-bytes [HASH] [SOCKET_ADDR] [FORMAT]");
}
let hash: Hash = args[1].parse().context("unable to parse [HASH]")?;
let addr: SocketAddr = args[2].parse().context("unable to parse [SOCKET_ADDR]")?;
let format = {
if args[3] != "blob" && args[3] != "collection" {
anyhow::bail!(
"expected either 'blob' or 'collection' for FORMAT argument, got {}",
args[3]
);
}
args[3].clone()
};

// load tls certificates
// This will error if you have not run the `provide-bytes` example
let roots = load_certs().await?;

// create an endpoint to listen for incoming connections
let endpoint = make_client_endpoint(roots)?;
println!("\nlistening on {}", endpoint.local_addr()?);
println!("fetching hash {hash} from {addr}");

// connect
let connection = endpoint.connect(addr, "localhost")?.await?;

if format == "collection" {
// create a request for a collection
let request = GetRequest::all(hash);
// create the initial state of the finite state machine
let initial = iroh_bytes::get::fsm::start(connection, request);

write_collection(initial).await
} else {
// create a request for a single blob
let request = GetRequest::single(hash);
// create the initial state of the finite state machine
let initial = iroh_bytes::get::fsm::start(connection, request);

write_blob(initial).await
}
}

async fn write_blob(initial: AtInitial) -> Result<()> {
// connect (create a stream pair)
let connected = initial.next().await?;

// we expect a start root message, since we requested a single blob
let ConnectedNext::StartRoot(start_root) = connected.next().await? else {
panic!("expected start root")
};
// we can just call next to proceed to the header, since we know the root hash
let header = start_root.next();

// we need to wrap stdout in a struct that implements AsyncSliceWriter. Since we can not
// seek in stdout we use ConcatenateSliceWriter which just concatenates all the writes.
let writer = ConcatenateSliceWriter::new(tokio::io::stdout());

// make the spacing nicer in the terminal
println!();
// use the utility function write_all to write the entire blob
let end = header.write_all(writer).await?;

// we requested a single blob, so we expect to enter the closing state
let EndBlobNext::Closing(closing) = end.next() else {
panic!("expected closing")
};

// close the connection and get the stats
let _stats = closing.next().await?;
Ok(())
}

async fn write_collection(initial: AtInitial) -> Result<()> {
// connect
let connected = initial.next().await?;
// read the first bytes
let ConnectedNext::StartRoot(start_root) = connected.next().await? else {
anyhow::bail!("failed to parse collection");
};
// check that we requested the whole collection
if !start_root.ranges().is_all() {
anyhow::bail!("collection was not requested completely");
}

// move to the header
let header: iroh_bytes::get::fsm::AtBlobHeader = start_root.next();
let (root_end, hashes_bytes) = header.concatenate_into_vec().await?;
let next = root_end.next();
let EndBlobNext::MoreChildren(at_meta) = next else {
anyhow::bail!("missing meta blob, got {next:?}");
};
// parse the hashes from the hash sequence bytes
let hashes = HashSeq::try_from(bytes::Bytes::from(hashes_bytes))
.context("failed to parse hashes")?
.into_iter()
.collect::<Vec<_>>();
let meta_hash = hashes.first().context("missing meta hash")?;

let (meta_end, _meta_bytes) = at_meta.next(*meta_hash).concatenate_into_vec().await?;
let mut curr = meta_end.next();
let closing = loop {
match curr {
EndBlobNext::MoreChildren(more) => {
let Some(hash) = hashes.get(more.child_offset() as usize) else {
break more.finish();
};
let header = more.next(*hash);

// we need to wrap stdout in a struct that implements AsyncSliceWriter. Since we can not
// seek in stdout we use ConcatenateSliceWriter which just concatenates all the writes.
let writer = ConcatenateSliceWriter::new(tokio::io::stdout());

// use the utility function write_all to write the entire blob
let end = header.write_all(writer).await?;
println!();
curr = end.next();
}
EndBlobNext::Closing(closing) => {
break closing;
}
}
};
// close the connection
let _stats = closing.next().await?;
Ok(())
}

#[derive(Clone)]
struct MockEventSender;

use futures::future::FutureExt;

impl iroh_bytes::provider::EventSender for MockEventSender {
fn send(&self, _event: iroh_bytes::provider::Event) -> futures::future::BoxFuture<()> {
async move {}.boxed()
}
}
Loading

0 comments on commit 518c07a

Please sign in to comment.