Skip to content

Commit

Permalink
refactor(iroh-bytes): use even newer bao-tree (#2168)
Browse files Browse the repository at this point in the history
## Description

refactor(iroh-bytes): use even newer bao-tree

There is a new version of bao-tree that fixes some long standing api
consistency issues and drops the tokio dependency. This updates iroh to
use it.

We of course still use tokio, but switching to something else would be
easier at least as far as bao-tree is concerned.

The other change is that encode_ranges_validated no longer writes the
length prefix, so we have to do this ourselves. This was done to
simplify the bao-tree API and make it more flexible.

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
  • Loading branch information
rklaehn authored Apr 10, 2024
1 parent 5d4ac52 commit fe6dcac
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 68 deletions.
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion iroh-base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.12", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false, optional = true }
data-encoding = { version = "2.3.3", optional = true }
hex = "0.4.3"
multibase = { version = "0.9.1", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions iroh-bytes/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ workspace = true

[dependencies]
anyhow = { version = "1" }
bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false }
bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false }
bytes = { version = "1.4", features = ["serde"] }
chrono = "0.4.31"
data-encoding = "2.3.3"
Expand All @@ -28,7 +28,7 @@ futures-buffered = "0.2.4"
genawaiter = { version = "0.99.1", features = ["futures03"] }
hex = "0.4.3"
iroh-base = { version = "0.13.0", features = ["redb"], path = "../iroh-base" }
iroh-io = { version = "0.4.0", features = ["stats"] }
iroh-io = { version = "0.6.0", features = ["stats"] }
iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.13.0", path = "../iroh-net", optional = true }
num_cpus = "1.15.0"
Expand Down
34 changes: 17 additions & 17 deletions iroh-bytes/src/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@ pub mod fsm {
BaoTree, ChunkRanges, TreeNode,
};
use derive_more::From;
use iroh_io::AsyncSliceWriter;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader};
use tokio::io::AsyncWriteExt;

type WrappedRecvStream = TrackingReader<TokioStreamReader<RecvStream>>;

