Skip to content

Commit

Permalink
PR review:
Browse files Browse the repository at this point in the history
move around the BlobStatus, use async lock
  • Loading branch information
rklaehn committed Jun 6, 2024
1 parent a28cfd5 commit 9cefcad
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 57 deletions.
3 changes: 3 additions & 0 deletions iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ pub trait Store: ReadableStore + MapMut {
fn temp_tag(&self, value: HashAndFormat) -> TempTag;

/// Handle to use to drop tags
///
/// Return None for stores that don't keep track of tags, such as read-only
/// stores.
fn tag_drop(&self) -> Option<&dyn TagDrop>;

/// Notify the store that a new gc phase is about to start.
Expand Down
23 changes: 21 additions & 2 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
format::collection::Collection,
get::db::DownloadProgress as BytesDownloadProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
};
use iroh_net::NodeAddr;
Expand All @@ -42,7 +42,6 @@ use super::{flatten, Iroh};
mod batch;
pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch};

pub use crate::rpc_protocol::BlobStatus;
pub use iroh_blobs::store::ImportMode;
pub use iroh_blobs::TempTag;

Expand Down Expand Up @@ -875,6 +874,26 @@ pub enum DownloadMode {
Queued,
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored on the node.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
///
/// This can be either a verified size if the last chunk was received,
/// or an unverified size if the last chunk was not yet received.
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
/// The size of the blob. For a complete blob the size is always known.
size: u64,
},
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::net::SocketAddr;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::StreamExt;
Expand Down Expand Up @@ -66,7 +66,7 @@ struct NodeInner<D> {
rt: LocalPoolHandle,
pub(crate) sync: DocsEngine,
downloader: Downloader,
blob_batches: Mutex<BlobBatches>,
blob_batches: tokio::sync::Mutex<BlobBatches>,
}

/// Keeps track of all the currently active batch operations of the blobs api.
Expand Down
64 changes: 32 additions & 32 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use anyhow::{anyhow, ensure, Result};
use futures_buffered::BufferedStreamExt;
use futures_lite::{Stream, StreamExt};
use futures_util::FutureExt;
use genawaiter::sync::{Co, Gen};
use iroh_base::rpc::RpcResult;
use iroh_blobs::downloader::{DownloadRequest, Downloader};
Expand Down Expand Up @@ -35,7 +36,7 @@ use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info};

use crate::client::blobs::{
BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
BlobInfo, BlobStatus, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption,
};
use crate::client::tags::TagInfo;
use crate::client::NodeStatus;
Expand All @@ -46,7 +47,7 @@ use crate::rpc_protocol::{
BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest,
BlobDownloadRequest, BlobDownloadResponse, BlobExportRequest, BlobExportResponse,
BlobGetCollectionRequest, BlobGetCollectionResponse, BlobListCollectionsRequest,
BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobStatus,
BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse,
BlobStatusRequest, BlobStatusResponse, BlobValidateRequest, CreateCollectionRequest,
CreateCollectionResponse, CreateTagRequest, DocExportFileRequest, DocExportFileResponse,
DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, ListTagsRequest,
Expand Down Expand Up @@ -458,7 +459,7 @@ impl<D: BaoStore> Handler<D> {
async fn tags_set_tag(self, msg: SetTagRequest) -> RpcResult<()> {
if let Some(batch) = msg.batch {
if let Some(content) = msg.value.as_ref() {
self.inner.blob_batches.lock().unwrap().remove_one(
self.inner.blob_batches.lock().await.remove_one(
batch,
content,
self.inner.db.tag_drop(),
Expand All @@ -471,7 +472,7 @@ impl<D: BaoStore> Handler<D> {

async fn tags_create_tag(self, msg: CreateTagRequest) -> RpcResult<Tag> {
if let Some(batch) = msg.batch {
self.inner.blob_batches.lock().unwrap().remove_one(
self.inner.blob_batches.lock().await.remove_one(
batch,
&msg.value,
self.inner.db.tag_drop(),
Expand Down Expand Up @@ -867,7 +868,7 @@ impl<D: BaoStore> Handler<D> {
.import_file(root, import_mode, format, import_progress)
.await?;
let hash = *tag.hash();
self.inner.blob_batches.lock().unwrap().store(batch, tag);
self.inner.blob_batches.lock().await.store(batch, tag);

progress.send(BatchAddPathProgress::Done { hash }).await?;
Ok(())
Expand Down Expand Up @@ -942,39 +943,38 @@ impl<D: BaoStore> Handler<D> {
_: BatchCreateRequest,
mut updates: impl Stream<Item = BatchUpdate> + Send + Unpin + 'static,
) -> impl Stream<Item = BatchCreateResponse> {
let batch = self.inner.blob_batches.lock().unwrap().create();
tokio::spawn(async move {
while let Some(item) = updates.next().await {
match item {
BatchUpdate::Drop(content) => {
// this can not fail, since we keep the batch alive.
// therefore it is safe to ignore the result.
let _ = self.inner.blob_batches.lock().unwrap().remove_one(
batch,
&content,
self.inner.db.tag_drop(),
);
async move {
let batch = self.inner.blob_batches.lock().await.create();
tokio::spawn(async move {
while let Some(item) = updates.next().await {
match item {
BatchUpdate::Drop(content) => {
// this can not fail, since we keep the batch alive.
// therefore it is safe to ignore the result.
let _ = self.inner.blob_batches.lock().await.remove_one(
batch,
&content,
self.inner.db.tag_drop(),
);
}
BatchUpdate::Ping => {}
}
BatchUpdate::Ping => {}
}
}
self.inner
.blob_batches
.lock()
.unwrap()
.remove(batch, self.inner.db.tag_drop());
});
futures_lite::stream::once(BatchCreateResponse::Id(batch))
self.inner
.blob_batches
.lock()
.await
.remove(batch, self.inner.db.tag_drop());
});
BatchCreateResponse::Id(batch)
}
.into_stream()
}

#[allow(clippy::unused_async)]
async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> {
let tag = self.inner.db.temp_tag(msg.content);
self.inner
.blob_batches
.lock()
.unwrap()
.store(msg.batch, tag);
self.inner.blob_batches.lock().await.store(msg.batch, tag);
Ok(())
}

Expand Down Expand Up @@ -1043,7 +1043,7 @@ impl<D: BaoStore> Handler<D> {
self.inner
.blob_batches
.lock()
.unwrap()
.await
.store(msg.batch, temp_tag);
progress
.send(BatchAddStreamResponse::Result { hash })
Expand Down
22 changes: 1 addition & 21 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress};
use iroh_docs::engine::LiveEvent;

use crate::client::{
blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
blobs::{BlobInfo, BlobStatus, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
docs::{ImportProgress, ShareMode},
tags::TagInfo,
NodeStatus,
Expand Down Expand Up @@ -219,26 +219,6 @@ pub struct BlobStatusRequest {
#[derive(Debug, Serialize, Deserialize, derive_more::From, derive_more::Into)]
pub struct BlobStatusResponse(pub BlobStatus);

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobStatus {
/// The blob is not stored on the node.
NotFound,
/// The blob is only stored partially.
Partial {
/// The size of the currently stored partial blob.
///
/// This can be either a verified size if the last chunk was received,
/// or an unverified size if the last chunk was not yet received.
size: BaoBlobSize,
},
/// The blob is stored completely.
Complete {
/// The size of the blob. For a complete blob the size is always known.
size: u64,
},
}

impl RpcMsg<RpcService> for BlobStatusRequest {
type Response = RpcResult<BlobStatusResponse>;
}
Expand Down

0 comments on commit 9cefcad

Please sign in to comment.