Skip to content

Commit

Permalink
feat: set default author
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 17, 2024
1 parent ca139d7 commit c74b6c5
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 55 deletions.
20 changes: 18 additions & 2 deletions iroh/src/client/authors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use iroh_docs::{Author, AuthorId};
use quic_rpc::{RpcClient, ServiceConnection};

use crate::rpc_protocol::{
AuthorCreateRequest, AuthorGetDefaultRequest, AuthorDeleteRequest, AuthorExportRequest,
AuthorImportRequest, AuthorListRequest, RpcService,
AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest, AuthorGetDefaultRequest,
AuthorImportRequest, AuthorListRequest, AuthorSetDefaultRequest, RpcService,
};

use super::flatten;
Expand Down Expand Up @@ -45,6 +45,19 @@ where
Ok(res.author_id)
}

/// Set the node-wide default author.
///
/// If the author does not exist, an error is returned.
///
/// This is a noop on memory nodes. On peristent node, the author id will be saved to a file in
/// the data directory, and reloaded after a node restart.
pub async fn set_default(&self, author_id: AuthorId) -> Result<()> {
self.rpc
.rpc(AuthorSetDefaultRequest { author_id })
.await??;
Ok(())
}

/// List document authors for which we have a secret key.
pub async fn list(&self) -> Result<impl Stream<Item = Result<AuthorId>>> {
let stream = self.rpc.server_streaming(AuthorListRequest {}).await?;
Expand Down Expand Up @@ -113,6 +126,9 @@ mod tests {
let authors: Vec<_> = node.authors.list().await?.try_collect().await?;
assert_eq!(authors.len(), 2);

node.authors.set_default(author.id()).await?;
assert_eq!(node.authors.default().await?, author);

Ok(())
}
}
75 changes: 67 additions & 8 deletions iroh/src/docs_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
//!
//! [`iroh_docs::Replica`] is also called documents here.
use std::{io, sync::Arc};
use std::path::PathBuf;
use std::{io, str::FromStr, sync::Arc};

use anyhow::Result;
use anyhow::{bail, Result};
use futures_lite::{Stream, StreamExt};
use iroh_blobs::downloader::Downloader;
use iroh_blobs::{store::EntryStatus, Hash};
use iroh_docs::AuthorId;
use iroh_docs::{actor::SyncHandle, ContentStatus, ContentStatusCallback, Entry, NamespaceId};
use iroh_docs::{Author, AuthorId};
use iroh_gossip::net::Gossip;
use iroh_net::util::SharedAbortingJoinHandle;
use iroh_net::{key::PublicKey, Endpoint, NodeAddr};
Expand Down Expand Up @@ -48,21 +49,76 @@ pub struct Engine {
#[debug("ContentStatusCallback")]
content_status_cb: ContentStatusCallback,
default_author: AuthorId,
default_author_storage: Arc<DefaultAuthorStorage>,
}

/// Where to persist the default author.
///
/// If set to `Mem`, a new author will be created in the docs store before spawning the sync
/// engine. Changing the default author will not be persisted.
///
/// If set to `Persistent`, the default author will be loaded from and persisted to the specified
/// path (as base32 encoded string of the author's public key).
#[derive(Debug)]
pub enum DefaultAuthorStorage {
Mem,
Persistent(PathBuf),
}

impl DefaultAuthorStorage {
pub async fn load(&self, docs_store: &SyncHandle) -> anyhow::Result<AuthorId> {
match self {
Self::Mem => {
let author = Author::new(&mut rand::thread_rng());
let author_id = author.id();
docs_store.import_author(author).await?;
Ok(author_id)
}
Self::Persistent(ref path) => {
if path.exists() {
let data = tokio::fs::read_to_string(path).await?;
let author_id = AuthorId::from_str(&data)?;
if docs_store.export_author(author_id).await?.is_none() {
bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
}
Ok(author_id)
} else {
let author = Author::new(&mut rand::thread_rng());
let author_id = author.id();
docs_store.import_author(author).await?;
tokio::fs::write(path, author_id.to_string()).await?;
Ok(author_id)
}
}
}
}
pub async fn save(&self, docs_store: &SyncHandle, author_id: AuthorId) -> anyhow::Result<()> {
if docs_store.export_author(author_id).await?.is_none() {
bail!("The author does not exist");
}
match self {
Self::Mem => {}
Self::Persistent(ref path) => {
tokio::fs::write(path, author_id.to_string()).await?;
}
}
Ok(())
}
}

