diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index e9af9f6336..73732e4a0e 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -88,9 +88,9 @@ where /// unless a permanent tag is created for it. pub async fn batch(&self) -> Result> { let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?; - let BatchCreateResponse::Id(scope) = stream.next().await.context("expected scope id")??; + let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??; let rpc = self.rpc.clone(); - Ok(Batch::new(scope, rpc, updates)) + Ok(Batch::new(batch, rpc, updates)) } /// Stream the contents of a a single blob. diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 08a51d7bb9..470ec75b03 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -22,7 +22,7 @@ use crate::{ client::RpcService, rpc_protocol::{ BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate, - BatchCreateTempTagRequest, BatchUpdate, + BatchCreateTempTagRequest, BatchId, BatchUpdate, }, }; @@ -32,7 +32,7 @@ use super::WrapOption; #[derive(derive_more::Debug)] struct BatchInner> { /// The id of the scope. - batch: u64, + batch: BatchId, /// The rpc client. rpc: RpcClient, /// The stream to send drop @@ -106,7 +106,7 @@ impl Default for AddReaderOpts { impl> Batch { pub(super) fn new( - batch: u64, + batch: BatchId, rpc: RpcClient, updates: UpdateSink, ) -> Self { diff --git a/iroh/src/node.rs b/iroh/src/node.rs index aa19a6e460..7e5bd12406 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -27,6 +27,7 @@ use tokio_util::task::LocalPoolHandle; use tracing::debug; use crate::client::RpcService; +use crate::rpc_protocol::BatchId; mod builder; mod rpc; @@ -65,16 +66,14 @@ struct NodeInner { rt: LocalPoolHandle, pub(crate) sync: DocsEngine, downloader: Downloader, - blob_scopes: Mutex, + blob_batches: Mutex, } /// Keeps track of all the currently active batch operations of the blobs api. -/// -/// #[derive(Debug, Default)] struct BlobBatches { /// Currently active batches - batches: BTreeMap, + batches: BTreeMap, /// Used to generate new batch ids. max: u64, } @@ -89,14 +88,14 @@ struct BlobBatch { impl BlobBatches { /// Create a new unique batch id. - fn create(&mut self) -> u64 { + fn create(&mut self) -> BatchId { let id = self.max; self.max += 1; - id + BatchId(id) } /// Store a temp tag in a batch identified by a batch id. - fn store(&mut self, batch: u64, tt: TempTag) { + fn store(&mut self, batch: BatchId, tt: TempTag) { let entry = self.batches.entry(batch).or_default(); let count = entry.tags.entry(tt.hash_and_format()).or_default(); tt.leak(); @@ -104,7 +103,7 @@ impl BlobBatches { } /// Remove a tag from a batch. - fn remove_one(&mut self, batch: u64, content: &HashAndFormat, u: Option<&dyn TagDrop>) { + fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat, u: Option<&dyn TagDrop>) { if let Some(scope) = self.batches.get_mut(&batch) { if let Some(counter) = scope.tags.get_mut(content) { *counter -= 1; @@ -119,7 +118,7 @@ impl BlobBatches { } /// Remove an entire batch. - fn remove(&mut self, batch: u64, u: Option<&dyn TagDrop>) { + fn remove(&mut self, batch: BatchId, u: Option<&dyn TagDrop>) { if let Some(scope) = self.batches.remove(&batch) { for (content, count) in scope.tags { if let Some(u) = u { diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index fce7496b6d..ad929cc934 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -490,7 +490,7 @@ where rt: lp.clone(), sync, downloader, - blob_scopes: Default::default(), + blob_batches: Default::default(), }); let task = { let gossip = gossip.clone(); diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index f9a29b347d..d4042dd86e 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -852,7 +852,7 @@ impl Handler { .import_file(root, import_mode, format, import_progress) .await?; let hash = *tag.hash(); - self.inner.blob_scopes.lock().unwrap().store(batch, tag); + self.inner.blob_batches.lock().unwrap().store(batch, tag); progress.send(BatchAddPathProgress::Done { hash }).await?; Ok(()) @@ -927,13 +927,13 @@ impl Handler { _: BatchCreateRequest, mut updates: impl Stream + Send + Unpin + 'static, ) -> impl Stream { - let scope_id = self.inner.blob_scopes.lock().unwrap().create(); + let batch = self.inner.blob_batches.lock().unwrap().create(); tokio::spawn(async move { while let Some(item) = updates.next().await { match item { BatchUpdate::Drop(content) => { - self.inner.blob_scopes.lock().unwrap().remove_one( - scope_id, + self.inner.blob_batches.lock().unwrap().remove_one( + batch, &content, self.inner.db.tag_drop(), ); @@ -941,18 +941,22 @@ impl Handler { } } self.inner - .blob_scopes + .blob_batches .lock() .unwrap() - .remove(scope_id, self.inner.db.tag_drop()); + .remove(batch, self.inner.db.tag_drop()); }); - futures_lite::stream::once(BatchCreateResponse::Id(scope_id)) + futures_lite::stream::once(BatchCreateResponse::Id(batch)) } #[allow(clippy::unused_async)] async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> { let tag = self.inner.db.temp_tag(msg.content); - self.inner.blob_scopes.lock().unwrap().store(msg.batch, tag); + self.inner + .blob_batches + .lock() + .unwrap() + .store(msg.batch, tag); Ok(()) } @@ -1019,7 +1023,7 @@ impl Handler { .await?; let hash = temp_tag.inner().hash; self.inner - .blob_scopes + .blob_batches .lock() .unwrap() .store(msg.batch, temp_tag); diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index cd87e09b6b..3894bb2d2c 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -68,7 +68,7 @@ pub enum BatchUpdate { #[derive(Debug, Serialize, Deserialize)] pub enum BatchCreateResponse { /// We got the id of the scope - Id(u64), + Id(BatchId), } impl Msg for BatchCreateRequest { @@ -1090,13 +1090,16 @@ impl BidiStreamingMsg for BlobAddStreamRequest { #[derive(Debug, Serialize, Deserialize, derive_more::Into)] pub struct BlobAddStreamResponse(pub AddProgress); +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(pub(crate) u64); + /// Create a temp tag with a given hash and format #[derive(Debug, Serialize, Deserialize)] pub struct BatchCreateTempTagRequest { /// Content to protect pub content: HashAndFormat, /// Batch to create the temp tag in - pub batch: u64, + pub batch: BatchId, } impl RpcMsg for BatchCreateTempTagRequest { @@ -1109,7 +1112,7 @@ pub struct BatchAddStreamRequest { /// What format to use for the blob pub format: BlobFormat, /// Batch to create the temp tag in - pub batch: u64, + pub batch: BatchId, } /// Write a blob from a byte stream @@ -1148,7 +1151,7 @@ pub struct BatchAddPathRequest { /// What format to use for the blob pub format: BlobFormat, /// Batch to create the temp tag in - pub batch: u64, + pub batch: BatchId, } /// Response to a batch add path request