Skip to content

Commit

Permalink
refactor: move docs engine into iroh-docs (#2343)
Browse files Browse the repository at this point in the history
## Description

Moves `iroh::docs_engine` to `iroh_docs::engine`. Most changes are
changed imports, dependencies, and visibility changes (plus docs where
they are now required). Everything is moved over, only the RPC
implementation stays in iroh for now.

The `iroh_docs::engine::Engine` is wrapped in a newtype `DocsEngine` in
iroh that derefs to the `Engine` so that we can implement the RPC
methods directly.

## Breaking Changes

* `iroh::client::docs::{Origin, SyncReason, SyncEvent}` are now
reexports of `iroh_docs::engine::{Origin, SyncReason, SyncEvent}`. There
is no need to change anything in your code that uses these structs via
`iroh::client::docs`.

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- ~~[ ] Tests if relevant.~~
- [x] All breaking changes documented.
  • Loading branch information
Frando authored Jun 4, 2024
1 parent 4dd69f4 commit 3772889
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 111 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 17 additions & 16 deletions iroh-docs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,35 @@ workspace = true
[dependencies]
anyhow = "1"
blake3 = { package = "iroh-blake3", version = "1.4.5"}
bytes = { version = "1.4", features = ["serde"] }
derive_more = { version = "1.0.0-beta.6", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-buffered = "0.2.4"
futures-lite = "2.3.0"
futures-util = { version = "0.3.25", optional = true }
hex = "0.4"
iroh-base = { version = "0.17.0", path = "../iroh-base" }
iroh-blobs = { version = "0.17.0", path = "../iroh-blobs", optional = true, features = ["downloader"] }
iroh-gossip = { version = "0.17.0", path = "../iroh-gossip", optional = true }
iroh-metrics = { version = "0.17.0", path = "../iroh-metrics", optional = true }
iroh-net = { version = "0.17.0", optional = true, path = "../iroh-net" }
lru = "0.12"
num_enum = "0.7"
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
rand_core = "0.6.4"
redb = { version = "2.0.0" }
redb_v1 = { package = "redb", version = "1.5.1" }
self_cell = "1.0.3"
serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
bytes = { version = "1.4", features = ["serde"] }
hex = "0.4"
tempfile = { version = "3.4" }
thiserror = "1"
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }

# fs-store
redb = { version = "2.0.0" }
redb_v1 = { package = "redb", version = "1.5.1" }
tempfile = { version = "3.4" }

# net
iroh-net = { version = "0.17.0", optional = true, path = "../iroh-net" }
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
futures-util = { version = "0.3.25", optional = true }
lru = "0.12"
self_cell = "1.0.3"
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tracing = "0.1"

[dev-dependencies]
iroh-test = { path = "../iroh-test" }
Expand All @@ -56,9 +56,10 @@ tempfile = "3.4"
test-strategy = "0.3.1"

[features]
default = ["net", "metrics"]
default = ["net", "metrics", "engine"]
net = ["dep:iroh-net", "tokio/io-util", "dep:tokio-stream", "dep:tokio-util", "dep:futures-util"]
metrics = ["dep:iroh-metrics"]
engine = ["net", "dep:iroh-gossip", "dep:iroh-blobs"]

[package.metadata.docs.rs]
all-features = true
84 changes: 52 additions & 32 deletions iroh/src/docs_engine.rs → iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Handlers and actors to for live syncing [`iroh_docs`] replicas.
//! Handlers and actors to for live syncing replicas.
//!
//! [`iroh_docs::Replica`] is also called documents here.
//! [`crate::Replica`] is also called documents here.
use std::path::PathBuf;
use std::{
Expand All @@ -13,57 +13,57 @@ use anyhow::{bail, Context, Result};
use futures_lite::{Stream, StreamExt};
use iroh_blobs::downloader::Downloader;
use iroh_blobs::{store::EntryStatus, Hash};
use iroh_docs::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId};
use iroh_docs::{Author, AuthorId};
use iroh_gossip::net::Gossip;
use iroh_net::util::SharedAbortingJoinHandle;
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, oneshot};
use tracing::{error, error_span, Instrument};

mod gossip;
mod live;
pub mod rpc;
mod state;
use crate::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId};
use crate::{Author, AuthorId};

use gossip::GossipActor;
use live::{LiveActor, ToLiveActor};
use self::gossip::GossipActor;
use self::live::{LiveActor, ToLiveActor};

pub use self::live::SyncEvent;
pub use self::state::{Origin, SyncReason};

mod gossip;
mod live;
mod state;

/// Capacity of the channel for the [`ToLiveActor`] messages.
const ACTOR_CHANNEL_CAP: usize = 64;
/// Capacity for the channels for [`Engine::subscribe`].
const SUBSCRIBE_CHANNEL_CAP: usize = 256;

