Skip to content

Commit

Permalink
feat(iroh-net): Export many more quinn types
Browse files Browse the repository at this point in the history
This export a lot of quinn types directly from
iroh-net::magic_endpoint.  These are all the types we need to interact
with iroh-net in our own code.  The goal is that users should not need
to figure out how to add their own (iroh-)quinn dependency to use
iroh-net, instead all types should be provided by iroh-net.

Not all APIs are re-exported however, to avoid exporting too much.
Hopefully what iroh itself needs is a reasonable indication of what is
needed, we can always add more.
  • Loading branch information
flub committed May 10, 2024
1 parent ec48b0d commit 1d2686c
Show file tree
Hide file tree
Showing 19 changed files with 120 additions and 94 deletions.
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion iroh-blobs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ iroh-net = { version = "0.15.0", path = "../iroh-net" }
num_cpus = "1.15.0"
parking_lot = { version = "0.12.1", optional = true }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
quinn = { package = "iroh-quinn", version = "0.10" }
rand = "0.8"
range-collections = "0.4.0"
redb = { version = "2.0.0", optional = true }
Expand Down Expand Up @@ -65,6 +64,13 @@ rustls = { version = "0.21.11", default-features = false, features = ["quic"] }
tempfile = "3.10.0"
futures-util = "0.3.30"

[dev-dependencies.quinn]
# This allows writing the examples without relying on iroh-net.
# Though as they still depend on iroh-quinn this is perhaps not very
# useful right now. Changing them is a bit more work however.
package = "iroh-quinn"
version = "0.10"

[features]
default = ["fs-store"]
downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"]
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ impl Queue {
}

