Skip to content

Commit

Permalink
add doc.import_file and doc.export_file
Browse files Browse the repository at this point in the history
  • Loading branch information
ramfox committed Nov 10, 2023
1 parent 53f1b61 commit 49d7f56
Show file tree
Hide file tree
Showing 4 changed files with 454 additions and 9 deletions.
195 changes: 187 additions & 8 deletions iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@
use std::collections::HashMap;
use std::io::{self, Cursor};
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, Context as AnyhowContext, Result};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_bytes::provider::AddProgress;
use iroh_bytes::store::ValidateProgress;
// use iroh_bytes::util::progress::FlumeProgressSender;
use iroh_bytes::util::runtime;
use iroh_bytes::Hash;
use iroh_bytes::{BlobFormat, Tag};
Expand All @@ -35,12 +36,13 @@ use crate::rpc_protocol::{
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
CounterStats, DeleteTagRequest, DocCloseRequest, DocCreateRequest, DocDelRequest,
DocDelResponse, DocDropRequest, DocGetExactRequest, DocGetManyRequest, DocImportRequest,
DocLeaveRequest, DocListRequest, DocOpenRequest, DocSetHashRequest, DocSetRequest,
DocShareRequest, DocStartSyncRequest, DocStatusRequest, DocSubscribeRequest, DocTicket,
DownloadProgress, ListTagsRequest, ListTagsResponse, NodeConnectionInfoRequest,
NodeConnectionInfoResponse, NodeConnectionsRequest, NodeShutdownRequest, NodeStatsRequest,
NodeStatusRequest, NodeStatusResponse, ProviderService, SetTagOption, ShareMode, WrapOption,
DocDelResponse, DocDropRequest, DocGetExactRequest, DocGetManyRequest, DocImportFileRequest,
DocImportProgress, DocImportRequest, DocLeaveRequest, DocListRequest, DocOpenRequest,
DocSetHashRequest, DocSetRequest, DocShareRequest, DocStartSyncRequest, DocStatusRequest,
DocSubscribeRequest, DocTicket, DownloadProgress, ListTagsRequest, ListTagsResponse,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeShutdownRequest, NodeStatsRequest, NodeStatusRequest, NodeStatusResponse, ProviderService,
SetTagOption, ShareMode, WrapOption,
};
use crate::sync_engine::LiveEvent;

Expand Down Expand Up @@ -624,6 +626,47 @@ where
Ok(())
}

/// Add an entry from a file path
pub async fn import_file(
&self,
author: AuthorId,
path: impl AsRef<Path>,
path_to_key: impl Fn(PathBuf) -> Bytes + Send + Sync + 'static,
) -> Result<DocImportFileProgress> {
self.ensure_open()?;
let path = path.as_ref().to_path_buf();
let key = path_to_key(path.clone());
let stream = self
.0
.rpc
.server_streaming(DocImportFileRequest {
doc_id: self.id(),
author_id: author,
path,
key,
})
.await?;
Ok(DocImportFileProgress::new(stream))
}

/// Export an entry as a file to a given path.
pub async fn export_file(
&self,
query: impl Into<Query>,
key_to_path: impl Fn(&[u8]) -> PathBuf + Send + Sync + 'static,
) -> Result<()> {
self.ensure_open()?;
let entry = self.get_one(query).await?.context("Entry not found")?;
let path = key_to_path(entry.key());
if let Some(dir) = path.parent() {
tokio::fs::create_dir_all(dir).await?;
}
let mut file = tokio::fs::File::create(path).await?;
let mut reader = self.read(&entry).await?;
tokio::io::copy(&mut reader, &mut file).await?;
Ok(())
}

/// Read the content of an [`Entry`] as a streaming [`BlobReader`].
pub async fn read(&self, entry: &Entry) -> Result<BlobReader> {
self.ensure_open()?;
Expand Down Expand Up @@ -751,6 +794,142 @@ where
}
}

/// Progress stream for doc import operations.
#[derive(derive_more::Debug)]
pub struct DocImportFileProgress {
#[debug(skip)]
stream: Pin<Box<dyn Stream<Item = Result<DocImportProgress>> + Send + Unpin + 'static>>,
}

