Skip to content

Commit

Permalink
move export code
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire committed May 7, 2024
1 parent 4fd1b01 commit 3d1654e
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 105 deletions.
53 changes: 14 additions & 39 deletions iroh-blobs/examples/provide-bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use anyhow::Result;
use tokio_util::task::LocalPoolHandle;
use tracing_subscriber::{prelude::*, EnvFilter};

use iroh_blobs::{format::collection::Collection, Hash};
use iroh_blobs::Hash;

mod connect;
use connect::{make_and_write_certs, make_server_endpoint, CERT_PATH};
Expand All @@ -30,43 +30,18 @@ pub fn setup_logging() {
#[tokio::main]
async fn main() -> Result<()> {
let args: Vec<_> = std::env::args().collect();
if args.len() != 2 {
anyhow::bail!(
"usage: provide-bytes [FORMAT], where [FORMAT] is either 'blob' or 'collection'\n\nThe 'blob' example demonstrates sending a single blob of bytes. The 'collection' example demonstrates sending multiple blobs of bytes, grouped together in a 'collection'."
);
if args.len() != 1 {
anyhow::bail!("usage: provide-bytes demonstrates sending a single blob of bytes.");
}
let format = {
if args[1] != "blob" && args[1] != "collection" {
anyhow::bail!(
"expected either 'blob' or 'collection' for FORMAT argument, got {}",
args[1]
);
}
args[1].clone()
};
println!("\nprovide bytes {format} example!");

let (db, hash) = if format == "collection" {
let (mut db, names) = iroh_blobs::store::readonly_mem::Store::new([
("blob1", b"the first blob of bytes".to_vec()),
("blob2", b"the second blob of bytes".to_vec()),
]); // create a collection
let collection: Collection = names
.into_iter()
.map(|(name, hash)| (name, Hash::from(hash)))
.collect();
// add it to the db
let hash = db.insert_many(collection.to_blobs()).unwrap();
(db, hash)
} else {
// create a new database and add a blob
let (db, names) =
iroh_blobs::store::readonly_mem::Store::new([("hello", b"Hello World!".to_vec())]);

// get the hash of the content
let hash = names.get("hello").unwrap();
(db, Hash::from(hash.as_bytes()))
};
println!("\nprovide bytes blob example!");

// create a new database and add a blob
let (db, names) =
iroh_blobs::store::readonly_mem::Store::new([("hello", b"Hello World!".to_vec())]);

// get the hash of the content
let hash = names.get("hello").unwrap();
let hash = Hash::from(hash.as_bytes());

// create tls certs and save to CERT_PATH
let (key, cert) = make_and_write_certs().await?;
Expand All @@ -77,8 +52,8 @@ async fn main() -> Result<()> {
println!("\nlistening on {addr}");
println!("providing hash {hash}");

println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {hash} \"{addr}\" {format}");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {hash} \"{addr}\" {format}\n");
println!("\nfetch the content using a finite state machine by running the following example:\n\ncargo run --example fetch-fsm {hash} \"{addr}\"");
println!("\nfetch the content using a stream by running the following example:\n\ncargo run --example fetch-stream {hash} \"{addr}\"\n");

// create a new local pool handle with 1 worker thread
let lp = LocalPoolHandle::new(1);
Expand Down
24 changes: 1 addition & 23 deletions iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,11 @@ use serde::{Deserialize, Serialize};
use tracing::trace;

use crate::{
store::{BaoBlobSize, ExportFormat, ExportMode, MapEntry, Store as BaoStore},
store::{BaoBlobSize, ExportMode, MapEntry, Store as BaoStore},
util::progress::{IdGenerator, ProgressSender},
Hash,
};

/// Export a hash to the local file system.
///
/// This exports a single hash, or a collection `recursive` is true, from the `db` store to the
/// local filesystem. Depending on `mode` the data is either copied or reflinked (if possible).
///
/// Progress is reported as [`ExportProgress`] through a [`ProgressSender`]. Note that the
/// [`ExportProgress::AllDone`] event is not emitted from here, but left to an upper layer to send,
/// if desired.
pub async fn export<D: BaoStore>(
db: &D,
hash: Hash,
outpath: PathBuf,
format: ExportFormat,
mode: ExportMode,
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
match format {
ExportFormat::Blob => export_blob(db, hash, outpath, mode, progress).await,
ExportFormat::Collection => todo!(), //export_collection(db, hash, outpath, mode, progress).await,
}
}

/// Export a single blob to a file on the local fileystem.
pub async fn export_blob<D: BaoStore>(
db: &D,
Expand Down
18 changes: 0 additions & 18 deletions iroh-blobs/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,24 +742,6 @@ pub enum ExportMode {
TryReference,
}

/// The expected format of a hash being exported.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum ExportFormat {
/// The hash refers to any blob and will be exported to a single file.
#[default]
Blob,
/// The hash refers to a [`crate::format::collection::Collection`] blob
/// and all children of the collection shall be exported to one file per child.
///
/// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
/// collection metadata, all other children of the collection will be exported to
/// a file each, with their collection name treated as a relative path to the export
/// destination path.
///
/// If the blob cannot be parsed as a collection, the operation will fail.
Collection,
}

#[allow(missing_docs)]
#[derive(Debug)]
pub enum ExportProgress {
Expand Down
9 changes: 3 additions & 6 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@ use indicatif::{
ProgressStyle,
};
use iroh::{
base::node_addr::AddrInfoOptions,
base::ticket::BlobTicket,
base::{node_addr::AddrInfoOptions, ticket::BlobTicket},
blobs::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
provider::AddProgress,
store::{
ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress,
},
store::{ConsistencyCheckProgress, ExportMode, ReportLevel, ValidateProgress},
util::SetTagOption,
BlobFormat, Hash, HashAndFormat, Tag,
},
client::{
blobs::{
BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions,
BlobInfo, BlobStatus, CollectionInfo, DownloadMode, DownloadOptions, ExportFormat,
IncompleteBlobInfo, WrapOption,
},
Iroh, RpcService,
Expand Down
25 changes: 22 additions & 3 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use iroh_blobs::{
export::ExportProgress as BytesExportProgress,
get::db::DownloadProgress as BytesDownloadProgress,
hashseq::parse_hash_seq_tokio,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
store::{ConsistencyCheckProgress, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
};
use iroh_net::NodeAddr;
Expand Down Expand Up @@ -134,7 +134,7 @@ where
tag: SetTagOption,
tags_to_delete: Vec<Tag>,
) -> anyhow::Result<(Hash, Tag)> {
let (hash, tag) = collection.store_iroh(&self, tag).await?;
let (hash, tag) = collection.store_iroh(self, tag).await?;

let tags = super::tags::Client {
rpc: self.rpc.clone(),
Expand Down Expand Up @@ -345,11 +345,12 @@ where

/// Read the content of a collection.
pub async fn get_collection(&self, hash: Hash) -> Result<Collection> {
let collection = Collection::load_iroh(&self, hash).await?;
let collection = Collection::load_iroh(self, hash).await?;
Ok(collection)
}

/// List all collections.
#[allow(clippy::unused_async)]
pub async fn list_collections(&self) -> Result<impl Stream<Item = Result<CollectionInfo>>> {
let this = self.clone();
let stream = Gen::new(move |co| async move {
Expand Down Expand Up @@ -918,6 +919,24 @@ pub enum DownloadMode {
Queued,
}

/// The expected format of a hash being exported.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub enum ExportFormat {
/// The hash refers to any blob and will be exported to a single file.
#[default]
Blob,
/// The hash refers to a [`crate::format::collection::Collection`] blob
/// and all children of the collection shall be exported to one file per child.
///
/// If the blob can be parsed as a [`BlobFormat::HashSeq`], and the first child contains
/// collection metadata, all other children of the collection will be exported to
/// a file each, with their collection name treated as a relative path to the export
/// destination path.
///
/// If the blob cannot be parsed as a collection, the operation will fail.
Collection,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
41 changes: 27 additions & 14 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use iroh_blobs::downloader::{DownloadRequest, Downloader};
use iroh_blobs::export::ExportProgress;
use iroh_blobs::get::db::DownloadProgress;
use iroh_blobs::get::Stats;
use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry};
use iroh_blobs::store::{ConsistencyCheckProgress, ImportProgress, MapEntry};
use iroh_blobs::util::progress::ProgressSender;
use iroh_blobs::BlobFormat;
use iroh_blobs::{
Expand All @@ -30,7 +30,7 @@ use quic_rpc::{
use tokio_util::task::LocalPoolHandle;
use tracing::{debug, info};

use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption};
use crate::client::blobs::{BlobInfo, DownloadMode, ExportFormat, IncompleteBlobInfo, WrapOption};
use crate::client::node::NodeStatus;
use crate::client::tags::TagInfo;
use crate::rpc_protocol::{
Expand All @@ -44,7 +44,7 @@ use crate::rpc_protocol::{
NodeShutdownRequest, NodeStatsRequest, NodeStatsResponse, NodeStatusRequest, NodeWatchRequest,
NodeWatchResponse, Request, RpcService, SetTagOption,
};
use crate::util::collection::Collection;
use crate::util::collection::{export_collection, Collection};

use super::{Event, NodeInner};

Expand Down Expand Up @@ -538,11 +538,10 @@ impl<D: BaoStore> Handler<D> {
}
x
});
iroh_blobs::export::export(
iroh_blobs::export::export_blob(
&self.inner.db,
entry.content_hash(),
path,
ExportFormat::Blob,
mode,
export_progress,
)
Expand Down Expand Up @@ -573,15 +572,29 @@ impl<D: BaoStore> Handler<D> {
let (tx, rx) = flume::bounded(1024);
let progress = FlumeProgressSender::new(tx);
self.rt().spawn_pinned(move || async move {
let res = iroh_blobs::export::export(
&self.inner.db,
msg.hash,
msg.path,
msg.format,
msg.mode,
progress.clone(),
)
.await;
let res = match msg.format {
ExportFormat::Blob => {
iroh_blobs::export::export_blob(
&self.inner.db,
msg.hash,
msg.path,
msg.mode,
progress.clone(),
)
.await
}
ExportFormat::Collection => {
export_collection(
&self.inner.db,
msg.hash,
msg.path,
msg.mode,
progress.clone(),
)
.await
}
};

match res {
Ok(()) => progress.send(ExportProgress::AllDone).await.ok(),
Err(err) => progress.send(ExportProgress::Abort(err.into())).await.ok(),
Expand Down
4 changes: 2 additions & 2 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ use quic_rpc::{
use serde::{Deserialize, Serialize};

pub use iroh_base::rpc::{RpcError, RpcResult};
use iroh_blobs::store::{ExportFormat, ExportMode};
use iroh_blobs::store::ExportMode;
pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress};

use crate::{
client::{
blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption},
blobs::{BlobInfo, DownloadMode, ExportFormat, IncompleteBlobInfo, WrapOption},
docs::ShareMode,
node::NodeStatus,
tags::TagInfo,
Expand Down

0 comments on commit 3d1654e

Please sign in to comment.