impl Engine {
/// Start the sync engine.
///
/// This will spawn two tokio tasks for the live sync coordination and gossip actors, and a
/// thread for the [`iroh_docs::actor::SyncHandle`].
pub(crate) fn spawn<B: iroh_blobs::store::Store>(
pub(crate) async fn spawn<B: iroh_blobs::store::Store>(
endpoint: Endpoint,
gossip: Gossip,
replica_store: iroh_docs::store::Store,
bao_store: B,
downloader: Downloader,
default_author: AuthorId,
) -> Self {
default_author_storage: DefaultAuthorStorage,
) -> anyhow::Result<Self> {
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let (to_gossip_actor, to_gossip_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
let me = endpoint.node_id().fmt_short();
Expand Down Expand Up @@ -98,14 +154,17 @@ impl Engine {
.instrument(error_span!("sync", %me)),
);

Self {
let default_author = default_author_storage.load(&sync).await?;

Ok(Self {
endpoint,
sync,
to_live_actor: live_actor_tx,
actor_handle: actor_handle.into(),
content_status_cb,
default_author,
}
default_author_storage: Arc::new(default_author_storage),
})
}

/// Start to sync a document.
Expand Down
17 changes: 14 additions & 3 deletions iroh/src/docs_engine/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use tokio_stream::StreamExt;

use crate::client::docs::ShareMode;
use crate::rpc_protocol::{
AuthorGetDefaultRequest, AuthorGetDefaultResponse, AuthorDeleteRequest, AuthorDeleteResponse,
AuthorExportRequest, AuthorExportResponse, AuthorImportRequest, AuthorImportResponse,
DocGetSyncPeersRequest, DocGetSyncPeersResponse,
AuthorDeleteRequest, AuthorDeleteResponse, AuthorExportRequest, AuthorExportResponse,
AuthorGetDefaultRequest, AuthorGetDefaultResponse, AuthorImportRequest, AuthorImportResponse,
AuthorSetDefaultRequest, AuthorSetDefaultResponse, DocGetSyncPeersRequest,
DocGetSyncPeersResponse,
};
use crate::{
docs_engine::Engine,
Expand Down Expand Up @@ -51,6 +52,16 @@ impl Engine {
}
}

pub async fn author_set_default(
&self,
req: AuthorSetDefaultRequest,
) -> RpcResult<AuthorSetDefaultResponse> {
self.default_author_storage
.save(&self.sync, req.author_id)
.await?;
Ok(AuthorSetDefaultResponse)
}

pub fn author_list(
&self,
_req: AuthorListRequest,
Expand Down
15 changes: 15 additions & 0 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,21 @@ mod tests {
assert!(iroh.is_ok());
}

// check that the default author can be set manually and is persisted.
let default_author = {
let iroh = Node::persistent(iroh_root).await?.spawn().await?;
let author = iroh.authors.create().await?;
iroh.authors.set_default(author).await?;
iroh.shutdown().await?;
author
};
{
let iroh = Node::persistent(iroh_root).await?.spawn().await?;
let author = iroh.authors.default().await?;
assert_eq!(author, default_author);
iroh.shutdown().await?;
}

Ok(())
}
}
20 changes: 9 additions & 11 deletions iroh/src/node/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,10 @@ use tracing::{debug, error, error_span, info, trace, warn, Instrument};

use crate::{
client::RPC_ALPN,
docs_engine::Engine,
docs_engine::{DefaultAuthorStorage, Engine},
node::{Event, NodeInner},
rpc_protocol::{Request, Response, RpcService},
util::{
fs::{load_default_author, load_secret_key},
path::IrohPaths,
},
util::{fs::load_secret_key, path::IrohPaths},
};

use super::{rpc, rpc_status::RpcStatus, Callbacks, EventCallback, Node};
Expand Down Expand Up @@ -355,7 +352,7 @@ where
/// This will create the underlying network server and spawn a tokio task accepting
/// connections. The returned [`Node`] can be used to control the task as well as
/// get information about it.
pub async fn spawn(mut self) -> Result<Node<D>> {
pub async fn spawn(self) -> Result<Node<D>> {
trace!("spawning node");
let lp = LocalPoolHandle::new(num_cpus::get());

Expand Down Expand Up @@ -423,12 +420,12 @@ where
let downloader = Downloader::new(self.blobs_store.clone(), endpoint.clone(), lp.clone());

// load or create the default author for documents
let default_author = match self.storage {
let default_author_storage = match self.storage {
StorageConfig::Persistent(ref root) => {
let path = IrohPaths::DefaultAuthor.with_root(root);
load_default_author(path, &mut self.docs_store).await?
DefaultAuthorStorage::Persistent(path)
}
StorageConfig::Mem => self.docs_store.new_author(&mut rand::thread_rng())?.id(),
StorageConfig::Mem => DefaultAuthorStorage::Mem,
};

// spawn the docs engine
Expand All @@ -438,8 +435,9 @@ where
self.docs_store,
self.blobs_store.clone(),
downloader.clone(),
default_author,
);
default_author_storage,
)
.await?;
let sync_db = sync.sync.clone();

let callbacks = Callbacks::default();
Expand Down
6 changes: 6 additions & 0 deletions iroh/src/node/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ impl<D: BaoStore> Handler<D> {
})
.await
}
AuthorSetDefault(msg) => {
chan.rpc(msg, handler, |handler, req| async move {
handler.inner.sync.author_set_default(req).await
})
.await
}
DocOpen(msg) => {
chan.rpc(msg, handler, |handler, req| async move {
handler.inner.sync.doc_open(req).await
Expand Down
16 changes: 16 additions & 0 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,20 @@ pub struct AuthorGetDefaultResponse {
pub author_id: AuthorId,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct AuthorSetDefaultRequest {
/// The id of the author
pub author_id: AuthorId,
}

impl RpcMsg<RpcService> for AuthorSetDefaultRequest {
type Response = RpcResult<AuthorSetDefaultResponse>;
}

/// Response for [`AuthorGetDefaultRequest`]
#[derive(Serialize, Deserialize, Debug)]
pub struct AuthorSetDefaultResponse;

/// Delete an author
#[derive(Serialize, Deserialize, Debug)]
pub struct AuthorDeleteRequest {
Expand Down Expand Up @@ -1082,6 +1096,7 @@ pub enum Request {
AuthorList(AuthorListRequest),
AuthorCreate(AuthorCreateRequest),
AuthorGetDefault(AuthorGetDefaultRequest),
AuthorSetDefault(AuthorSetDefaultRequest),
AuthorImport(AuthorImportRequest),
AuthorExport(AuthorExportRequest),
AuthorDelete(AuthorDeleteRequest),
Expand Down Expand Up @@ -1143,6 +1158,7 @@ pub enum Response {
AuthorList(RpcResult<AuthorListResponse>),
AuthorCreate(RpcResult<AuthorCreateResponse>),
AuthorGetDefault(AuthorGetDefaultResponse),
AuthorSetDefault(RpcResult<AuthorSetDefaultResponse>),
AuthorImport(RpcResult<AuthorImportResponse>),
AuthorExport(RpcResult<AuthorExportResponse>),
AuthorDelete(RpcResult<AuthorDeleteResponse>),
Expand Down
31 changes: 0 additions & 31 deletions iroh/src/util/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use std::{
borrow::Cow,
fs::read_dir,
path::{Component, Path, PathBuf},
str::FromStr,
};

use anyhow::{bail, Context};
use bytes::Bytes;
use iroh_docs::AuthorId;
use iroh_net::key::SecretKey;
use tokio::io::AsyncWriteExt;
use walkdir::WalkDir;
Expand Down Expand Up @@ -121,35 +119,6 @@ pub fn relative_canonicalized_path_to_string(path: impl AsRef<Path>) -> anyhow::
canonicalized_path_to_string(path, true)
}

/// Load the default author public key from a path, and check that it is present in the `docs_store`.
///
/// If `path` does not exist, a new author keypair is created and persisted in the docs store, and
/// the public key is written to `path`, in base32 encoding.
///
/// If `path` does exist, but does not contain an ed25519 public key in base32 encoding, an error
/// is returned.
///
/// If `path` exists and is a valid author public key, but its secret key does not exist in the
/// docs store, an error is returned.
pub async fn load_default_author(
path: PathBuf,
docs_store: &mut iroh_docs::store::fs::Store,
) -> anyhow::Result<AuthorId> {
if path.exists() {
let data = tokio::fs::read_to_string(&path).await?;
let author_id = AuthorId::from_str(&data)?;
if docs_store.get_author(&author_id)?.is_none() {
bail!("The default author is missing from the docs store. To recover, delete the file `{}`. Then iroh will create a new default author.", path.to_string_lossy())
}
Ok(author_id)
} else {
let author_id = docs_store.new_author(&mut rand::thread_rng())?.id();
docs_store.flush()?;
tokio::fs::write(path, author_id.to_string()).await?;
Ok(author_id)
}
}

/// Loads a [`SecretKey`] from the provided file, or stores a newly generated one
/// at the given location.
pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result<SecretKey> {
Expand Down

0 comments on commit c74b6c5

Please sign in to comment.