impl DocImportFileProgress {
fn new(
stream: (impl Stream<Item = Result<impl Into<DocImportProgress>, impl Into<anyhow::Error>>>
+ Send
+ Unpin
+ 'static),
) -> Self {
let stream = stream.map(|item| match item {
Ok(item) => Ok(item.into()),
Err(err) => Err(err.into()),
});
Self {
stream: Box::pin(stream),
}
}

/// Finish writing the stream, ignoring all intermediate progress events.
///
/// Returns a [`DocImportFileOutcome`] which contains a tag, key, and hash and the size of the
/// content.
pub async fn finish(mut self) -> Result<DocImportFileOutcome> {
let mut entry_size = 0;
let mut entry_hash = None;
while let Some(msg) = self.next().await {
match msg? {
DocImportProgress::Found { size, .. } => {
entry_size = size;
}
DocImportProgress::AllDone { key, tag } => {
let hash = entry_hash
.context("expected DocImportProgress::IngestDone event to occur")?;
let outcome = DocImportFileOutcome {
hash,
tag,
key,
size: entry_size,
};
return Ok(outcome);
}
DocImportProgress::Abort(err) => return Err(err.into()),
DocImportProgress::Progress { .. } => {}
DocImportProgress::IngestDone { hash, .. } => {
entry_hash = Some(hash);
}
}
}
Err(anyhow!("Response stream ended prematurely"))
}
}

/// Outcome of a [`Doc::import_file`] operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocImportFileOutcome {
/// The hash of the entry's content
hash: Hash,
/// The size of the entry
size: u64,
/// The key of the entry
key: Bytes,
/// The tag of the entry
tag: Tag,
}

impl Stream for DocImportFileProgress {
type Item = Result<DocImportProgress>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.poll_next_unpin(cx)
}
}

// /// Progress stream for doc export operations.
// #[derive(derive_more::Debug)]
// pub struct DocExportFileProgress {
// #[debug(skip)]
// stream: Pin<Box<dyn Stream<Item = DocExportProgress> + Send + Unpin + 'static>>,
// }

// impl DocExportFileProgress {
// fn new(stream: (impl Stream<Item = DocExportProgress> + Send + Unpin + 'static)) -> Self {
// Self {
// stream: Box::pin(stream),
// }
// }
// }

// /// Iterate through the export progress stream, returning when the stream has completed.
//
// /// Returns a [`DocExportFileOutcome`] which contains a file path the data was writen to and the size of the content.
// pub async fn finish(mut self) -> Result<DocExportFileOutcome> {
// let mut total_size = 0;
// let mut path = None;
// while let Some(msg) = self.next().await {
// match msg? {
// DocExportProgress::Found { size, outpath, .. } => {
// total_size = size;
// path = Some(outpath);
// }
// DocExportProgress::AllDone => {
// let path = path.context("expected DocExportProgress::Found event to occur")?;
// let outcome = DocExportFileOutcome {
// size: total_size,
// path,
// };
// return Ok(outcome);
// }
// DocExportProgress::Abort(err) => return Err(err.into()),
// DocExportProgress::Progress { .. } => {}
// }
// }
// Err(anyhow!("Response stream ended prematurely"))
// }
// }

/// Outcome of a [`Doc::export_file`] operation
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DocExportFileOutcome {
/// The size of the entry
size: u64,
/// The path to which the entry was saved
path: PathBuf,
}

// impl Stream for DocExportFileProgress {
// type Item = Result<DocExportProgress>;
// fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// self.stream.poll_next_unpin(cx)
// }
// }

