diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index 15ccfaa2faf..f1da48dfcfd 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -11,7 +11,7 @@ use anyhow::{anyhow, Context as _, Result}; use bytes::Bytes; use derive_more::{Display, FromStr}; use futures_lite::{Stream, StreamExt}; -use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions}; +use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions, rpc::RpcError}; use iroh_blobs::{export::ExportProgress, store::ExportMode, Hash}; use iroh_docs::{ actor::OpenState, @@ -26,10 +26,9 @@ use serde::{Deserialize, Serialize}; use crate::rpc_protocol::{ DocCloseRequest, DocCreateRequest, DocDelRequest, DocDelResponse, DocDropRequest, DocExportFileRequest, DocGetDownloadPolicyRequest, DocGetExactRequest, DocGetManyRequest, - DocGetSyncPeersRequest, DocImportFileRequest, DocImportProgress, DocImportRequest, - DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, - DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest, - DocSubscribeRequest, RpcService, + DocGetSyncPeersRequest, DocImportFileRequest, DocImportRequest, DocLeaveRequest, + DocListRequest, DocOpenRequest, DocSetDownloadPolicyRequest, DocSetHashRequest, DocSetRequest, + DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, RpcService, }; #[doc(inline)] @@ -470,6 +469,47 @@ impl Entry { } } +/// Progress messages for an doc import operation +/// +/// An import operation involves computing the outboard of a file, and then +/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that +/// file as an entry in the doc. +#[derive(Debug, Serialize, Deserialize)] +pub enum ImportProgress { + /// An item was found with name `name`, from now on referred to via `id` + Found { + /// A new unique id for this entry. + id: u64, + /// The name of the entry. + name: String, + /// The size of the entry in bytes. + size: u64, + }, + /// We got progress ingesting item `id`. + Progress { + /// The unique id of the entry. + id: u64, + /// The offset of the progress, in bytes. + offset: u64, + }, + /// We are done adding `id` to the data store and the hash is `hash`. + IngestDone { + /// The unique id of the entry. + id: u64, + /// The hash of the entry. + hash: Hash, + }, + /// We are done setting the entry to the doc + AllDone { + /// The key of the entry + key: Bytes, + }, + /// We got an error and need to abort. + /// + /// This will be the last message in the stream. + Abort(RpcError), +} + /// Intended capability for document share tickets #[derive(Serialize, Deserialize, Debug, Clone, Display, FromStr)] pub enum ShareMode { @@ -537,12 +577,12 @@ impl From for LiveEvent { #[must_use = "streams do nothing unless polled"] pub struct ImportFileProgress { #[debug(skip)] - stream: Pin> + Send + Unpin + 'static>>, + stream: Pin> + Send + Unpin + 'static>>, } impl ImportFileProgress { fn new( - stream: (impl Stream, impl Into>> + stream: (impl Stream, impl Into>> + Send + Unpin + 'static), @@ -565,10 +605,10 @@ impl ImportFileProgress { let mut entry_hash = None; while let Some(msg) = self.next().await { match msg? { - DocImportProgress::Found { size, .. } => { + ImportProgress::Found { size, .. } => { entry_size = size; } - DocImportProgress::AllDone { key } => { + ImportProgress::AllDone { key } => { let hash = entry_hash .context("expected DocImportProgress::IngestDone event to occur")?; let outcome = ImportFileOutcome { @@ -578,9 +618,9 @@ impl ImportFileProgress { }; return Ok(outcome); } - DocImportProgress::Abort(err) => return Err(err.into()), - DocImportProgress::Progress { .. } => {} - DocImportProgress::IngestDone { hash, .. } => { + ImportProgress::Abort(err) => return Err(err.into()), + ImportProgress::Progress { .. } => {} + ImportProgress::IngestDone { hash, .. } => { entry_hash = Some(hash); } } @@ -601,7 +641,7 @@ pub struct ImportFileOutcome { } impl Stream for ImportFileProgress { - type Item = Result; + type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(&mut self.stream).poll_next(cx) } diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index 2ae74099882..bb68f3ec34a 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -45,11 +45,11 @@ use crate::rpc_protocol::{ BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, - DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocImportProgress, - DocSetHashRequest, ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, - NodeConnectionInfoResponse, NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, - NodeRelayRequest, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, - NodeWatchRequest, NodeWatchResponse, Request, RpcService, SetTagOption, + DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, + ListTagsRequest, NodeAddrRequest, NodeConnectionInfoRequest, NodeConnectionInfoResponse, + NodeConnectionsRequest, NodeConnectionsResponse, NodeIdRequest, NodeRelayRequest, + NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest, + NodeWatchResponse, Request, RpcService, SetTagOption, }; use super::{Event, NodeInner}; @@ -488,7 +488,7 @@ impl Handler { let tx2 = tx.clone(); self.rt().spawn_pinned(|| async move { if let Err(e) = self.doc_import_file0(msg, tx).await { - tx2.send_async(DocImportProgress::Abort(e.into())) + tx2.send_async(crate::client::docs::ImportProgress::Abort(e.into())) .await .ok(); } @@ -499,8 +499,9 @@ impl Handler { async fn doc_import_file0( self, msg: DocImportFileRequest, - progress: flume::Sender, + progress: flume::Sender, ) -> anyhow::Result<()> { + use crate::client::docs::ImportProgress as DocImportProgress; use iroh_blobs::store::ImportMode; use std::collections::BTreeMap; diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index c0cd6c9b261..fa51f131983 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -44,7 +44,7 @@ pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; use crate::{ client::{ blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, - docs::ShareMode, + docs::{ImportProgress, ShareMode}, tags::TagInfo, NodeStatus, }, @@ -736,47 +736,7 @@ impl ServerStreamingMsg for DocImportFileRequest { /// Wrapper around [`DocImportProgress`]. #[derive(Debug, Serialize, Deserialize, derive_more::Into)] -pub struct DocImportFileResponse(pub DocImportProgress); - -/// Progress messages for an doc import operation -/// -/// An import operation involves computing the outboard of a file, and then -/// either copying or moving the file into the database, then setting the author, hash, size, and tag of that file as an entry in the doc -#[derive(Debug, Serialize, Deserialize)] -pub enum DocImportProgress { - /// An item was found with name `name`, from now on referred to via `id` - Found { - /// A new unique id for this entry. - id: u64, - /// The name of the entry. - name: String, - /// The size of the entry in bytes. - size: u64, - }, - /// We got progress ingesting item `id`. - Progress { - /// The unique id of the entry. - id: u64, - /// The offset of the progress, in bytes. - offset: u64, - }, - /// We are done adding `id` to the data store and the hash is `hash`. - IngestDone { - /// The unique id of the entry. - id: u64, - /// The hash of the entry. - hash: Hash, - }, - /// We are done setting the entry to the doc - AllDone { - /// The key of the entry - key: Bytes, - }, - /// We got an error and need to abort. - /// - /// This will be the last message in the stream. - Abort(RpcError), -} +pub struct DocImportFileResponse(pub ImportProgress); /// A request to the node to save the data of the entry to the given filepath ///