diff --git a/iroh-blobs/src/store/traits.rs b/iroh-blobs/src/store/traits.rs index 9d1e42fc33..62791cbd48 100644 --- a/iroh-blobs/src/store/traits.rs +++ b/iroh-blobs/src/store/traits.rs @@ -357,6 +357,9 @@ pub trait Store: ReadableStore + MapMut { fn temp_tag(&self, value: HashAndFormat) -> TempTag; /// Handle to use to drop tags + /// + /// Return None for stores that don't keep track of tags, such as read-only + /// stores. fn tag_drop(&self) -> Option<&dyn TagDrop>; /// Notify the store that a new gc phase is about to start. diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 7ff196163e..1cabcdb008 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -18,7 +18,7 @@ use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::Collection, get::db::DownloadProgress as BytesDownloadProgress, - store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, + store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, }; use iroh_net::NodeAddr; @@ -42,7 +42,6 @@ use super::{flatten, Iroh}; mod batch; pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; -pub use crate::rpc_protocol::BlobStatus; pub use iroh_blobs::store::ImportMode; pub use iroh_blobs::TempTag; @@ -875,6 +874,26 @@ pub enum DownloadMode { Queued, } +/// Status information about a blob. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BlobStatus { + /// The blob is not stored on the node. + NotFound, + /// The blob is only stored partially. + Partial { + /// The size of the currently stored partial blob. + /// + /// This can be either a verified size if the last chunk was received, + /// or an unverified size if the last chunk was not yet received. + size: BaoBlobSize, + }, + /// The blob is stored completely. + Complete { + /// The size of the blob. For a complete blob the size is always known. + size: u64, + }, +} + #[cfg(test)] mod tests { use super::*; diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 5d34323c34..0785018d0b 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -7,7 +7,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::net::SocketAddr; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use anyhow::{anyhow, Result}; use futures_lite::StreamExt; @@ -66,7 +66,7 @@ struct NodeInner { rt: LocalPoolHandle, pub(crate) sync: DocsEngine, downloader: Downloader, - blob_batches: Mutex, + blob_batches: tokio::sync::Mutex, } /// Keeps track of all the currently active batch operations of the blobs api. diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 2d5e08d1c1..94df3831b1 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -6,6 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, ensure, Result}; use futures_buffered::BufferedStreamExt; use futures_lite::{Stream, StreamExt}; +use futures_util::FutureExt; use genawaiter::sync::{Co, Gen}; use iroh_base::rpc::RpcResult; use iroh_blobs::downloader::{DownloadRequest, Downloader}; @@ -35,7 +36,7 @@ use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; use crate::client::blobs::{ - BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption, + BlobInfo, BlobStatus, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption, }; use crate::client::tags::TagInfo; use crate::client::NodeStatus; @@ -46,7 +47,7 @@ use crate::rpc_protocol::{ BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest, - BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobStatus, + BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobStatusRequest, BlobStatusResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, CreateTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, ListTagsRequest, @@ -458,7 +459,7 @@ impl Handler { async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> { if let Some(batch) = msg.batch { if let Some(content) = msg.value.as_ref() { - self.inner.blob_batches.lock().unwrap().remove_one( + self.inner.blob_batches.lock().await.remove_one( batch, content, self.inner.db.tag_drop(), @@ -471,7 +472,7 @@ impl Handler { async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult { if let Some(batch) = msg.batch { - self.inner.blob_batches.lock().unwrap().remove_one( + self.inner.blob_batches.lock().await.remove_one( batch, &msg.value, self.inner.db.tag_drop(), @@ -867,7 +868,7 @@ impl Handler { .import_file(root, import_mode, format, import_progress) .await?; let hash = *tag.hash(); - self.inner.blob_batches.lock().unwrap().store(batch, tag); + self.inner.blob_batches.lock().await.store(batch, tag); progress.send(BatchAddPathProgress::Done { hash }).await?; Ok(()) @@ -942,39 +943,38 @@ impl Handler { _: BatchCreateRequest, mut updates: impl Stream + Send + Unpin + 'static, ) -> impl Stream { - let batch = self.inner.blob_batches.lock().unwrap().create(); - tokio::spawn(async move { - while let Some(item) = updates.next().await { - match item { - BatchUpdate::Drop(content) => { - // this can not fail, since we keep the batch alive. - // therefore it is safe to ignore the result. - let _ = self.inner.blob_batches.lock().unwrap().remove_one( - batch, - &content, - self.inner.db.tag_drop(), - ); + async move { + let batch = self.inner.blob_batches.lock().await.create(); + tokio::spawn(async move { + while let Some(item) = updates.next().await { + match item { + BatchUpdate::Drop(content) => { + // this can not fail, since we keep the batch alive. + // therefore it is safe to ignore the result. + let _ = self.inner.blob_batches.lock().await.remove_one( + batch, + &content, + self.inner.db.tag_drop(), + ); + } + BatchUpdate::Ping => {} } - BatchUpdate::Ping => {} } - } - self.inner - .blob_batches - .lock() - .unwrap() - .remove(batch, self.inner.db.tag_drop()); - }); - futures_lite::stream::once(BatchCreateResponse::Id(batch)) + self.inner + .blob_batches + .lock() + .await + .remove(batch, self.inner.db.tag_drop()); + }); + BatchCreateResponse::Id(batch) + } + .into_stream() } #[allow(clippy::unused_async)] async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { let tag = self.inner.db.temp_tag(msg.content); - self.inner - .blob_batches - .lock() - .unwrap() - .store(msg.batch, tag); + self.inner.blob_batches.lock().await.store(msg.batch, tag); Ok(()) } @@ -1043,7 +1043,7 @@ impl Handler { self.inner .blob_batches .lock() - .unwrap() + .await .store(msg.batch, temp_tag); progress .send(BatchAddStreamResponse::Result { hash }) diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index ec70024fd8..2777e99cd3 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -46,7 +46,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; use iroh_docs::engine::LiveEvent; use crate::client::{ - blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + blobs::{BlobInfo, BlobStatus, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, docs::{ImportProgress, ShareMode}, tags::TagInfo, NodeStatus, @@ -219,26 +219,6 @@ pub struct BlobStatusRequest { #[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)] pub struct BlobStatusResponse(pub BlobStatus); -/// Status information about a blob. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -pub enum BlobStatus { - /// The blob is not stored on the node. - NotFound, - /// The blob is only stored partially. - Partial { - /// The size of the currently stored partial blob. - /// - /// This can be either a verified size if the last chunk was received, - /// or an unverified size if the last chunk was not yet received. - size: BaoBlobSize, - }, - /// The blob is stored completely. - Complete { - /// The size of the blob. For a complete blob the size is always known. - size: u64, - }, -} - impl RpcMsg for BlobStatusRequest { type Response = RpcResult; }