Skip to content

Commit

Permalink
further cleanup around APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 13, 2024
1 parent 7e6d839 commit d69eb9e
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 117 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions iroh-willow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ smallvec = "1.13.2"
itertools = "0.12.1"
futures-lite = "2.3.0"
futures-concurrency = "7.6.0"
futures-util = "0.3.30"

[dev-dependencies]
iroh-test = { path = "../iroh-test" }
Expand Down
87 changes: 77 additions & 10 deletions iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ use std::{
thread::JoinHandle,
};

use futures::{future::LocalBoxFuture, FutureExt};
use futures_lite::{
future::{Boxed as BoxFuture, BoxedLocal as LocalBoxFuture},
stream::Stream,
};
use futures_util::future::{FutureExt, Shared};
use genawaiter::{
sync::{Co, Gen},
GeneratorState,
Expand All @@ -19,6 +23,7 @@ use tokio::sync::oneshot;
use tracing::{debug, error, error_span, trace, warn, Span};

use crate::{
net::InitialTransmission,
proto::{
grouping::ThreeDRange,
keys::NamespaceId,
Expand All @@ -27,7 +32,7 @@ use crate::{
},
session::{
coroutine::{ControlRoutine, ReconcileRoutine},
Channels, Error, SessionInit, SessionState, SharedSessionState,
Channels, Error, Role, SessionInit, SessionState, SharedSessionState,
},
store::Store,
};
Expand All @@ -37,7 +42,7 @@ pub const INBOX_CAP: usize = 1024;
pub type SessionId = NodeId;

#[derive(Debug, Clone)]
pub struct StoreHandle {
pub struct WillowHandle {
tx: flume::Sender<ToActor>,
join_handle: Arc<Option<JoinHandle<()>>>,
}
Expand Down Expand Up @@ -85,8 +90,8 @@ impl Notifier {
}
}

impl StoreHandle {
pub fn spawn<S: Store>(store: S, me: NodeId) -> StoreHandle {
impl WillowHandle {
pub fn spawn<S: Store>(store: S, me: NodeId) -> WillowHandle {
let (tx, rx) = flume::bounded(INBOX_CAP);
// This channel only tracks wake to resume messages to coroutines, which are a sinlge u64
// per wakeup. We want to issue wake calls synchronosuly without blocking, so we use an
Expand Down Expand Up @@ -116,7 +121,7 @@ impl StoreHandle {
})
.expect("failed to spawn thread");
let join_handle = Arc::new(Some(join_handle));
StoreHandle { tx, join_handle }
WillowHandle { tx, join_handle }
}
pub async fn send(&self, action: ToActor) -> anyhow::Result<()> {
self.tx.send_async(action).await?;
Expand All @@ -132,9 +137,56 @@ impl StoreHandle {
reply_rx.await??;
Ok(())
}

pub async fn get_entries(
&self,
namespace: NamespaceId,
range: ThreeDRange,
) -> anyhow::Result<impl Stream<Item = Entry>> {
let (tx, rx) = flume::bounded(1024);
self.send(ToActor::GetEntries {
namespace,
reply: tx,
range,
})
.await?;
Ok(rx.into_stream())
}

pub async fn init_session(
&self,
peer: NodeId,
our_role: Role,
initial_transmission: InitialTransmission,
channels: Channels,
init: SessionInit,
) -> anyhow::Result<SessionHandle> {
let state = SessionState::new(our_role, initial_transmission);

let (on_finish_tx, on_finish_rx) = oneshot::channel();
self.send(ToActor::InitSession {
peer,
state,
channels,
init,
on_finish: on_finish_tx,
})
.await?;

let on_finish = on_finish_rx
.map(|r| match r {
Ok(Ok(())) => Ok(()),
Ok(Err(err)) => Err(Arc::new(err.into())),
Err(_) => Err(Arc::new(Error::ActorFailed)),
})
.boxed();
let on_finish = on_finish.shared();
let handle = SessionHandle { on_finish };
Ok(handle)
}
}

impl Drop for StoreHandle {
impl Drop for WillowHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
Expand All @@ -146,6 +198,21 @@ impl Drop for StoreHandle {
}
}
}

#[derive(Debug)]
pub struct SessionHandle {
on_finish: Shared<BoxFuture<Result<(), Arc<Error>>>>,
}

impl SessionHandle {
/// Wait for the session to finish.
///
/// Returns an error if the session failed to complete.
pub async fn on_finish(self) -> Result<(), Arc<Error>> {
self.on_finish.await
}
}

#[derive(derive_more::Debug, strum::Display)]
pub enum ToActor {
InitSession {
Expand All @@ -155,7 +222,7 @@ pub enum ToActor {
#[debug(skip)]
channels: Channels,
init: SessionInit,
on_done: oneshot::Sender<Result<(), Error>>,
on_finish: oneshot::Sender<Result<(), Error>>,
},
GetEntries {
namespace: NamespaceId,
Expand Down Expand Up @@ -195,7 +262,7 @@ pub struct StorageThread<S> {
next_coro_id: u64,
}

type CoroFut = LocalBoxFuture<'static, Result<(), Error>>;
type CoroFut = LocalBoxFuture<Result<(), Error>>;

#[derive(derive_more::Debug)]
struct CoroutineState {
Expand Down Expand Up @@ -257,7 +324,7 @@ impl<S: Store> StorageThread<S> {
state,
channels,
init,
on_done,
on_finish: on_done,
} => {
let span = error_span!("session", peer=%peer.fmt_short());
let session = Session {
Expand Down
Loading

0 comments on commit d69eb9e

Please sign in to comment.