From aee4e0c2a28bf7d3f51c98b7a2a5224ed61e8420 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 3 May 2024 11:49:13 +0200 Subject: [PATCH] refactor(iroh): cleanup client::blobs naming --- iroh/src/client/blobs.rs | 162 +++++++++++++++++++-------------------- iroh/src/client/docs.rs | 10 +-- iroh/src/node.rs | 4 +- 3 files changed, 88 insertions(+), 88 deletions(-) diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index ab36411f544..d82563f76cd 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -15,10 +15,9 @@ use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_bytes::{ - export::ExportProgress, + export::ExportProgress as BytesExportProgress, format::collection::Collection, - get::db::DownloadProgress, - provider::AddProgress, + get::db::DownloadProgress as BytesDownloadProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, }; @@ -61,25 +60,25 @@ where { /// Stream the contents of a a single blob. /// - /// Returns a [`BlobReader`], which can report the size of the blob before reading it. - pub async fn read(&self, hash: Hash) -> Result { - BlobReader::from_rpc_read(&self.rpc, hash).await + /// Returns a [`Reader`], which can report the size of the blob before reading it. + pub async fn read(&self, hash: Hash) -> Result { + Reader::from_rpc_read(&self.rpc, hash).await } /// Read offset + len from a single blob. /// /// If `len` is `None` it will read the full blob. - pub async fn read_at(&self, hash: Hash, offset: u64, len: Option) -> Result { - BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len).await + pub async fn read_at(&self, hash: Hash, offset: u64, len: Option) -> Result { + Reader::from_rpc_read_at(&self.rpc, hash, offset, len).await } /// Read all bytes of single blob. /// /// This allocates a buffer for the full blob. Use only if you know that the blob you're /// reading is small. If not sure, use [`Self::read`] and check the size with - /// [`BlobReader::size`] before calling [`BlobReader::read_to_bytes`]. + /// [`Reader::size`] before calling [`Reader::read_to_bytes`]. pub async fn read_to_bytes(&self, hash: Hash) -> Result { - BlobReader::from_rpc_read(&self.rpc, hash) + Reader::from_rpc_read(&self.rpc, hash) .await? .read_to_bytes() .await @@ -94,7 +93,7 @@ where offset: u64, len: Option, ) -> Result { - BlobReader::from_rpc_read_at(&self.rpc, hash, offset, len) + Reader::from_rpc_read_at(&self.rpc, hash, offset, len) .await? .read_to_bytes() .await @@ -112,7 +111,7 @@ where in_place: bool, tag: SetTagOption, wrap: WrapOption, - ) -> Result { + ) -> Result { let stream = self .rpc .server_streaming(BlobAddPathRequest { @@ -122,7 +121,7 @@ where wrap, }) .await?; - Ok(BlobAddProgress::new(stream)) + Ok(AddProgress::new(stream)) } /// Create a collection from already existing blobs. @@ -151,7 +150,7 @@ where &self, reader: impl AsyncRead + Unpin + Send + 'static, tag: SetTagOption, - ) -> anyhow::Result { + ) -> anyhow::Result { const CAP: usize = 1024 * 64; // send 64KB per request by default let input = ReaderStream::with_capacity(reader, CAP); self.add_stream(input, tag).await @@ -162,7 +161,7 @@ where &self, input: impl Stream> + Send + Unpin + 'static, tag: SetTagOption, - ) -> anyhow::Result { + ) -> anyhow::Result { let (mut sink, progress) = self.rpc.bidi(BlobAddStreamRequest { tag }).await?; let mut input = input.map(|chunk| match chunk { Ok(chunk) => Ok(BlobAddStreamUpdate::Chunk(chunk)), @@ -180,11 +179,11 @@ where } }); - Ok(BlobAddProgress::new(progress)) + Ok(AddProgress::new(progress)) } /// Write a blob by passing bytes. - pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { + pub async fn add_bytes(&self, bytes: impl Into) -> anyhow::Result { let input = futures_lite::stream::once(Ok(bytes.into())); self.add_stream(input, SetTagOption::Auto).await?.await } @@ -194,7 +193,7 @@ where &self, bytes: impl Into, name: impl Into, - ) -> anyhow::Result { + ) -> anyhow::Result { let input = futures_lite::stream::once(Ok(bytes.into())); self.add_stream(input, SetTagOption::Named(name.into())) .await? @@ -230,7 +229,7 @@ where } /// Download a blob from another node and add it to the local database. - pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result { + pub async fn download(&self, hash: Hash, node: NodeAddr) -> Result { self.download_with_opts( hash, DownloadOptions { @@ -244,11 +243,7 @@ where } /// Download a hash sequence from another node and add it to the local database. - pub async fn download_hash_seq( - &self, - hash: Hash, - node: NodeAddr, - ) -> Result { + pub async fn download_hash_seq(&self, hash: Hash, node: NodeAddr) -> Result { self.download_with_opts( hash, DownloadOptions { @@ -266,7 +261,7 @@ where &self, hash: Hash, opts: DownloadOptions, - ) -> Result { + ) -> Result { let DownloadOptions { format, nodes, @@ -283,7 +278,7 @@ where mode, }) .await?; - Ok(BlobDownloadProgress::new( + Ok(DownloadProgress::new( stream.map(|res| res.map_err(anyhow::Error::from)), )) } @@ -303,7 +298,7 @@ where destination: PathBuf, format: ExportFormat, mode: ExportMode, - ) -> Result { + ) -> Result { let req = BlobExportRequest { hash, path: destination, @@ -311,7 +306,7 @@ where mode, }; let stream = self.rpc.server_streaming(req).await?; - Ok(BlobExportProgress::new( + Ok(ExportProgress::new( stream.map(|r| r.map_err(anyhow::Error::from)), )) } @@ -405,7 +400,7 @@ pub enum BlobStatus { /// Outcome of a blob add operation. #[derive(Debug, Clone)] -pub struct BlobAddOutcome { +pub struct AddOutcome { /// The hash of the blob pub hash: Hash, /// The format the blob @@ -458,16 +453,19 @@ pub struct IncompleteBlobInfo { /// Progress stream for blob add operations. #[derive(derive_more::Debug)] -pub struct BlobAddProgress { +pub struct AddProgress { #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, + stream: Pin< + Box> + Send + Unpin + 'static>, + >, current_total_size: Arc, } -impl BlobAddProgress { +impl AddProgress { fn new( - stream: (impl Stream, impl Into>> - + Send + stream: (impl Stream< + Item = Result, impl Into>, + > + Send + Unpin + 'static), ) -> Self { @@ -476,7 +474,7 @@ impl BlobAddProgress { let stream = stream.map(move |item| match item { Ok(item) => { let item = item.into(); - if let AddProgress::Found { size, .. } = &item { + if let iroh_bytes::provider::AddProgress::Found { size, .. } = &item { total_size.fetch_add(*size, Ordering::Relaxed); } Ok(item) @@ -490,25 +488,25 @@ impl BlobAddProgress { } /// Finish writing the stream, ignoring all intermediate progress events. /// - /// Returns a [`BlobAddOutcome`] which contains a tag, format, hash and a size. + /// Returns a [`AddOutcome`] which contains a tag, format, hash and a size. /// When importing a single blob, this is the hash and size of that blob. /// When importing a collection, the hash is the hash of the collection and the size /// is the total size of all imported blobs (but excluding the size of the collection blob /// itself). - pub async fn finish(self) -> Result { + pub async fn finish(self) -> Result { self.await } } -impl Stream for BlobAddProgress { - type Item = Result; +impl Stream for AddProgress { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } } -impl Future for BlobAddProgress { - type Output = Result; +impl Future for AddProgress { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -519,8 +517,8 @@ impl Future for BlobAddProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - AddProgress::AllDone { hash, format, tag } => { - let outcome = BlobAddOutcome { + iroh_bytes::provider::AddProgress::AllDone { hash, format, tag } => { + let outcome = AddOutcome { hash, format, tag, @@ -528,7 +526,7 @@ impl Future for BlobAddProgress { }; return Poll::Ready(Ok(outcome)); } - AddProgress::Abort(err) => { + iroh_bytes::provider::AddProgress::Abort(err) => { return Poll::Ready(Err(err.into())); } _ => {} @@ -540,7 +538,7 @@ impl Future for BlobAddProgress { /// Outcome of a blob download operation. #[derive(Debug, Clone)] -pub struct BlobDownloadOutcome { +pub struct DownloadOutcome { /// The size of the data we already had locally pub local_size: u64, /// The size of the data we downloaded from the network @@ -551,17 +549,17 @@ pub struct BlobDownloadOutcome { /// Progress stream for blob download operations. #[derive(derive_more::Debug)] -pub struct BlobDownloadProgress { +pub struct DownloadProgress { #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, + stream: Pin> + Send + Unpin + 'static>>, current_local_size: Arc, current_network_size: Arc, } -impl BlobDownloadProgress { - /// Create a `BlobDownloadProgress` that can help you easily poll the `DownloadProgress` stream from your download until it is finished or errors. +impl DownloadProgress { + /// Create a [`DownloadProgress`] that can help you easily poll the [`BytesDownloadProgress`] stream from your download until it is finished or errors. pub fn new( - stream: (impl Stream, impl Into>> + stream: (impl Stream, impl Into>> + Send + Unpin + 'static), @@ -576,10 +574,10 @@ impl BlobDownloadProgress { Ok(item) => { let item = item.into(); match &item { - DownloadProgress::FoundLocal { size, .. } => { + BytesDownloadProgress::FoundLocal { size, .. } => { local_size.fetch_add(size.value(), Ordering::Relaxed); } - DownloadProgress::Found { size, .. } => { + BytesDownloadProgress::Found { size, .. } => { network_size.fetch_add(*size, Ordering::Relaxed); } _ => {} @@ -595,25 +593,26 @@ impl BlobDownloadProgress { current_network_size, } } + /// Finish writing the stream, ignoring all intermediate progress events. /// - /// Returns a [`BlobDownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally. + /// Returns a [`DownloadOutcome`] which contains the size of the content we downloaded and the size of the content we already had locally. /// When importing a single blob, this is the size of that blob. /// When importing a collection, this is the total size of all imported blobs (but excluding the size of the collection blob itself). - pub async fn finish(self) -> Result { + pub async fn finish(self) -> Result { self.await } } -impl Stream for BlobDownloadProgress { - type Item = Result; +impl Stream for DownloadProgress { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } } -impl Future for BlobDownloadProgress { - type Output = Result; +impl Future for DownloadProgress { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -624,15 +623,15 @@ impl Future for BlobDownloadProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - DownloadProgress::AllDone(stats) => { - let outcome = BlobDownloadOutcome { + BytesDownloadProgress::AllDone(stats) => { + let outcome = DownloadOutcome { local_size: self.current_local_size.load(Ordering::Relaxed), downloaded_size: self.current_network_size.load(Ordering::Relaxed), stats, }; return Poll::Ready(Ok(outcome)); } - DownloadProgress::Abort(err) => { + BytesDownloadProgress::Abort(err) => { return Poll::Ready(Err(err.into())); } _ => {} @@ -644,23 +643,24 @@ impl Future for BlobDownloadProgress { /// Outcome of a blob export operation. #[derive(Debug, Clone, PartialEq, Eq)] -pub struct BlobExportOutcome { +pub struct ExportOutcome { /// The total size of the exported data. total_size: u64, } /// Progress stream for blob export operations. #[derive(derive_more::Debug)] -pub struct BlobExportProgress { +pub struct ExportProgress { #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, + stream: Pin> + Send + Unpin + 'static>>, current_total_size: Arc, } -impl BlobExportProgress { - /// Create a `BlobExportProgress` that can help you easily poll the `ExportProgress` stream from your download until it is finished or errors. +impl ExportProgress { + /// Create a [`ExportProgress`] that can help you easily poll the [`BytesExportProgress`] stream from your + /// download until it is finished or errors. pub fn new( - stream: (impl Stream, impl Into>> + stream: (impl Stream, impl Into>> + Send + Unpin + 'static), @@ -670,7 +670,7 @@ impl BlobExportProgress { let stream = stream.map(move |item| match item { Ok(item) => { let item = item.into(); - if let ExportProgress::Found { size, .. } = &item { + if let BytesExportProgress::Found { size, .. } = &item { let size = size.value(); total_size.fetch_add(size, Ordering::Relaxed); } @@ -687,21 +687,21 @@ impl BlobExportProgress { /// Finish writing the stream, ignoring all intermediate progress events. /// - /// Returns a [`BlobExportOutcome`] which contains the size of the content we exported. - pub async fn finish(self) -> Result { + /// Returns a [`ExportOutcome`] which contains the size of the content we exported. + pub async fn finish(self) -> Result { self.await } } -impl Stream for BlobExportProgress { - type Item = Result; +impl Stream for ExportProgress { + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } } -impl Future for BlobExportProgress { - type Output = Result; +impl Future for ExportProgress { + type Output = Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { @@ -712,13 +712,13 @@ impl Future for BlobExportProgress { } Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), Poll::Ready(Some(Ok(msg))) => match msg { - ExportProgress::AllDone => { - let outcome = BlobExportOutcome { + BytesExportProgress::AllDone => { + let outcome = ExportOutcome { total_size: self.current_total_size.load(Ordering::Relaxed), }; return Poll::Ready(Ok(outcome)); } - ExportProgress::Abort(err) => { + BytesExportProgress::Abort(err) => { return Poll::Ready(Err(err.into())); } _ => {} @@ -732,7 +732,7 @@ impl Future for BlobExportProgress { /// /// Implements [`AsyncRead`]. #[derive(derive_more::Debug)] -pub struct BlobReader { +pub struct Reader { size: u64, response_size: u64, is_complete: bool, @@ -740,7 +740,7 @@ pub struct BlobReader { stream: tokio_util::io::StreamReader>, Bytes>, } -impl BlobReader { +impl Reader { fn new( size: u64, response_size: u64, @@ -810,7 +810,7 @@ impl BlobReader { } } -impl AsyncRead for BlobReader { +impl AsyncRead for Reader { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -820,7 +820,7 @@ impl AsyncRead for BlobReader { } } -impl Stream for BlobReader { +impl Stream for Reader { type Item = io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index 0031fa23623..fef75498f83 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -35,7 +35,7 @@ use crate::rpc_protocol::{ #[doc(inline)] pub use crate::sync_engine::{Origin, SyncEvent, SyncReason}; -use super::{blobs::BlobReader, flatten}; +use super::{blobs, flatten}; /// Iroh docs client. #[derive(Debug, Clone)] @@ -442,17 +442,17 @@ impl Entry { self.0.timestamp() } - /// Read the content of an [`Entry`] as a streaming [`BlobReader`]. + /// Read the content of an [`Entry`] as a streaming [`blobs::Reader`]. /// /// You can pass either a [`Doc`] or the `Iroh` client by reference as `client`. pub async fn content_reader( &self, client: impl Into<&RpcClient>, - ) -> Result + ) -> Result where C: ServiceConnection, { - BlobReader::from_rpc_read(client.into(), self.content_hash()).await + blobs::Reader::from_rpc_read(client.into(), self.content_hash()).await } /// Read all content of an [`Entry`] into a buffer. @@ -465,7 +465,7 @@ impl Entry { where C: ServiceConnection, { - BlobReader::from_rpc_read(client.into(), self.content_hash()) + blobs::Reader::from_rpc_read(client.into(), self.content_hash()) .await? .read_to_bytes() .await diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 3fa4142d7c9..38e3721d55f 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -283,7 +283,7 @@ mod tests { use iroh_net::{relay::RelayMode, test_utils::DnsPkarrServer}; use crate::{ - client::blobs::{BlobAddOutcome, WrapOption}, + client::blobs::{AddOutcome, WrapOption}, rpc_protocol::{BlobAddPathRequest, BlobAddPathResponse, SetTagOption}, }; @@ -422,7 +422,7 @@ mod tests { .insecure_skip_relay_cert_verify(true) .spawn() .await?; - let BlobAddOutcome { hash, .. } = node1.blobs.add_bytes(b"foo".to_vec()).await?; + let AddOutcome { hash, .. } = node1.blobs.add_bytes(b"foo".to_vec()).await?; // create a node addr with only a relay URL, no direct addresses let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);