diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 074a3bea4a..d027f93ccf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -197,6 +197,8 @@ jobs: steps: - uses: actions/checkout@v4 - uses: dtolnay/rust-toolchain@stable + with: + components: clippy - name: Install sccache uses: mozilla-actions/sccache-action@v0.0.4 diff --git a/Cargo.lock b/Cargo.lock index 0a06fd14bf..8cd20fe5bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2662,10 +2662,14 @@ dependencies = [ "derive_more", "ed25519-dalek", "flume", + "futures-buffered", + "futures-lite 2.3.0", "futures-util", "hex", "iroh-base", "iroh-blake3", + "iroh-blobs", + "iroh-gossip", "iroh-metrics", "iroh-net", "iroh-test", diff --git a/iroh-blobs/src/export.rs b/iroh-blobs/src/export.rs index 75b282fd6c..cdbda28881 100644 --- a/iroh-blobs/src/export.rs +++ b/iroh-blobs/src/export.rs @@ -46,7 +46,7 @@ pub async fn export_collection( progress: impl ProgressSender + IdGenerator, ) -> anyhow::Result<()> { tokio::fs::create_dir_all(&outpath).await?; - let collection = Collection::load(db, &hash).await?; + let collection = Collection::load_db(db, &hash).await?; for (name, hash) in collection.into_iter() { #[allow(clippy::needless_borrow)] let path = outpath.join(pathbuf_from_name(&name)); diff --git a/iroh-blobs/src/format/collection.rs b/iroh-blobs/src/format/collection.rs index ab13572cc1..cdf4448e98 100644 --- a/iroh-blobs/src/format/collection.rs +++ b/iroh-blobs/src/format/collection.rs @@ -1,5 +1,5 @@ //! The collection type used by iroh -use std::collections::BTreeMap; +use std::{collections::BTreeMap, future::Future}; use anyhow::Context; use bao_tree::blake3; @@ -64,6 +64,12 @@ impl IntoIterator for Collection { } } +/// A simple store trait for loading blobs +pub trait SimpleStore { + /// Load a blob from the store + fn load(&self, hash: Hash) -> impl Future> + Send + '_; +} + /// Metadata for a collection /// /// This is the wire format for the metadata blob. @@ -84,7 +90,7 @@ impl Collection { /// /// To persist the collection, write all the blobs to storage, and use the /// hash of the last blob as the collection hash. - pub fn to_blobs(&self) -> impl Iterator { + pub fn to_blobs(&self) -> impl DoubleEndedIterator { let meta = CollectionMeta { header: *Self::HEADER, names: self.names(), @@ -160,11 +166,25 @@ impl Collection { Ok((collection, res, stats)) } + /// Create a new collection from a hash sequence and metadata. + pub async fn load(root: Hash, store: &impl SimpleStore) -> anyhow::Result { + let hs = store.load(root).await?; + let hs = HashSeq::try_from(hs)?; + let meta_hash = hs.iter().next().context("empty hash seq")?; + let meta = store.load(meta_hash).await?; + let meta: CollectionMeta = postcard::from_bytes(&meta)?; + anyhow::ensure!( + meta.names.len() + 1 == hs.len(), + "names and links length mismatch" + ); + Ok(Self::from_parts(hs.into_iter(), meta)) + } + /// Load a collection from a store given a root hash /// /// This assumes that both the links and the metadata of the collection is stored in the store. /// It does not require that all child blobs are stored in the store. - pub async fn load(db: &D, root: &Hash) -> anyhow::Result + pub async fn load_db(db: &D, root: &Hash) -> anyhow::Result where D: crate::store::Map, { diff --git a/iroh-blobs/src/store.rs b/iroh-blobs/src/store.rs index 0e8f35d301..3030d55b3f 100644 --- a/iroh-blobs/src/store.rs +++ b/iroh-blobs/src/store.rs @@ -11,6 +11,7 @@ pub mod readonly_mem; pub mod fs; mod traits; +use tracing::warn; pub use traits::*; /// Create a 16 byte unique ID. @@ -66,7 +67,10 @@ impl TempCounterMap { fn dec(&mut self, value: &HashAndFormat) { let HashAndFormat { hash, format } = value; - let counters = self.0.get_mut(hash).unwrap(); + let Some(counters) = self.0.get_mut(hash) else { + warn!("Decrementing non-existent temp tag"); + return; + }; counters.dec(*format); if counters.is_empty() { self.0.remove(hash); diff --git a/iroh-blobs/src/store/fs.rs b/iroh-blobs/src/store/fs.rs index e0a4d192f0..5febe54457 100644 --- a/iroh-blobs/src/store/fs.rs +++ b/iroh-blobs/src/store/fs.rs @@ -111,7 +111,7 @@ use crate::{ BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSendError, ProgressSender, }, - raw_outboard_size, LivenessTracker, MemOrFile, + raw_outboard_size, MemOrFile, TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -779,16 +779,18 @@ struct StoreInner { path_options: Arc, } -impl LivenessTracker for RwLock { - fn on_clone(&self, content: &HashAndFormat) { - self.write().unwrap().inc(content); - } - +impl TagDrop for RwLock { fn on_drop(&self, content: &HashAndFormat) { self.write().unwrap().dec(content); } } +impl TagCounter for RwLock { + fn on_create(&self, content: &HashAndFormat) { + self.write().unwrap().inc(content); + } +} + impl StoreInner { fn new_sync(path: PathBuf, options: Options, rt: tokio::runtime::Handle) -> io::Result { tracing::trace!( @@ -981,7 +983,7 @@ impl StoreInner { )) })?; std::fs::create_dir_all(parent)?; - let temp_tag = self.temp_tag(HashAndFormat::raw(hash)); + let temp_tag = self.temp.temp_tag(HashAndFormat::raw(hash)); let (tx, rx) = oneshot::channel(); self.tx .send_async(ActorMessage::Export { @@ -1048,10 +1050,6 @@ impl StoreInner { Ok(rx.await?) } - fn temp_tag(&self, content: HashAndFormat) -> TempTag { - TempTag::new(content, Some(self.temp.clone())) - } - fn import_file_sync( &self, path: PathBuf, @@ -1141,7 +1139,7 @@ impl StoreInner { }; progress.blocking_send(ImportProgress::OutboardDone { id, hash })?; // from here on, everything related to the hash is protected by the temp tag - let tag = self.temp_tag(HashAndFormat { hash, format }); + let tag = self.temp.temp_tag(HashAndFormat { hash, format }); let hash = *tag.hash(); // blocking send for the import let (tx, rx) = flume::bounded(1); @@ -1423,7 +1421,7 @@ impl super::Store for Store { } fn temp_tag(&self, value: HashAndFormat) -> TempTag { - self.0.temp_tag(value) + self.0.temp.temp_tag(value) } async fn shutdown(&self) { @@ -1717,7 +1715,7 @@ impl ActorState { let inline_outboard = outboard_size <= self.options.inline.max_outboard_inlined && outboard_size != 0; // from here on, everything related to the hash is protected by the temp tag - let tag = TempTag::new(content_id, Some(self.temp.clone())); + let tag = self.temp.temp_tag(content_id); let hash = *tag.hash(); self.protected.insert(hash); // move the data file into place, or create a reference to it diff --git a/iroh-blobs/src/store/mem.rs b/iroh-blobs/src/store/mem.rs index 7b14b2a14b..e10849e2b7 100644 --- a/iroh-blobs/src/store/mem.rs +++ b/iroh-blobs/src/store/mem.rs @@ -23,7 +23,7 @@ use crate::{ }, util::{ progress::{BoxedProgressSender, IdGenerator, IgnoreProgressSender, ProgressSender}, - LivenessTracker, + TagCounter, TagDrop, }, Tag, TempTag, IROH_BLOCK_SIZE, }; @@ -43,13 +43,7 @@ pub struct Store { #[derive(Debug, Default)] struct StoreInner(RwLock); -impl LivenessTracker for StoreInner { - fn on_clone(&self, inner: &HashAndFormat) { - tracing::trace!("temp tagging: {:?}", inner); - let mut state = self.0.write().unwrap(); - state.temp.inc(inner); - } - +impl TagDrop for StoreInner { fn on_drop(&self, inner: &HashAndFormat) { tracing::trace!("temp tag drop: {:?}", inner); let mut state = self.0.write().unwrap(); @@ -57,6 +51,14 @@ impl LivenessTracker for StoreInner { } } +impl TagCounter for StoreInner { + fn on_create(&self, inner: &HashAndFormat) { + tracing::trace!("temp tagging: {:?}", inner); + let mut state = self.0.write().unwrap(); + state.temp.inc(inner); + } +} + impl Store { /// Create a new in memory store pub fn new() -> Self { @@ -217,7 +219,7 @@ impl super::Store for Store { } fn temp_tag(&self, tag: HashAndFormat) -> TempTag { - TempTag::new(tag, Some(self.inner.clone())) + self.inner.temp_tag(tag) } async fn gc_start(&self) -> io::Result<()> { diff --git a/iroh-blobs/src/util.rs b/iroh-blobs/src/util.rs index b540b88562..751886492c 100644 --- a/iroh-blobs/src/util.rs +++ b/iroh-blobs/src/util.rs @@ -4,7 +4,12 @@ use bytes::Bytes; use derive_more::{Debug, Display, From, Into}; use range_collections::range_set::RangeSetRange; use serde::{Deserialize, Serialize}; -use std::{borrow::Borrow, fmt, sync::Arc, time::SystemTime}; +use std::{ + borrow::Borrow, + fmt, + sync::{Arc, Weak}, + time::SystemTime, +}; use crate::{store::Store, BlobFormat, Hash, HashAndFormat, IROH_BLOCK_SIZE}; @@ -179,6 +184,13 @@ pub enum SetTagOption { Named(Tag), } +/// Trait used from temp tags to notify an abstract store that a temp tag is +/// being dropped. +pub trait TagDrop: std::fmt::Debug + Send + Sync + 'static { + /// Called on drop + fn on_drop(&self, inner: &HashAndFormat); +} + /// A trait for things that can track liveness of blobs and collections. /// /// This trait works together with [TempTag] to keep track of the liveness of a @@ -187,11 +199,21 @@ pub enum SetTagOption { /// It is important to include the format in the liveness tracking, since /// protecting a collection means protecting the blob and all its children, /// whereas protecting a raw blob only protects the blob itself. -pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { - /// Called on clone - fn on_clone(&self, inner: &HashAndFormat); - /// Called on drop - fn on_drop(&self, inner: &HashAndFormat); +pub trait TagCounter: TagDrop + Sized { + /// Called on creation of a temp tag + fn on_create(&self, inner: &HashAndFormat); + + /// Get this as a weak reference for use in temp tags + fn as_weak(self: &Arc) -> Weak { + let on_drop: Arc = self.clone(); + Arc::downgrade(&on_drop) + } + + /// Create a new temp tag for the given hash and format + fn temp_tag(self: &Arc, inner: HashAndFormat) -> TempTag { + self.on_create(&inner); + TempTag::new(inner, Some(self.as_weak())) + } } /// A hash and format pair that is protected from garbage collection. @@ -202,8 +224,8 @@ pub trait LivenessTracker: std::fmt::Debug + Send + Sync + 'static { pub struct TempTag { /// The hash and format we are pinning inner: HashAndFormat, - /// liveness tracker - liveness: Option>, + /// optional callback to call on drop + on_drop: Option>, } impl TempTag { @@ -214,11 +236,8 @@ impl TempTag { /// The caller is responsible for increasing the refcount on creation and to /// make sure that temp tags that are created between a mark phase and a sweep /// phase are protected. - pub fn new(inner: HashAndFormat, liveness: Option>) -> Self { - if let Some(liveness) = liveness.as_ref() { - liveness.on_clone(&inner); - } - Self { inner, liveness } + pub fn new(inner: HashAndFormat, on_drop: Option>) -> Self { + Self { inner, on_drop } } /// The hash of the pinned item @@ -241,20 +260,16 @@ impl TempTag { // set the liveness tracker to None, so that the refcount is not decreased // during drop. This means that the refcount will never reach 0 and the // item will not be gced until the end of the process. - self.liveness = None; - } -} - -impl Clone for TempTag { - fn clone(&self) -> Self { - Self::new(self.inner, self.liveness.clone()) + self.on_drop = None; } } impl Drop for TempTag { fn drop(&mut self) { - if let Some(liveness) = self.liveness.as_ref() { - liveness.on_drop(&self.inner); + if let Some(on_drop) = self.on_drop.take() { + if let Some(on_drop) = on_drop.upgrade() { + on_drop.on_drop(&self.inner); + } } } } diff --git a/iroh-cli/src/commands/blob.rs b/iroh-cli/src/commands/blob.rs index 82ea5bd4e9..cb1a9fb2e6 100644 --- a/iroh-cli/src/commands/blob.rs +++ b/iroh-cli/src/commands/blob.rs @@ -467,7 +467,7 @@ impl ListCommands { } } Self::Collections => { - let mut response = iroh.blobs.list_collections().await?; + let mut response = iroh.blobs.list_collections()?; while let Some(item) = response.next().await { let CollectionInfo { tag, diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index d1e39d56df..f08c97fee0 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -17,35 +17,35 @@ workspace = true [dependencies] anyhow = "1" blake3 = { package = "iroh-blake3", version = "1.4.5"} +bytes = { version = "1.4", features = ["serde"] } derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] } ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" +futures-buffered = "0.2.4" +futures-lite = "2.3.0" +futures-util = { version = "0.3.25" } +hex = "0.4" iroh-base = { version = "0.17.0", path = "../iroh-base" } +iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] } +iroh-gossip = { version = "0.17.0", path = "../iroh-gossip", optional = true } iroh-metrics = { version = "0.17.0", path = "../iroh-metrics", optional = true } +iroh-net = { version = "0.17.0", optional = true, path = "../iroh-net" } +lru = "0.12" num_enum = "0.7" postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } rand = "0.8.5" rand_core = "0.6.4" -serde = { version = "1.0.164", features = ["derive"] } -strum = { version = "0.25", features = ["derive"] } -bytes = { version = "1.4", features = ["serde"] } -hex = "0.4" -thiserror = "1" -tracing = "0.1" -tokio = { version = "1", features = ["sync"] } - -# fs-store redb = { version = "2.0.0" } redb_v1 = { package = "redb", version = "1.5.1" } +self_cell = "1.0.3" +serde = { version = "1.0.164", features = ["derive"] } +strum = { version = "0.25", features = ["derive"] } tempfile = { version = "3.4" } - -# net -iroh-net = { version = "0.17.0", optional = true, path = "../iroh-net" } -tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } +thiserror = "1" +tokio = { version = "1", features = ["sync", "rt", "time", "macros"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} -futures-util = { version = "0.3.25", optional = true } -lru = "0.12" -self_cell = "1.0.3" +tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } +tracing = "0.1" [dev-dependencies] iroh-test = { path = "../iroh-test" } @@ -56,9 +56,10 @@ tempfile = "3.4" test-strategy = "0.3.1" [features] -default = ["net", "metrics"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] +default = ["net", "metrics", "engine"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"] metrics = ["dep:iroh-metrics"] +engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"] [package.metadata.docs.rs] all-features = true diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..a48e8f55b3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -570,22 +572,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +624,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -636,13 +654,21 @@ impl Actor { } Ok(id) }), - Action::ListAuthors { reply } => iter_to_channel( - reply, - self.store + Action::ListAuthors { reply } => { + let iter = self + .store .list_authors() - .map(|a| a.map(|a| a.map(|a| a.id()))), - ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + .map(|a| a.map(|a| a.map(|a| a.id()))); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +683,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -759,7 +787,9 @@ impl Actor { .states .ensure_open(&namespace) .and_then(|_| self.store.get_many(namespace, query)); - iter_to_channel(reply, iter) + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { this.close(namespace); @@ -921,15 +951,18 @@ impl OpenReplicas { } } -fn iter_to_channel( +async fn iter_to_channel_async( channel: flume::Sender>, iter: Result>>, ) -> Result<(), SendReplyError> { match iter { - Err(err) => channel.send(Err(err)).map_err(send_reply_error)?, + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, Ok(iter) => { for item in iter { - channel.send(item).map_err(send_reply_error)?; + channel.send_async(item).await.map_err(send_reply_error)?; } } } diff --git a/iroh/src/docs_engine.rs b/iroh-docs/src/engine.rs similarity index 84% rename from iroh/src/docs_engine.rs rename to iroh-docs/src/engine.rs index b64870fda3..b5345b0bea 100644 --- a/iroh/src/docs_engine.rs +++ b/iroh-docs/src/engine.rs @@ -1,6 +1,6 @@ -//! Handlers and actors to for live syncing [`iroh_docs`] replicas. +//! Handlers and actors to for live syncing replicas. //! -//! [`iroh_docs::Replica`] is also called documents here. +//! [`crate::Replica`] is also called documents here. use std::path::PathBuf; use std::{ @@ -13,8 +13,6 @@ use anyhow::{bail, Context, Result}; use futures_lite::{Stream, StreamExt}; use iroh_blobs::downloader::Downloader; use iroh_blobs::{store::EntryStatus, Hash}; -use iroh_docs::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId}; -use iroh_docs::{Author, AuthorId}; use iroh_gossip::net::Gossip; use iroh_net::util::SharedAbortingJoinHandle; use iroh_net::{key::PublicKey, Endpoint, NodeAddr}; @@ -22,17 +20,19 @@ use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, oneshot}; use tracing::{error, error_span, Instrument}; -mod gossip; -mod live; -pub mod rpc; -mod state; +use crate::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId}; +use crate::{Author, AuthorId}; -use gossip::GossipActor; -use live::{LiveActor, ToLiveActor}; +use self::gossip::GossipActor; +use self::live::{LiveActor, ToLiveActor}; pub use self::live::SyncEvent; pub use self::state::{Origin, SyncReason}; +mod gossip; +mod live; +mod state; + /// Capacity of the channel for the [`ToLiveActor`] messages. const ACTOR_CHANNEL_CAP: usize = 64; /// Capacity for the channels for [`Engine::subscribe`]. @@ -40,30 +40,30 @@ const SUBSCRIBE_CHANNEL_CAP: usize = 256; /// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with /// peers and a gossip swarm for each syncing document. -/// -/// The RPC methods dealing with documents and sync operate on the `Engine`, with method -/// implementations in [rpc]. #[derive(derive_more::Debug, Clone)] pub struct Engine { - pub(crate) endpoint: Endpoint, - pub(crate) sync: SyncHandle, + /// [`Endpoint`] used by the engine. + pub endpoint: Endpoint, + /// Handle to the actor thread. + pub sync: SyncHandle, + /// The persistent default author for this engine. + pub default_author: Arc, to_live_actor: mpsc::Sender, #[allow(dead_code)] actor_handle: SharedAbortingJoinHandle<()>, #[debug("ContentStatusCallback")] content_status_cb: ContentStatusCallback, - default_author: Arc, } impl Engine { /// Start the sync engine. /// /// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a - /// thread for the [`iroh_docs::actor::SyncHandle`]. - pub(crate) async fn spawn( + /// thread for the [`crate::actor::SyncHandle`]. + pub async fn spawn( endpoint: Endpoint, gossip: Gossip, - replica_store: iroh_docs::store::Store, + replica_store: crate::store::Store, bao_store: B, downloader: Downloader, default_author_storage: DefaultAuthorStorage, @@ -127,7 +127,7 @@ impl Engine { /// /// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer, /// and join an iroh-gossip swarm with these peers to receive and broadcast document updates. - async fn start_sync(&self, namespace: NamespaceId, peers: Vec) -> Result<()> { + pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec) -> Result<()> { let (reply, reply_rx) = oneshot::channel(); self.to_live_actor .send(ToLiveActor::StartSync { @@ -144,7 +144,7 @@ impl Engine { /// /// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means /// they will receive `None` and no further events in case of rejoining the document. - async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> { + pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> { let (reply, reply_rx) = oneshot::channel(); self.to_live_actor .send(ToLiveActor::Leave { @@ -158,7 +158,7 @@ impl Engine { } /// Subscribe to replica and sync progress events. - async fn subscribe( + pub async fn subscribe( &self, namespace: NamespaceId, ) -> Result> + Unpin + 'static> { @@ -195,7 +195,7 @@ impl Engine { } /// Handle an incoming iroh-docs connection. - pub(super) async fn handle_connection( + pub async fn handle_connection( &self, conn: iroh_net::endpoint::Connecting, ) -> anyhow::Result<()> { @@ -205,13 +205,15 @@ impl Engine { Ok(()) } - pub(crate) async fn start_shutdown(&self) -> Result<()> { + /// Shutdown the engine. + pub async fn shutdown(&self) -> Result<()> { self.to_live_actor.send(ToLiveActor::Shutdown).await?; Ok(()) } } -pub(crate) fn entry_to_content_status(entry: io::Result) -> ContentStatus { +/// Converts an [`EntryStatus`] into a ['ContentStatus']. +pub fn entry_to_content_status(entry: io::Result) -> ContentStatus { match entry { Ok(EntryStatus::Complete) => ContentStatus::Complete, Ok(EntryStatus::Partial) => ContentStatus::Incomplete, @@ -277,14 +279,14 @@ impl From for LiveEvent { impl LiveEvent { fn from_replica_event( - ev: iroh_docs::Event, + ev: crate::Event, content_status_cb: &ContentStatusCallback, ) -> Result { Ok(match ev { - iroh_docs::Event::LocalInsert { entry, .. } => Self::InsertLocal { + crate::Event::LocalInsert { entry, .. } => Self::InsertLocal { entry: entry.into(), }, - iroh_docs::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote { + crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote { content_status: content_status_cb(entry.content_hash()), entry: entry.into(), from: PublicKey::from_bytes(&from)?, @@ -302,11 +304,19 @@ impl LiveEvent { /// path (as base32 encoded string of the author's public key). #[derive(Debug)] pub enum DefaultAuthorStorage { + /// Memory storage. Mem, + /// File based persistent storage. Persistent(PathBuf), } impl DefaultAuthorStorage { + /// Load the default author from the storage. + /// + /// Will create and save a new author if the storage is empty. + /// + /// Returns an error if the author can't be parsed or if the uathor does not exist in the docs + /// store. pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result { match self { Self::Mem => { @@ -343,6 +353,8 @@ impl DefaultAuthorStorage { } } } + + /// Save a new default author. pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> { match self { Self::Mem => { @@ -363,24 +375,32 @@ impl DefaultAuthorStorage { } } +/// Peristent default author for a docs engine. #[derive(Debug)] -struct DefaultAuthor { +pub struct DefaultAuthor { value: RwLock, storage: DefaultAuthorStorage, } impl DefaultAuthor { - async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result { + /// Load the default author from storage. + /// + /// If the storage is empty creates a new author and perists it. + pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result { let value = storage.load(docs_store).await?; Ok(Self { value: RwLock::new(value), storage, }) } - fn get(&self) -> AuthorId { + + /// Get the current default author. + pub fn get(&self) -> AuthorId { *self.value.read().unwrap() } - async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> { + + /// Set the default author. + pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> { if docs_store.export_author(author_id).await?.is_none() { bail!("The author does not exist"); } diff --git a/iroh/src/docs_engine/gossip.rs b/iroh-docs/src/engine/gossip.rs similarity index 99% rename from iroh/src/docs_engine/gossip.rs rename to iroh-docs/src/engine/gossip.rs index 373bd20ec6..17077ac802 100644 --- a/iroh/src/docs_engine/gossip.rs +++ b/iroh-docs/src/engine/gossip.rs @@ -3,7 +3,6 @@ use std::collections::HashSet; use anyhow::{Context, Result}; use futures_lite::StreamExt; use futures_util::FutureExt; -use iroh_docs::{actor::SyncHandle, ContentStatus, NamespaceId}; use iroh_gossip::net::{Event, Gossip}; use iroh_net::key::PublicKey; use tokio::{ @@ -16,6 +15,8 @@ use tokio_stream::{ }; use tracing::{debug, error, trace, warn}; +use crate::{actor::SyncHandle, ContentStatus, NamespaceId}; + use super::live::{Op, ToLiveActor}; #[derive(strum::Display, Debug)] diff --git a/iroh/src/docs_engine/live.rs b/iroh-docs/src/engine/live.rs similarity index 98% rename from iroh/src/docs_engine/live.rs rename to iroh-docs/src/engine/live.rs index 8dd3d5843a..88f4b39e22 100644 --- a/iroh/src/docs_engine/live.rs +++ b/iroh-docs/src/engine/live.rs @@ -9,14 +9,6 @@ use iroh_blobs::downloader::{DownloadError, DownloadRequest, Downloader}; use iroh_blobs::get::Stats; use iroh_blobs::HashAndFormat; use iroh_blobs::{store::EntryStatus, Hash}; -use iroh_docs::{ - actor::{OpenOpts, SyncHandle}, - net::{ - connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError, - SyncFinished, - }, - AuthorHeads, ContentStatus, NamespaceId, SignedEntry, -}; use iroh_gossip::{net::Gossip, proto::TopicId}; use iroh_net::NodeId; use iroh_net::{key::PublicKey, Endpoint, NodeAddr}; @@ -27,6 +19,15 @@ use tokio::{ }; use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument, Span}; +use crate::{ + actor::{OpenOpts, SyncHandle}, + net::{ + connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError, + SyncFinished, + }, + AuthorHeads, ContentStatus, NamespaceId, SignedEntry, +}; + use super::gossip::{GossipActor, ToGossipActor}; use super::state::{NamespaceStates, Origin, SyncReason}; @@ -145,8 +146,8 @@ pub struct LiveActor { gossip: Gossip, bao_store: B, downloader: Downloader, - replica_events_tx: flume::Sender, - replica_events_rx: flume::Receiver, + replica_events_tx: flume::Sender, + replica_events_rx: flume::Receiver, /// Send messages to self. /// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks. @@ -542,7 +543,7 @@ impl LiveActor { match details .outcome .heads_received - .encode(Some(iroh_gossip::net::MAX_MESSAGE_SIZE)) + .encode(Some(self.gossip.max_message_size())) { Err(err) => warn!(?err, "Failed to encode author heads for sync report"), Ok(heads) => { @@ -684,9 +685,9 @@ impl LiveActor { } } - async fn on_replica_event(&mut self, event: iroh_docs::Event) -> Result<()> { + async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> { match event { - iroh_docs::Event::LocalInsert { namespace, entry } => { + crate::Event::LocalInsert { namespace, entry } => { debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert"); let topic = TopicId::from_bytes(*namespace.as_bytes()); // A new entry was inserted locally. Broadcast a gossip message. @@ -696,7 +697,7 @@ impl LiveActor { self.gossip.broadcast(topic, message).await?; } } - iroh_docs::Event::RemoteInsert { + crate::Event::RemoteInsert { namespace, entry, from, diff --git a/iroh/src/docs_engine/state.rs b/iroh-docs/src/engine/state.rs similarity index 99% rename from iroh/src/docs_engine/state.rs rename to iroh-docs/src/engine/state.rs index 91e28a721e..c9d4a1d0e0 100644 --- a/iroh/src/docs_engine/state.rs +++ b/iroh-docs/src/engine/state.rs @@ -1,8 +1,8 @@ -use anyhow::Result; -use iroh_docs::{ +use crate::{ net::{AbortReason, AcceptOutcome, SyncFinished}, NamespaceId, }; +use anyhow::Result; use iroh_net::NodeId; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; diff --git a/iroh-docs/src/lib.rs b/iroh-docs/src/lib.rs index 7bee08d712..b1347fe7be 100644 --- a/iroh-docs/src/lib.rs +++ b/iroh-docs/src/lib.rs @@ -40,6 +40,9 @@ pub mod net; #[cfg(feature = "net")] mod ticket; +#[cfg(feature = "engine")] +pub mod engine; + pub mod actor; pub mod store; pub mod sync; diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..981143ca86 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -154,6 +154,22 @@ impl Store { } } + /// Get an owned read-only snapshot of the database. + /// + /// This will open a new read transaction. The read transaction won't be reused for other + /// reads. + /// + /// This has the side effect of committing any open write transaction, + /// so it can be used as a way to ensure that the data is persisted. + pub fn snapshot_owned(&mut self) -> Result { + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + Ok(tables) + } + /// Get access to the tables to read from them. /// /// The underlying transaction is a write transaction, but with a non-mut @@ -223,8 +239,6 @@ impl Store { } } -type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +311,16 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) } /// Get an author key from the store. @@ -340,19 +352,16 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let authors: Vec<_> = tables + pub fn list_authors(&mut self) -> Result>> { + let tables = self.snapshot()?; + let iter = tables .authors - .iter()? + .range::<&'static [u8; 32]>(..)? .map(|res| match res { Ok((_key, value)) => Ok(Author::from_bytes(value.value())), Err(err) => Err(err.into()), - }) - .collect(); - - Ok(authors.into_iter()) + }); + Ok(iter) } /// Import a new replica namespace. @@ -413,7 +422,8 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - QueryIterator::new(self.tables()?, namespace, query.into()) + let tables = self.snapshot_owned()?; + QueryIterator::new(tables, namespace, query.into()) } /// Get an entry by key and author. @@ -435,13 +445,8 @@ impl Store { /// Get all content hashes of all replicas in the store. pub fn content_hashes(&mut self) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; - let records = tables.records; - ContentHashesIterator::all(records) + let tables = self.snapshot_owned()?; + ContentHashesIterator::all(&tables.records) } /// Get the latest entry for each author in a namespace. @@ -870,14 +875,6 @@ impl Iterator for ParentIterator { } } -self_cell::self_cell!( - struct ContentHashesIteratorInner { - owner: RecordsTable, - #[covariant] - dependent: RecordsRange, - } -); - /// Iterator for all content hashes /// /// Note that you might get duplicate hashes. Also, the iterator will keep @@ -886,13 +883,16 @@ self_cell::self_cell!( /// Also, this represents a snapshot of the database at the time of creation. /// It nees a copy of a redb::ReadOnlyTable to be self-contained. #[derive(derive_more::Debug)] -pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner); +pub struct ContentHashesIterator { + #[debug(skip)] + range: RecordsRange<'static>, +} impl ContentHashesIterator { /// Create a new iterator over all content hashes. - pub fn all(owner: RecordsTable) -> anyhow::Result { - let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?; - Ok(Self(inner)) + pub fn all(table: &RecordsTable) -> anyhow::Result { + let range = RecordsRange::all_static(table)?; + Ok(Self { range }) } } @@ -900,7 +900,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - let v = self.0.with_dependent_mut(|_, d| d.next())?; + let v = self.range.next()?; Some(v.map(|e| e.content_hash())) } } diff --git a/iroh-docs/src/store/fs/query.rs b/iroh-docs/src/store/fs/query.rs index a73dbcd8e7..f05b4ecfb3 100644 --- a/iroh-docs/src/store/fs/query.rs +++ b/iroh-docs/src/store/fs/query.rs @@ -3,6 +3,7 @@ use iroh_base::hash::Hash; use crate::{ store::{ + fs::tables::ReadOnlyTables, util::{IndexKind, LatestPerKeySelector, SelectorRes}, AuthorFilter, KeyFilter, Query, }, @@ -12,34 +13,33 @@ use crate::{ use super::{ bounds::{ByKeyBounds, RecordsBounds}, ranges::{RecordsByKeyRange, RecordsRange}, - tables::Tables, RecordsValue, }; /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange<'static>, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(&tables.records, bounds)?; + let range = RecordsRange::with_bounds_static(&tables.records, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds( - &tables.records_by_key, - &tables.records, - bounds, - )?; + let range = + RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-docs/src/store/fs/ranges.rs b/iroh-docs/src/store/fs/ranges.rs index 9219c620ac..f28d95ae63 100644 --- a/iroh-docs/src/store/fs/ranges.rs +++ b/iroh-docs/src/store/fs/ranges.rs @@ -1,6 +1,6 @@ //! Ranges and helpers for working with [`redb`] tables -use redb::{Key, Range, ReadableTable, Table, Value}; +use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt for Range<'a, K, V #[debug("RecordsRange")] pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn all( - records: &'a impl ReadableTable, RecordsValue<'static>>, - ) -> anyhow::Result { - let range = records.range::>(..)?; - Ok(Self(range)) - } +// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>; +impl<'a> RecordsRange<'a> { pub(super) fn with_bounds( records: &'a impl ReadableTable, RecordsValue<'static>>, bounds: RecordsBounds, @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> { Ok(Self(range)) } + // /// Get the next item in the range. /// /// Omit items for which the `matcher` function returns false. @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> { } } +impl RecordsRange<'static> { + pub(super) fn all_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + ) -> anyhow::Result { + let range = records.range::>(..)?; + Ok(Self(range)) + } + pub(super) fn with_bounds_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + bounds: RecordsBounds, + ) -> anyhow::Result { + let range = records.range(bounds.as_ref())?; + Ok(Self(range)) + } +} + impl<'a> Iterator for RecordsRange<'a> { type Item = anyhow::Result; fn next(&mut self) -> Option { @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a> { - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, - by_key_range: Range<'a, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { +impl RecordsByKeyRange { pub fn with_bounds( - records_by_key_table: &'a impl ReadableTable, ()>, - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, + records_by_key_table: ReadOnlyTable, ()>, + records_table: ReadOnlyTable, RecordsValue<'static>>, bounds: ByKeyBounds, ) -> anyhow::Result { let by_key_range = records_by_key_table.range(bounds.as_ref())?; diff --git a/iroh-gossip/src/net.rs b/iroh-gossip/src/net.rs index 4083e3a113..756ccfee68 100644 --- a/iroh-gossip/src/net.rs +++ b/iroh-gossip/src/net.rs @@ -26,10 +26,6 @@ pub mod util; /// ALPN protocol name pub const GOSSIP_ALPN: &[u8] = b"/iroh-gossip/0"; -/// Maximum message size is limited currently. The limit is more-or-less arbitrary. -// TODO: Make the limit configurable. -pub const MAX_MESSAGE_SIZE: usize = 4096; - /// Channel capacity for all subscription broadcast channels (single) const SUBSCRIBE_ALL_CAP: usize = 2048; /// Channel capacity for topic subscription broadcast channels (one per topic) @@ -76,6 +72,7 @@ pub struct Gossip { to_actor_tx: mpsc::Sender, on_endpoints_tx: mpsc::Sender>, _actor_handle: Arc>>, + max_message_size: usize, } impl Gossip { @@ -94,6 +91,7 @@ impl Gossip { let (on_endpoints_tx, on_endpoints_rx) = mpsc::channel(ON_ENDPOINTS_CAP); let me = endpoint.node_id().fmt_short(); + let max_message_size = state.max_message_size(); let actor = Actor { endpoint, state, @@ -125,9 +123,15 @@ impl Gossip { to_actor_tx, on_endpoints_tx, _actor_handle: Arc::new(actor_handle), + max_message_size, } } + /// Get the maximum message size configured for this gossip actor. + pub fn max_message_size(&self) -> usize { + self.max_message_size + } + /// Join a topic and connect to peers. /// /// @@ -427,12 +431,23 @@ impl Actor { let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP); self.conn_send_tx.insert(peer_id, send_tx.clone()); + let max_message_size = self.state.max_message_size(); + // Spawn a task for this connection let in_event_tx = self.in_event_tx.clone(); tokio::spawn( async move { debug!("connection established"); - match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await { + match connection_loop( + peer_id, + conn, + origin, + send_rx, + &in_event_tx, + max_message_size, + ) + .await + { Ok(()) => { debug!("connection closed without error") } @@ -605,6 +620,7 @@ async fn connection_loop( origin: ConnOrigin, mut send_rx: mpsc::Receiver, in_event_tx: &mpsc::Sender, + max_message_size: usize, ) -> anyhow::Result<()> { let (mut send, mut recv) = match origin { ConnOrigin::Accept => conn.accept_bi().await?, @@ -621,10 +637,10 @@ async fn connection_loop( // but the other side may still want to use it to // send data to us. Some(msg) = send_rx.recv(), if !send_rx.is_closed() => { - write_message(&mut send, &mut send_buf, &msg).await? + write_message(&mut send, &mut send_buf, &msg, max_message_size).await? } - msg = read_message(&mut recv, &mut recv_buf) => { + msg = read_message(&mut recv, &mut recv_buf, max_message_size) => { let msg = msg?; match msg { None => break, diff --git a/iroh-gossip/src/net/util.rs b/iroh-gossip/src/net/util.rs index 1101300292..2a45fa4961 100644 --- a/iroh-gossip/src/net/util.rs +++ b/iroh-gossip/src/net/util.rs @@ -11,16 +11,17 @@ use tokio::{ use crate::proto::util::TimerMap; -use super::{ProtoMessage, MAX_MESSAGE_SIZE}; +use super::ProtoMessage; /// Write a `ProtoMessage` as a length-prefixed, postcard-encoded message. pub async fn write_message( writer: &mut W, buffer: &mut BytesMut, frame: &ProtoMessage, + max_message_size: usize, ) -> Result<()> { let len = postcard::experimental::serialized_size(&frame)?; - ensure!(len < MAX_MESSAGE_SIZE); + ensure!(len < max_message_size); buffer.clear(); buffer.resize(len, 0u8); let slice = postcard::to_slice(&frame, buffer)?; @@ -33,8 +34,9 @@ pub async fn write_message( pub async fn read_message( reader: impl AsyncRead + Unpin, buffer: &mut BytesMut, + max_message_size: usize, ) -> Result> { - match read_lp(reader, buffer).await? { + match read_lp(reader, buffer, max_message_size).await? { None => Ok(None), Some(data) => { let message = postcard::from_bytes(&data)?; @@ -52,6 +54,7 @@ pub async fn read_message( pub async fn read_lp( mut reader: impl AsyncRead + Unpin, buffer: &mut BytesMut, + max_message_size: usize, ) -> Result> { let size = match reader.read_u32().await { Ok(size) => size, @@ -60,8 +63,8 @@ pub async fn read_lp( }; let mut reader = reader.take(size as u64); let size = usize::try_from(size).context("frame larger than usize")?; - if size > MAX_MESSAGE_SIZE { - bail!("Incoming message exceeds MAX_MESSAGE_SIZE"); + if size > max_message_size { + bail!("Incoming message exceeds the maximum message size of {max_message_size} bytes"); } buffer.reserve(size); loop { diff --git a/iroh-gossip/src/proto/state.rs b/iroh-gossip/src/proto/state.rs index f8b1ebd1e3..a841342014 100644 --- a/iroh-gossip/src/proto/state.rs +++ b/iroh-gossip/src/proto/state.rs @@ -196,6 +196,11 @@ impl State { .unwrap_or(false) } + /// Returns the maximum message size configured in the gossip protocol. + pub fn max_message_size(&self) -> usize { + self.config.max_message_size + } + /// Handle an [`InEvent`] /// /// This returns an iterator of [`OutEvent`]s that must be processed. diff --git a/iroh-gossip/src/proto/topic.rs b/iroh-gossip/src/proto/topic.rs index df36578dbb..0ac50d4f1f 100644 --- a/iroh-gossip/src/proto/topic.rs +++ b/iroh-gossip/src/proto/topic.rs @@ -18,6 +18,10 @@ use super::{ }; use super::{PeerData, PeerIdentity}; +/// The default maximum size in bytes for a gossip message. +/// This is a sane but arbitrary default and can be changed in the [`Config`]. +pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096; + /// Input event to the topic state handler. #[derive(Clone, Debug)] pub enum InEvent { @@ -170,13 +174,32 @@ impl IO for VecDeque> { self.push_back(event.into()) } } + /// Protocol configuration -#[derive(Clone, Default, Debug)] +#[derive(Clone, Debug)] pub struct Config { /// Configuration for the swarm membership layer pub membership: hyparview::Config, /// Configuration for the gossip broadcast layer pub broadcast: plumtree::Config, + /// Max message size in bytes. + /// + /// This size should be the same across a network to ensure all nodes can transmit and read large messages. + /// + /// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages. + /// + /// The default is [`DEFAULT_MAX_MESSAGE_SIZE`]. + pub max_message_size: usize, +} + +impl Default for Config { + fn default() -> Self { + Self { + membership: Default::default(), + broadcast: Default::default(), + max_message_size: DEFAULT_MAX_MESSAGE_SIZE, + } + } } /// The topic state maintains the swarm membership and broadcast tree for a particular topic. diff --git a/iroh-net/src/magicsock.rs b/iroh-net/src/magicsock.rs index 080e99b985..7437c87694 100644 --- a/iroh-net/src/magicsock.rs +++ b/iroh-net/src/magicsock.rs @@ -531,12 +531,20 @@ impl MagicSock { } if udp_addr.is_none() && relay_url.is_none() { - // Handle no addresses being available - warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); - return Poll::Ready(Err(io::Error::new( - io::ErrorKind::NotConnected, - "no UDP or relay address available for node", - ))); + // Returning an error here would lock up the entire `Endpoint`. + // + // If we returned `Poll::Pending`, the waker driving the `poll_send` will never get woken up. + // + // Our best bet here is to log an error and return `Poll::Ready(Ok(n))`. + // + // `n` is the number of consecutive transmits in this batch that are meant for the same destination (a destination that we have no addresses for, and so we can never actually send). + // + // When we return `Poll::Ready(Ok(n))`, we are effectively dropping those n messages, by lying to QUIC and saying they were sent. + // (If we returned `Poll::Ready(Ok(0))` instead, QUIC would loop to attempt to re-send those messages, blocking other traffic.) + // + // When `QUIC` gets no `ACK`s for those messages, the connection will eventually timeout. + error!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); + return Poll::Ready(Ok(n)); } if (udp_addr.is_none() || udp_pending) && (relay_url.is_none() || relay_pending) { @@ -549,14 +557,16 @@ impl MagicSock { } if !relay_sent && !udp_sent && !pings_sent { - warn!(node = %public_key.fmt_short(), "failed to send: no UDP or relay addr"); + // Returning an error here would lock up the entire `Endpoint`. + // Instead, log an error and return `Poll::Pending`, the connection will timeout. let err = udp_error.unwrap_or_else(|| { io::Error::new( io::ErrorKind::NotConnected, "no UDP or relay address available for node", ) }); - return Poll::Ready(Err(err)); + error!(node = %public_key.fmt_short(), "{err:?}"); + return Poll::Pending; } trace!( diff --git a/iroh-net/src/net/interfaces/bsd.rs b/iroh-net/src/net/interfaces/bsd.rs index dd6ca7e3ca..7ef0cd1eb0 100644 --- a/iroh-net/src/net/interfaces/bsd.rs +++ b/iroh-net/src/net/interfaces/bsd.rs @@ -300,7 +300,7 @@ impl WireFormat { Ok(Some(WireMessage::Route(m))) } - #[cfg(any(target_os = "openbsd",))] + #[cfg(target_os = "openbsd")] MessageType::Route => { if data.len() < self.body_off { return Err(RouteError::MessageTooShort); diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 61d075e7fc..e1e98cae2e 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -13,10 +13,11 @@ use anyhow::{anyhow, Result}; use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; +use genawaiter::sync::{Co, Gen}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_blobs::{ export::ExportProgress as BytesExportProgress, - format::collection::Collection, + format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, BlobFormat, Hash, Tag, @@ -31,13 +32,12 @@ use tracing::warn; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddStreamRequest, BlobAddStreamUpdate, BlobConsistencyCheckRequest, - BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDeleteBlobRequest, BlobDownloadRequest, BlobExportRequest, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, NodeStatusRequest, RpcService, SetTagOption, }; -use super::{flatten, Iroh}; +use super::{flatten, tags, Iroh}; /// Iroh blobs client. #[derive(Debug, Clone)] @@ -322,18 +322,35 @@ where /// Read the content of a collection. pub async fn get_collection(&self, hash: Hash) -> Result { - let BlobGetCollectionResponse { collection } = - self.rpc.rpc(BlobGetCollectionRequest { hash }).await??; - Ok(collection) + Collection::load(hash, self).await } /// List all collections. - pub async fn list_collections(&self) -> Result>> { - let stream = self - .rpc - .server_streaming(BlobListCollectionsRequest) - .await?; - Ok(flatten(stream)) + pub fn list_collections(&self) -> Result>> { + let this = self.clone(); + Ok(Gen::new(|co| async move { + if let Err(cause) = this.list_collections_impl(&co).await { + co.yield_(Err(cause)).await; + } + })) + } + + async fn list_collections_impl(&self, co: &Co>) -> Result<()> { + let tags = self.tags_client(); + let mut tags = tags.list_hash_seq().await?; + while let Some(tag) = tags.next().await { + let tag = tag?; + if let Ok(collection) = self.get_collection(tag.hash).await { + let info = CollectionInfo { + tag: tag.name, + hash: tag.hash, + total_blobs_count: Some(collection.len() as u64 + 1), + total_blobs_size: Some(0), + }; + co.yield_(Ok(info)).await; + } + } + Ok(()) } /// Delete a blob. @@ -366,6 +383,21 @@ where Ok(BlobStatus::Partial { size: reader.size }) } } + + fn tags_client(&self) -> tags::Client { + tags::Client { + rpc: self.rpc.clone(), + } + } +} + +impl SimpleStore for Client +where + C: ServiceConnection, +{ + async fn load(&self, hash: Hash) -> anyhow::Result { + self.read_to_bytes(hash).await + } } /// Whether to wrap the added data in a collection. @@ -929,7 +961,7 @@ mod tests { .create_collection(collection, SetTagOption::Auto, tags) .await?; - let collections: Vec<_> = client.blobs.list_collections().await?.try_collect().await?; + let collections: Vec<_> = client.blobs.list_collections()?.try_collect().await?; assert_eq!(collections.len(), 1); { diff --git a/iroh/src/client/docs.rs b/iroh/src/client/docs.rs index c335d082e2..2a35233eba 100644 --- a/iroh/src/client/docs.rs +++ b/iroh/src/client/docs.rs @@ -33,7 +33,7 @@ use crate::rpc_protocol::{ }; #[doc(inline)] -pub use crate::docs_engine::{Origin, SyncEvent, SyncReason}; +pub use iroh_docs::engine::{Origin, SyncEvent, SyncReason}; use super::{blobs, flatten}; @@ -588,13 +588,13 @@ pub enum LiveEvent { PendingContentReady, } -impl From for LiveEvent { - fn from(event: crate::docs_engine::LiveEvent) -> LiveEvent { +impl From for LiveEvent { + fn from(event: crate::docs::engine::LiveEvent) -> LiveEvent { match event { - crate::docs_engine::LiveEvent::InsertLocal { entry } => Self::InsertLocal { + crate::docs::engine::LiveEvent::InsertLocal { entry } => Self::InsertLocal { entry: entry.into(), }, - crate::docs_engine::LiveEvent::InsertRemote { + crate::docs::engine::LiveEvent::InsertRemote { from, entry, content_status, @@ -603,11 +603,11 @@ impl From for LiveEvent { content_status, entry: entry.into(), }, - crate::docs_engine::LiveEvent::ContentReady { hash } => Self::ContentReady { hash }, - crate::docs_engine::LiveEvent::NeighborUp(node) => Self::NeighborUp(node), - crate::docs_engine::LiveEvent::NeighborDown(node) => Self::NeighborDown(node), - crate::docs_engine::LiveEvent::SyncFinished(details) => Self::SyncFinished(details), - crate::docs_engine::LiveEvent::PendingContentReady => Self::PendingContentReady, + crate::docs::engine::LiveEvent::ContentReady { hash } => Self::ContentReady { hash }, + crate::docs::engine::LiveEvent::NeighborUp(node) => Self::NeighborUp(node), + crate::docs::engine::LiveEvent::NeighborDown(node) => Self::NeighborDown(node), + crate::docs::engine::LiveEvent::SyncFinished(details) => Self::SyncFinished(details), + crate::docs::engine::LiveEvent::PendingContentReady => Self::PendingContentReady, } } } diff --git a/iroh/src/client/tags.rs b/iroh/src/client/tags.rs index c2d4309977..c25111e3e3 100644 --- a/iroh/src/client/tags.rs +++ b/iroh/src/client/tags.rs @@ -20,7 +20,16 @@ where { /// List all tags. pub async fn list(&self) -> Result>> { - let stream = self.rpc.server_streaming(ListTagsRequest).await?; + let stream = self.rpc.server_streaming(ListTagsRequest::all()).await?; + Ok(stream.map(|res| res.map_err(anyhow::Error::from))) + } + + /// List all tags with a hash_seq format. + pub async fn list_hash_seq(&self) -> Result>> { + let stream = self + .rpc + .server_streaming(ListTagsRequest::hash_seq()) + .await?; Ok(stream.map(|res| res.map_err(anyhow::Error::from))) } diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 275c23459e..335b962582 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -22,7 +22,6 @@ pub mod client; pub mod node; pub mod util; -mod docs_engine; mod rpc_protocol; /// Expose metrics module diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 058363276f..7e0c6c2975 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -13,6 +13,7 @@ use futures_lite::StreamExt; use iroh_base::key::PublicKey; use iroh_blobs::downloader::Downloader; use iroh_blobs::store::Store as BaoStore; +use iroh_docs::engine::Engine; use iroh_net::util::AbortingJoinHandle; use iroh_net::{endpoint::LocalEndpointsStream, key::SecretKey, Endpoint}; use quic_rpc::transport::flume::FlumeConnection; @@ -23,7 +24,6 @@ use tokio_util::task::LocalPoolHandle; use tracing::debug; use crate::client::RpcService; -use crate::docs_engine::Engine; mod builder; mod rpc; @@ -60,7 +60,7 @@ struct NodeInner { gc_task: Option>, #[debug("rt")] rt: LocalPoolHandle, - pub(crate) sync: Engine, + pub(crate) sync: DocsEngine, downloader: Downloader, } @@ -193,6 +193,17 @@ impl NodeInner { } } +/// Wrapper around [`Engine`] so that we can implement our RPC methods directly. +#[derive(Debug, Clone)] +pub(crate) struct DocsEngine(Engine); + +impl std::ops::Deref for DocsEngine { + type Target = Engine; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 7c9875f3c1..db935479f2 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -14,6 +14,7 @@ use iroh_blobs::{ protocol::Closed, store::{GcMarkEvent, GcSweepEvent, Map, Store as BaoStore}, }; +use iroh_docs::engine::{DefaultAuthorStorage, Engine}; use iroh_docs::net::DOCS_ALPN; use iroh_gossip::net::{Gossip, GOSSIP_ALPN}; use iroh_net::{ @@ -32,13 +33,11 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument}; use crate::{ client::RPC_ALPN, - docs_engine::{DefaultAuthorStorage, Engine}, - node::NodeInner, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; -use super::{rpc, rpc_status::RpcStatus, Node}; +use super::{rpc, rpc_status::RpcStatus, DocsEngine, Node, NodeInner}; pub const PROTOCOLS: [&[u8]; 3] = [iroh_blobs::protocol::ALPN, GOSSIP_ALPN, DOCS_ALPN]; @@ -466,6 +465,7 @@ where ) .await?; let sync_db = sync.sync.clone(); + let sync = DocsEngine(sync); let gc_task = if let GcPolicy::Interval(gc_period) = self.gc_policy { tracing::info!("Starting GC task with interval {:?}", gc_period); @@ -575,7 +575,7 @@ where // clean shutdown of the blobs db to close the write transaction handler.inner.db.shutdown().await; - if let Err(err) = handler.inner.sync.start_shutdown().await { + if let Err(err) = handler.inner.sync.shutdown().await { warn!("sync shutdown error: {:?}", err); } break @@ -737,7 +737,7 @@ async fn handle_connection( alpn: String, node: Arc>, gossip: Gossip, - sync: Engine, + sync: DocsEngine, ) -> Result<()> { match alpn.as_bytes() { GOSSIP_ALPN => gossip.handle_connection(connecting.await?).await?, diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index ba03e10486..56219110f1 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -17,7 +17,6 @@ use iroh_blobs::store::{ConsistencyCheckProgress, ExportFormat, ImportProgress, use iroh_blobs::util::progress::ProgressSender; use iroh_blobs::BlobFormat; use iroh_blobs::{ - hashseq::parse_hash_seq, provider::AddProgress, store::{Store as BaoStore, ValidateProgress}, util::progress::FlumeProgressSender, @@ -33,16 +32,13 @@ use quic_rpc::{ use tokio_util::task::LocalPoolHandle; use tracing::{debug, info}; -use crate::client::blobs::{ - BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption, -}; +use crate::client::blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}; use crate::client::tags::TagInfo; use crate::client::NodeStatus; use crate::rpc_protocol::{ BlobAddPathRequest, BlobAddPathResponse, BlobAddStreamRequest, BlobAddStreamResponse, BlobAddStreamUpdate, BlobConsistencyCheckRequest, BlobDeleteBlobRequest, BlobDownloadRequest, - BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobGetCollectionRequest, - BlobGetCollectionResponse, BlobListCollectionsRequest, BlobListIncompleteRequest, + BlobDownloadResponse, BlobExportRequest, BlobExportResponse, BlobListIncompleteRequest, BlobListRequest, BlobReadAtRequest, BlobReadAtResponse, BlobValidateRequest, CreateCollectionRequest, CreateCollectionResponse, DeleteTagRequest, DocExportFileRequest, DocExportFileResponse, DocImportFileRequest, DocImportFileResponse, DocSetHashRequest, @@ -54,6 +50,8 @@ use crate::rpc_protocol::{ use super::NodeInner; +mod docs; + const HEALTH_POLL_WAIT: Duration = Duration::from_secs(1); /// Chunk size for getting blobs over RPC const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; @@ -93,12 +91,7 @@ impl Handler { chan.server_streaming(msg, handler, Self::blob_list_incomplete) .await } - BlobListCollections(msg) => { - chan.server_streaming(msg, handler, Self::blob_list_collections) - .await - } CreateCollection(msg) => chan.rpc(msg, handler, Self::create_collection).await, - BlobGetCollection(msg) => chan.rpc(msg, handler, Self::blob_get_collection).await, ListTags(msg) => { chan.server_streaming(msg, handler, Self::blob_list_tags) .await @@ -346,39 +339,6 @@ impl Handler { Ok(()) } - async fn blob_list_collections_impl( - self, - co: &Co>, - ) -> anyhow::Result<()> { - let db = self.inner.db.clone(); - let local = self.inner.rt.clone(); - let tags = db.tags().await.unwrap(); - for item in tags { - let (name, HashAndFormat { hash, format }) = item?; - if !format.is_hash_seq() { - continue; - } - let Some(entry) = db.get(&hash).await? else { - continue; - }; - let count = local - .spawn_pinned(|| async move { - let reader = entry.data_reader().await?; - let (_collection, count) = parse_hash_seq(reader).await?; - anyhow::Ok(count) - }) - .await??; - co.yield_(Ok(CollectionInfo { - tag: name, - hash, - total_blobs_count: Some(count), - total_blobs_size: None, - })) - .await; - } - Ok(()) - } - fn blob_list( self, _msg: BlobListRequest, @@ -401,17 +361,6 @@ impl Handler { }) } - fn blob_list_collections( - self, - _msg: BlobListCollectionsRequest, - ) -> impl Stream> + Send + 'static { - Gen::new(move |co| async move { - if let Err(e) = self.blob_list_collections_impl(&co).await { - co.yield_(Err(e.into())).await; - } - }) - } - async fn blob_delete_tag(self, msg: DeleteTagRequest) -> RpcResult<()> { self.inner.db.set_tag(msg.name, None).await?; Ok(()) @@ -422,15 +371,16 @@ impl Handler { Ok(()) } - fn blob_list_tags(self, _msg: ListTagsRequest) -> impl Stream + Send + 'static { + fn blob_list_tags(self, msg: ListTagsRequest) -> impl Stream + Send + 'static { tracing::info!("blob_list_tags"); Gen::new(|co| async move { let tags = self.inner.db.tags().await.unwrap(); #[allow(clippy::manual_flatten)] for item in tags { if let Ok((name, HashAndFormat { hash, format })) = item { - tracing::info!("{:?} {} {:?}", name, hash, format); - co.yield_(TagInfo { name, hash, format }).await; + if (format.is_raw() && msg.raw) || (format.is_hash_seq() && msg.hash_seq) { + co.yield_(TagInfo { name, hash, format }).await; + } } } }) @@ -1042,21 +992,6 @@ impl Handler { Ok(CreateCollectionResponse { hash, tag }) } - - async fn blob_get_collection( - self, - req: BlobGetCollectionRequest, - ) -> RpcResult { - let hash = req.hash; - let db = self.inner.db.clone(); - let collection = self - .rt() - .spawn_pinned(move || async move { Collection::load(&db, &hash).await }) - .await - .map_err(|_| anyhow!("join failed"))??; - - Ok(BlobGetCollectionResponse { collection }) - } } async fn download( diff --git a/iroh/src/docs_engine/rpc.rs b/iroh/src/node/rpc/docs.rs similarity index 88% rename from iroh/src/docs_engine/rpc.rs rename to iroh/src/node/rpc/docs.rs index 76f2afd761..a0433a803e 100644 --- a/iroh/src/docs_engine/rpc.rs +++ b/iroh/src/node/rpc/docs.rs @@ -1,4 +1,4 @@ -//! This module contains an impl block on [`Engine`] with handlers for RPC requests +//! This module contains an impl block on [`DocsEngine`] with handlers for RPC requests use anyhow::anyhow; use futures_lite::Stream; @@ -7,33 +7,28 @@ use iroh_docs::{Author, DocTicket, NamespaceSecret}; use tokio_stream::StreamExt; use crate::client::docs::ShareMode; +use crate::node::DocsEngine; use crate::rpc_protocol::{ - AuthorDeleteRequest, AuthorDeleteResponse, AuthorExportRequest, AuthorExportResponse, - AuthorGetDefaultRequest, AuthorGetDefaultResponse, AuthorImportRequest, AuthorImportResponse, - AuthorSetDefaultRequest, AuthorSetDefaultResponse, DocGetSyncPeersRequest, - DocGetSyncPeersResponse, -}; -use crate::{ - docs_engine::Engine, - rpc_protocol::{ - AuthorCreateRequest, AuthorCreateResponse, AuthorListRequest, AuthorListResponse, - DocCloseRequest, DocCloseResponse, DocCreateRequest, DocCreateResponse, DocDelRequest, - DocDelResponse, DocDropRequest, DocDropResponse, DocGetDownloadPolicyRequest, - DocGetDownloadPolicyResponse, DocGetExactRequest, DocGetExactResponse, DocGetManyRequest, - DocGetManyResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, DocLeaveResponse, - DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, - DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, DocSetHashRequest, - DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, - DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, DocStatusResponse, - DocSubscribeRequest, DocSubscribeResponse, RpcResult, - }, + AuthorCreateRequest, AuthorCreateResponse, AuthorDeleteRequest, AuthorDeleteResponse, + AuthorExportRequest, AuthorExportResponse, AuthorGetDefaultRequest, AuthorGetDefaultResponse, + AuthorImportRequest, AuthorImportResponse, AuthorListRequest, AuthorListResponse, + AuthorSetDefaultRequest, AuthorSetDefaultResponse, DocCloseRequest, DocCloseResponse, + DocCreateRequest, DocCreateResponse, DocDelRequest, DocDelResponse, DocDropRequest, + DocDropResponse, DocGetDownloadPolicyRequest, DocGetDownloadPolicyResponse, DocGetExactRequest, + DocGetExactResponse, DocGetManyRequest, DocGetManyResponse, DocGetSyncPeersRequest, + DocGetSyncPeersResponse, DocImportRequest, DocImportResponse, DocLeaveRequest, + DocLeaveResponse, DocListRequest, DocListResponse, DocOpenRequest, DocOpenResponse, + DocSetDownloadPolicyRequest, DocSetDownloadPolicyResponse, DocSetHashRequest, + DocSetHashResponse, DocSetRequest, DocSetResponse, DocShareRequest, DocShareResponse, + DocStartSyncRequest, DocStartSyncResponse, DocStatusRequest, DocStatusResponse, + DocSubscribeRequest, DocSubscribeResponse, RpcResult, }; /// Capacity for the flume channels to forward sync store iterators to async RPC streams. const ITER_CHANNEL_CAP: usize = 64; #[allow(missing_docs)] -impl Engine { +impl DocsEngine { pub async fn author_create( &self, _req: AuthorCreateRequest, diff --git a/iroh/src/rpc_protocol.rs b/iroh/src/rpc_protocol.rs index 7bfb5d60b3..8fe71e7d6a 100644 --- a/iroh/src/rpc_protocol.rs +++ b/iroh/src/rpc_protocol.rs @@ -41,15 +41,13 @@ use serde::{Deserialize, Serialize}; pub use iroh_base::rpc::{RpcError, RpcResult}; use iroh_blobs::store::{ExportFormat, ExportMode}; pub use iroh_blobs::{provider::AddProgress, store::ValidateProgress}; +use iroh_docs::engine::LiveEvent; -use crate::{ - client::{ - blobs::{BlobInfo, CollectionInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, - docs::{ImportProgress, ShareMode}, - tags::TagInfo, - NodeStatus, - }, - docs_engine::LiveEvent, +use crate::client::{ + blobs::{BlobInfo, DownloadMode, IncompleteBlobInfo, WrapOption}, + docs::{ImportProgress, ShareMode}, + tags::TagInfo, + NodeStatus, }; pub use iroh_blobs::util::SetTagOption; @@ -207,22 +205,39 @@ impl ServerStreamingMsg for BlobListIncompleteRequest { /// /// Lists all collections that have been explicitly added to the database. #[derive(Debug, Serialize, Deserialize)] -pub struct BlobListCollectionsRequest; - -impl Msg for BlobListCollectionsRequest { - type Pattern = ServerStreaming; +pub struct ListTagsRequest { + /// List raw tags + pub raw: bool, + /// List hash seq tags + pub hash_seq: bool, +} + +impl ListTagsRequest { + /// List all tags + pub fn all() -> Self { + Self { + raw: true, + hash_seq: true, + } + } + + /// List raw tags + pub fn raw() -> Self { + Self { + raw: true, + hash_seq: false, + } + } + + /// List hash seq tags + pub fn hash_seq() -> Self { + Self { + raw: false, + hash_seq: true, + } + } } -impl ServerStreamingMsg for BlobListCollectionsRequest { - type Response = RpcResult; -} - -/// List all collections -/// -/// Lists all collections that have been explicitly added to the database. -#[derive(Debug, Serialize, Deserialize)] -pub struct ListTagsRequest; - impl Msg for ListTagsRequest { type Pattern = ServerStreaming; } @@ -252,25 +267,6 @@ pub struct DeleteTagRequest { impl RpcMsg for DeleteTagRequest { type Response = RpcResult<()>; } - -/// Get a collection -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionRequest { - /// Hash of the collection - pub hash: Hash, -} - -impl RpcMsg for BlobGetCollectionRequest { - type Response = RpcResult; -} - -/// The response for a `BlobGetCollectionRequest`. -#[derive(Debug, Serialize, Deserialize)] -pub struct BlobGetCollectionResponse { - /// The collection. - pub collection: Collection, -} - /// Create a collection. #[derive(Debug, Serialize, Deserialize)] pub struct CreateCollectionRequest { @@ -1065,12 +1061,10 @@ pub enum Request { BlobExport(BlobExportRequest), BlobList(BlobListRequest), BlobListIncomplete(BlobListIncompleteRequest), - BlobListCollections(BlobListCollectionsRequest), BlobDeleteBlob(BlobDeleteBlobRequest), BlobValidate(BlobValidateRequest), BlobFsck(BlobConsistencyCheckRequest), CreateCollection(CreateCollectionRequest), - BlobGetCollection(BlobGetCollectionRequest), DeleteTag(DeleteTagRequest), ListTags(ListTagsRequest), @@ -1125,13 +1119,11 @@ pub enum Response { BlobAddPath(BlobAddPathResponse), BlobList(RpcResult), BlobListIncomplete(RpcResult), - BlobListCollections(RpcResult), BlobDownload(BlobDownloadResponse), BlobFsck(ConsistencyCheckProgress), BlobExport(BlobExportResponse), BlobValidate(ValidateProgress), CreateCollection(RpcResult), - BlobGetCollection(RpcResult), ListTags(TagInfo), DeleteTag(RpcResult<()>), diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 556f5829a7..afa2591588 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -973,6 +973,44 @@ async fn sync_big() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg(feature = "test-utils")] +async fn test_list_docs_stream() -> Result<()> { + let node = Node::memory() + .node_discovery(iroh::node::DiscoveryConfig::None) + .relay_mode(iroh::net::relay::RelayMode::Disabled) + .spawn() + .await?; + let count = 200; + + // create docs + for _i in 0..count { + let doc = node.docs.create().await?; + doc.close().await?; + } + + // create doc stream + let mut stream = node.docs.list().await?; + + // process each doc and call into the docs actor. + // this makes sure that we don't deadlock the docs actor. + let mut i = 0; + let fut = async { + while let Some((id, _)) = stream.try_next().await.unwrap() { + let _doc = node.docs.open(id).await.unwrap().unwrap(); + i += 1; + } + }; + + tokio::time::timeout(Duration::from_secs(2), fut) + .await + .expect("not to timeout"); + + assert_eq!(i, count); + + Ok(()) +} + /// Get all entries of a document. async fn get_all(doc: &MemDoc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?;