Skip to content

Commit

Permalink
Merge branch 'main' into feat-simplify-iroh-node
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored May 13, 2024
2 parents e8bf9a6 + b62e904 commit 77ccd5b
Show file tree
Hide file tree
Showing 34 changed files with 771 additions and 387 deletions.
338 changes: 206 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ hex = "0.4.3"
iroh-base = { version = "0.15.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.15.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.15.0", path = "../iroh-net", optional = true }
iroh-net = { version = "0.15.0", path = "../iroh-net" }
num_cpus = "1.15.0"
parking_lot = { version = "0.12.1", optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = "0.10"
rand = "0.8"
range-collections = "0.4.0"
redb = { version = "2.0.0", optional = true }
Expand Down Expand Up @@ -65,9 +64,16 @@ rustls = { version = "0.21.11", default-features = false, features = ["quic"] }
tempfile = "3.10.0"
futures-util = "0.3.30"

[dev-dependencies.quinn]
# This allows writing the examples without relying on iroh-net.
# Though as they still depend on iroh-quinn this is perhaps not very
# useful right now. Changing them is a bit more work however.
package = "iroh-quinn"
version = "0.10"

[features]
default = ["fs-store"]
downloader = ["dep:iroh-net", "dep:parking_lot", "tokio-util/time", "dep:hashlink"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"]
metrics = ["dep:iroh-metrics"]
redb = ["dep:redb"]
Expand Down
11 changes: 10 additions & 1 deletion iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! To provide a collection (multiple blobs)
use anyhow::Result;
use tokio_util::task::LocalPoolHandle;
use tracing::warn;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{format::collection::Collection, Hash};
Expand Down Expand Up @@ -84,14 +85,22 @@ async fn main() -> Result<()> {
let lp = LocalPoolHandle::new(1);

let accept_task = tokio::spawn(async move {
while let Some(conn) = endpoint.accept().await {
while let Some(incoming) = endpoint.accept().await {
println!("connection incoming");

let db = db.clone();
let lp = lp.clone();

// spawn a task to handle the connection
tokio::spawn(async move {
let remote_addr = incoming.remote_address();
let conn = match incoming.await {
Ok(conn) => conn,
Err(err) => {
warn!(%remote_addr, "Error connecting: {err:#}");
return;
}
};
iroh_blobs::provider::handle_connection(conn, db, MockEventSender, lp).await
});
}
Expand Down
4 changes: 2 additions & 2 deletions iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::{
use futures_lite::{future::BoxedLocal, Stream, StreamExt};
use hashlink::LinkedHashSet;
use iroh_base::hash::{BlobFormat, Hash, HashAndFormat};
use iroh_net::{MagicEndpoint, NodeAddr, NodeId};
use iroh_net::{magic_endpoint, MagicEndpoint, NodeAddr, NodeId};
use tokio::{
sync::{mpsc, oneshot},
task::JoinSet,
Expand Down Expand Up @@ -1452,7 +1452,7 @@ impl Queue {
}

impl Dialer for iroh_net::dialer::Dialer {
type Connection = quinn::Connection;
type Connection = magic_endpoint::Connection;

fn queue_dial(&mut self, node_id: NodeId) {
self.queue_dial(node_id, crate::protocol::ALPN)
Expand Down
11 changes: 8 additions & 3 deletions iroh-blobs/src/downloader/get.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! [`Getter`] implementation that performs requests over [`quinn::Connection`]s.
//! [`Getter`] implementation that performs requests over [`Connection`]s.
//!
//! [`Connection`]: iroh_net::magic_endpoint::Connection
use crate::{
get::{db::get_to_db, error::GetError},
Expand All @@ -7,6 +9,7 @@ use crate::{
use futures_lite::FutureExt;
#[cfg(feature = "metrics")]
use iroh_metrics::{inc, inc_by};
use iroh_net::magic_endpoint;

#[cfg(feature = "metrics")]
use crate::metrics::Metrics;
Expand All @@ -27,13 +30,15 @@ impl From<GetError> for FailureAction {
}
}

/// [`Getter`] implementation that performs requests over [`quinn::Connection`]s.
/// [`Getter`] implementation that performs requests over [`Connection`]s.
///
/// [`Connection`]: iroh_net::magic_endpoint::Connection
pub(crate) struct IoGetter<S: Store> {
pub store: S,
}

impl<S: Store> Getter for IoGetter<S> {
type Connection = quinn::Connection;
type Connection = magic_endpoint::Connection;

fn get(
&mut self,
Expand Down
55 changes: 29 additions & 26 deletions iroh-blobs/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::Hash;
use anyhow::Result;
use bao_tree::io::fsm::BaoContentItem;
use bao_tree::ChunkNum;
use quinn::RecvStream;
use iroh_net::magic_endpoint::{self, RecvStream, SendStream};
use serde::{Deserialize, Serialize};
use tracing::{debug, error};

Expand Down Expand Up @@ -72,6 +72,7 @@ pub mod fsm {
};
use derive_more::From;
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
use iroh_net::magic_endpoint::Connection;
use tokio::io::AsyncWriteExt;

type WrappedRecvStream = TrackingReader<TokioStreamReader<RecvStream>>;
Expand All @@ -85,7 +86,7 @@ pub mod fsm {
}

/// The entry point of the get response machine
pub fn start(connection: quinn::Connection, request: GetRequest) -> AtInitial {
pub fn start(connection: Connection, request: GetRequest) -> AtInitial {
AtInitial::new(connection, request)
}

Expand Down Expand Up @@ -125,7 +126,7 @@ pub mod fsm {
/// Initial state of the get response machine
#[derive(Debug)]
pub struct AtInitial {
connection: quinn::Connection,
connection: Connection,
request: GetRequest,
}

Expand All @@ -134,15 +135,15 @@ pub mod fsm {
///
/// `connection` is an existing connection
/// `request` is the request to be sent
pub fn new(connection: quinn::Connection, request: GetRequest) -> Self {
pub fn new(connection: Connection, request: GetRequest) -> Self {
Self {
connection,
request,
}
}

/// Initiate a new bidi stream to use for the get response
pub async fn next(self) -> Result<AtConnected, quinn::ConnectionError> {
pub async fn next(self) -> Result<AtConnected, magic_endpoint::ConnectionError> {
let start = Instant::now();
let (writer, reader) = self.connection.open_bi().await?;
let reader = TrackingReader::new(TokioStreamReader::new(reader));
Expand All @@ -161,7 +162,7 @@ pub mod fsm {
pub struct AtConnected {
start: Instant,
reader: WrappedRecvStream,
writer: TrackingWriter<quinn::SendStream>,
writer: TrackingWriter<SendStream>,
request: GetRequest,
}

Expand All @@ -185,9 +186,9 @@ pub mod fsm {
/// The serialized request is too long to be sent
#[error("request too big")]
RequestTooBig,
/// Error when writing the request to the [`quinn::SendStream`]
/// Error when writing the request to the [`SendStream`].
#[error("write: {0}")]
Write(#[from] quinn::WriteError),
Write(#[from] magic_endpoint::WriteError),
/// A generic io error
#[error("io {0}")]
Io(io::Error),
Expand All @@ -196,7 +197,7 @@ pub mod fsm {
impl ConnectedNextError {
fn from_io(cause: io::Error) -> Self {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::WriteError>() {
if let Some(e) = inner.downcast_ref::<magic_endpoint::WriteError>() {
Self::Write(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -295,7 +296,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartRoot {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -304,7 +305,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartChild {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<RecvStream>>,
misc: Box<Misc>,
child_offset: u64,
}
Expand Down Expand Up @@ -379,7 +380,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtBlobHeader {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -394,7 +395,7 @@ pub mod fsm {
NotFound,
/// Quinn read error when reading the size header
#[error("read: {0}")]
Read(quinn::ReadError),
Read(magic_endpoint::ReadError),
/// Generic io error
#[error("io: {0}")]
Io(io::Error),
Expand All @@ -420,7 +421,7 @@ pub mod fsm {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
.get_ref()
.and_then(|x| x.downcast_ref::<quinn::ReadError>())
.and_then(|x| x.downcast_ref::<magic_endpoint::ReadError>())
{
AtBlobHeaderNextError::Read(e.clone())
} else {
Expand Down Expand Up @@ -525,8 +526,8 @@ pub mod fsm {
/// decoding the response, e.g. from [`AtBlobContent::next`].
///
/// This is similar to [`bao_tree::io::DecodeError`], but takes into account
/// that we are reading from a [`quinn::RecvStream`], so read errors will be
/// propagated as [`DecodeError::Read`], containing a [`quinn::ReadError`].
/// that we are reading from a [`RecvStream`], so read errors will be
/// propagated as [`DecodeError::Read`], containing a [`ReadError`].
/// This carries more concrete information about the error than an [`io::Error`].
///
/// When the provider finds that it does not have a chunk that we requested,
Expand All @@ -541,7 +542,9 @@ pub mod fsm {
/// not behaving correctly.
///
/// The [`DecodeError::Io`] variant is just a fallback for any other io error that
/// is not actually a [`quinn::ReadError`].
/// is not actually a [`ReadError`].
///
/// [`ReadError`]: magic_endpoint::ReadError
#[derive(Debug, thiserror::Error)]
pub enum DecodeError {
/// A chunk was not found or invalid, so the provider stopped sending data
Expand All @@ -561,7 +564,7 @@ pub mod fsm {
LeafHashMismatch(ChunkNum),
/// Error when reading from the stream
#[error("read: {0}")]
Read(quinn::ReadError),
Read(magic_endpoint::ReadError),
/// A generic io error
#[error("io: {0}")]
Io(#[from] io::Error),
Expand Down Expand Up @@ -602,7 +605,7 @@ pub mod fsm {
bao_tree::io::DecodeError::LeafHashMismatch(chunk) => Self::LeafHashMismatch(chunk),
bao_tree::io::DecodeError::Io(cause) => {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::ReadError>() {
if let Some(e) = inner.downcast_ref::<magic_endpoint::ReadError>() {
Self::Read(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -844,7 +847,7 @@ pub mod fsm {
}

/// Finish the get response, returning statistics
pub async fn next(self) -> result::Result<Stats, quinn::ReadError> {
pub async fn next(self) -> result::Result<Stats, magic_endpoint::ReadError> {
// Shut down the stream
let (reader, bytes_read) = self.reader.into_parts();
let mut reader = reader.into_inner();
Expand Down Expand Up @@ -881,13 +884,13 @@ pub mod fsm {
pub enum GetResponseError {
/// Error when opening a stream
#[error("connection: {0}")]
Connection(#[from] quinn::ConnectionError),
Connection(#[from] magic_endpoint::ConnectionError),
/// Error when writing the handshake or request to the stream
#[error("write: {0}")]
Write(#[from] quinn::WriteError),
Write(#[from] magic_endpoint::WriteError),
/// Error when reading from the stream
#[error("read: {0}")]
Read(#[from] quinn::ReadError),
Read(#[from] magic_endpoint::ReadError),
/// Error when decoding, e.g. hash mismatch
#[error("decode: {0}")]
Decode(bao_tree::io::DecodeError),
Expand All @@ -908,13 +911,13 @@ impl From<bao_tree::io::DecodeError> for GetResponseError {
bao_tree::io::DecodeError::Io(cause) => {
// try to downcast to specific quinn errors
if let Some(source) = cause.source() {
if let Some(error) = source.downcast_ref::<quinn::ConnectionError>() {
if let Some(error) = source.downcast_ref::<magic_endpoint::ConnectionError>() {
return Self::Connection(error.clone());
}
if let Some(error) = source.downcast_ref::<quinn::ReadError>() {
if let Some(error) = source.downcast_ref::<magic_endpoint::ReadError>() {
return Self::Read(error.clone());
}
if let Some(error) = source.downcast_ref::<quinn::WriteError>() {
if let Some(error) = source.downcast_ref::<magic_endpoint::WriteError>() {
return Self::Write(error.clone());
}
}
Expand Down
11 changes: 4 additions & 7 deletions iroh-blobs/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::num::NonZeroU64;
use futures_lite::StreamExt;
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
use iroh_net::magic_endpoint::Connection;
use serde::{Deserialize, Serialize};

use crate::hashseq::parse_hash_seq;
Expand Down Expand Up @@ -44,7 +45,7 @@ use tracing::trace;
pub async fn get_to_db<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
F: Future<Output = anyhow::Result<Connection>>,
>(
db: &D,
get_conn: C,
Expand All @@ -62,11 +63,7 @@ pub async fn get_to_db<
///
/// We need to create our own files and handle the case where an outboard
/// is not needed.
async fn get_blob<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
>(
async fn get_blob<D: BaoStore, C: FnOnce() -> F, F: Future<Output = anyhow::Result<Connection>>>(
db: &D,
get_conn: C,
hash: &Hash,
Expand Down Expand Up @@ -305,7 +302,7 @@ async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<Bl
async fn get_hash_seq<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
F: Future<Output = anyhow::Result<Connection>>,
>(
db: &D,
get_conn: C,
Expand Down
Loading

0 comments on commit 77ccd5b

Please sign in to comment.