Skip to content

Commit

Permalink
rename scope to batch everywhere and add a newtype
Browse files Browse the repository at this point in the history
  • Loading branch information
rklaehn committed Jun 4, 2024
1 parent 28439d5 commit be8209c
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 28 deletions.
4 changes: 2 additions & 2 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ where
/// unless a permanent tag is created for it.
pub async fn batch(&self) -> Result<Batch<C>> {
let (updates, mut stream) = self.rpc.bidi(BatchCreateRequest).await?;
let BatchCreateResponse::Id(scope) = stream.next().await.context("expected scope id")??;
let BatchCreateResponse::Id(batch) = stream.next().await.context("expected scope id")??;
let rpc = self.rpc.clone();
Ok(Batch::new(scope, rpc, updates))
Ok(Batch::new(batch, rpc, updates))
}

/// Stream the contents of a a single blob.
Expand Down
6 changes: 3 additions & 3 deletions iroh/src/client/blobs/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
client::RpcService,
rpc_protocol::{
BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, BatchAddStreamUpdate,
BatchCreateTempTagRequest, BatchUpdate,
BatchCreateTempTagRequest, BatchId, BatchUpdate,
},
};

Expand All @@ -32,7 +32,7 @@ use super::WrapOption;
#[derive(derive_more::Debug)]
struct BatchInner<C: ServiceConnection<RpcService>> {
/// The id of the scope.
batch: u64,
batch: BatchId,
/// The rpc client.
rpc: RpcClient<RpcService, C>,
/// The stream to send drop
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Default for AddReaderOpts {

impl<C: ServiceConnection<RpcService>> Batch<C> {
pub(super) fn new(
batch: u64,
batch: BatchId,
rpc: RpcClient<RpcService, C>,
updates: UpdateSink<RpcService, C, BatchUpdate>,
) -> Self {
Expand Down
17 changes: 8 additions & 9 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio_util::task::LocalPoolHandle;
use tracing::debug;

use crate::client::RpcService;
use crate::rpc_protocol::BatchId;

mod builder;
mod rpc;
Expand Down Expand Up @@ -65,16 +66,14 @@ struct NodeInner<D> {
rt: LocalPoolHandle,
pub(crate) sync: DocsEngine,
downloader: Downloader,
blob_scopes: Mutex<BlobBatches>,
blob_batches: Mutex<BlobBatches>,
}

/// Keeps track of all the currently active batch operations of the blobs api.
///
///
#[derive(Debug, Default)]
struct BlobBatches {
/// Currently active batches
batches: BTreeMap<u64, BlobBatch>,
batches: BTreeMap<BatchId, BlobBatch>,
/// Used to generate new batch ids.
max: u64,
}
Expand All @@ -89,22 +88,22 @@ struct BlobBatch {

impl BlobBatches {
/// Create a new unique batch id.
fn create(&mut self) -> u64 {
fn create(&mut self) -> BatchId {
let id = self.max;
self.max += 1;
id
BatchId(id)
}

/// Store a temp tag in a batch identified by a batch id.
fn store(&mut self, batch: u64, tt: TempTag) {
fn store(&mut self, batch: BatchId, tt: TempTag) {
let entry = self.batches.entry(batch).or_default();
let count = entry.tags.entry(tt.hash_and_format()).or_default();
tt.leak();
*count += 1;
}

/// Remove a tag from a batch.
fn remove_one(&mut self, batch: u64, content: &HashAndFormat, u: Option<&dyn TagDrop>) {
fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat, u: Option<&dyn TagDrop>) {
if let Some(scope) = self.batches.get_mut(&batch) {
if let Some(counter) = scope.tags.get_mut(content) {
*counter -= 1;
Expand All @@ -119,7 +118,7 @@ impl BlobBatches {
}

/// Remove an entire batch.
fn remove(&mut self, batch: u64, u: Option<&dyn TagDrop>) {
fn remove(&mut self, batch: BatchId, u: Option<&dyn TagDrop>) {
if let Some(scope) = self.batches.remove(&batch) {
for (content, count) in scope.tags {
if let Some(u) = u {
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ where
rt: lp.clone(),
sync,
downloader,
blob_scopes: Default::default(),
blob_batches: Default::default(),
});
let task = {
let gossip = gossip.clone();
Expand Down
22 changes: 13 additions & 9 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ impl<D: BaoStore> Handler<D> {
.import_file(root, import_mode, format, import_progress)
.await?;
let hash = *tag.hash();
self.inner.blob_scopes.lock().unwrap().store(batch, tag);
self.inner.blob_batches.lock().unwrap().store(batch, tag);

progress.send(BatchAddPathProgress::Done { hash }).await?;
Ok(())
Expand Down Expand Up @@ -927,32 +927,36 @@ impl<D: BaoStore> Handler<D> {
_: BatchCreateRequest,
mut updates: impl Stream<Item = BatchUpdate> + Send + Unpin + 'static,
) -> impl Stream<Item = BatchCreateResponse> {
let scope_id = self.inner.blob_scopes.lock().unwrap().create();
let batch = self.inner.blob_batches.lock().unwrap().create();
tokio::spawn(async move {
while let Some(item) = updates.next().await {
match item {
BatchUpdate::Drop(content) => {
self.inner.blob_scopes.lock().unwrap().remove_one(
scope_id,
self.inner.blob_batches.lock().unwrap().remove_one(
batch,
&content,
self.inner.db.tag_drop(),
);
}
}
}
self.inner
.blob_scopes
.blob_batches
.lock()
.unwrap()
.remove(scope_id, self.inner.db.tag_drop());
.remove(batch, self.inner.db.tag_drop());
});
futures_lite::stream::once(BatchCreateResponse::Id(scope_id))
futures_lite::stream::once(BatchCreateResponse::Id(batch))
}

#[allow(clippy::unused_async)]
async fn batch_create_temp_tag(self, msg: BatchCreateTempTagRequest) -> RpcResult<()> {
let tag = self.inner.db.temp_tag(msg.content);
self.inner.blob_scopes.lock().unwrap().store(msg.batch, tag);
self.inner
.blob_batches
.lock()
.unwrap()
.store(msg.batch, tag);
Ok(())
}

Expand Down Expand Up @@ -1019,7 +1023,7 @@ impl<D: BaoStore> Handler<D> {
.await?;
let hash = temp_tag.inner().hash;
self.inner
.blob_scopes
.blob_batches
.lock()
.unwrap()
.store(msg.batch, temp_tag);
Expand Down
11 changes: 7 additions & 4 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub enum BatchUpdate {
#[derive(Debug, Serialize, Deserialize)]
pub enum BatchCreateResponse {
/// We got the id of the scope
Id(u64),
Id(BatchId),
}

impl Msg<RpcService> for BatchCreateRequest {
Expand Down Expand Up @@ -1090,13 +1090,16 @@ impl BidiStreamingMsg<RpcService> for BlobAddStreamRequest {
#[derive(Debug, Serialize, Deserialize, derive_more::Into)]
pub struct BlobAddStreamResponse(pub AddProgress);

#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)]
pub struct BatchId(pub(crate) u64);

/// Create a temp tag with a given hash and format
#[derive(Debug, Serialize, Deserialize)]
pub struct BatchCreateTempTagRequest {
/// Content to protect
pub content: HashAndFormat,
/// Batch to create the temp tag in
pub batch: u64,
pub batch: BatchId,
}

impl RpcMsg<RpcService> for BatchCreateTempTagRequest {
Expand All @@ -1109,7 +1112,7 @@ pub struct BatchAddStreamRequest {
/// What format to use for the blob
pub format: BlobFormat,
/// Batch to create the temp tag in
pub batch: u64,
pub batch: BatchId,
}

/// Write a blob from a byte stream
Expand Down Expand Up @@ -1148,7 +1151,7 @@ pub struct BatchAddPathRequest {
/// What format to use for the blob
pub format: BlobFormat,
/// Batch to create the temp tag in
pub batch: u64,
pub batch: BatchId,
}

/// Response to a batch add path request
Expand Down

0 comments on commit be8209c

Please sign in to comment.