/// The sync engine coordinates actors that manage open documents, set-reconciliation syncs with
/// peers and a gossip swarm for each syncing document.
///
/// The RPC methods dealing with documents and sync operate on the `Engine`, with method
/// implementations in [rpc].
#[derive(derive_more::Debug, Clone)]
pub struct Engine {
pub(crate) endpoint: Endpoint,
pub(crate) sync: SyncHandle,
/// [`Endpoint`] used by the engine.
pub endpoint: Endpoint,
/// Handle to the actor thread.
pub sync: SyncHandle,
/// The persistent default author for this engine.
pub default_author: Arc<DefaultAuthor>,
to_live_actor: mpsc::Sender<ToLiveActor>,
#[allow(dead_code)]
actor_handle: SharedAbortingJoinHandle<()>,
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,
default_author: Arc<DefaultAuthor>,
}

impl Engine {
/// Start the sync engine.
///
/// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
/// thread for the [`iroh_docs::actor::SyncHandle`].
pub(crate) async fn spawn<B: iroh_blobs::store::Store>(
/// thread for the [`crate::actor::SyncHandle`].
pub async fn spawn<B: iroh_blobs::store::Store>(
endpoint: Endpoint,
gossip: Gossip,
replica_store: iroh_docs::store::Store,
replica_store: crate::store::Store,
bao_store: B,
downloader: Downloader,
default_author_storage: DefaultAuthorStorage,
Expand Down Expand Up @@ -127,7 +127,7 @@ impl Engine {
///
/// If `peers` is non-empty, it will both do an initial set-reconciliation sync with each peer,
/// and join an iroh-gossip swarm with these peers to receive and broadcast document updates.
async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
pub async fn start_sync(&self, namespace: NamespaceId, peers: Vec<NodeAddr>) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.to_live_actor
.send(ToLiveActor::StartSync {
Expand All @@ -144,7 +144,7 @@ impl Engine {
///
/// If `kill_subscribers` is true, all existing event subscribers will be dropped. This means
/// they will receive `None` and no further events in case of rejoining the document.
async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
pub async fn leave(&self, namespace: NamespaceId, kill_subscribers: bool) -> Result<()> {
let (reply, reply_rx) = oneshot::channel();
self.to_live_actor
.send(ToLiveActor::Leave {
Expand All @@ -158,7 +158,7 @@ impl Engine {
}

/// Subscribe to replica and sync progress events.
async fn subscribe(
pub async fn subscribe(
&self,
namespace: NamespaceId,
) -> Result<impl Stream<Item = Result<LiveEvent>> + Unpin + 'static> {
Expand Down Expand Up @@ -195,7 +195,7 @@ impl Engine {
}

/// Handle an incoming iroh-docs connection.
pub(super) async fn handle_connection(
pub async fn handle_connection(
&self,
conn: iroh_net::endpoint::Connecting,
) -> anyhow::Result<()> {
Expand All @@ -205,13 +205,15 @@ impl Engine {
Ok(())
}

pub(crate) async fn start_shutdown(&self) -> Result<()> {
/// Shutdown the engine.
pub async fn shutdown(&self) -> Result<()> {
self.to_live_actor.send(ToLiveActor::Shutdown).await?;
Ok(())
}
}

pub(crate) fn entry_to_content_status(entry: io::Result<EntryStatus>) -> ContentStatus {
/// Converts an [`EntryStatus`] into a ['ContentStatus'].
pub fn entry_to_content_status(entry: io::Result<EntryStatus>) -> ContentStatus {
match entry {
Ok(EntryStatus::Complete) => ContentStatus::Complete,
Ok(EntryStatus::Partial) => ContentStatus::Incomplete,
Expand Down Expand Up @@ -277,14 +279,14 @@ impl From<live::Event> for LiveEvent {

impl LiveEvent {
fn from_replica_event(
ev: iroh_docs::Event,
ev: crate::Event,
content_status_cb: &ContentStatusCallback,
) -> Result<Self> {
Ok(match ev {
iroh_docs::Event::LocalInsert { entry, .. } => Self::InsertLocal {
crate::Event::LocalInsert { entry, .. } => Self::InsertLocal {
entry: entry.into(),
},
iroh_docs::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
crate::Event::RemoteInsert { entry, from, .. } => Self::InsertRemote {
content_status: content_status_cb(entry.content_hash()),
entry: entry.into(),
from: PublicKey::from_bytes(&from)?,
Expand All @@ -302,11 +304,19 @@ impl LiveEvent {
/// path (as base32 encoded string of the author's public key).
#[derive(Debug)]
pub enum DefaultAuthorStorage {
/// Memory storage.
Mem,
/// File based persistent storage.
Persistent(PathBuf),
}

impl DefaultAuthorStorage {
/// Load the default author from the storage.
///
/// Will create and save a new author if the storage is empty.
///
/// Returns an error if the author can't be parsed or if the uathor does not exist in the docs
/// store.
pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
match self {
Self::Mem => {
Expand Down Expand Up @@ -343,6 +353,8 @@ impl DefaultAuthorStorage {
}
}
}

/// Save a new default author.
pub async fn persist(&self, author_id: AuthorId) -> anyhow::Result<()> {
match self {
Self::Mem => {
Expand All @@ -363,24 +375,32 @@ impl DefaultAuthorStorage {
}
}

/// Peristent default author for a docs engine.
#[derive(Debug)]
struct DefaultAuthor {
pub struct DefaultAuthor {
value: RwLock<AuthorId>,
storage: DefaultAuthorStorage,
}

impl DefaultAuthor {
async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
/// Load the default author from storage.
///
/// If the storage is empty creates a new author and perists it.
pub async fn load(storage: DefaultAuthorStorage, docs_store: &SyncHandle) -> Result<Self> {
let value = storage.load(docs_store).await?;
Ok(Self {
value: RwLock::new(value),
storage,
})
}
fn get(&self) -> AuthorId {

/// Get the current default author.
pub fn get(&self) -> AuthorId {
*self.value.read().unwrap()
}
async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {

/// Set the default author.
pub async fn set(&self, author_id: AuthorId, docs_store: &SyncHandle) -> Result<()> {
if docs_store.export_author(author_id).await?.is_none() {
bail!("The author does not exist");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::HashSet;
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use futures_util::FutureExt;
use iroh_docs::{actor::SyncHandle, ContentStatus, NamespaceId};
use iroh_gossip::net::{Event, Gossip};
use iroh_net::key::PublicKey;
use tokio::{
Expand All @@ -16,6 +15,8 @@ use tokio_stream::{
};
use tracing::{debug, error, trace, warn};

use crate::{actor::SyncHandle, ContentStatus, NamespaceId};

use super::live::{Op, ToLiveActor};

#[derive(strum::Display, Debug)]
Expand Down
27 changes: 14 additions & 13 deletions iroh/src/docs_engine/live.rs → iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ use iroh_blobs::downloader::{DownloadError, DownloadRequest, Downloader};
use iroh_blobs::get::Stats;
use iroh_blobs::HashAndFormat;
use iroh_blobs::{store::EntryStatus, Hash};
use iroh_docs::{
actor::{OpenOpts, SyncHandle},
net::{
connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
SyncFinished,
},
AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
};
use iroh_gossip::{net::Gossip, proto::TopicId};
use iroh_net::NodeId;
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
Expand All @@ -27,6 +19,15 @@ use tokio::{
};
use tracing::{debug, error, error_span, info, instrument, trace, warn, Instrument, Span};

use crate::{
actor::{OpenOpts, SyncHandle},
net::{
connect_and_sync, handle_connection, AbortReason, AcceptError, AcceptOutcome, ConnectError,
SyncFinished,
},
AuthorHeads, ContentStatus, NamespaceId, SignedEntry,
};

use super::gossip::{GossipActor, ToGossipActor};
use super::state::{NamespaceStates, Origin, SyncReason};

Expand Down Expand Up @@ -145,8 +146,8 @@ pub struct LiveActor<B: iroh_blobs::store::Store> {
gossip: Gossip,
bao_store: B,
downloader: Downloader,
replica_events_tx: flume::Sender<iroh_docs::Event>,
replica_events_rx: flume::Receiver<iroh_docs::Event>,
replica_events_tx: flume::Sender<crate::Event>,
replica_events_rx: flume::Receiver<crate::Event>,

/// Send messages to self.
/// Note: Must not be used in methods called from `Self::run` directly to prevent deadlocks.
Expand Down Expand Up @@ -684,9 +685,9 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}

async fn on_replica_event(&mut self, event: iroh_docs::Event) -> Result<()> {
async fn on_replica_event(&mut self, event: crate::Event) -> Result<()> {
match event {
iroh_docs::Event::LocalInsert { namespace, entry } => {
crate::Event::LocalInsert { namespace, entry } => {
debug!(namespace=%namespace.fmt_short(), "replica event: LocalInsert");
let topic = TopicId::from_bytes(*namespace.as_bytes());
// A new entry was inserted locally. Broadcast a gossip message.
Expand All @@ -696,7 +697,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
self.gossip.broadcast(topic, message).await?;
}
}
iroh_docs::Event::RemoteInsert {
crate::Event::RemoteInsert {
namespace,
entry,
from,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use anyhow::Result;
use iroh_docs::{
use crate::{
net::{AbortReason, AcceptOutcome, SyncFinished},
NamespaceId,
};
use anyhow::Result;
use iroh_net::NodeId;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
Expand Down
Loading

0 comments on commit 3772889

Please sign in to comment.