self_cell::self_cell! {
struct RangesIterInner {
Expand Down Expand Up @@ -142,7 +144,7 @@ pub mod fsm {
pub async fn next(self) -> Result<AtConnected, quinn::ConnectionError> {
let start = Instant::now();
let (writer, reader) = self.connection.open_bi().await?;
let reader = TrackingReader::new(reader);
let reader = TrackingReader::new(TokioStreamReader::new(reader));
let writer = TrackingWriter::new(writer);
Ok(AtConnected {
start,
Expand All @@ -157,7 +159,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtConnected {
start: Instant,
reader: TrackingReader<quinn::RecvStream>,
reader: WrappedRecvStream,
writer: TrackingWriter<quinn::SendStream>,
request: GetRequest,
}
Expand Down Expand Up @@ -292,7 +294,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartRoot {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand All @@ -301,7 +303,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtStartChild {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
child_offset: u64,
}
Expand Down Expand Up @@ -376,7 +378,7 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtBlobHeader {
ranges: ChunkRanges,
reader: TrackingReader<quinn::RecvStream>,
reader: TrackingReader<TokioStreamReader<quinn::RecvStream>>,
misc: Box<Misc>,
hash: Hash,
}
Expand Down Expand Up @@ -412,7 +414,7 @@ pub mod fsm {
impl AtBlobHeader {
/// Read the size header, returning it and going into the `Content` state.
pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> {
let size = self.reader.read_u64_le().await.map_err(|cause| {
let size = self.reader.read::<8>().await.map_err(|cause| {
if cause.kind() == io::ErrorKind::UnexpectedEof {
AtBlobHeaderNextError::NotFound
} else if let Some(e) = cause
Expand All @@ -424,6 +426,7 @@ pub mod fsm {
AtBlobHeaderNextError::Io(cause)
}
})?;
let size = u64::from_le_bytes(size);
let stream = ResponseDecoder::new(
self.hash.into(),
self.ranges,
Expand Down Expand Up @@ -513,7 +516,7 @@ pub mod fsm {
/// State while we are reading content
#[derive(Debug)]
pub struct AtBlobContent {
stream: ResponseDecoder<TrackingReader<RecvStream>>,
stream: ResponseDecoder<WrappedRecvStream>,
misc: Box<Misc>,
}

Expand Down Expand Up @@ -792,7 +795,7 @@ pub mod fsm {
/// State after we have read all the content for a blob
#[derive(Debug)]
pub struct AtEndBlob {
stream: TrackingReader<RecvStream>,
stream: WrappedRecvStream,
misc: Box<Misc>,
}

Expand Down Expand Up @@ -826,16 +829,12 @@ pub mod fsm {
#[derive(Debug)]
pub struct AtClosing {
misc: Box<Misc>,
reader: TrackingReader<RecvStream>,
reader: WrappedRecvStream,
check_extra_data: bool,
}

impl AtClosing {
fn new(
misc: Box<Misc>,
reader: TrackingReader<RecvStream>,
check_extra_data: bool,
) -> Self {
fn new(misc: Box<Misc>, reader: WrappedRecvStream, check_extra_data: bool) -> Self {
Self {
misc,
reader,
Expand All @@ -846,7 +845,8 @@ pub mod fsm {
/// Finish the get response, returning statistics
pub async fn next(self) -> result::Result<Stats, quinn::ReadError> {
// Shut down the stream
let (mut reader, bytes_read) = self.reader.into_parts();
let (reader, bytes_read) = self.reader.into_parts();
let mut reader = reader.into_inner();
if self.check_extra_data {
if let Some(chunk) = reader.read_chunk(8, false).await? {
reader.stop(0u8.into()).ok();
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/get/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub async fn valid_ranges<D: MapMut>(entry: &D::EntryMut) -> anyhow::Result<Chun
use tracing::trace as log;
// compute the valid range from just looking at the data file
let mut data_reader = entry.data_reader().await?;
let data_size = data_reader.len().await?;
let data_size = data_reader.size().await?;
let valid_from_data = ChunkRanges::from(..ChunkNum::full_chunks(data_size));
// compute the valid range from just looking at the outboard file
let mut outboard = entry.outboard().await?;
Expand Down
5 changes: 4 additions & 1 deletion iroh-bytes/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ pub async fn transfer_collection<D: Map, E: EventSender>(
// wrap the data reader in a tracking reader so we can get some stats for reading
let mut tracking_reader = TrackingSliceReader::new(&mut data);
// send the root
tw.write(outboard.tree().size().to_le_bytes().as_slice())
.await?;
encode_ranges_validated(
&mut tracking_reader,
&mut outboard,
Expand Down Expand Up @@ -490,13 +492,14 @@ pub async fn send_blob<D: Map, W: AsyncStreamWriter>(
db: &D,
name: Hash,
ranges: &RangeSpec,
writer: W,
mut writer: W,
) -> Result<(SentStatus, u64, SliceReaderStats)> {
match db.get(&name).await? {
Some(entry) => {
let outboard = entry.outboard().await?;
let size = outboard.tree().size();
let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?);
writer.write(size.to_le_bytes().as_slice()).await?;
let res = encode_ranges_validated(
&mut file_reader,
outboard,
Expand Down
52 changes: 35 additions & 17 deletions iroh-bytes/src/store/bao_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ impl AsyncSliceReader for DataReader {
.await
}

async fn len(&mut self) -> io::Result<u64> {
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
Expand Down Expand Up @@ -458,7 +458,7 @@ impl AsyncSliceReader for OutboardReader {
.await
}

async fn len(&mut self) -> io::Result<u64> {
async fn size(&mut self) -> io::Result<u64> {
with_storage(
&mut self.0,
BaoFileStorage::is_mem,
Expand Down Expand Up @@ -732,9 +732,9 @@ pub mod test_support {
BlockSize, ChunkRanges,
};
use futures::{Future, Stream, StreamExt};
use iroh_io::AsyncStreamReader;
use rand::RngCore;
use range_collections::RangeSet2;
use tokio::io::{AsyncRead, AsyncReadExt};

use crate::util::limited_range;

Expand All @@ -751,10 +751,11 @@ pub mod test_support {
mut target: W,
) -> io::Result<()>
where
R: AsyncRead + Unpin,
R: AsyncStreamReader,
W: BaoBatchWriter,
{
let size = encoded.read_u64_le().await?;
let size = encoded.read::<8>().await?;
let size = u64::from_le_bytes(size);
let mut reading =
ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded);
let mut stack = Vec::new();
Expand Down Expand Up @@ -792,7 +793,8 @@ pub mod test_support {
/// Take some data and encode it
pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor<Bytes>) {
let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE);
let mut encoded = Vec::new();
let size = data.len() as u64;
let mut encoded = size.to_le_bytes().to_vec();
bao_tree::io::sync::encode_ranges_validated(
data,
&outboard,
Expand Down Expand Up @@ -823,7 +825,8 @@ pub mod test_support {
let chunk_ranges = round_up_to_chunks(&range_set);
// compute the outboard
let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip();
let mut encoded = Vec::new();
let size = data.len() as u64;
let mut encoded = size.to_le_bytes().to_vec();
encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap();
(outboard.root.into(), chunk_ranges, encoded)
}
Expand Down Expand Up @@ -866,8 +869,11 @@ pub mod test_support {

#[cfg(test)]
mod tests {
use std::io::Write;

use bao_tree::{blake3, ChunkNum, ChunkRanges};
use futures::StreamExt;
use iroh_io::TokioStreamReader;
use tests::test_support::{
decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate,
};
Expand Down Expand Up @@ -900,7 +906,7 @@ mod tests {
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = tokio_util::io::StreamReader::new(trickle);
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let _task = tasks.spawn_local(async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file)
.await
Expand All @@ -912,18 +918,22 @@ mod tests {
println!(
"len {:?} {:?}",
handle,
handle.data_reader().len().await.unwrap()
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [1024 * 16..1024 * 48];
validate(&handle, &test_data, &ranges).await;

// let ranges =
// let full_chunks = bao_tree::io::full_chunk_groups();
let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::from(ChunkNum(16)..ChunkNum(48)),
encoded,
)
Expand Down Expand Up @@ -957,7 +967,7 @@ mod tests {
let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10))
.map(io::Result::Ok)
.boxed();
let trickle = tokio_util::io::StreamReader::new(trickle);
let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle));
let task = local.spawn_pinned(move || async move {
decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await
});
Expand All @@ -969,16 +979,20 @@ mod tests {
println!(
"len {:?} {:?}",
handle,
handle.data_reader().len().await.unwrap()
handle.data_reader().size().await.unwrap()
);
#[allow(clippy::single_range_in_vec_init)]
let ranges = [0..n];
validate(&handle, &test_data, &ranges).await;

let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::all(),
encoded,
)
Expand Down Expand Up @@ -1013,10 +1027,14 @@ mod tests {
.unwrap();
validate(&handle, &test_data, &ranges).await;

let encoded = Vec::new();
let mut encoded = Vec::new();
let ob = handle.outboard().unwrap();
encoded
.write_all(ob.tree.size().to_le_bytes().as_slice())
.unwrap();
bao_tree::io::fsm::encode_ranges_validated(
handle.data_reader(),
handle.outboard().unwrap(),
ob,
&ChunkRanges::all(),
encoded,
)
Expand Down
2 changes: 1 addition & 1 deletion iroh-bytes/src/store/fs/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ async fn actor_store_smoke() {
hash,
IROH_BLOCK_SIZE,
chunk_ranges.clone(),
Cursor::new(wire_data),
Cursor::new(wire_data.as_slice()),
handle.batch_writer().await.unwrap(),
)
.await
Expand Down
Loading

0 comments on commit fe6dcac

Please sign in to comment.