Skip to content

Commit

Permalink
fix: make all iterators static and spawn them on tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 4, 2024
1 parent 0addb83 commit 04137da
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 78 deletions.
32 changes: 11 additions & 21 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,15 @@ impl Actor {
}
Ok(id)
}),
Action::ListAuthors { reply } => iter_to_channel(
reply,
self.store
Action::ListAuthors { reply } => {
let iter = self
.store
.list_authors()
.map(|a| a.map(|a| a.map(|a| a.id()))),
),
.map(|a| a.map(|a| a.map(|a| a.id())));
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
Action::ListReplicas { reply } => {
let iter = self.store.list_namespaces();
self.tasks
Expand Down Expand Up @@ -788,7 +791,9 @@ impl Actor {
.states
.ensure_open(&namespace)
.and_then(|_| self.store.get_many(namespace, query));
iter_to_channel(reply, iter)
self.tasks
.spawn_local(iter_to_channel_async(reply, iter).map(|_| ()));
Ok(())
}
ReplicaAction::DropReplica { reply } => send_reply_with(reply, self, |this| {
this.close(namespace);
Expand Down Expand Up @@ -950,21 +955,6 @@ impl OpenReplicas {
}
}

fn iter_to_channel<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
) -> Result<(), SendReplyError> {
match iter {
Err(err) => channel.send(Err(err)).map_err(send_reply_error)?,
Ok(iter) => {
for item in iter {
channel.send(item).map_err(send_reply_error)?;
}
}
}
Ok(())
}

async fn iter_to_channel_async<T: Send + 'static>(
channel: flume::Sender<Result<T>>,
iter: Result<impl Iterator<Item = Result<T>>>,
Expand Down
50 changes: 22 additions & 28 deletions iroh-docs/src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub(crate) mod tables;

use self::{
bounds::{ByKeyBounds, RecordsBounds},
ranges::RangeExt,
ranges::{RangeExt, RecordsRange},
tables::{RecordsTable, TransactionAndTables},
};
use self::{
Expand All @@ -48,8 +48,6 @@ use self::{
},
};

pub use self::ranges::RecordsRange;

/// Manages the replicas and authors for an instance.
#[derive(Debug)]
pub struct Store {
Expand Down Expand Up @@ -223,7 +221,6 @@ impl Store {
}
}

type AuthorsIter = std::vec::IntoIter<Result<Author>>;
type PeersIter = std::vec::IntoIter<PeerIdBytes>;

impl Store {
Expand Down Expand Up @@ -347,19 +344,16 @@ impl Store {
}

/// List all author keys in this store.
pub fn list_authors(&mut self) -> Result<AuthorsIter> {
// TODO: avoid collect
let tables = self.tables()?;
let authors: Vec<_> = tables
pub fn list_authors(&mut self) -> Result<impl Iterator<Item = Result<Author>>> {
let tables = self.snapshot()?;
let iter = tables
.authors
.iter()?
.range::<&'static [u8; 32]>(..)?
.map(|res| match res {
Ok((_key, value)) => Ok(Author::from_bytes(value.value())),
Err(err) => Err(err.into()),
})
.collect();

Ok(authors.into_iter())
});
Ok(iter)
}

/// Import a new replica namespace.
Expand Down Expand Up @@ -420,7 +414,12 @@ impl Store {
namespace: NamespaceId,
query: impl Into<Query>,
) -> Result<QueryIterator> {
QueryIterator::new(self.tables()?, namespace, query.into())
// make sure the current transaction is committed
self.flush()?;
assert!(matches!(self.transaction, CurrentTransaction::None));
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
QueryIterator::new(tables, namespace, query.into())
}

/// Get an entry by key and author.
Expand Down Expand Up @@ -448,7 +447,7 @@ impl Store {
let tx = self.db.begin_read()?;
let tables = ReadOnlyTables::new(tx)?;
let records = tables.records;
ContentHashesIterator::all(records)
ContentHashesIterator::all(&records)
}

/// Get the latest entry for each author in a namespace.
Expand Down Expand Up @@ -877,14 +876,6 @@ impl Iterator for ParentIterator {
}
}

self_cell::self_cell!(
struct ContentHashesIteratorInner {
owner: RecordsTable,
#[covariant]
dependent: RecordsRange,
}
);

/// Iterator for all content hashes
///
/// Note that you might get duplicate hashes. Also, the iterator will keep
Expand All @@ -893,21 +884,24 @@ self_cell::self_cell!(
/// Also, this represents a snapshot of the database at the time of creation.
/// It nees a copy of a redb::ReadOnlyTable to be self-contained.
#[derive(derive_more::Debug)]
pub struct ContentHashesIterator(#[debug(skip)] ContentHashesIteratorInner);
pub struct ContentHashesIterator {
#[debug(skip)]
range: RecordsRange<'static>,
}

impl ContentHashesIterator {
/// Create a new iterator over all content hashes.
pub fn all(owner: RecordsTable) -> anyhow::Result<Self> {
let inner = ContentHashesIteratorInner::try_new(owner, |owner| RecordsRange::all(owner))?;
Ok(Self(inner))
pub fn all(table: &RecordsTable) -> anyhow::Result<Self> {
let range = RecordsRange::all_static(&table)?;
Ok(Self { range })
}
}

impl Iterator for ContentHashesIterator {
type Item = Result<Hash>;

fn next(&mut self) -> Option<Self::Item> {
let v = self.0.with_dependent_mut(|_, d| d.next())?;
let v = self.range.next()?;
Some(v.map(|e| e.content_hash()))
}
}
Expand Down
27 changes: 12 additions & 15 deletions iroh-docs/src/store/fs/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use iroh_base::hash::Hash;

use crate::{
store::{
fs::tables::ReadOnlyTables,
util::{IndexKind, LatestPerKeySelector, SelectorRes},
AuthorFilter, KeyFilter, Query,
},
Expand All @@ -12,34 +13,33 @@ use crate::{
use super::{
bounds::{ByKeyBounds, RecordsBounds},
ranges::{RecordsByKeyRange, RecordsRange},
tables::Tables,
RecordsValue,
};

/// A query iterator for entry queries.
#[derive(Debug)]
pub struct QueryIterator<'a> {
range: QueryRange<'a>,
pub struct QueryIterator {
range: QueryRange,
query: Query,
offset: u64,
count: u64,
}

#[derive(Debug)]
enum QueryRange<'a> {
enum QueryRange {
AuthorKey {
range: RecordsRange<'a>,
range: RecordsRange<'static>,
key_filter: KeyFilter,
},
KeyAuthor {
range: RecordsByKeyRange<'a>,
range: RecordsByKeyRange,
author_filter: AuthorFilter,
selector: Option<LatestPerKeySelector>,
},
}

impl<'a> QueryIterator<'a> {
pub fn new(tables: &'a Tables<'a>, namespace: NamespaceId, query: Query) -> Result<Self> {
impl QueryIterator {
pub fn new(tables: ReadOnlyTables, namespace: NamespaceId, query: Query) -> Result<Self> {
let index_kind = IndexKind::from(&query);
let range = match index_kind {
IndexKind::AuthorKey { range, key_filter } => {
Expand All @@ -53,7 +53,7 @@ impl<'a> QueryIterator<'a> {
// no author set => full table scan with the provided key filter
AuthorFilter::Any => (RecordsBounds::namespace(namespace), key_filter),
};
let range = RecordsRange::with_bounds(&tables.records, bounds)?;
let range = RecordsRange::with_bounds_static(&tables.records, bounds)?;
QueryRange::AuthorKey {
range,
key_filter: filter,
Expand All @@ -65,11 +65,8 @@ impl<'a> QueryIterator<'a> {
latest_per_key,
} => {
let bounds = ByKeyBounds::new(namespace, &range);
let range = RecordsByKeyRange::with_bounds(
&tables.records_by_key,
&tables.records,
bounds,
)?;
let range =
RecordsByKeyRange::with_bounds(tables.records_by_key, tables.records, bounds)?;
let selector = latest_per_key.then(LatestPerKeySelector::default);
QueryRange::KeyAuthor {
author_filter,
Expand All @@ -88,7 +85,7 @@ impl<'a> QueryIterator<'a> {
}
}

impl<'a> Iterator for QueryIterator<'a> {
impl Iterator for QueryIterator {
type Item = Result<SignedEntry>;

fn next(&mut self) -> Option<Result<SignedEntry>> {
Expand Down
40 changes: 26 additions & 14 deletions iroh-docs/src/store/fs/ranges.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Ranges and helpers for working with [`redb`] tables
use redb::{Key, Range, ReadableTable, Table, Value};
use redb::{Key, Range, ReadOnlyTable, ReadableTable, Value};

use crate::{store::SortDirection, SignedEntry};

Expand Down Expand Up @@ -74,14 +74,9 @@ impl<'a, K: Key + 'static, V: Value + 'static> RangeExt<K, V> for Range<'a, K, V
#[debug("RecordsRange")]
pub struct RecordsRange<'a>(Range<'a, RecordsId<'static>, RecordsValue<'static>>);

impl<'a> RecordsRange<'a> {
pub(super) fn all(
records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
) -> anyhow::Result<Self> {
let range = records.range::<RecordsId<'static>>(..)?;
Ok(Self(range))
}
// pub type RecordsRange<'a> = Range<'a, RecordsId<'static>, RecordsValue<'static>>;

impl<'a> RecordsRange<'a> {
pub(super) fn with_bounds(
records: &'a impl ReadableTable<RecordsId<'static>, RecordsValue<'static>>,
bounds: RecordsBounds,
Expand All @@ -90,6 +85,7 @@ impl<'a> RecordsRange<'a> {
Ok(Self(range))
}

//
/// Get the next item in the range.
///
/// Omit items for which the `matcher` function returns false.
Expand All @@ -103,6 +99,22 @@ impl<'a> RecordsRange<'a> {
}
}

impl RecordsRange<'static> {
pub(super) fn all_static(
records: &ReadOnlyTable<RecordsId<'static>, RecordsValue<'static>>,
) -> anyhow::Result<Self> {
let range = records.range::<RecordsId<'static>>(..)?;
Ok(Self(range))
}
pub(super) fn with_bounds_static(
records: &ReadOnlyTable<RecordsId<'static>, RecordsValue<'static>>,
bounds: RecordsBounds,
) -> anyhow::Result<Self> {
let range = records.range(bounds.as_ref())?;
Ok(Self(range))
}
}

impl<'a> Iterator for RecordsRange<'a> {
type Item = anyhow::Result<SignedEntry>;
fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -112,15 +124,15 @@ impl<'a> Iterator for RecordsRange<'a> {

#[derive(derive_more::Debug)]
#[debug("RecordsByKeyRange")]
pub struct RecordsByKeyRange<'a> {
records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
by_key_range: Range<'a, RecordsByKeyId<'static>, ()>,
pub struct RecordsByKeyRange {
records_table: ReadOnlyTable<RecordsId<'static>, RecordsValue<'static>>,
by_key_range: Range<'static, RecordsByKeyId<'static>, ()>,
}

impl<'a> RecordsByKeyRange<'a> {
impl RecordsByKeyRange {
pub fn with_bounds(
records_by_key_table: &'a impl ReadableTable<RecordsByKeyId<'static>, ()>,
records_table: &'a Table<'a, RecordsId<'static>, RecordsValue<'static>>,
records_by_key_table: ReadOnlyTable<RecordsByKeyId<'static>, ()>,
records_table: ReadOnlyTable<RecordsId<'static>, RecordsValue<'static>>,
bounds: ByKeyBounds,
) -> anyhow::Result<Self> {
let by_key_range = records_by_key_table.range(bounds.as_ref())?;
Expand Down

0 comments on commit 04137da

Please sign in to comment.