Skip to content

Commit

Permalink
Remove custom get and custom collections (#1575)
Browse files Browse the repository at this point in the history
## Description

As discussed, this removes the custom get request. Fixes
#1554

A custom get request is going to be replaced by the following:

- create a connection under another ALPN (not the iroh-bytes one)
- do whatever custom exchange needed to figure out what is to be done
- use the existing quinn stream for a bao sync

Also removes the custom collection parsing. Collections are now just
sequences of blake3 hashes. Fixes
#1553

Custom collections are going to be replaced by two things: For the
purpose of the network protocol, they are going to be replaced with an
exchange under a custom ALPN as described above. For the purpose of GC
they are going to be replaced by a way to enumerate live nodes.

## Notes & open questions

How detailed will the examples have to be? It is hard to think of a
small example that does not seem arbitrary, but a big example would be a
lot of work.

Here is a possible big example: We define a protocol that allows
exchanging arbitrary cids, with arbitrary hash fn. We could call it
swapbits. We could even implement a more complex protocol to exchange
dags, we could call it syncgraph.

Then we implement a node that supports this custom protocol and does the
actual exchange using bao.

But as you can imagine this would be a rather large example.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
  • Loading branch information
rklaehn authored Oct 6, 2023
1 parent e2ee678 commit 89377a9
Show file tree
Hide file tree
Showing 26 changed files with 331 additions and 955 deletions.
10 changes: 4 additions & 6 deletions iroh-bytes/src/baomap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{collections::BTreeSet, io, path::PathBuf, sync::Arc};

use crate::{
collection::CollectionParser,
hashseq::parse_hash_seq,
util::{
progress::{IdGenerator, ProgressSender},
BlobFormat, HashAndFormat, RpcError, Tag,
Expand Down Expand Up @@ -234,11 +234,10 @@ pub trait Store: ReadableStore + PartialMap {
/// in the gc_sweep phase.
fn gc_mark<'a>(
&'a self,
cp: impl CollectionParser + 'a,
extra_roots: impl IntoIterator<Item = io::Result<HashAndFormat>> + 'a,
) -> LocalBoxStream<'a, GcMarkEvent> {
Gen::new(|co| async move {
if let Err(e) = gc_mark_task(self, cp, extra_roots, &co).await {
if let Err(e) = gc_mark_task(self, extra_roots, &co).await {
co.yield_(GcMarkEvent::Error(e)).await;
}
})
Expand Down Expand Up @@ -367,7 +366,6 @@ impl Drop for TempTag {
/// Implementation of the gc method.
async fn gc_mark_task<'a>(
store: &'a impl Store,
cp: impl CollectionParser + 'a,
extra_roots: impl IntoIterator<Item = io::Result<HashAndFormat>> + 'a,
co: &Co<GcMarkEvent>,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -418,11 +416,11 @@ async fn gc_mark_task<'a>(
warn!("gc: {} creating data reader failed", hash);
continue;
};
let Ok((mut iter, stats)) = cp.parse(reader).await else {
let Ok((mut iter, count)) = parse_hash_seq(reader).await else {
warn!("gc: {} parse failed", hash);
continue;
};
info!("parsed collection {} {:?}", hash, stats);
info!("parsed collection {} {:?}", hash, count);
loop {
let item = match iter.next().await {
Ok(Some(item)) => item,
Expand Down
199 changes: 0 additions & 199 deletions iroh-bytes/src/collection.rs

This file was deleted.

72 changes: 12 additions & 60 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub mod fsm {
};
use derive_more::From;
use iroh_io::AsyncSliceWriter;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::io::AsyncWriteExt;

self_cell::self_cell! {
struct RangesIterInner {
Expand All @@ -79,7 +79,7 @@ pub mod fsm {
}

/// The entry point of the get response machine
pub fn start(connection: quinn::Connection, request: Request) -> AtInitial {
pub fn start(connection: quinn::Connection, request: GetRequest) -> AtInitial {
AtInitial::new(connection, request)
}

Expand Down Expand Up @@ -116,15 +116,15 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtInitial {
connection: quinn::Connection,
request: Request,
request: GetRequest,
}

impl AtInitial {
/// Create a new get response
///
/// `connection` is an existing connection
/// `request` is the request to be sent
pub fn new(connection: quinn::Connection, request: Request) -> Self {
pub fn new(connection: quinn::Connection, request: GetRequest) -> Self {
Self {
connection,
request,
Expand Down Expand Up @@ -152,7 +152,7 @@ pub mod fsm {
start: Instant,
reader: TrackingReader<quinn::RecvStream>,
writer: TrackingWriter<quinn::SendStream>,
request: Request,
request: GetRequest,
}

/// Possible next states after the handshake has been sent
Expand All @@ -178,32 +178,16 @@ pub mod fsm {
/// Error when writing the request to the [`quinn::SendStream`]
#[error("write: {0}")]
Write(#[from] quinn::WriteError),
/// Error when reading a custom request from the [`quinn::RecvStream`]
#[error("read: {0}")]
Read(quinn::ReadError),
/// The remote side sent a custom request that is too big
#[error("custom request too big")]
CustomRequestTooBig,
/// Response terminated early when reading a custom request
#[error("eof")]
Eof,
/// Error when deserializing a received custom request
#[error("postcard: {0}")]
PostcardDe(postcard::Error),
/// A generic io error
#[error("io {0}")]
Io(io::Error),
}

impl ConnectedNextError {
fn from_io(cause: io::Error) -> Self {
if cause.kind() == io::ErrorKind::UnexpectedEof {
Self::Eof
} else if let Some(inner) = cause.get_ref() {
if let Some(inner) = cause.get_ref() {
if let Some(e) = inner.downcast_ref::<quinn::WriteError>() {
Self::Write(e.clone())
} else if let Some(e) = inner.downcast_ref::<quinn::ReadError>() {
Self::Read(e.clone())
} else {
Self::Io(cause)
}
Expand All @@ -217,15 +201,10 @@ pub mod fsm {
fn from(cause: ConnectedNextError) -> Self {
match cause {
ConnectedNextError::Write(cause) => cause.into(),
ConnectedNextError::Read(cause) => cause.into(),
ConnectedNextError::Eof => io::ErrorKind::UnexpectedEof.into(),
ConnectedNextError::Io(cause) => cause,
ConnectedNextError::PostcardSer(cause) => {
io::Error::new(io::ErrorKind::Other, cause)
}
ConnectedNextError::PostcardDe(cause) => {
io::Error::new(io::ErrorKind::Other, cause)
}
_ => io::Error::new(io::ErrorKind::Other, cause),
}
}
Expand All @@ -241,15 +220,18 @@ pub mod fsm {
pub async fn next(self) -> Result<ConnectedNext, ConnectedNextError> {
let Self {
start,
mut reader,
reader,
mut writer,
request,
mut request,
} = self;
// 1. Send Request
{
debug!("sending request");
let wrapped = Request::Get(request);
let request_bytes =
postcard::to_stdvec(&request).map_err(ConnectedNextError::PostcardSer)?;
postcard::to_stdvec(&wrapped).map_err(ConnectedNextError::PostcardSer)?;
let Request::Get(x) = wrapped;
request = x;

if request_bytes.len() > MAX_MESSAGE_SIZE {
return Err(ConnectedNextError::RequestTooBig);
Expand All @@ -266,36 +248,6 @@ pub mod fsm {
let (mut writer, bytes_written) = writer.into_parts();
writer.finish().await?;

// 3. Turn a possible custom request into a get request
let request = match request {
Request::Get(get_request) => {
// we already have a get request, just return it
get_request
}
Request::CustomGet(_) => {
// we sent a custom request, so we need the actual GetRequest from the response
let response_len = reader
.read_u64_le()
.await
.map_err(ConnectedNextError::from_io)?;

let mut response = if response_len < (MAX_MESSAGE_SIZE as u64) {
Vec::with_capacity(response_len as usize)
} else {
return Err(ConnectedNextError::CustomRequestTooBig);
};
(&mut reader)
.take(response_len)
.read_to_end(&mut response)
.await
.map_err(ConnectedNextError::from_io)?;
if response.len() != response_len as usize {
return Err(ConnectedNextError::Eof);
}
postcard::from_bytes::<GetRequest>(&response)
.map_err(ConnectedNextError::PostcardDe)?
}
};
let hash = request.hash;
let ranges_iter = RangesIter::new(request.ranges);
// this is in a box so we don't have to memcpy it on every state transition
Expand Down
Loading

0 comments on commit 89377a9

Please sign in to comment.