diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..cba2c2bec8 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -275,8 +277,12 @@ impl SyncHandle { pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> { let (reply, rx) = oneshot::channel(); let action = ReplicaAction::Open { reply, opts }; + tracing::debug!("SyncHandle::open IN"); self.send_replica(namespace, action).await?; - rx.await? + tracing::debug!("SyncHandle::open MID"); + let res = rx.await?; + tracing::debug!("SyncHandle::open OUT"); + res } pub async fn close(&self, namespace: NamespaceId) -> Result { @@ -570,22 +576,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +628,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -642,7 +664,12 @@ impl Actor { .list_authors() .map(|a| a.map(|a| a.map(|a| a.id()))), ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +684,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -936,6 +965,24 @@ fn iter_to_channel( Ok(()) } +async fn iter_to_channel_async( + channel: flume::Sender>, + iter: Result>>, +) -> Result<(), SendReplyError> { + match iter { + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, + Ok(iter) => { + for item in iter { + channel.send_async(item).await.map_err(send_reply_error)?; + } + } + } + Ok(()) +} + fn get_author(store: &mut Store, id: &AuthorId) -> Result { store.get_author(id)?.context("author not found") } diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..a81b38a779 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -224,7 +224,6 @@ impl Store { } type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +296,26 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) + // let tables = self.tables()?; + // let namespaces: Vec<_> = tables + // .namespaces + // .iter()? + // .map(|res| { + // let capability = parse_capability(res?.1.value())?; + // Ok((capability.id(), capability.kind())) + // }) + // .collect(); + // Ok(namespaces.into_iter()) } /// Get an author key from the store. diff --git a/iroh/src/node/rpc/docs.rs b/iroh/src/node/rpc/docs.rs index a0433a803e..2b64bf0b3c 100644 --- a/iroh/src/node/rpc/docs.rs +++ b/iroh/src/node/rpc/docs.rs @@ -126,7 +126,9 @@ impl DocsEngine { } pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult { + tracing::debug!("doc_open IN"); self.sync.open(req.doc_id, Default::default()).await?; + tracing::debug!("doc_open OUT"); Ok(DocOpenResponse {}) }