From 98914ee4dcdb78f7477311f933d84f4f2478e168 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Thu, 6 Jun 2024 09:02:00 +0200 Subject: [PATCH] fix(docs): prevent deadlocks with streams returned from docs actor (#2346) ## Description Fixes #2345 The iroh-docs actor loop can easily be deadlocked from the client side: If you call any RPC method that returns a stream, and the stream is longer than what the RPC layer buffers, and you call and await any other docs method *while consuming the stream*, the docs actor will deadlock. (It will only happen though if the stream is longer than the capacity of the intermediate channel that goes from the actor to the RPC layer, which is why this does not *always* happen) This is the case for all methods that return iterators. The solution is twofold: * Run single-threaded executor in iroh-docs actor loop * For actions returning iterators/streams, spawn a task on that executor to forward the store iterator into the stream, yielding when the receiver is not consuming fast enough To be able to spawn the iterators onto a task, they have to be `'static`. Which they can be - but only when operating on snapshots. So this PR fixes the potential for deadlock. It has the downside, however, that whenever calling a docs client function that returns an iterator, the current write transaction will be committed first, which has a perfomance penalty. However this is preferable to deadlocks, IMO. ## Breaking Changes ## Notes & open questions This will need tests and likely documentation of the perfomance implications. ## Change checklist - [ ] Self-review. - [ ] Documentation updates if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --- iroh-docs/Cargo.toml | 6 +-- iroh-docs/src/actor.rs | 69 ++++++++++++++++++------- iroh-docs/src/store/fs.rs | 88 ++++++++++++++++---------------- iroh-docs/src/store/fs/query.rs | 27 +++++----- iroh-docs/src/store/fs/ranges.rs | 40 ++++++++++----- iroh/tests/sync.rs | 38 ++++++++++++++ 6 files changed, 174 insertions(+), 94 deletions(-) diff --git a/iroh-docs/Cargo.toml b/iroh-docs/Cargo.toml index 005d2e1ea3..f08c97fee0 100644 --- a/iroh-docs/Cargo.toml +++ b/iroh-docs/Cargo.toml @@ -23,7 +23,7 @@ ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] } flume = "0.11" futures-buffered = "0.2.4" futures-lite = "2.3.0" -futures-util = { version = "0.3.25", optional = true } +futures-util = { version = "0.3.25" } hex = "0.4" iroh-base = { version = "0.17.0", path = "../iroh-base" } iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] } @@ -42,7 +42,7 @@ serde = { version = "1.0.164", features = ["derive"] } strum = { version = "0.25", features = ["derive"] } tempfile = { version = "3.4" } thiserror = "1" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "rt", "time", "macros"] } tokio-stream = { version = "0.1", optional = true, features = ["sync"]} tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] } tracing = "0.1" @@ -57,7 +57,7 @@ test-strategy = "0.3.1" [features] default = ["net", "metrics", "engine"] -net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"] +net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util"] metrics = ["dep:iroh-metrics"] engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"] diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index bbe91181cb..a48e8f55b3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -10,9 +10,10 @@ use std::{ use anyhow::{anyhow, Context, Result}; use bytes::Bytes; +use futures_util::FutureExt; use iroh_base::hash::Hash; use serde::{Deserialize, Serialize}; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, task::JoinSet}; use tracing::{debug, error, error_span, trace, warn}; use crate::{ @@ -253,6 +254,7 @@ impl SyncHandle { states: Default::default(), action_rx, content_status_callback, + tasks: Default::default(), }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) @@ -570,22 +572,37 @@ struct Actor { states: OpenReplicas, action_rx: flume::Receiver, content_status_callback: Option, + tasks: JoinSet<()>, } impl Actor { - fn run(mut self) -> Result<()> { + fn run(self) -> Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .build()?; + let local_set = tokio::task::LocalSet::new(); + local_set.block_on(&rt, async move { self.run_async().await }) + } + async fn run_async(mut self) -> Result<()> { loop { - let action = match self.action_rx.recv_timeout(MAX_COMMIT_DELAY) { - Ok(action) => action, - Err(flume::RecvTimeoutError::Timeout) => { + let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); + tokio::pin!(timeout); + let action = tokio::select! { + _ = &mut timeout => { if let Err(cause) = self.store.flush() { error!(?cause, "failed to flush store"); } continue; } - Err(flume::RecvTimeoutError::Disconnected) => { - debug!("action channel disconnected"); - break; + action = self.action_rx.recv_async() => { + match action { + Ok(action) => action, + Err(flume::RecvError::Disconnected) => { + debug!("action channel disconnected"); + break; + } + + } } }; trace!(%action, "tick"); @@ -607,6 +624,7 @@ impl Actor { } } } + self.tasks.abort_all(); debug!("shutdown"); Ok(()) } @@ -636,13 +654,21 @@ impl Actor { } Ok(id) }), - Action::ListAuthors { reply } => iter_to_channel( - reply, - self.store + Action::ListAuthors { reply } => { + let iter = self + .store .list_authors() - .map(|a| a.map(|a| a.map(|a| a.id()))), - ), - Action::ListReplicas { reply } => iter_to_channel(reply, self.store.list_namespaces()), + .map(|a| a.map(|a| a.map(|a| a.id()))); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } + Action::ListReplicas { reply } => { + let iter = self.store.list_namespaces(); + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) + } Action::ContentHashes { reply } => { send_reply_with(reply, self, |this| this.store.content_hashes()) } @@ -657,7 +683,9 @@ impl Actor { ) -> Result<(), SendReplyError> { match action { ReplicaAction::Open { reply, opts } => { + tracing::trace!("open in"); let res = self.open(namespace, opts); + tracing::trace!("open out"); send_reply(reply, res) } ReplicaAction::Close { reply } => { @@ -759,7 +787,9 @@ impl Actor { .states .ensure_open(&namespace) .and_then(|_| self.store.get_many(namespace, query)); - iter_to_channel(reply, iter) + self.tasks + .spawn_local(iter_to_channel_async(reply, iter).map(|_| ())); + Ok(()) } ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| { this.close(namespace); @@ -921,15 +951,18 @@ impl OpenReplicas { } } -fn iter_to_channel( +async fn iter_to_channel_async( channel: flume::Sender>, iter: Result>>, ) -> Result<(), SendReplyError> { match iter { - Err(err) => channel.send(Err(err)).map_err(send_reply_error)?, + Err(err) => channel + .send_async(Err(err)) + .await + .map_err(send_reply_error)?, Ok(iter) => { for item in iter { - channel.send(item).map_err(send_reply_error)?; + channel.send_async(item).await.map_err(send_reply_error)?; } } } diff --git a/iroh-docs/src/store/fs.rs b/iroh-docs/src/store/fs.rs index ab1171b756..981143ca86 100644 --- a/iroh-docs/src/store/fs.rs +++ b/iroh-docs/src/store/fs.rs @@ -154,6 +154,22 @@ impl Store { } } + /// Get an owned read-only snapshot of the database. + /// + /// This will open a new read transaction. The read transaction won't be reused for other + /// reads. + /// + /// This has the side effect of committing any open write transaction, + /// so it can be used as a way to ensure that the data is persisted. + pub fn snapshot_owned(&mut self) -> Result { + // make sure the current transaction is committed + self.flush()?; + assert!(matches!(self.transaction, CurrentTransaction::None)); + let tx = self.db.begin_read()?; + let tables = ReadOnlyTables::new(tx)?; + Ok(tables) + } + /// Get access to the tables to read from them. /// /// The underlying transaction is a write transaction, but with a non-mut @@ -223,8 +239,6 @@ impl Store { } } -type AuthorsIter = std::vec::IntoIter>; -type NamespaceIter = std::vec::IntoIter>; type PeersIter = std::vec::IntoIter; impl Store { @@ -297,18 +311,16 @@ impl Store { } /// List all replica namespaces in this store. - pub fn list_namespaces(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let namespaces: Vec<_> = tables - .namespaces - .iter()? - .map(|res| { - let capability = parse_capability(res?.1.value())?; - Ok((capability.id(), capability.kind())) - }) - .collect(); - Ok(namespaces.into_iter()) + pub fn list_namespaces( + &mut self, + ) -> Result>> { + let snapshot = self.snapshot()?; + let iter = snapshot.namespaces.range::<&'static [u8; 32]>(..)?; + let iter = iter.map(|res| { + let capability = parse_capability(res?.1.value())?; + Ok((capability.id(), capability.kind())) + }); + Ok(iter) } /// Get an author key from the store. @@ -340,19 +352,16 @@ impl Store { } /// List all author keys in this store. - pub fn list_authors(&mut self) -> Result { - // TODO: avoid collect - let tables = self.tables()?; - let authors: Vec<_> = tables + pub fn list_authors(&mut self) -> Result>> { + let tables = self.snapshot()?; + let iter = tables .authors - .iter()? + .range::<&'static [u8; 32]>(..)? .map(|res| match res { Ok((_key, value)) => Ok(Author::from_bytes(value.value())), Err(err) => Err(err.into()), - }) - .collect(); - - Ok(authors.into_iter()) + }); + Ok(iter) } /// Import a new replica namespace. @@ -413,7 +422,8 @@ impl Store { namespace: NamespaceId, query: impl Into, ) -> Result { - QueryIterator::new(self.tables()?, namespace, query.into()) + let tables = self.snapshot_owned()?; + QueryIterator::new(tables, namespace, query.into()) } /// Get an entry by key and author. @@ -435,13 +445,8 @@ impl Store { /// Get all content hashes of all replicas in the store. pub fn content_hashes(&mut self) -> Result { - // make sure the current transaction is committed - self.flush()?; - assert!(matches!(self.transaction, CurrentTransaction::None)); - let tx = self.db.begin_read()?; - let tables = ReadOnlyTables::new(tx)?; - let records = tables.records; - ContentHashesIterator::all(records) + let tables = self.snapshot_owned()?; + ContentHashesIterator::all(&tables.records) } /// Get the latest entry for each author in a namespace. @@ -870,14 +875,6 @@ impl Iterator for ParentIterator { } } -self_cell::self_cell!( - struct ContentHashesIteratorInner { - owner: RecordsTable, - #[covariant] - dependent: RecordsRange, - } -); - /// Iterator for all content hashes /// /// Note that you might get duplicate hashes. Also, the iterator will keep @@ -886,13 +883,16 @@ self_cell::self_cell!( /// Also, this represents a snapshot of the database at the time of creation. /// It nees a copy of a redb::ReadOnlyTable to be self-contained. #[derive(derive_more::Debug)] -pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner); +pub struct ContentHashesIterator { + #[debug(skip)] + range: RecordsRange<'static>, +} impl ContentHashesIterator { /// Create a new iterator over all content hashes. - pub fn all(owner: RecordsTable) -> anyhow::Result { - let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?; - Ok(Self(inner)) + pub fn all(table: &RecordsTable) -> anyhow::Result { + let range = RecordsRange::all_static(table)?; + Ok(Self { range }) } } @@ -900,7 +900,7 @@ impl Iterator for ContentHashesIterator { type Item = Result; fn next(&mut self) -> Option { - let v = self.0.with_dependent_mut(|_, d| d.next())?; + let v = self.range.next()?; Some(v.map(|e| e.content_hash())) } } diff --git a/iroh-docs/src/store/fs/query.rs b/iroh-docs/src/store/fs/query.rs index a73dbcd8e7..f05b4ecfb3 100644 --- a/iroh-docs/src/store/fs/query.rs +++ b/iroh-docs/src/store/fs/query.rs @@ -3,6 +3,7 @@ use iroh_base::hash::Hash; use crate::{ store::{ + fs::tables::ReadOnlyTables, util::{IndexKind, LatestPerKeySelector, SelectorRes}, AuthorFilter, KeyFilter, Query, }, @@ -12,34 +13,33 @@ use crate::{ use super::{ bounds::{ByKeyBounds, RecordsBounds}, ranges::{RecordsByKeyRange, RecordsRange}, - tables::Tables, RecordsValue, }; /// A query iterator for entry queries. #[derive(Debug)] -pub struct QueryIterator<'a> { - range: QueryRange<'a>, +pub struct QueryIterator { + range: QueryRange, query: Query, offset: u64, count: u64, } #[derive(Debug)] -enum QueryRange<'a> { +enum QueryRange { AuthorKey { - range: RecordsRange<'a>, + range: RecordsRange<'static>, key_filter: KeyFilter, }, KeyAuthor { - range: RecordsByKeyRange<'a>, + range: RecordsByKeyRange, author_filter: AuthorFilter, selector: Option, }, } -impl<'a> QueryIterator<'a> { - pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result { +impl QueryIterator { + pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result { let index_kind = IndexKind::from(&query); let range = match index_kind { IndexKind::AuthorKey { range, key_filter } => { @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> { // no author set => full table scan with the provided key filter AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter), }; - let range = RecordsRange::with_bounds(&tables.records, bounds)?; + let range = RecordsRange::with_bounds_static(&tables.records, bounds)?; QueryRange::AuthorKey { range, key_filter: filter, @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> { latest_per_key, } => { let bounds = ByKeyBounds::new(namespace, &range); - let range = RecordsByKeyRange::with_bounds( - &tables.records_by_key, - &tables.records, - bounds, - )?; + let range = + RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?; let selector = latest_per_key.then(LatestPerKeySelector::default); QueryRange::KeyAuthor { author_filter, @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> { } } -impl<'a> Iterator for QueryIterator<'a> { +impl Iterator for QueryIterator { type Item = Result; fn next(&mut self) -> Option> { diff --git a/iroh-docs/src/store/fs/ranges.rs b/iroh-docs/src/store/fs/ranges.rs index 9219c620ac..f28d95ae63 100644 --- a/iroh-docs/src/store/fs/ranges.rs +++ b/iroh-docs/src/store/fs/ranges.rs @@ -1,6 +1,6 @@ //! Ranges and helpers for working with [`redb`] tables -use redb::{Key, Range, ReadableTable, Table, Value}; +use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value}; use crate::{store::SortDirection, SignedEntry}; @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt for Range<'a, K, V #[debug("RecordsRange")] pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>); -impl<'a> RecordsRange<'a> { - pub(super) fn all( - records: &'a impl ReadableTable, RecordsValue<'static>>, - ) -> anyhow::Result { - let range = records.range::>(..)?; - Ok(Self(range)) - } +// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>; +impl<'a> RecordsRange<'a> { pub(super) fn with_bounds( records: &'a impl ReadableTable, RecordsValue<'static>>, bounds: RecordsBounds, @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> { Ok(Self(range)) } + // /// Get the next item in the range. /// /// Omit items for which the `matcher` function returns false. @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> { } } +impl RecordsRange<'static> { + pub(super) fn all_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + ) -> anyhow::Result { + let range = records.range::>(..)?; + Ok(Self(range)) + } + pub(super) fn with_bounds_static( + records: &ReadOnlyTable, RecordsValue<'static>>, + bounds: RecordsBounds, + ) -> anyhow::Result { + let range = records.range(bounds.as_ref())?; + Ok(Self(range)) + } +} + impl<'a> Iterator for RecordsRange<'a> { type Item = anyhow::Result; fn next(&mut self) -> Option { @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> { #[derive(derive_more::Debug)] #[debug("RecordsByKeyRange")] -pub struct RecordsByKeyRange<'a> { - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, - by_key_range: Range<'a, RecordsByKeyId<'static>, ()>, +pub struct RecordsByKeyRange { + records_table: ReadOnlyTable, RecordsValue<'static>>, + by_key_range: Range<'static, RecordsByKeyId<'static>, ()>, } -impl<'a> RecordsByKeyRange<'a> { +impl RecordsByKeyRange { pub fn with_bounds( - records_by_key_table: &'a impl ReadableTable, ()>, - records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>, + records_by_key_table: ReadOnlyTable, ()>, + records_table: ReadOnlyTable, RecordsValue<'static>>, bounds: ByKeyBounds, ) -> anyhow::Result { let by_key_range = records_by_key_table.range(bounds.as_ref())?; diff --git a/iroh/tests/sync.rs b/iroh/tests/sync.rs index 556f5829a7..afa2591588 100644 --- a/iroh/tests/sync.rs +++ b/iroh/tests/sync.rs @@ -973,6 +973,44 @@ async fn sync_big() -> Result<()> { Ok(()) } +#[tokio::test] +#[cfg(feature = "test-utils")] +async fn test_list_docs_stream() -> Result<()> { + let node = Node::memory() + .node_discovery(iroh::node::DiscoveryConfig::None) + .relay_mode(iroh::net::relay::RelayMode::Disabled) + .spawn() + .await?; + let count = 200; + + // create docs + for _i in 0..count { + let doc = node.docs.create().await?; + doc.close().await?; + } + + // create doc stream + let mut stream = node.docs.list().await?; + + // process each doc and call into the docs actor. + // this makes sure that we don't deadlock the docs actor. + let mut i = 0; + let fut = async { + while let Some((id, _)) = stream.try_next().await.unwrap() { + let _doc = node.docs.open(id).await.unwrap().unwrap(); + i += 1; + } + }; + + tokio::time::timeout(Duration::from_secs(2), fut) + .await + .expect("not to timeout"); + + assert_eq!(i, count); + + Ok(()) +} + /// Get all entries of a document. async fn get_all(doc: &MemDoc) -> anyhow::Result> { let entries = doc.get_many(Query::all()).await?;