Skip to content

Commit

Permalink
Merge branch 'main' into batch-blob-api-2
Browse files Browse the repository at this point in the history
# Conflicts:
#	iroh/src/client/blobs.rs
#	iroh/src/node/rpc.rs
#	iroh/src/rpc_protocol.rs
  • Loading branch information
rklaehn committed Jun 6, 2024
1 parent 9cefcad commit f6a8d15
Show file tree
Hide file tree
Showing 19 changed files with 398 additions and 324 deletions.
2 changes: 1 addition & 1 deletion iroh-blobs/src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub async fn export_collection<D: BaoStore>(
progress: impl ProgressSender<Msg = ExportProgress> + IdGenerator,
) -> anyhow::Result<()> {
tokio::fs::create_dir_all(&outpath).await?;
let collection = Collection::load(db, &hash).await?;
let collection = Collection::load_db(db, &hash).await?;
for (name, hash) in collection.into_iter() {
#[allow(clippy::needless_borrow)]
let path = outpath.join(pathbuf_from_name(&name));
Expand Down
24 changes: 22 additions & 2 deletions iroh-blobs/src/format/collection.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! The collection type used by iroh
use std::collections::BTreeMap;
use std::{collections::BTreeMap, future::Future};

use anyhow::Context;
use bao_tree::blake3;
Expand Down Expand Up @@ -64,6 +64,12 @@ impl IntoIterator for Collection {
}
}

/// A simple store trait for loading blobs
pub trait SimpleStore {
/// Load a blob from the store
fn load(&self, hash: Hash) -> impl Future<Output = anyhow::Result<Bytes>> + Send + '_;
}

/// Metadata for a collection
///
/// This is the wire format for the metadata blob.
Expand Down Expand Up @@ -160,11 +166,25 @@ impl Collection {
Ok((collection, res, stats))
}

/// Create a new collection from a hash sequence and metadata.
pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result<Self> {
let hs = store.load(root).await?;
let hs = HashSeq::try_from(hs)?;
let meta_hash = hs.iter().next().context("empty hash seq")?;
let meta = store.load(meta_hash).await?;
let meta: CollectionMeta = postcard::from_bytes(&meta)?;
anyhow::ensure!(
meta.names.len() + 1 == hs.len(),
"names and links length mismatch"
);
Ok(Self::from_parts(hs.into_iter(), meta))
}

/// Load a collection from a store given a root hash
///
/// This assumes that both the links and the metadata of the collection is stored in the store.
/// It does not require that all child blobs are stored in the store.
pub async fn load<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
pub async fn load_db<D>(db: &D, root: &Hash) -> anyhow::Result<Self>
where
D: crate::store::Map,
{
Expand Down
2 changes: 1 addition & 1 deletion iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ impl ListCommands {
}
}
Self::Collections => {
let mut response = iroh.blobs.list_collections().await?;
let mut response = iroh.blobs.list_collections()?;
while let Some(item) = response.next().await {
let CollectionInfo {
tag,
Expand Down
6 changes: 3 additions & 3 deletions iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25", optional = true }
futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.17.0", path = "../iroh-base" }
iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] }
Expand All @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = { version = "3.4" }
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tracing = "0.1"
Expand All @@ -57,7 +57,7 @@ test-strategy = "0.3.1"

[features]
default = ["net", "metrics", "engine"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"]
metrics = ["dep:iroh-metrics"]
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"]

Expand Down
69 changes: 51 additions & 18 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::{

use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_base::hash::Hash;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};

use crate::{
Expand Down Expand Up @@ -253,6 +254,7 @@ impl SyncHandle {
states: Default::default(),
action_rx,
content_status_callback,
tasks: Default::default(),
};
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
Expand Down Expand Up @@ -570,22 +572,37 @@ struct Actor {
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}

impl Actor {
fn run(mut self) -> Result<()> {
fn run(self) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
let local_set = tokio::task::LocalSet::new();
local_set.block_on(&rt, async move { self.run_async().await })
}
async fn run_async(mut self) -> Result<()> {
loop {
let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
Ok(action) => action,
Err(flume::RecvTimeoutError::Timeout) => {
let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
tokio::pin!(timeout);
let action = tokio::select! {
_ = &mut timeout => {
if let Err(cause) = self.store.flush() {
error!(?cause, "failed to flush store");
}
continue;
}
Err(flume::RecvTimeoutError::Disconnected) => {
debug!("action channel disconnected");
break;
action = self.action_rx.recv_async() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
debug!("action channel disconnected");
break;
}

}
}
};
trace!(%action, "tick");
Expand All @@ -607,6 +624,7 @@ impl Actor {
}
}
}
self.tasks.abort_all();
debug!("shutdown");
Ok(())
}
Expand Down Expand Up @@ -636,13 +654,21 @@ impl Actor {
}
Ok(id)
}),
Action::ListAuthors { reply } => iter_to_channel(
reply,
self.store
Action::ListAuthors { reply } => {
let iter = self
.store
.list_authors()
.map(|a| a.map(|a| a.map(|a| a.id()))),
),
Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
.map(|a| a.map(|a| a.map(|a| a.id())));
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ListReplicas { reply } => {
let iter = self.store.list_namespaces();
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ContentHashes { reply } => {
send_reply_with(reply, self, |this| this.store.content_hashes())
}
Expand All @@ -657,7 +683,9 @@ impl Actor {
) -> Result<(), SendReplyError> {
match action {
ReplicaAction::Open { reply, opts } => {
tracing::trace!("open in");
let res = self.open(namespace, opts);
tracing::trace!("open out");
send_reply(reply, res)
}
ReplicaAction::Close { reply } => {
Expand Down Expand Up @@ -759,7 +787,9 @@ impl Actor {
.states
.ensure_open(&namespace)
.and_then(|_| self.store.get_many(namespace, query));
iter_to_channel(reply, iter)
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
this.close(namespace);
Expand Down Expand Up @@ -921,15 +951,18 @@ impl OpenReplicas {
}
}

fn iter_to_channel<T: Send + 'static>(
async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send(item).map_err(send_reply_error)?;
channel.send_async(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
match details
.outcome
.heads_received
.encode(Some(iroh_gossip::net::MAX_MESSAGE_SIZE))
.encode(Some(self.gossip.max_message_size()))
{
Err(err) => warn!(?err, "Failed to encode author heads for sync report"),
Ok(heads) => {
Expand Down
Loading

0 comments on commit f6a8d15

Please sign in to comment.