From fe6dcaccad54e7d72ae6aa122721ccd33736edc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=BCdiger=20Klaehn?= Date: Wed, 10 Apr 2024 18:51:33 +0300 Subject: [PATCH] refactor(iroh-bytes): use even newer bao-tree (#2168) ## Description refactor(iroh-bytes): use even newer bao-tree There is a new version of bao-tree that fixes some long standing api consistency issues and drops the tokio dependency. This updates iroh to use it. We of course still use tokio, but switching to something else would be easier at least as far as bao-tree is concerned. The other change is that encode_ranges_validated no longer writes the length prefix, so we have to do this ourselves. This was done to simplify the bao-tree API and make it more flexible. ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. --- Cargo.lock | 11 +++---- iroh-base/Cargo.toml | 2 +- iroh-bytes/Cargo.toml | 4 +-- iroh-bytes/src/get.rs | 34 ++++++++++----------- iroh-bytes/src/get/db.rs | 2 +- iroh-bytes/src/provider.rs | 5 ++- iroh-bytes/src/store/bao_file.rs | 52 +++++++++++++++++++++----------- iroh-bytes/src/store/fs/tests.rs | 2 +- iroh-bytes/src/store/mem.rs | 4 +-- iroh-bytes/src/util/io.rs | 30 +++++++++--------- iroh-cli/Cargo.toml | 2 +- iroh/Cargo.toml | 4 +-- iroh/tests/gc.rs | 8 ++++- 13 files changed, 92 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c3b2ec4fa8..20fd76df40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -313,9 +313,9 @@ dependencies = [ [[package]] name = "bao-tree" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95ae2f5c25ce9df1d21b6d2cfe8e1517ff78bd65476bfc47a1ac5b657fa0e1df" +checksum = "f1f7a89a8ee5889d2593ae422ce6e1bb03e48a0e8a16e4fa0882dfcbe7e182ef" dependencies = [ "bytes", "futures-lite", @@ -326,7 +326,6 @@ dependencies = [ "range-collections", "self_cell", "smallvec", - "tokio", ] [[package]] @@ -2516,12 +2515,12 @@ dependencies = [ [[package]] name = "iroh-io" -version = "0.4.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd67e386f948a6f09e71057b48fff51b6414f0080997495b5bdf2d1bdcdbe46" +checksum = "74d1047ad5ca29ab4ff316b6830d86e7ea52cea54325e4d4a849692e1274b498" dependencies = [ "bytes", - "futures", + "futures-lite", "pin-project", "smallvec", "tokio", diff --git a/iroh-base/Cargo.toml b/iroh-base/Cargo.toml index 24351b4948..b618acf2b7 100644 --- a/iroh-base/Cargo.toml +++ b/iroh-base/Cargo.toml @@ -16,7 +16,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm", "validate"], default-features = false, optional = true } +bao-tree = { version = "0.13", features = ["tokio_fsm", "validate"], default-features = false, optional = true } data-encoding = { version = "2.3.3", optional = true } hex = "0.4.3" multibase = { version = "0.9.1", optional = true } diff --git a/iroh-bytes/Cargo.toml b/iroh-bytes/Cargo.toml index b76aa570a9..dd87ff71da 100644 --- a/iroh-bytes/Cargo.toml +++ b/iroh-bytes/Cargo.toml @@ -17,7 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false } +bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = { version = "1.4", features = ["serde"] } chrono = "0.4.31" data-encoding = "2.3.3" @@ -28,7 +28,7 @@ futures-buffered = "0.2.4" genawaiter = { version = "0.99.1", features = ["futures03"] } hex = "0.4.3" iroh-base = { version = "0.13.0", features = ["redb"], path = "../iroh-base" } -iroh-io = { version = "0.4.0", features = ["stats"] } +iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true } iroh-net = { version = "0.13.0", path = "../iroh-net", optional = true } num_cpus = "1.15.0" diff --git a/iroh-bytes/src/get.rs b/iroh-bytes/src/get.rs index b2cbcf05d1..47cd017932 100644 --- a/iroh-bytes/src/get.rs +++ b/iroh-bytes/src/get.rs @@ -70,8 +70,10 @@ pub mod fsm { BaoTree, ChunkRanges, TreeNode, }; use derive_more::From; - use iroh_io::AsyncSliceWriter; - use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use iroh_io::{AsyncSliceWriter, AsyncStreamReader, TokioStreamReader}; + use tokio::io::AsyncWriteExt; + + type WrappedRecvStream = TrackingReader>; self_cell::self_cell! { struct RangesIterInner { @@ -142,7 +144,7 @@ pub mod fsm { pub async fn next(self) -> Result { let start = Instant::now(); let (writer, reader) = self.connection.open_bi().await?; - let reader = TrackingReader::new(reader); + let reader = TrackingReader::new(TokioStreamReader::new(reader)); let writer = TrackingWriter::new(writer); Ok(AtConnected { start, @@ -157,7 +159,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtConnected { start: Instant, - reader: TrackingReader, + reader: WrappedRecvStream, writer: TrackingWriter, request: GetRequest, } @@ -292,7 +294,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartRoot { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -301,7 +303,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtStartChild { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, child_offset: u64, } @@ -376,7 +378,7 @@ pub mod fsm { #[derive(Debug)] pub struct AtBlobHeader { ranges: ChunkRanges, - reader: TrackingReader, + reader: TrackingReader>, misc: Box, hash: Hash, } @@ -412,7 +414,7 @@ pub mod fsm { impl AtBlobHeader { /// Read the size header, returning it and going into the `Content` state. pub async fn next(mut self) -> Result<(AtBlobContent, u64), AtBlobHeaderNextError> { - let size = self.reader.read_u64_le().await.map_err(|cause| { + let size = self.reader.read::<8>().await.map_err(|cause| { if cause.kind() == io::ErrorKind::UnexpectedEof { AtBlobHeaderNextError::NotFound } else if let Some(e) = cause @@ -424,6 +426,7 @@ pub mod fsm { AtBlobHeaderNextError::Io(cause) } })?; + let size = u64::from_le_bytes(size); let stream = ResponseDecoder::new( self.hash.into(), self.ranges, @@ -513,7 +516,7 @@ pub mod fsm { /// State while we are reading content #[derive(Debug)] pub struct AtBlobContent { - stream: ResponseDecoder>, + stream: ResponseDecoder, misc: Box, } @@ -792,7 +795,7 @@ pub mod fsm { /// State after we have read all the content for a blob #[derive(Debug)] pub struct AtEndBlob { - stream: TrackingReader, + stream: WrappedRecvStream, misc: Box, } @@ -826,16 +829,12 @@ pub mod fsm { #[derive(Debug)] pub struct AtClosing { misc: Box, - reader: TrackingReader, + reader: WrappedRecvStream, check_extra_data: bool, } impl AtClosing { - fn new( - misc: Box, - reader: TrackingReader, - check_extra_data: bool, - ) -> Self { + fn new(misc: Box, reader: WrappedRecvStream, check_extra_data: bool) -> Self { Self { misc, reader, @@ -846,7 +845,8 @@ pub mod fsm { /// Finish the get response, returning statistics pub async fn next(self) -> result::Result { // Shut down the stream - let (mut reader, bytes_read) = self.reader.into_parts(); + let (reader, bytes_read) = self.reader.into_parts(); + let mut reader = reader.into_inner(); if self.check_extra_data { if let Some(chunk) = reader.read_chunk(8, false).await? { reader.stop(0u8.into()).ok(); diff --git a/iroh-bytes/src/get/db.rs b/iroh-bytes/src/get/db.rs index 0596775912..f8dbd39ca1 100644 --- a/iroh-bytes/src/get/db.rs +++ b/iroh-bytes/src/get/db.rs @@ -145,7 +145,7 @@ pub async fn valid_ranges(entry: &D::EntryMut) -> anyhow::Result( // wrap the data reader in a tracking reader so we can get some stats for reading let mut tracking_reader = TrackingSliceReader::new(&mut data); // send the root + tw.write(outboard.tree().size().to_le_bytes().as_slice()) + .await?; encode_ranges_validated( &mut tracking_reader, &mut outboard, @@ -490,13 +492,14 @@ pub async fn send_blob( db: &D, name: Hash, ranges: &RangeSpec, - writer: W, + mut writer: W, ) -> Result<(SentStatus, u64, SliceReaderStats)> { match db.get(&name).await? { Some(entry) => { let outboard = entry.outboard().await?; let size = outboard.tree().size(); let mut file_reader = TrackingSliceReader::new(entry.data_reader().await?); + writer.write(size.to_le_bytes().as_slice()).await?; let res = encode_ranges_validated( &mut file_reader, outboard, diff --git a/iroh-bytes/src/store/bao_file.rs b/iroh-bytes/src/store/bao_file.rs index e1fa62e8a6..64adc6adc0 100644 --- a/iroh-bytes/src/store/bao_file.rs +++ b/iroh-bytes/src/store/bao_file.rs @@ -426,7 +426,7 @@ impl AsyncSliceReader for DataReader { .await } - async fn len(&mut self) -> io::Result { + async fn size(&mut self) -> io::Result { with_storage( &mut self.0, BaoFileStorage::is_mem, @@ -458,7 +458,7 @@ impl AsyncSliceReader for OutboardReader { .await } - async fn len(&mut self) -> io::Result { + async fn size(&mut self) -> io::Result { with_storage( &mut self.0, BaoFileStorage::is_mem, @@ -732,9 +732,9 @@ pub mod test_support { BlockSize, ChunkRanges, }; use futures::{Future, Stream, StreamExt}; + use iroh_io::AsyncStreamReader; use rand::RngCore; use range_collections::RangeSet2; - use tokio::io::{AsyncRead, AsyncReadExt}; use crate::util::limited_range; @@ -751,10 +751,11 @@ pub mod test_support { mut target: W, ) -> io::Result<()> where - R: AsyncRead + Unpin, + R: AsyncStreamReader, W: BaoBatchWriter, { - let size = encoded.read_u64_le().await?; + let size = encoded.read::<8>().await?; + let size = u64::from_le_bytes(size); let mut reading = ResponseDecoder::new(root.into(), ranges, BaoTree::new(size, block_size), encoded); let mut stack = Vec::new(); @@ -792,7 +793,8 @@ pub mod test_support { /// Take some data and encode it pub fn simulate_remote(data: &[u8]) -> (Hash, Cursor) { let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); - let mut encoded = Vec::new(); + let size = data.len() as u64; + let mut encoded = size.to_le_bytes().to_vec(); bao_tree::io::sync::encode_ranges_validated( data, &outboard, @@ -823,7 +825,8 @@ pub mod test_support { let chunk_ranges = round_up_to_chunks(&range_set); // compute the outboard let outboard = PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE).flip(); - let mut encoded = Vec::new(); + let size = data.len() as u64; + let mut encoded = size.to_le_bytes().to_vec(); encode_ranges_validated(data, &outboard, &chunk_ranges, &mut encoded).unwrap(); (outboard.root.into(), chunk_ranges, encoded) } @@ -866,8 +869,11 @@ pub mod test_support { #[cfg(test)] mod tests { + use std::io::Write; + use bao_tree::{blake3, ChunkNum, ChunkRanges}; use futures::StreamExt; + use iroh_io::TokioStreamReader; use tests::test_support::{ decode_response_into_batch, local, make_wire_data, random_test_data, trickle, validate, }; @@ -900,7 +906,7 @@ mod tests { let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10)) .map(io::Result::Ok) .boxed(); - let trickle = tokio_util::io::StreamReader::new(trickle); + let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle)); let _task = tasks.spawn_local(async move { decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file) .await @@ -912,7 +918,7 @@ mod tests { println!( "len {:?} {:?}", handle, - handle.data_reader().len().await.unwrap() + handle.data_reader().size().await.unwrap() ); #[allow(clippy::single_range_in_vec_init)] let ranges = [1024 * 16..1024 * 48]; @@ -920,10 +926,14 @@ mod tests { // let ranges = // let full_chunks = bao_tree::io::full_chunk_groups(); - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::from(ChunkNum(16)..ChunkNum(48)), encoded, ) @@ -957,7 +967,7 @@ mod tests { let trickle = trickle(&wire_data, 1200, std::time::Duration::from_millis(10)) .map(io::Result::Ok) .boxed(); - let trickle = tokio_util::io::StreamReader::new(trickle); + let trickle = TokioStreamReader::new(tokio_util::io::StreamReader::new(trickle)); let task = local.spawn_pinned(move || async move { decode_response_into_batch(hash, IROH_BLOCK_SIZE, chunk_ranges, trickle, file).await }); @@ -969,16 +979,20 @@ mod tests { println!( "len {:?} {:?}", handle, - handle.data_reader().len().await.unwrap() + handle.data_reader().size().await.unwrap() ); #[allow(clippy::single_range_in_vec_init)] let ranges = [0..n]; validate(&handle, &test_data, &ranges).await; - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::all(), encoded, ) @@ -1013,10 +1027,14 @@ mod tests { .unwrap(); validate(&handle, &test_data, &ranges).await; - let encoded = Vec::new(); + let mut encoded = Vec::new(); + let ob = handle.outboard().unwrap(); + encoded + .write_all(ob.tree.size().to_le_bytes().as_slice()) + .unwrap(); bao_tree::io::fsm::encode_ranges_validated( handle.data_reader(), - handle.outboard().unwrap(), + ob, &ChunkRanges::all(), encoded, ) diff --git a/iroh-bytes/src/store/fs/tests.rs b/iroh-bytes/src/store/fs/tests.rs index 7aaff9be7d..5844b78738 100644 --- a/iroh-bytes/src/store/fs/tests.rs +++ b/iroh-bytes/src/store/fs/tests.rs @@ -793,7 +793,7 @@ async fn actor_store_smoke() { hash, IROH_BLOCK_SIZE, chunk_ranges.clone(), - Cursor::new(wire_data), + Cursor::new(wire_data.as_slice()), handle.batch_writer().await.unwrap(), ) .await diff --git a/iroh-bytes/src/store/mem.rs b/iroh-bytes/src/store/mem.rs index 0b65510b75..395485d606 100644 --- a/iroh-bytes/src/store/mem.rs +++ b/iroh-bytes/src/store/mem.rs @@ -298,7 +298,7 @@ impl AsyncSliceReader for DataReader { Ok(self.0.data.read().unwrap().read_data_at(offset, len)) } - async fn len(&mut self) -> std::io::Result { + async fn size(&mut self) -> std::io::Result { Ok(self.0.data.read().unwrap().data_len()) } } @@ -310,7 +310,7 @@ impl AsyncSliceReader for OutboardReader { Ok(self.0.data.read().unwrap().read_outboard_at(offset, len)) } - async fn len(&mut self) -> std::io::Result { + async fn size(&mut self) -> std::io::Result { Ok(self.0.data.read().unwrap().outboard_len()) } } diff --git a/iroh-bytes/src/util/io.rs b/iroh-bytes/src/util/io.rs index 85d9aa408d..2a4b24018d 100644 --- a/iroh-bytes/src/util/io.rs +++ b/iroh-bytes/src/util/io.rs @@ -1,7 +1,8 @@ //! Utilities for working with tokio io +use iroh_io::AsyncStreamReader; use std::{io, pin::Pin, task::Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::io::AsyncWrite; /// A reader that tracks the number of bytes read #[derive(Debug)] @@ -28,23 +29,20 @@ impl TrackingReader { } } -impl AsyncRead for TrackingReader +impl AsyncStreamReader for TrackingReader where - R: AsyncRead + Unpin, + R: AsyncStreamReader, { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> Poll> { - let this = &mut *self; - let filled0 = buf.filled().len(); - let res = Pin::new(&mut this.inner).poll_read(cx, buf); - if let Poll::Ready(Ok(())) = res { - let size = buf.filled().len().saturating_sub(filled0); - this.read = this.read.saturating_add(size as u64); - } - res + async fn read_bytes(&mut self, len: usize) -> io::Result { + let bytes = self.inner.read_bytes(len).await?; + self.read = self.read.saturating_add(bytes.len() as u64); + Ok(bytes) + } + + async fn read(&mut self) -> io::Result<[u8; L]> { + let res = self.inner.read::().await?; + self.read = self.read.saturating_add(L as u64); + Ok(res) } } diff --git a/iroh-cli/Cargo.toml b/iroh-cli/Cargo.toml index 4c4d706969..9a4bf2f31b 100644 --- a/iroh-cli/Cargo.toml +++ b/iroh-cli/Cargo.toml @@ -23,7 +23,7 @@ doc = false [dependencies] anyhow = "1.0.81" -bao-tree = { version = "0.12" } +bao-tree = { version = "0.13" } bytes = "1.5.0" clap = { version = "4", features = ["derive"] } colored = { version = "2.0.4" } diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index af65ecd65c..83a35d920c 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -17,7 +17,7 @@ workspace = true [dependencies] anyhow = { version = "1" } -bao-tree = { version = "0.12", features = ["tokio_fsm"], default-features = false } +bao-tree = { version = "0.13", features = ["tokio_fsm"], default-features = false } bytes = "1" data-encoding = "2.4.0" derive_more = { version = "1.0.0-beta.1", features = ["debug", "display", "from", "try_into", "from_str"] } @@ -28,7 +28,7 @@ hashlink = "0.8.4" hex = { version = "0.4.3" } iroh-bytes = { version = "0.13.0", path = "../iroh-bytes", features = ["downloader"] } iroh-base = { version = "0.13.0", path = "../iroh-base", features = ["key"] } -iroh-io = { version = "0.4.0", features = ["stats"] } +iroh-io = { version = "0.6.0", features = ["stats"] } iroh-metrics = { version = "0.13.0", path = "../iroh-metrics", optional = true } iroh-net = { version = "0.13.0", path = "../iroh-net" } num_cpus = { version = "1.15.0" } diff --git a/iroh/tests/gc.rs b/iroh/tests/gc.rs index e461695b5f..a63f03d0c5 100644 --- a/iroh/tests/gc.rs +++ b/iroh/tests/gc.rs @@ -1,4 +1,7 @@ -use std::{io::Cursor, time::Duration}; +use std::{ + io::{Cursor, Write}, + time::Duration, +}; use anyhow::Result; use bao_tree::{blake3, io::sync::Outboard, ChunkRanges}; @@ -25,6 +28,9 @@ pub fn create_test_data(size: usize) -> Bytes { pub fn simulate_remote(data: &[u8]) -> (blake3::Hash, Cursor) { let outboard = bao_tree::io::outboard::PostOrderMemOutboard::create(data, IROH_BLOCK_SIZE); let mut encoded = Vec::new(); + encoded + .write_all(outboard.tree.size().to_le_bytes().as_ref()) + .unwrap(); bao_tree::io::sync::encode_ranges_validated(data, &outboard, &ChunkRanges::all(), &mut encoded) .unwrap(); let hash = outboard.root();