Skip to content

Commit

Permalink
fix(iroh): make client::docs::ImportProgress public
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 13, 2024
1 parent 531829d commit 87c75f7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 62 deletions.
66 changes: 53 additions & 13 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use derive_more::{Display, FromStr};
use futures_lite::{Stream, StreamExt};
use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions};
use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions, rpc::RpcError};
use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash};
use iroh_docs::{
actor::OpenState,
Expand All @@ -26,10 +26,9 @@ use serde::{Deserialize, Serialize};
use crate::rpc_protocol::{
DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest,
DocExportFileRequest, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest,
DocGetSyncPeersRequest, DocImportFileRequest, DocImportProgress, DocImportRequest,
DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest,
DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest,
DocSubscribeRequest, RpcService,
DocGetSyncPeersRequest, DocImportFileRequest, DocImportRequest, DocLeaveRequest,
DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest,
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, RpcService,
};

#[doc(inline)]
Expand Down Expand Up @@ -470,6 +469,47 @@ impl Entry {
}
}

/// Progress messages for an doc import operation
///
/// An import operation involves computing the outboard of a file, and then
/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that
/// file as an entry in the doc.
#[derive(Debug, Serialize, Deserialize)]
pub enum ImportProgress {
/// An item was found with name `name`, from now on referred to via `id`
Found {
/// A new unique id for this entry.
id: u64,
/// The name of the entry.
name: String,
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting item `id`.
Progress {
/// The unique id of the entry.
id: u64,
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done adding `id` to the data store and the hash is `hash`.
IngestDone {
/// The unique id of the entry.
id: u64,
/// The hash of the entry.
hash: Hash,
},
/// We are done setting the entry to the doc
AllDone {
/// The key of the entry
key: Bytes,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}

/// Intended capability for document share tickets
#[derive(Serialize, Deserialize, Debug, Clone, Display, FromStr)]
pub enum ShareMode {
Expand Down Expand Up @@ -537,12 +577,12 @@ impl From<crate::docs_engine::LiveEvent> for LiveEvent {
#[must_use = "streams do nothing unless polled"]
pub struct ImportFileProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<DocImportProgress>> + Send + Unpin + 'static>>,
stream: Pin<Box<dyn Stream<Item = Result<ImportProgress>> + Send + Unpin + 'static>>,
}

impl ImportFileProgress {
fn new(
stream: (impl Stream<Item = Result<impl Into<DocImportProgress>, impl Into<anyhow::Error>>>
stream: (impl Stream<Item = Result<impl Into<ImportProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
Expand All @@ -565,10 +605,10 @@ impl ImportFileProgress {
let mut entry_hash = None;
while let Some(msg) = self.next().await {
match msg? {
DocImportProgress::Found { size, .. } => {
ImportProgress::Found { size, .. } => {
entry_size = size;
}
DocImportProgress::AllDone { key } => {
ImportProgress::AllDone { key } => {
let hash = entry_hash
.context("expected DocImportProgress::IngestDone event to occur")?;
let outcome = ImportFileOutcome {
Expand All @@ -578,9 +618,9 @@ impl ImportFileProgress {
};
return Ok(outcome);
}
DocImportProgress::Abort(err) => return Err(err.into()),
DocImportProgress::Progress { .. } => {}
DocImportProgress::IngestDone { hash, .. } => {
ImportProgress::Abort(err) => return Err(err.into()),
ImportProgress::Progress { .. } => {}
ImportProgress::IngestDone { hash, .. } => {
entry_hash = Some(hash);
}
}
Expand All @@ -601,7 +641,7 @@ pub struct ImportFileOutcome {
}

impl Stream for ImportFileProgress {
type Item = Result<DocImportProgress>;
type Item = Result<ImportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
Expand Down
15 changes: 8 additions & 7 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ use crate::rpc_protocol::{
BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest,
BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest,
CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest,
DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocImportProgress,
DocSetHashRequest, ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest,
NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest,
NodeWatchRequest, NodeWatchResponse, Request, RpcService, SetTagOption,
DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest,
ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse,
NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest,
NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest,
NodeWatchResponse, Request, RpcService, SetTagOption,
};

use super::{Event, NodeInner};
Expand Down Expand Up @@ -488,7 +488,7 @@ impl<D: BaoStore> Handler<D> {
let tx2 = tx.clone();
self.rt().spawn_pinned(|| async move {
if let Err(e) = self.doc_import_file0(msg, tx).await {
tx2.send_async(DocImportProgress::Abort(e.into()))
tx2.send_async(crate::client::docs::ImportProgress::Abort(e.into()))
.await
.ok();
}
Expand All @@ -499,8 +499,9 @@ impl<D: BaoStore> Handler<D> {
async fn doc_import_file0(
self,
msg: DocImportFileRequest,
progress: flume::Sender<DocImportProgress>,
progress: flume::Sender<crate::client::docs::ImportProgress>,
) -> anyhow::Result<()> {
use crate::client::docs::ImportProgress as DocImportProgress;
use iroh_blobs::store::ImportMode;
use std::collections::BTreeMap;

Expand Down
44 changes: 2 additions & 42 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress};
use crate::{
client::{
blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
docs::ShareMode,
docs::{ImportProgress, ShareMode},
tags::TagInfo,
NodeStatus,
},
Expand Down Expand Up @@ -736,47 +736,7 @@ impl ServerStreamingMsg<RpcService> for DocImportFileRequest {

/// Wrapper around [`DocImportProgress`].
#[derive(Debug, Serialize, Deserialize, derive_more::Into)]
pub struct DocImportFileResponse(pub DocImportProgress);

/// Progress messages for an doc import operation
///
/// An import operation involves computing the outboard of a file, and then
/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that file as an entry in the doc
#[derive(Debug, Serialize, Deserialize)]
pub enum DocImportProgress {
/// An item was found with name `name`, from now on referred to via `id`
Found {
/// A new unique id for this entry.
id: u64,
/// The name of the entry.
name: String,
/// The size of the entry in bytes.
size: u64,
},
/// We got progress ingesting item `id`.
Progress {
/// The unique id of the entry.
id: u64,
/// The offset of the progress, in bytes.
offset: u64,
},
/// We are done adding `id` to the data store and the hash is `hash`.
IngestDone {
/// The unique id of the entry.
id: u64,
/// The hash of the entry.
hash: Hash,
},
/// We are done setting the entry to the doc
AllDone {
/// The key of the entry
key: Bytes,
},
/// We got an error and need to abort.
///
/// This will be the last message in the stream.
Abort(RpcError),
}
pub struct DocImportFileResponse(pub ImportProgress);

/// A request to the node to save the data of the entry to the given filepath
///
Expand Down

0 comments on commit 87c75f7

Please sign in to comment.