Skip to content

Commit

Permalink
better structure for coroutines
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 9, 2024
1 parent d13b26c commit 645e134
Show file tree
Hide file tree
Showing 5 changed files with 208 additions and 243 deletions.
12 changes: 6 additions & 6 deletions iroh-willow/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,19 @@ pub async fn run(
};
let state = SessionState::new(our_role, our_nonce, received_commitment, max_payload_size);

let (reply, reply_rx) = oneshot::channel();
let (on_done, on_done_rx) = oneshot::channel();
store
.send(ToActor::InitSession {
peer,
state,
channels,
init,
reply,
on_done,
})
.await?;

join_set.spawn(async move {
reply_rx.await??;
on_done_rx.await??;
Ok(())
});

Expand Down Expand Up @@ -151,7 +151,7 @@ async fn recv_loop<T: Encoder>(
channel_tx: Sender<T>,
) -> anyhow::Result<()> {
while let Some(buf) = recv_stream.read_chunk(CHANNEL_CAP, true).await? {
channel_tx.write_slice_async(&buf.bytes[..]).await?;
channel_tx.write_all_async(&buf.bytes[..]).await?;
trace!(len = buf.bytes.len(), "recv");
}
recv_stream.stop(ERROR_CODE_CLOSE_GRACEFUL.into()).ok();
Expand Down Expand Up @@ -212,7 +212,7 @@ mod tests {
use crate::{
net::run,
proto::{
grouping::AreaOfInterest,
grouping::{AreaOfInterest, ThreeDRange},
keys::{NamespaceId, NamespaceKind, NamespaceSecretKey, UserPublicKey, UserSecretKey},
meadowcap::{AccessMode, McCapability, OwnedCapability},
wgps::ReadCapability,
Expand Down Expand Up @@ -360,6 +360,7 @@ mod tests {
.send(ToActor::GetEntries {
namespace,
reply: tx,
range: ThreeDRange::full()
})
.await?;
let entries: HashSet<_> = rx.into_stream().collect::<HashSet<_>>().await;
Expand Down Expand Up @@ -390,7 +391,6 @@ mod tests {
};
track_entries.extend([entry.clone()]);
let entry = entry.attach_authorisation(write_cap.clone(), &user_secret)?;
info!("INGEST {entry:?}");
store.ingest_entry(entry).await?;
}
let init = SessionInit::with_interest(user_secret, read_cap, AreaOfInterest::full());
Expand Down
53 changes: 25 additions & 28 deletions iroh-willow/src/session/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::{

use anyhow::anyhow;
use genawaiter::sync::Co;
use iroh_net::NodeId;

use tracing::{debug, trace};

Expand Down Expand Up @@ -39,12 +38,10 @@ pub enum Readyness {

#[derive(derive_more::Debug)]
pub struct Coroutine<S: ReadonlyStore, W: Store> {
pub peer: NodeId,
pub store_snapshot: Rc<S>,
pub store_writer: Rc<RefCell<W>>,
pub channels: Channels,
pub state: SharedSessionState,
// pub waker: CoroutineWaker,
#[debug(skip)]
pub co: Co<Yield, ()>,
}
Expand Down Expand Up @@ -87,16 +84,29 @@ impl Channels {
impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
pub async fn run_reconciliation(
mut self,
start: Option<(AreaOfInterestHandle, AreaOfInterestHandle)>,
start_with_aoi: Option<(AreaOfInterestHandle, AreaOfInterestHandle)>,
) -> Result<(), Error> {
debug!(init = start.is_some(), "start reconciliation");
if let Some((our_handle, their_handle)) = start {
self.init_reconciliation(our_handle, their_handle).await?;
debug!(start = start_with_aoi.is_some(), "start reconciliation");

// optionally initiate reconciliation with a first fingerprint. only alfie may do this.
if let Some((our_handle, their_handle)) = start_with_aoi {
self.start_reconciliation(our_handle, their_handle).await?;
}

while let Some(message) = self.recv(LogicalChannel::Reconciliation).await {
let message = message?;
self.on_reconciliation_message(message).await?;
trace!(%message, "recv");
match message {
Message::ReconciliationSendFingerprint(message) => {
self.on_send_fingerprint(message).await?
}
Message::ReconciliationAnnounceEntries(message) => {
self.on_announce_entries(message).await?
}
Message::ReconciliationSendEntry(message) => self.on_send_entry(message).await?,
_ => return Err(Error::UnsupportedMessage),
};

if self.state_mut().reconciliation_is_complete() {
self.channels.close_send();
}
Expand Down Expand Up @@ -141,21 +151,6 @@ impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
Ok(())
}

async fn on_reconciliation_message(&mut self, message: Message) -> Result<(), Error> {
trace!(%message, "recv");
match message {
Message::ReconciliationSendFingerprint(message) => {
self.on_send_fingerprint(message).await?
}
Message::ReconciliationAnnounceEntries(message) => {
self.on_announce_entries(message).await?
}
Message::ReconciliationSendEntry(message) => self.on_send_entry(message).await?,
_ => return Err(Error::UnsupportedMessage),
};
Ok(())
}

async fn setup(&mut self, init: SessionInit) -> Result<(), Error> {
debug!(?init, "init");
for (capability, aois) in init.interests.into_iter() {
Expand Down Expand Up @@ -194,7 +189,7 @@ impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
Ok(())
}

async fn init_reconciliation(
async fn start_reconciliation(
&mut self,
our_handle: AreaOfInterestHandle,
their_handle: AreaOfInterestHandle,
Expand Down Expand Up @@ -339,9 +334,11 @@ impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
static_token,
message.dynamic_token,
)?;

self.store_writer
.borrow_mut()
.ingest_entry(&authorised_entry)?;

Ok(())
}

Expand Down Expand Up @@ -490,14 +487,14 @@ impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
async fn recv(&self, channel: LogicalChannel) -> Option<anyhow::Result<Message>> {
let receiver = self.channels.receiver(channel);
loop {
match receiver.read_message() {
match receiver.recv_message() {
Err(err) => return Some(Err(err)),
Ok(outcome) => match outcome {
ReadOutcome::Closed => {
debug!("recv: closed");
return None;
}
ReadOutcome::ReadBufferEmpty => {
ReadOutcome::BufferEmpty => {
self.co
.yield_(Yield::Pending(Readyness::Channel(channel, Interest::Recv)))
.await;
Expand Down Expand Up @@ -526,13 +523,13 @@ impl<S: ReadonlyStore, W: Store> Coroutine<S, W> {
let sender = self.channels.sender(channel);

loop {
match sender.send(&message)? {
match sender.send_message(&message)? {
WriteOutcome::Closed => {
debug!("send: closed");
return Err(anyhow!("channel closed"));
}
WriteOutcome::Ok => {
debug!(msg=%message, ch=%channel.fmt_short(), "sent");
debug!(ch=%channel.fmt_short(), msg=%message, "sent");
break Ok(());
}
WriteOutcome::BufferFull => {
Expand Down
26 changes: 12 additions & 14 deletions iroh-willow/src/session/resource.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::{HashMap, VecDeque};

use crate::{
proto::wgps::{
AreaOfInterestHandle, CapabilityHandle, IsHandle, ReadCapability, ResourceHandle,
SetupBindAreaOfInterest, StaticToken, StaticTokenHandle,
},
store::actor::AssignedWaker,
use std::{
collections::{HashMap, VecDeque},
task::Waker,
};

use crate::proto::wgps::{
AreaOfInterestHandle, CapabilityHandle, IsHandle, ReadCapability, ResourceHandle,
SetupBindAreaOfInterest, StaticToken, StaticTokenHandle,
};

use super::Error;
Expand All @@ -17,7 +17,7 @@ pub struct ScopedResources {
pub static_tokens: ResourceMap<StaticTokenHandle, StaticToken>,
}
impl ScopedResources {
pub fn register_waker(&mut self, handle: ResourceHandle, waker: AssignedWaker) {
pub fn register_waker(&mut self, handle: ResourceHandle, waker: Waker) {
tracing::trace!(?handle, "register_notify");
match handle {
ResourceHandle::AreaOfInterest(h) => self.areas_of_interest.register_waker(h, waker),
Expand All @@ -41,7 +41,7 @@ impl ScopedResources {
pub struct ResourceMap<H, R> {
next_handle: u64,
map: HashMap<H, Resource<R>>,
wakers: HashMap<H, VecDeque<AssignedWaker>>,
wakers: HashMap<H, VecDeque<Waker>>,
}

impl<H, R> Default for ResourceMap<H, R> {
Expand Down Expand Up @@ -72,15 +72,13 @@ where
if let Some(mut wakers) = self.wakers.remove(&handle) {
tracing::trace!(?handle, "notify {}", wakers.len());
for waker in wakers.drain(..) {
if let Err(err) = waker.wake() {
tracing::warn!(?err, "notify failed for {handle:?}");
}
waker.wake();
}
}
handle
}

pub fn register_waker(&mut self, handle: H, notifier: AssignedWaker) {
pub fn register_waker(&mut self, handle: H, notifier: Waker) {
self.wakers.entry(handle).or_default().push_back(notifier)
}

Expand Down
Loading

0 comments on commit 645e134

Please sign in to comment.