Skip to content

Commit

Permalink
fix: deadlock for list_docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 4, 2024
1 parent 3772889 commit 0addb83
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 23 deletions.
67 changes: 57 additions & 10 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::{

use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_base::hash::Hash;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};

use crate::{
Expand Down Expand Up @@ -253,6 +254,7 @@ impl SyncHandle {
states: Default::default(),
action_rx,
content_status_callback,
tasks: Default::default(),
};
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
Expand All @@ -275,8 +277,12 @@ impl SyncHandle {
pub async fn open(&self, namespace: NamespaceId, opts: OpenOpts) -> Result<()> {
let (reply, rx) = oneshot::channel();
let action = ReplicaAction::Open { reply, opts };
tracing::debug!("SyncHandle::open IN");
self.send_replica(namespace, action).await?;
rx.await?
tracing::debug!("SyncHandle::open MID");
let res = rx.await?;
tracing::debug!("SyncHandle::open OUT");
res
}

pub async fn close(&self, namespace: NamespaceId) -> Result<bool> {
Expand Down Expand Up @@ -570,22 +576,37 @@ struct Actor {
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}

impl Actor {
fn run(mut self) -> Result<()> {
fn run(self) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
let local_set = tokio::task::LocalSet::new();
local_set.block_on(&rt, async move { self.run_async().await })
}
async fn run_async(mut self) -> Result<()> {
loop {
let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
Ok(action) => action,
Err(flume::RecvTimeoutError::Timeout) => {
let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
tokio::pin!(timeout);
let action = tokio::select! {
_ = &mut timeout => {
if let Err(cause) = self.store.flush() {
error!(?cause, "failed to flush store");
}
continue;
}
Err(flume::RecvTimeoutError::Disconnected) => {
debug!("action channel disconnected");
break;
action = self.action_rx.recv_async() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
debug!("action channel disconnected");
break;
}

}
}
};
trace!(%action, "tick");
Expand All @@ -607,6 +628,7 @@ impl Actor {
}
}
}
self.tasks.abort_all();
debug!("shutdown");
Ok(())
}
Expand Down Expand Up @@ -642,7 +664,12 @@ impl Actor {
.list_authors()
.map(|a| a.map(|a| a.map(|a| a.id()))),
),
Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
Action::ListReplicas { reply } => {
let iter = self.store.list_namespaces();
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ContentHashes { reply } => {
send_reply_with(reply, self, |this| this.store.content_hashes())
}
Expand All @@ -657,7 +684,9 @@ impl Actor {
) -> Result<(), SendReplyError> {
match action {
ReplicaAction::Open { reply, opts } => {
tracing::trace!("open in");
let res = self.open(namespace, opts);
tracing::trace!("open out");
send_reply(reply, res)
}
ReplicaAction::Close { reply } => {
Expand Down Expand Up @@ -936,6 +965,24 @@ fn iter_to_channel<T: Send + 'static>(
Ok(())
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send_async(item).await.map_err(send_reply_error)?;
}
}
}
Ok(())
}

fn get_author(store: &mut Store, id: &AuthorId) -> Result<Author> {
store.get_author(id)?.context("author not found")
}
Expand Down
33 changes: 20 additions & 13 deletions iroh-docs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ impl Store {
}

type AuthorsIter = std::vec::IntoIter<Result<Author>>;
type NamespaceIter = std::vec::IntoIter<Result<(NamespaceId, CapabilityKind)>>;
type PeersIter = std::vec::IntoIter<PeerIdBytes>;

impl Store {
Expand Down Expand Up @@ -297,18 +296,26 @@ impl Store {
}

/// List all replica namespaces in this store.
pub fn list_namespaces(&mut self) -> Result<NamespaceIter> {
// TODO: avoid collect
let tables = self.tables()?;
let namespaces: Vec<_> = tables
.namespaces
.iter()?
.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
})
.collect();
Ok(namespaces.into_iter())
pub fn list_namespaces(
&mut self,
) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
let snapshot = self.snapshot()?;
let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
let iter = iter.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
});
Ok(iter)
// let tables = self.tables()?;
// let namespaces: Vec<_> = tables
// .namespaces
// .iter()?
// .map(|res| {
// let capability = parse_capability(res?.1.value())?;
// Ok((capability.id(), capability.kind()))
// })
// .collect();
// Ok(namespaces.into_iter())
}

/// Get an author key from the store.
Expand Down
2 changes: 2 additions & 0 deletions iroh/src/node/rpc/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,9 @@ impl DocsEngine {
}

pub async fn doc_open(&self, req: DocOpenRequest) -> RpcResult<DocOpenResponse> {
tracing::debug!("doc_open IN");
self.sync.open(req.doc_id, Default::default()).await?;
tracing::debug!("doc_open OUT");
Ok(DocOpenResponse {})
}

Expand Down

0 comments on commit 0addb83

Please sign in to comment.