Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 13, 2024
1 parent d69eb9e commit 9f0533d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 198 deletions.
82 changes: 0 additions & 82 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 9 additions & 29 deletions iroh-willow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,27 @@ workspace = true

[dependencies]
anyhow = "1"
bytes = { version = "1.4", features = ["serde"] }
derive_more = { version = "1.0.0-beta.1", features = ["debug", "deref", "display", "from", "try_into", "into", "as_ref", "try_from"] }
ed25519-dalek = { version = "2.0.0", features = ["serde", "rand_core"] }
flume = "0.11"
futures-concurrency = "7.6.0"
futures-lite = "2.3.0"
futures-util = "0.3.30"
genawaiter = "0.99.1"
iroh-base = { version = "0.15.0", path = "../iroh-base" }
iroh-metrics = { version = "0.15.0", path = "../iroh-metrics", optional = true }
num_enum = "0.7"
iroh-net = { version = "0.15.0", path = "../iroh-net" }
postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] }
rand = "0.8.5"
rand_core = "0.6.4"
redb = { version = "2.0.0" }
serde = { version = "1.0.164", features = ["derive"] }
strum = { version = "0.25", features = ["derive"] }
bytes = { version = "1.4", features = ["serde"] }
hex = "0.4"
thiserror = "1"
tracing = "0.1"
tokio = { version = "1", features = ["sync"] }

# fs-store
redb = { version = "2.0.0" }
tempfile = { version = "3.4" }

# net
iroh-net = { version = "0.15.0", optional = true, path = "../iroh-net" }
tokio-util = { version = "0.7", optional = true, features = ["codec", "io-util", "io"] }
tokio-stream = { version = "0.1", optional = true, features = ["sync"]}
quinn = { version = "0.10", optional = true }
futures = { version = "0.3", optional = true }
self_cell = "1.0.3"
tracing = "0.1"
zerocopy = { version = "0.8.0-alpha.7", features = ["derive"] }
genawaiter = "0.99.1"
rtrb = "0.3.0"
parking_lot = "0.12.2"
once_cell = "1.19.0"
rayon = "1.10.0"
smallvec = "1.13.2"
itertools = "0.12.1"
futures-lite = "2.3.0"
futures-concurrency = "7.6.0"
futures-util = "0.3.30"

[dev-dependencies]
iroh-test = { path = "../iroh-test" }
Expand All @@ -63,10 +45,8 @@ tokio = { version = "1", features = ["sync", "macros"] }
proptest = "1.2.0"
tempfile = "3.4"
test-strategy = "0.3.1"
tracing-chrome = "0.7.2"
tracing-subscriber = "0.3.18"

[features]
default = ["net", "metrics"]
net = ["iroh-net", "tokio/io-util", "tokio-stream", "tokio-util", "quinn", "futures"]
default = ["metrics"]
metrics = ["iroh-metrics"]
98 changes: 46 additions & 52 deletions iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,56 +42,13 @@ pub const INBOX_CAP: usize = 1024;
pub type SessionId = NodeId;

#[derive(Debug, Clone)]
pub struct WillowHandle {
pub struct ActorHandle {
tx: flume::Sender<ToActor>,
join_handle: Arc<Option<JoinHandle<()>>>,
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
pub enum Interest {
Send,
Recv,
}

#[derive(Debug, Clone)]
pub struct AssignedWaker {
waker: Notifier,
coro_id: CoroId,
}

impl AssignedWaker {
pub fn wake(&self) {
self.waker.wake(self.coro_id)
}
}

impl Wake for AssignedWaker {
fn wake(self: Arc<Self>) {
self.waker.wake(self.coro_id)
}
}

#[derive(Debug, Clone)]
pub struct Notifier {
tx: flume::Sender<CoroId>,
}

impl Notifier {
pub fn wake(&self, coro_id: CoroId) {
self.tx.send(coro_id).ok();
}

pub fn create_waker(&self, coro_id: CoroId) -> std::task::Waker {
Arc::new(AssignedWaker {
waker: self.clone(),
coro_id,
})
.into()
}
}

impl WillowHandle {
pub fn spawn<S: Store>(store: S, me: NodeId) -> WillowHandle {
impl ActorHandle {
pub fn spawn<S: Store>(store: S, me: NodeId) -> ActorHandle {
let (tx, rx) = flume::bounded(INBOX_CAP);
// This channel only tracks wake to resume messages to coroutines, which are a sinlge u64
// per wakeup. We want to issue wake calls synchronosuly without blocking, so we use an
Expand Down Expand Up @@ -121,7 +78,7 @@ impl WillowHandle {
})
.expect("failed to spawn thread");
let join_handle = Arc::new(Some(join_handle));
WillowHandle { tx, join_handle }
ActorHandle { tx, join_handle }
}
pub async fn send(&self, action: ToActor) -> anyhow::Result<()> {
self.tx.send_async(action).await?;
Expand Down Expand Up @@ -186,7 +143,7 @@ impl WillowHandle {
}
}

impl Drop for WillowHandle {
impl Drop for ActorHandle {
fn drop(&mut self) {
// this means we're dropping the last reference
if let Some(handle) = Arc::get_mut(&mut self.join_handle) {
Expand Down Expand Up @@ -410,7 +367,7 @@ impl<S: Store> StorageThread<S> {
fn start_coroutine(
&mut self,
session_id: SessionId,
create_fn: impl FnOnce(WakeableCo, &mut Session) -> CoroFut,
create_fn: impl FnOnce(WakeableCoro, &mut Session) -> CoroFut,
span_fn: impl FnOnce() -> Span,
) -> Result<(), Error> {
let session = self
Expand All @@ -432,7 +389,7 @@ impl<S: Store> StorageThread<S> {
drop(_guard);

let gen = Gen::new(move |co| {
let co = WakeableCo::new(co, waker);
let co = WakeableCoro::new(co, waker);
create_fn(co, session)
});
let state = CoroutineState {
Expand Down Expand Up @@ -491,13 +448,13 @@ pub enum Yield {
}

#[derive(derive_more::Debug)]
pub struct WakeableCo {
pub struct WakeableCoro {
pub waker: Waker,
#[debug(skip)]
pub co: Co<Yield, ()>,
}

impl WakeableCo {
impl WakeableCoro {
pub fn new(co: Co<Yield, ()>, waker: Waker) -> Self {
Self { co, waker }
}
Expand All @@ -524,3 +481,40 @@ impl WakeableCo {
Pin::new(&mut fut).poll(&mut ctx)
}
}

#[derive(Debug, Clone)]
pub struct CoroWaker {
waker: Notifier,
coro_id: CoroId,
}

impl CoroWaker {
pub fn wake(&self) {
self.waker.wake(self.coro_id)
}
}

impl Wake for CoroWaker {
fn wake(self: Arc<Self>) {
self.waker.wake(self.coro_id)
}
}

#[derive(Debug, Clone)]
pub struct Notifier {
tx: flume::Sender<CoroId>,
}

impl Notifier {
pub fn wake(&self, coro_id: CoroId) {
self.tx.send(coro_id).ok();
}

pub fn create_waker(&self, coro_id: CoroId) -> std::task::Waker {
Arc::new(CoroWaker {
waker: self.clone(),
coro_id,
})
.into()
}
}
Loading

0 comments on commit 9f0533d

Please sign in to comment.