fn flatten<T, E1, E2>(
s: impl Stream<Item = StdResult<StdResult<T, E1>, E2>>,
) -> impl Stream<Item = Result<T>>
Expand Down
122 changes: 121 additions & 1 deletion iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ use crate::rpc_protocol::{
BlobAddStreamUpdate, BlobDeleteBlobRequest, BlobDownloadRequest, BlobListCollectionsRequest,
BlobListCollectionsResponse, BlobListIncompleteRequest, BlobListIncompleteResponse,
BlobListRequest, BlobListResponse, BlobReadRequest, BlobReadResponse, BlobValidateRequest,
DeleteTagRequest, DownloadLocation, ListTagsRequest, ListTagsResponse,
DeleteTagRequest, DocImportFileRequest, DocImportFileResponse, DocImportProgress,
DocSetHashRequest, DownloadLocation, ListTagsRequest, ListTagsResponse,
NodeConnectionInfoRequest, NodeConnectionInfoResponse, NodeConnectionsRequest,
NodeConnectionsResponse, NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse,
NodeStatusRequest, NodeStatusResponse, NodeWatchRequest, NodeWatchResponse, ProviderRequest,
Expand Down Expand Up @@ -954,6 +955,121 @@ impl<D: BaoStore> RpcHandler<D> {
rx.into_stream().map(BlobAddPathResponse)
}

fn doc_import_file(
self,
msg: DocImportFileRequest,
) -> impl Stream<Item = DocImportFileResponse> {
// provide a little buffer so that we don't slow down the sender
let (tx, rx) = flume::bounded(32);
let tx2 = tx.clone();
self.rt().local_pool().spawn_pinned(|| async move {
if let Err(e) = self.doc_import_file0(msg, tx).await {
tx2.send_async(DocImportProgress::Abort(e.into()))
.await
.ok();
}
});
rx.into_stream().map(DocImportFileResponse)
}

async fn doc_import_file0(
self,
msg: DocImportFileRequest,
progress: flume::Sender<DocImportProgress>,
) -> anyhow::Result<()> {
use iroh_bytes::store::ImportMode;
use std::collections::BTreeMap;

let progress = FlumeProgressSender::new(progress);
let names = Arc::new(Mutex::new(BTreeMap::new()));
// convert import progress to provide progress
let import_progress = progress.clone().with_filter_map(move |x| match x {
ImportProgress::Found { id, name } => {
names.lock().unwrap().insert(id, name);
None
}
ImportProgress::Size { id, size } => {
let name = names.lock().unwrap().remove(&id)?;
Some(DocImportProgress::Found { id, name, size })
}
ImportProgress::OutboardProgress { id, offset } => {
Some(DocImportProgress::Progress { id, offset })
}
ImportProgress::OutboardDone { hash, id } => {
Some(DocImportProgress::IngestDone { hash, id })
}
_ => None,
});
let DocImportFileRequest {
doc_id,
author_id,
key,
path: root,
// todo: figure out tags
// in_place,
// tag,
} = msg;
// Check that the path is absolute and exists.
anyhow::ensure!(root.is_absolute(), "path must be absolute");
anyhow::ensure!(
root.exists(),
"trying to add missing path: {}",
root.display()
);

// todo: do we assume copy?
// let import_mode = match in_place {
// true => ImportMode::TryReference,
// false => ImportMode::Copy,
// };
let import_mode = ImportMode::Copy;

let (temp_tag, size) = self
.inner
.db
.import_file(root, import_mode, BlobFormat::Raw, import_progress)
.await?;

let hash_and_format = temp_tag.inner();
let HashAndFormat { hash, format } = *hash_and_format;
// todo: figure out tags
// let tag = match tag {
// SetTagOption::Named(tag) => {
// self.inner
// .db
// .set_tag(tag.clone(), Some(*hash_and_format))
// .await?;
// tag
// }
// SetTagOption::Auto => self.inner.db.create_tag(*hash_and_format).await?,
// };
let tag = self.inner.db.create_tag(*hash_and_format).await?;
self.inner
.sync
.doc_set_hash(DocSetHashRequest {
doc_id,
author_id,
key: key.clone(),
hash,
size,
})
.await?;
progress
.send(DocImportProgress::AllDone {
key,
tag: tag.clone(),
})
.await?;
self.inner
.callbacks
.send(Event::ByteProvide(
iroh_bytes::provider::Event::TaggedBlobAdded { hash, format, tag },
))
.await;

Ok(())
}

async fn blob_export(
self,
out: String,
Expand Down Expand Up @@ -1546,6 +1662,10 @@ fn handle_rpc_request<D: BaoStore, E: ServiceEndpoint<ProviderService>>(
})
.await
}
DocImportFile(msg) => {
chan.server_streaming(msg, handler, RpcHandler::doc_import_file)
.await
}
DocDel(msg) => {
chan.rpc(msg, handler, |handler, req| async move {
handler.inner.sync.doc_del(req).await
Expand Down
Loading

0 comments on commit 49d7f56

Please sign in to comment.