Skip to content

Commit

Permalink
fix(docs): prevent deadlocks with streams returned from docs actor (#…
Browse files Browse the repository at this point in the history
…2346)

## Description

Fixes #2345 

The iroh-docs actor loop can easily be deadlocked from the client side:
If you call any RPC method that returns a stream, and the stream is
longer than what the RPC layer buffers, and you call and await any other
docs method *while consuming the stream*, the docs actor will deadlock.
(It will only happen though if the stream is longer than the capacity of
the intermediate channel that goes from the actor to the RPC layer,
which is why this does not *always* happen)

This is the case for all methods that return iterators. The solution is
twofold:

* Run single-threaded executor in iroh-docs actor loop
* For actions returning iterators/streams, spawn a task on that executor
to forward the store iterator into the stream, yielding when the
receiver is not consuming fast enough

To be able to spawn the iterators onto a task, they have to be
`'static`. Which they can be - but only when operating on snapshots.

So this PR fixes the potential for deadlock. It has the downside,
however, that whenever calling a docs client function that returns an
iterator, the current write transaction will be committed first, which
has a perfomance penalty. However this is preferable to deadlocks, IMO.

## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

This will need tests and likely documentation of the perfomance
implications.

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
  • Loading branch information
Frando authored Jun 6, 2024
1 parent 7153a38 commit 98914ee
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 94 deletions.
6 changes: 3 additions & 3 deletions iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25", optional = true }
futures-util = { version = "0.3.25" }
hex = "0.4"
iroh-base = { version = "0.17.0", path = "../iroh-base" }
iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] }
Expand All @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
tempfile = { version = "3.4" }
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tokio = { version = "1", features = ["sync", "rt", "time", "macros"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tracing = "0.1"
Expand All @@ -57,7 +57,7 @@ test-strategy = "0.3.1"

[features]
default = ["net", "metrics", "engine"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"]
metrics = ["dep:iroh-metrics"]
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"]

Expand Down
69 changes: 51 additions & 18 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ use std::{

use anyhow::{anyhow, Context, Result};
use bytes::Bytes;
use futures_util::FutureExt;
use iroh_base::hash::Hash;
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::{sync::oneshot, task::JoinSet};
use tracing::{debug, error, error_span, trace, warn};

use crate::{
Expand Down Expand Up @@ -253,6 +254,7 @@ impl SyncHandle {
states: Default::default(),
action_rx,
content_status_callback,
tasks: Default::default(),
};
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
Expand Down Expand Up @@ -570,22 +572,37 @@ struct Actor {
states: OpenReplicas,
action_rx: flume::Receiver<Action>,
content_status_callback: Option<ContentStatusCallback>,
tasks: JoinSet<()>,
}

impl Actor {
fn run(mut self) -> Result<()> {
fn run(self) -> Result<()> {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()?;
let local_set = tokio::task::LocalSet::new();
local_set.block_on(&rt, async move { self.run_async().await })
}
async fn run_async(mut self) -> Result<()> {
loop {
let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) {
Ok(action) => action,
Err(flume::RecvTimeoutError::Timeout) => {
let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
tokio::pin!(timeout);
let action = tokio::select! {
_ = &mut timeout => {
if let Err(cause) = self.store.flush() {
error!(?cause, "failed to flush store");
}
continue;
}
Err(flume::RecvTimeoutError::Disconnected) => {
debug!("action channel disconnected");
break;
action = self.action_rx.recv_async() => {
match action {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
debug!("action channel disconnected");
break;
}

}
}
};
trace!(%action, "tick");
Expand All @@ -607,6 +624,7 @@ impl Actor {
}
}
}
self.tasks.abort_all();
debug!("shutdown");
Ok(())
}
Expand Down Expand Up @@ -636,13 +654,21 @@ impl Actor {
}
Ok(id)
}),
Action::ListAuthors { reply } => iter_to_channel(
reply,
self.store
Action::ListAuthors { reply } => {
let iter = self
.store
.list_authors()
.map(|a| a.map(|a| a.map(|a| a.id()))),
),
Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()),
.map(|a| a.map(|a| a.map(|a| a.id())));
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ListReplicas { reply } => {
let iter = self.store.list_namespaces();
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ContentHashes { reply } => {
send_reply_with(reply, self, |this| this.store.content_hashes())
}
Expand All @@ -657,7 +683,9 @@ impl Actor {
) -> Result<(), SendReplyError> {
match action {
ReplicaAction::Open { reply, opts } => {
tracing::trace!("open in");
let res = self.open(namespace, opts);
tracing::trace!("open out");
send_reply(reply, res)
}
ReplicaAction::Close { reply } => {
Expand Down Expand Up @@ -759,7 +787,9 @@ impl Actor {
.states
.ensure_open(&namespace)
.and_then(|_| self.store.get_many(namespace, query));
iter_to_channel(reply, iter)
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
this.close(namespace);
Expand Down Expand Up @@ -921,15 +951,18 @@ impl OpenReplicas {
}
}

fn iter_to_channel<T: Send + 'static>(
async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
Err(err) => channel
.send_async(Err(err))
.await
.map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send(item).map_err(send_reply_error)?;
channel.send_async(item).await.map_err(send_reply_error)?;
}
}
}
Expand Down
88 changes: 44 additions & 44 deletions iroh-docs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,22 @@ impl Store {
}
}

/// Get an owned read-only snapshot of the database.
///
/// This will open a new read transaction. The read transaction won't be reused for other
/// reads.
///
/// This has the side effect of committing any open write transaction,
/// so it can be used as a way to ensure that the data is persisted.
pub fn snapshot_owned(&mut self) -> Result<ReadOnlyTables> {
// make sure the current transaction is committed
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
Ok(tables)
}

/// Get access to the tables to read from them.
///
/// The underlying transaction is a write transaction, but with a non-mut
Expand Down Expand Up @@ -223,8 +239,6 @@ impl Store {
}
}

type AuthorsIter = std::vec::IntoIter<Result<Author>>;
type NamespaceIter = std::vec::IntoIter<Result<(NamespaceId, CapabilityKind)>>;
type PeersIter = std::vec::IntoIter<PeerIdBytes>;

impl Store {
Expand Down Expand Up @@ -297,18 +311,16 @@ impl Store {
}

/// List all replica namespaces in this store.
pub fn list_namespaces(&mut self) -> Result<NamespaceIter> {
// TODO: avoid collect
let tables = self.tables()?;
let namespaces: Vec<_> = tables
.namespaces
.iter()?
.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
})
.collect();
Ok(namespaces.into_iter())
pub fn list_namespaces(
&mut self,
) -> Result<impl Iterator<Item = Result<(NamespaceId, CapabilityKind)>>> {
let snapshot = self.snapshot()?;
let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?;
let iter = iter.map(|res| {
let capability = parse_capability(res?.1.value())?;
Ok((capability.id(), capability.kind()))
});
Ok(iter)
}

/// Get an author key from the store.
Expand Down Expand Up @@ -340,19 +352,16 @@ impl Store {
}

/// List all author keys in this store.
pub fn list_authors(&mut self) -> Result<AuthorsIter> {
// TODO: avoid collect
let tables = self.tables()?;
let authors: Vec<_> = tables
pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
let tables = self.snapshot()?;
let iter = tables
.authors
.iter()?
.range::<&'static [u8; 32]>(..)?
.map(|res| match res {
Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
Err(err) => Err(err.into()),
})
.collect();

Ok(authors.into_iter())
});
Ok(iter)
}

/// Import a new replica namespace.
Expand Down Expand Up @@ -413,7 +422,8 @@ impl Store {
namespace: NamespaceId,
query: impl Into<Query>,
) -> Result<QueryIterator> {
QueryIterator::new(self.tables()?, namespace, query.into())
let tables = self.snapshot_owned()?;
QueryIterator::new(tables, namespace, query.into())
}

/// Get an entry by key and author.
Expand All @@ -435,13 +445,8 @@ impl Store {

/// Get all content hashes of all replicas in the store.
pub fn content_hashes(&mut self) -> Result<ContentHashesIterator> {
// make sure the current transaction is committed
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
let records = tables.records;
ContentHashesIterator::all(records)
let tables = self.snapshot_owned()?;
ContentHashesIterator::all(&tables.records)
}

/// Get the latest entry for each author in a namespace.
Expand Down Expand Up @@ -870,14 +875,6 @@ impl Iterator for ParentIterator {
}
}

self_cell::self_cell!(
struct ContentHashesIteratorInner {
owner: RecordsTable,
#[covariant]
dependent: RecordsRange,
}
);

/// Iterator for all content hashes
///
/// Note that you might get duplicate hashes. Also, the iterator will keep
Expand All @@ -886,21 +883,24 @@ self_cell::self_cell!(
/// Also, this represents a snapshot of the database at the time of creation.
/// It nees a copy of a redb::ReadOnlyTable to be self-contained.
#[derive(derive_more::Debug)]
pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner);
pub struct ContentHashesIterator {
#[debug(skip)]
range: RecordsRange<'static>,
}

impl ContentHashesIterator {
/// Create a new iterator over all content hashes.
pub fn all(owner: RecordsTable) -> anyhow::Result<Self> {
let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?;
Ok(Self(inner))
pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
let range = RecordsRange::all_static(table)?;
Ok(Self { range })
}
}

impl Iterator for ContentHashesIterator {
type Item = Result<Hash>;

fn next(&mut self) -> Option<Self::Item> {
let v = self.0.with_dependent_mut(|_, d| d.next())?;
let v = self.range.next()?;
Some(v.map(|e| e.content_hash()))
}
}
Expand Down
Loading

0 comments on commit 98914ee

Please sign in to comment.