impl Dialer for iroh_net::dialer::Dialer {
type Connection = quinn::Connection;
type Connection = iroh_net::magic_endpoint::Connection;

fn queue_dial(&mut self, node_id: NodeId) {
self.queue_dial(node_id, crate::protocol::ALPN)
Expand Down
2 changes: 1 addition & 1 deletion iroh-blobs/src/downloader/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(crate) struct IoGetter<S: Store> {
}

impl<S: Store> Getter for IoGetter<S> {
type Connection = quinn::Connection;
type Connection = iroh_net::magic_endpoint::Connection;

fn get(
&mut self,
Expand Down
54 changes: 32 additions & 22 deletions iroh-blobs/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::Hash;
use anyhow::Result;
use bao_tree::io::fsm::BaoContentItem;
use bao_tree::ChunkNum;
use quinn::RecvStream;
use iroh_net::magic_endpoint::RecvStream;
use serde::{Deserialize, Serialize};
use tracing::{debug, error};

Expand Down Expand Up @@ -85,7 +85,10 @@ pub mod fsm {
}

/// The entry point of the get response machine
pub fn start(connection: quinn::Connection, request: GetRequest) -> AtInitial {
pub fn start(
connection: iroh_net::magic_endpoint::Connection,
request: GetRequest,
) -> AtInitial {
AtInitial::new(connection, request)
}

Expand Down Expand Up @@ -125,7 +128,7 @@ pub mod fsm {
/// Initial state of the get response machine
#[derive(Debug)]
pub struct AtInitial {
connection: quinn::Connection,
connection: iroh_net::magic_endpoint::Connection,
request: GetRequest,
}

Expand All @@ -134,15 +137,15 @@ pub mod fsm {
///
/// `connection` is an existing connection
/// `request` is the request to be sent
pub fn new(connection: quinn::Connection, request: GetRequest) -> Self {
pub fn new(connection: iroh_net::magic_endpoint::Connection, request: GetRequest) -> Self {
Self {
connection,
request,
}
}

/// Initiate a new bidi stream to use for the get response
pub async fn next(self) -> Result<AtConnected, quinn::ConnectionError> {
pub async fn next(self) -> Result<AtConnected, iroh_net::magic_endpoint::ConnectionError> {
let start = Instant::now();
let (writer, reader) = self.connection.open_bi().await?;
let reader = TrackingReader::new(TokioStreamReader::new(reader));
Expand All @@ -161,7 +164,7 @@ pub mod fsm {
pub struct AtConnected {
start: Instant,
reader: WrappedRecvStream,
writer: TrackingWriter<quinn::SendStream>,
writer: TrackingWriter<iroh_net::magic_endpoint::SendStream>,
request: GetRequest,
}

Expand All @@ -187,7 +190,7 @@ pub mod fsm {
RequestTooBig,
/// Error when writing the request to the [`quinn::SendStream`]
#[error("write: {0}")]
Write(#[from] quinn::WriteError),
Write(#[from] iroh_net::magic_endpoint::WriteError),
/// A generic io error
#[error("io {0}")]
Io(io::Error),
Expand All @@ -196,7 +199,7 @@ pub mod fsm {
impl ConnectedNextError {
fn from_io(cause: io::Error) -> Self {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::WriteError>() {
if let Some(e) = inner.downcast_ref::<iroh_net::magic_endpoint::WriteError>() {
Self::Write(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -295,7 +298,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartRoot {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<iroh_net::magic_endpoint::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -304,7 +307,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartChild {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<iroh_net::magic_endpoint::RecvStream>>,
misc: Box<Misc>,
child_offset: u64,
}
Expand Down Expand Up @@ -379,7 +382,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtBlobHeader {
ranges: ChunkRanges,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
reader: TrackingReader<TokioStreamReader<iroh_net::magic_endpoint::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -394,7 +397,7 @@ pub mod fsm {
NotFound,
/// Quinn read error when reading the size header
#[error("read: {0}")]
Read(quinn::ReadError),
Read(iroh_net::magic_endpoint::ReadError),
/// Generic io error
#[error("io: {0}")]
Io(io::Error),
Expand All @@ -420,7 +423,7 @@ pub mod fsm {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
.get_ref()
.and_then(|x| x.downcast_ref::<quinn::ReadError>())
.and_then(|x| x.downcast_ref::<iroh_net::magic_endpoint::ReadError>())
{
AtBlobHeaderNextError::Read(e.clone())
} else {
Expand Down Expand Up @@ -561,7 +564,7 @@ pub mod fsm {
LeafHashMismatch(ChunkNum),
/// Error when reading from the stream
#[error("read: {0}")]
Read(quinn::ReadError),
Read(iroh_net::magic_endpoint::ReadError),
/// A generic io error
#[error("io: {0}")]
Io(#[from] io::Error),
Expand Down Expand Up @@ -602,7 +605,8 @@ pub mod fsm {
bao_tree::io::DecodeError::LeafHashMismatch(chunk) => Self::LeafHashMismatch(chunk),
bao_tree::io::DecodeError::Io(cause) => {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::ReadError>() {
if let Some(e) = inner.downcast_ref::<iroh_net::magic_endpoint::ReadError>()
{
Self::Read(e.clone())
} else {
Self::Io(cause)
Expand Down Expand Up @@ -844,7 +848,7 @@ pub mod fsm {
}

/// Finish the get response, returning statistics
pub async fn next(self) -> result::Result<Stats, quinn::ReadError> {
pub async fn next(self) -> result::Result<Stats, iroh_net::magic_endpoint::ReadError> {
// Shut down the stream
let (reader, bytes_read) = self.reader.into_parts();
let mut reader = reader.into_inner();
Expand Down Expand Up @@ -881,13 +885,13 @@ pub mod fsm {
pub enum GetResponseError {
/// Error when opening a stream
#[error("connection: {0}")]
Connection(#[from] quinn::ConnectionError),
Connection(#[from] iroh_net::magic_endpoint::ConnectionError),
/// Error when writing the handshake or request to the stream
#[error("write: {0}")]
Write(#[from] quinn::WriteError),
Write(#[from] iroh_net::magic_endpoint::WriteError),
/// Error when reading from the stream
#[error("read: {0}")]
Read(#[from] quinn::ReadError),
Read(#[from] iroh_net::magic_endpoint::ReadError),
/// Error when decoding, e.g. hash mismatch
#[error("decode: {0}")]
Decode(bao_tree::io::DecodeError),
Expand All @@ -908,13 +912,19 @@ impl From<bao_tree::io::DecodeError> for GetResponseError {
bao_tree::io::DecodeError::Io(cause) => {
// try to downcast to specific quinn errors
if let Some(source) = cause.source() {
if let Some(error) = source.downcast_ref::<quinn::ConnectionError>() {
if let Some(error) =
source.downcast_ref::<iroh_net::magic_endpoint::ConnectionError>()
{
return Self::Connection(error.clone());
}
if let Some(error) = source.downcast_ref::<quinn::ReadError>() {
if let Some(error) =
source.downcast_ref::<iroh_net::magic_endpoint::ReadError>()
{
return Self::Read(error.clone());
}
if let Some(error) = source.downcast_ref::<quinn::WriteError>() {
if let Some(error) =
source.downcast_ref::<iroh_net::magic_endpoint::WriteError>()
{
return Self::Write(error.clone());
}
}
Expand Down
11 changes: 4 additions & 7 deletions iroh-blobs/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::num::NonZeroU64;
use futures_lite::StreamExt;
use iroh_base::hash::Hash;
use iroh_base::rpc::RpcError;
use iroh_net::magic_endpoint::Connection;
use serde::{Deserialize, Serialize};

use crate::hashseq::parse_hash_seq;
Expand Down Expand Up @@ -44,7 +45,7 @@ use tracing::trace;
pub async fn get_to_db<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
F: Future<Output = anyhow::Result<Connection>>,
>(
db: &D,
get_conn: C,
Expand All @@ -62,11 +63,7 @@ pub async fn get_to_db<
///
/// We need to create our own files and handle the case where an outboard
/// is not needed.
async fn get_blob<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
>(
async fn get_blob<D: BaoStore, C: FnOnce() -> F, F: Future<Output = anyhow::Result<Connection>>>(
db: &D,
get_conn: C,
hash: &Hash,
Expand Down Expand Up @@ -305,7 +302,7 @@ async fn blob_infos<D: BaoStore>(db: &D, hash_seq: &[Hash]) -> io::Result<Vec<Bl
async fn get_hash_seq<
D: BaoStore,
C: FnOnce() -> F,
F: Future<Output = anyhow::Result<quinn::Connection>>,
F: Future<Output = anyhow::Result<Connection>>,
>(
db: &D,
get_conn: C,
Expand Down
45 changes: 24 additions & 21 deletions iroh-blobs/src/get/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,40 @@ impl From<ProgressSendError> for GetError {
}
}

impl From<quinn::ConnectionError> for GetError {
fn from(value: quinn::ConnectionError) -> Self {
impl From<iroh_net::magic_endpoint::ConnectionError> for GetError {
fn from(value: iroh_net::magic_endpoint::ConnectionError) -> Self {
// explicit match just to be sure we are taking everything into account
use iroh_net::magic_endpoint::ConnectionError;
match value {
e @ quinn::ConnectionError::VersionMismatch => {
e @ ConnectionError::VersionMismatch => {
// > The peer doesn't implement any supported version
// unsupported version is likely a long time error, so this peer is not usable
GetError::NoncompliantNode(e.into())
}
e @ quinn::ConnectionError::TransportError(_) => {
e @ ConnectionError::TransportError(_) => {
// > The peer violated the QUIC specification as understood by this implementation
// bad peer we don't want to keep around
GetError::NoncompliantNode(e.into())
}
e @ quinn::ConnectionError::ConnectionClosed(_) => {
e @ ConnectionError::ConnectionClosed(_) => {
// > The peer's QUIC stack aborted the connection automatically
// peer might be disconnecting or otherwise unavailable, drop it
GetError::Io(e.into())
}
e @ quinn::ConnectionError::ApplicationClosed(_) => {
e @ ConnectionError::ApplicationClosed(_) => {
// > The peer closed the connection
// peer might be disconnecting or otherwise unavailable, drop it
GetError::Io(e.into())
}
e @ quinn::ConnectionError::Reset => {
e @ ConnectionError::Reset => {
// > The peer is unable to continue processing this connection, usually due to having restarted
GetError::RemoteReset(e.into())
}
e @ quinn::ConnectionError::TimedOut => {
e @ ConnectionError::TimedOut => {
// > Communication with the peer has lapsed for longer than the negotiated idle timeout
GetError::Io(e.into())
}
e @ quinn::ConnectionError::LocallyClosed => {
e @ ConnectionError::LocallyClosed => {
// > The local application closed the connection
// TODO(@divma): don't see how this is reachable but let's just not use the peer
GetError::Io(e.into())
Expand All @@ -74,27 +75,29 @@ impl From<quinn::ConnectionError> for GetError {
}
}

impl From<quinn::ReadError> for GetError {
fn from(value: quinn::ReadError) -> Self {
impl From<iroh_net::magic_endpoint::ReadError> for GetError {
fn from(value: iroh_net::magic_endpoint::ReadError) -> Self {
use iroh_net::magic_endpoint::ReadError;
match value {
e @ quinn::ReadError::Reset(_) => GetError::RemoteReset(e.into()),
quinn::ReadError::ConnectionLost(conn_error) => conn_error.into(),
quinn::ReadError::UnknownStream
| quinn::ReadError::IllegalOrderedRead
| quinn::ReadError::ZeroRttRejected => {
e @ ReadError::Reset(_) => GetError::RemoteReset(e.into()),
ReadError::ConnectionLost(conn_error) => conn_error.into(),
ReadError::UnknownStream
| ReadError::IllegalOrderedRead
| ReadError::ZeroRttRejected => {
// all these errors indicate the peer is not usable at this moment
GetError::Io(value.into())
}
}
}
}

impl From<quinn::WriteError> for GetError {
fn from(value: quinn::WriteError) -> Self {
impl From<iroh_net::magic_endpoint::WriteError> for GetError {
fn from(value: iroh_net::magic_endpoint::WriteError) -> Self {
use iroh_net::magic_endpoint::WriteError;
match value {
e @ quinn::WriteError::Stopped(_) => GetError::RemoteReset(e.into()),
quinn::WriteError::ConnectionLost(conn_error) => conn_error.into(),
quinn::WriteError::UnknownStream | quinn::WriteError::ZeroRttRejected => {
e @ WriteError::Stopped(_) => GetError::RemoteReset(e.into()),
WriteError::ConnectionLost(conn_error) => conn_error.into(),
WriteError::UnknownStream | WriteError::ZeroRttRejected => {
// all these errors indicate the peer is not usable at this moment
GetError::Io(value.into())
}
Expand Down
Loading

0 comments on commit 1d2686c

Please sign in to comment.