From 645e134c4ad7bd29894ac78e824d7422e1946180 Mon Sep 17 00:00:00 2001 From: "Franz Heinzmann (Frando)" Date: Thu, 9 May 2024 22:03:27 +0200 Subject: [PATCH] better structure for coroutines --- iroh-willow/src/net.rs | 12 +- iroh-willow/src/session/coroutine.rs | 53 +++--- iroh-willow/src/session/resource.rs | 26 ++- iroh-willow/src/store/actor.rs | 268 ++++++++++++++------------- iroh-willow/src/util/channel.rs | 92 +++------ 5 files changed, 208 insertions(+), 243 deletions(-) diff --git a/iroh-willow/src/net.rs b/iroh-willow/src/net.rs index 8751e477cf5..273ab7e38ff 100644 --- a/iroh-willow/src/net.rs +++ b/iroh-willow/src/net.rs @@ -84,19 +84,19 @@ pub async fn run( }; let state = SessionState::new(our_role, our_nonce, received_commitment, max_payload_size); - let (reply, reply_rx) = oneshot::channel(); + let (on_done, on_done_rx) = oneshot::channel(); store .send(ToActor::InitSession { peer, state, channels, init, - reply, + on_done, }) .await?; join_set.spawn(async move { - reply_rx.await??; + on_done_rx.await??; Ok(()) }); @@ -151,7 +151,7 @@ async fn recv_loop( channel_tx: Sender, ) -> anyhow::Result<()> { while let Some(buf) = recv_stream.read_chunk(CHANNEL_CAP, true).await? { - channel_tx.write_slice_async(&buf.bytes[..]).await?; + channel_tx.write_all_async(&buf.bytes[..]).await?; trace!(len = buf.bytes.len(), "recv"); } recv_stream.stop(ERROR_CODE_CLOSE_GRACEFUL.into()).ok(); @@ -212,7 +212,7 @@ mod tests { use crate::{ net::run, proto::{ - grouping::AreaOfInterest, + grouping::{AreaOfInterest, ThreeDRange}, keys::{NamespaceId, NamespaceKind, NamespaceSecretKey, UserPublicKey, UserSecretKey}, meadowcap::{AccessMode, McCapability, OwnedCapability}, wgps::ReadCapability, @@ -360,6 +360,7 @@ mod tests { .send(ToActor::GetEntries { namespace, reply: tx, + range: ThreeDRange::full() }) .await?; let entries: HashSet<_> = rx.into_stream().collect::>().await; @@ -390,7 +391,6 @@ mod tests { }; track_entries.extend([entry.clone()]); let entry = entry.attach_authorisation(write_cap.clone(), &user_secret)?; - info!("INGEST {entry:?}"); store.ingest_entry(entry).await?; } let init = SessionInit::with_interest(user_secret, read_cap, AreaOfInterest::full()); diff --git a/iroh-willow/src/session/coroutine.rs b/iroh-willow/src/session/coroutine.rs index 6afbd53b745..3b2b0a1ceb2 100644 --- a/iroh-willow/src/session/coroutine.rs +++ b/iroh-willow/src/session/coroutine.rs @@ -5,7 +5,6 @@ use std::{ use anyhow::anyhow; use genawaiter::sync::Co; -use iroh_net::NodeId; use tracing::{debug, trace}; @@ -39,12 +38,10 @@ pub enum Readyness { #[derive(derive_more::Debug)] pub struct Coroutine { - pub peer: NodeId, pub store_snapshot: Rc, pub store_writer: Rc>, pub channels: Channels, pub state: SharedSessionState, - // pub waker: CoroutineWaker, #[debug(skip)] pub co: Co, } @@ -87,16 +84,29 @@ impl Channels { impl Coroutine { pub async fn run_reconciliation( mut self, - start: Option<(AreaOfInterestHandle, AreaOfInterestHandle)>, + start_with_aoi: Option<(AreaOfInterestHandle, AreaOfInterestHandle)>, ) -> Result<(), Error> { - debug!(init = start.is_some(), "start reconciliation"); - if let Some((our_handle, their_handle)) = start { - self.init_reconciliation(our_handle, their_handle).await?; + debug!(start = start_with_aoi.is_some(), "start reconciliation"); + + // optionally initiate reconciliation with a first fingerprint. only alfie may do this. + if let Some((our_handle, their_handle)) = start_with_aoi { + self.start_reconciliation(our_handle, their_handle).await?; } while let Some(message) = self.recv(LogicalChannel::Reconciliation).await { let message = message?; - self.on_reconciliation_message(message).await?; + trace!(%message, "recv"); + match message { + Message::ReconciliationSendFingerprint(message) => { + self.on_send_fingerprint(message).await? + } + Message::ReconciliationAnnounceEntries(message) => { + self.on_announce_entries(message).await? + } + Message::ReconciliationSendEntry(message) => self.on_send_entry(message).await?, + _ => return Err(Error::UnsupportedMessage), + }; + if self.state_mut().reconciliation_is_complete() { self.channels.close_send(); } @@ -141,21 +151,6 @@ impl Coroutine { Ok(()) } - async fn on_reconciliation_message(&mut self, message: Message) -> Result<(), Error> { - trace!(%message, "recv"); - match message { - Message::ReconciliationSendFingerprint(message) => { - self.on_send_fingerprint(message).await? - } - Message::ReconciliationAnnounceEntries(message) => { - self.on_announce_entries(message).await? - } - Message::ReconciliationSendEntry(message) => self.on_send_entry(message).await?, - _ => return Err(Error::UnsupportedMessage), - }; - Ok(()) - } - async fn setup(&mut self, init: SessionInit) -> Result<(), Error> { debug!(?init, "init"); for (capability, aois) in init.interests.into_iter() { @@ -194,7 +189,7 @@ impl Coroutine { Ok(()) } - async fn init_reconciliation( + async fn start_reconciliation( &mut self, our_handle: AreaOfInterestHandle, their_handle: AreaOfInterestHandle, @@ -339,9 +334,11 @@ impl Coroutine { static_token, message.dynamic_token, )?; + self.store_writer .borrow_mut() .ingest_entry(&authorised_entry)?; + Ok(()) } @@ -490,14 +487,14 @@ impl Coroutine { async fn recv(&self, channel: LogicalChannel) -> Option> { let receiver = self.channels.receiver(channel); loop { - match receiver.read_message() { + match receiver.recv_message() { Err(err) => return Some(Err(err)), Ok(outcome) => match outcome { ReadOutcome::Closed => { debug!("recv: closed"); return None; } - ReadOutcome::ReadBufferEmpty => { + ReadOutcome::BufferEmpty => { self.co .yield_(Yield::Pending(Readyness::Channel(channel, Interest::Recv))) .await; @@ -526,13 +523,13 @@ impl Coroutine { let sender = self.channels.sender(channel); loop { - match sender.send(&message)? { + match sender.send_message(&message)? { WriteOutcome::Closed => { debug!("send: closed"); return Err(anyhow!("channel closed")); } WriteOutcome::Ok => { - debug!(msg=%message, ch=%channel.fmt_short(), "sent"); + debug!(ch=%channel.fmt_short(), msg=%message, "sent"); break Ok(()); } WriteOutcome::BufferFull => { diff --git a/iroh-willow/src/session/resource.rs b/iroh-willow/src/session/resource.rs index 06db355f1ef..7f424d87890 100644 --- a/iroh-willow/src/session/resource.rs +++ b/iroh-willow/src/session/resource.rs @@ -1,11 +1,11 @@ -use std::collections::{HashMap, VecDeque}; - -use crate::{ - proto::wgps::{ - AreaOfInterestHandle, CapabilityHandle, IsHandle, ReadCapability, ResourceHandle, - SetupBindAreaOfInterest, StaticToken, StaticTokenHandle, - }, - store::actor::AssignedWaker, +use std::{ + collections::{HashMap, VecDeque}, + task::Waker, +}; + +use crate::proto::wgps::{ + AreaOfInterestHandle, CapabilityHandle, IsHandle, ReadCapability, ResourceHandle, + SetupBindAreaOfInterest, StaticToken, StaticTokenHandle, }; use super::Error; @@ -17,7 +17,7 @@ pub struct ScopedResources { pub static_tokens: ResourceMap, } impl ScopedResources { - pub fn register_waker(&mut self, handle: ResourceHandle, waker: AssignedWaker) { + pub fn register_waker(&mut self, handle: ResourceHandle, waker: Waker) { tracing::trace!(?handle, "register_notify"); match handle { ResourceHandle::AreaOfInterest(h) => self.areas_of_interest.register_waker(h, waker), @@ -41,7 +41,7 @@ impl ScopedResources { pub struct ResourceMap { next_handle: u64, map: HashMap>, - wakers: HashMap>, + wakers: HashMap>, } impl Default for ResourceMap { @@ -72,15 +72,13 @@ where if let Some(mut wakers) = self.wakers.remove(&handle) { tracing::trace!(?handle, "notify {}", wakers.len()); for waker in wakers.drain(..) { - if let Err(err) = waker.wake() { - tracing::warn!(?err, "notify failed for {handle:?}"); - } + waker.wake(); } } handle } - pub fn register_waker(&mut self, handle: H, notifier: AssignedWaker) { + pub fn register_waker(&mut self, handle: H, notifier: Waker) { self.wakers.entry(handle).or_default().push_back(notifier) } diff --git a/iroh-willow/src/store/actor.rs b/iroh-willow/src/store/actor.rs index 70797c40d63..b420cb07168 100644 --- a/iroh-willow/src/store/actor.rs +++ b/iroh-willow/src/store/actor.rs @@ -1,16 +1,16 @@ use std::{ cell::RefCell, - collections::{HashMap, VecDeque}, + collections::{HashMap, HashSet}, rc::Rc, sync::Arc, + task::Wake, thread::JoinHandle, }; use futures::{future::LocalBoxFuture, FutureExt}; use genawaiter::{sync::Gen, GeneratorState}; use tokio::sync::oneshot; -use tracing::{debug, error, error_span, instrument, trace, warn, Span}; -// use iroh_net::NodeId; +use tracing::{debug, error, error_span, trace, warn, Span}; use super::Store; use crate::{ @@ -28,6 +28,8 @@ use iroh_base::key::NodeId; pub const CHANNEL_CAP: usize = 1024; +pub type SessionId = NodeId; + #[derive(Debug, Clone)] pub struct StoreHandle { tx: flume::Sender, @@ -64,44 +66,51 @@ pub enum Interest { #[derive(Debug, Clone)] pub struct AssignedWaker { - waker: CoroutineWaker, - peer: NodeId, - notify: Readyness, + waker: Notifier, + coro_id: CoroId, } impl AssignedWaker { - pub fn wake(&self) -> anyhow::Result<()> { - self.waker.wake(self.peer, self.notify) + pub fn wake(&self) { + self.waker.wake(self.coro_id) + } +} + +impl Wake for AssignedWaker { + fn wake(self: Arc) { + self.waker.wake(self.coro_id) } } #[derive(Debug, Clone)] -pub struct CoroutineWaker { - tx: flume::Sender, +pub struct Notifier { + tx: flume::Sender, } -impl CoroutineWaker { - pub fn wake(&self, peer: NodeId, notify: Readyness) -> anyhow::Result<()> { - let msg = ToActor::Resume { peer, notify }; - // TODO: deadlock - self.tx.send(msg)?; - Ok(()) +impl Notifier { + pub fn wake(&self, coro_id: CoroId) { + self.tx.send(coro_id).ok(); } - pub fn with_notify(&self, peer: NodeId, notify: Readyness) -> AssignedWaker { - AssignedWaker { + pub fn create_waker(&self, coro_id: CoroId) -> std::task::Waker { + Arc::new(AssignedWaker { waker: self.clone(), - peer, - notify, - } + coro_id, + }) + .into() } } impl StoreHandle { pub fn spawn(store: S, me: NodeId) -> StoreHandle { let (tx, rx) = flume::bounded(CHANNEL_CAP); + // This channel only tracks wake to resume messages to coroutines, which are a sinlge u64 + // per wakeup. We want to issue wake calls synchronosuly without blocking, so we use an + // unbounded channel here. The actual capacity is bounded by the number of sessions times + // the number of coroutines per session (which is fixed, currently at 2). + let (notify_tx, notify_rx) = flume::unbounded(); // let actor_tx = tx.clone(); - let waker = CoroutineWaker { tx: tx.clone() }; + let waker = Notifier { tx: notify_tx }; let join_handle = std::thread::Builder::new() .name("sync-actor".to_string()) .spawn(move || { @@ -111,8 +120,12 @@ impl StoreHandle { let mut actor = StorageThread { store: Rc::new(RefCell::new(store)), sessions: Default::default(), - actor_rx: rx, - waker, + coroutines: Default::default(), + + next_coro_id: Default::default(), + inbox_rx: rx, + notify_rx, + notifier: waker, }; if let Err(error) = actor.run() { error!(?error, "storage thread failed"); @@ -130,11 +143,6 @@ impl StoreHandle { self.tx.send(action)?; Ok(()) } - pub fn waker(&self) -> CoroutineWaker { - CoroutineWaker { - tx: self.tx.clone(), - } - } pub async fn ingest_entry(&self, entry: AuthorisedEntry) -> anyhow::Result<()> { let (reply, reply_rx) = oneshot::channel(); self.send(ToActor::IngestEntry { entry, reply }).await?; @@ -169,17 +177,18 @@ pub enum ToActor { #[debug(skip)] channels: Channels, init: SessionInit, - reply: oneshot::Sender>, + on_done: oneshot::Sender>, }, // DropSession { // peer: NodeId, // }, - Resume { - peer: NodeId, - notify: Readyness, - }, + // Resume { + // session_id: SessionId, + // coro_id: CoroId, + // }, GetEntries { namespace: NamespaceId, + range: ThreeDRange, #[debug(skip)] reply: flume::Sender, }, @@ -197,69 +206,78 @@ pub enum ToActor { struct Session { state: SharedSessionState, channels: Channels, - pending: PendingCoroutines, + coroutines: HashSet, on_done: oneshot::Sender>, } -#[derive(derive_more::Debug, Default)] -struct PendingCoroutines { - #[debug(skip)] - inner: HashMap>, -} - -impl PendingCoroutines { - fn get_mut(&mut self, pending_on: Readyness) -> &mut VecDeque { - self.inner.entry(pending_on).or_default() - } - fn push_back(&mut self, pending_on: Readyness, generator: CoroutineState) { - self.get_mut(pending_on).push_back(generator); - } - fn pop_front(&mut self, pending_on: Readyness) -> Option { - self.get_mut(pending_on).pop_front() - } - // fn push_front(&mut self, pending_on: Readyness, generator: ReconcileGen) { - // self.get_mut(pending_on).push_front(generator); - // } - // fn len(&self, pending_on: &Readyness) -> usize { - // self.inner.get(pending_on).map(|v| v.len()).unwrap_or(0) - // } - // - // fn is_empty(&self) -> bool { - // self.inner.values().any(|v| !v.is_empty()) - // } -} +type CoroId = u64; #[derive(Debug)] pub struct StorageThread { + inbox_rx: flume::Receiver, + notify_rx: flume::Receiver, store: Rc>, - sessions: HashMap, - actor_rx: flume::Receiver, - waker: CoroutineWaker, // actor_tx: flume::Sender, + sessions: HashMap, + coroutines: HashMap, + next_coro_id: u64, + notifier: Notifier, // actor_tx: flume::Sender, } type ReconcileFut = LocalBoxFuture<'static, Result<(), Error>>; type ReconcileGen = Gen; +#[derive(derive_more::Debug)] +struct CoroutineState { + id: CoroId, + session_id: SessionId, + #[debug("Generator")] + gen: ReconcileGen, + span: Span, + finalizes_session: bool, +} + impl StorageThread { pub fn run(&mut self) -> anyhow::Result<()> { + enum Op { + Inbox(ToActor), + Notify(CoroId), + } loop { - let message = match self.actor_rx.recv() { - Err(_) => break, - Ok(message) => message, + let op = flume::Selector::new() + .recv(&self.inbox_rx, |r| r.map(Op::Inbox)) + .recv(&self.notify_rx, |r| r.map(Op::Notify)) + .wait(); + + let Ok(op) = op else { + break; }; - match message { - ToActor::Shutdown { reply } => { + + match op { + Op::Inbox(ToActor::Shutdown { reply }) => { if let Some(reply) = reply { reply.send(()).ok(); } break; } - message => self.handle_message(message)?, + Op::Inbox(message) => self.handle_message(message)?, + Op::Notify(coro_id) => self.handle_resume(coro_id), } } Ok(()) } + fn handle_resume(&mut self, coro_id: CoroId) { + if let Some(coro) = self.coroutines.remove(&coro_id) { + let session_id = coro.session_id; + if let Err(error) = self.resume_coroutine(coro) { + warn!(?error, session=%session_id.fmt_short(), "abort session: coroutine failed"); + self.remove_session(&session_id, Err(error)); + } + } else { + debug!(%coro_id, "received wakeup for dropped coroutine"); + } + } + fn handle_message(&mut self, message: ToActor) -> Result<(), Error> { trace!(%message, "tick: handle_message"); match message { @@ -268,14 +286,14 @@ impl StorageThread { peer, state, channels, - init, - reply, + init: setup, + on_done, } => { let session = Session { state: Rc::new(RefCell::new(state)), channels, - pending: Default::default(), - on_done: reply, + coroutines: Default::default(), + on_done, }; self.sessions.insert(peer, session); @@ -283,29 +301,18 @@ impl StorageThread { if let Err(error) = self.start_coroutine( peer, - |routine| routine.run_control(init).boxed_local(), - error_span!("control", peer=%peer.fmt_short()), + |routine| routine.run_control(setup).boxed_local(), + error_span!("session", peer=%peer.fmt_short()), true, ) { warn!(?error, peer=%peer.fmt_short(), "abort session: starting failed"); self.remove_session(&peer, Err(error)); } } - ToActor::Resume { peer, notify } => { - if self.sessions.contains_key(&peer) { - if let Err(error) = self.resume_next(peer, notify) { - warn!(?error, peer=%peer.fmt_short(), "abort session: coroutine failed"); - self.remove_session(&peer, Err(error)); - } - } - } - // ToActor::DropSession { peer } => { - // self.remove_session(&peer, Ok(())); - // } - ToActor::GetEntries { namespace, reply } => { + ToActor::GetEntries { namespace, range, reply } => { let store = self.store.borrow(); let entries = store - .get_entries(namespace, &ThreeDRange::full()) + .get_entries(namespace, &range) .filter_map(|r| r.ok()); for entry in entries { reply.send(entry).ok(); @@ -324,6 +331,9 @@ impl StorageThread { if let Some(session) = session { session.channels.close_all(); session.on_done.send(result).ok(); + for coro_id in session.coroutines { + self.coroutines.remove(&coro_id); + } } else { warn!("remove_session called for unknown session"); } @@ -331,69 +341,64 @@ impl StorageThread { fn start_coroutine( &mut self, - peer: NodeId, + session_id: SessionId, producer: impl FnOnce(Coroutine) -> ReconcileFut, span: Span, finalizes_session: bool, ) -> Result<(), Error> { - let session = self.sessions.get_mut(&peer).ok_or(Error::SessionNotFound)?; + let session = self + .sessions + .get_mut(&session_id) + .ok_or(Error::SessionNotFound)?; let store_snapshot = Rc::new(self.store.borrow_mut().snapshot()?); let channels = session.channels.clone(); let state = session.state.clone(); let store_writer = Rc::clone(&self.store); - // let waker = self.waker.clone(); let gen = Gen::new(move |co| { let routine = Coroutine { - peer, store_snapshot, store_writer, - // waker, channels, state, co, }; (producer)(routine) }); + let id = { + let next_id = self.next_coro_id; + self.next_coro_id += 1; + next_id + }; + session.coroutines.insert(id); let state = CoroutineState { + id, + session_id, gen, span, finalizes_session, }; - self.resume_coroutine(peer, state) - } - - #[instrument(skip_all, fields(session=%peer.fmt_short()))] - fn resume_next(&mut self, peer: NodeId, notify: Readyness) -> Result<(), Error> { - let session = self.sessions.get_mut(&peer).ok_or(Error::SessionNotFound)?; - let generator = session.pending.pop_front(notify); - match generator { - Some(generator) => self.resume_coroutine(peer, generator), - None => { - debug!("nothing to resume"); - Ok(()) - } - } + self.resume_coroutine(state) } - fn resume_coroutine(&mut self, peer: NodeId, mut state: CoroutineState) -> Result<(), Error> { - let _guard = state.span.enter(); - trace!(peer=%peer.fmt_short(), "resume"); + fn resume_coroutine(&mut self, mut coro: CoroutineState) -> Result<(), Error> { + let _guard = coro.span.enter(); + trace!("resume"); loop { - match state.gen.resume() { + match coro.gen.resume() { GeneratorState::Yielded(yielded) => { trace!(?yielded, "yield"); match yielded { - Yield::Pending(resume_on) => { - let session = - self.sessions.get_mut(&peer).ok_or(Error::SessionNotFound)?; + Yield::Pending(waiting_for) => { + let session = self + .sessions + .get_mut(&coro.session_id) + .ok_or(Error::SessionNotFound)?; drop(_guard); - match resume_on { + match waiting_for { Readyness::Channel(ch, interest) => { - let waker = self - .waker - .with_notify(peer, Readyness::Channel(ch, interest)); + let waker = self.notifier.create_waker(coro.id); match interest { Interest::Send => { session.channels.sender(ch).register_waker(waker) @@ -404,20 +409,19 @@ impl StorageThread { }; } Readyness::Resource(handle) => { - let waker = - self.waker.with_notify(peer, Readyness::Resource(handle)); + let waker = self.notifier.create_waker(coro.id); let mut state = session.state.borrow_mut(); state.their_resources.register_waker(handle, waker); } } - session.pending.push_back(resume_on, state); + self.coroutines.insert(coro.id, coro); break Ok(()); } Yield::StartReconciliation(start) => { debug!("start coroutine reconciliation"); self.start_coroutine( - peer, - |routine| routine.run_reconciliation(start).boxed_local(), + coro.session_id, + |state| state.run_reconciliation(start).boxed_local(), error_span!("reconcile"), false, )?; @@ -426,18 +430,18 @@ impl StorageThread { } GeneratorState::Complete(res) => { debug!(?res, "complete"); - if res.is_err() || state.finalizes_session { - self.remove_session(&peer, res) + if res.is_err() || coro.finalizes_session { + self.remove_session(&coro.session_id, res) } break Ok(()); } } } } -} -struct CoroutineState { - gen: ReconcileGen, - span: Span, - finalizes_session: bool, + // fn next_coro_id(&mut self) -> u64 { + // let next_id = self.next_coro_id; + // self.next_coro_id += 1; + // next_id + // } } diff --git a/iroh-willow/src/util/channel.rs b/iroh-willow/src/util/channel.rs index 6a25a4eb6db..2d9f4d146a8 100644 --- a/iroh-willow/src/util/channel.rs +++ b/iroh-willow/src/util/channel.rs @@ -2,6 +2,7 @@ use std::{ io, marker::PhantomData, sync::{Arc, Mutex}, + task::Waker, }; use anyhow::anyhow; @@ -9,8 +10,6 @@ use bytes::{Buf, Bytes, BytesMut}; use tokio::sync::Notify; use tracing::trace; -use crate::store::actor::AssignedWaker; - use super::{DecodeOutcome, Decoder, Encoder}; pub fn channel(cap: usize) -> (Sender, Receiver) { @@ -29,7 +28,7 @@ pub fn channel(cap: usize) -> (Sender, Receiver) { #[derive(Debug)] pub enum ReadOutcome { - ReadBufferEmpty, + BufferEmpty, Closed, Item(T), } @@ -47,8 +46,8 @@ struct Shared { max_buffer_size: usize, notify_readable: Arc, notify_writable: Arc, - wakers_on_writable: Vec, - wakers_on_readable: Vec, + wakers_on_writable: Vec, + wakers_on_readable: Vec, closed: bool, } @@ -78,7 +77,7 @@ impl Shared { &self.buf[..] } - fn read_is_empty(&self) -> bool { + fn recv_buf_is_empty(&self) -> bool { self.buf.is_empty() } @@ -89,7 +88,7 @@ impl Shared { } } - fn read_bytes(&mut self) -> Bytes { + fn recv_bytes(&mut self) -> Bytes { let len = self.buf.len(); if len > 0 { self.notify_writable(); @@ -97,7 +96,7 @@ impl Shared { self.buf.split_to(len).freeze() } - fn write_slice(&mut self, len: usize) -> Option<&mut [u8]> { + fn writable_mut(&mut self, len: usize) -> Option<&mut [u8]> { if self.remaining_write_capacity() < len { None } else { @@ -109,12 +108,12 @@ impl Shared { } } - fn write_message(&mut self, item: &T) -> anyhow::Result { + fn send_message(&mut self, item: &T) -> anyhow::Result { let len = item.encoded_len(); if self.closed() { return Ok(WriteOutcome::Closed); } - if let Some(slice) = self.write_slice(len) { + if let Some(slice) = self.writable_mut(len) { let mut cursor = io::Cursor::new(slice); item.encode_into(&mut cursor)?; self.notify_readable(); @@ -124,7 +123,7 @@ impl Shared { } } - fn read_message(&mut self) -> anyhow::Result> { + fn recv_message(&mut self) -> anyhow::Result> { let data = self.peek_read(); trace!("read, remaining {}", data.len()); let res = match T::decode_from(data)? { @@ -132,7 +131,7 @@ impl Shared { if self.closed() { ReadOutcome::Closed } else { - ReadOutcome::ReadBufferEmpty + ReadOutcome::BufferEmpty } } DecodeOutcome::Decoded { item, consumed } => { @@ -150,13 +149,13 @@ impl Shared { fn notify_readable(&mut self) { self.notify_readable.notify_waiters(); for waker in self.wakers_on_readable.drain(..) { - waker.wake().ok(); + waker.wake(); } } fn notify_writable(&mut self) { self.notify_writable.notify_waiters(); for waker in self.wakers_on_writable.drain(..) { - waker.wake().ok(); + waker.wake(); } } } @@ -181,16 +180,16 @@ impl Receiver { self.shared.lock().unwrap().close() } - pub fn read_bytes(&self) -> Bytes { - self.shared.lock().unwrap().read_bytes() + pub fn register_waker(&self, waker: Waker) { + self.shared.lock().unwrap().wakers_on_readable.push(waker); } pub async fn read_bytes_async(&self) -> Option { loop { let notify = { let mut shared = self.shared.lock().unwrap(); - if !shared.read_is_empty() { - return Some(shared.read_bytes()); + if !shared.recv_buf_is_empty() { + return Some(shared.recv_bytes()); } if shared.closed() { return None; @@ -201,24 +200,20 @@ impl Receiver { } } - pub fn read_message(&self) -> anyhow::Result> { + pub fn recv_message(&self) -> anyhow::Result> { let mut shared = self.shared.lock().unwrap(); - let outcome = shared.read_message()?; + let outcome = shared.recv_message()?; Ok(outcome) } - pub fn register_waker(&self, waker: AssignedWaker) { - self.shared.lock().unwrap().wakers_on_readable.push(waker); - } - - pub async fn recv_async(&self) -> Option> { + pub async fn recv_message_async(&self) -> Option> { loop { let notify = { let mut shared = self.shared.lock().unwrap(); - match shared.read_message() { + match shared.recv_message() { Err(err) => return Some(Err(err)), Ok(outcome) => match outcome { - ReadOutcome::ReadBufferEmpty => shared.notify_readable.clone(), + ReadOutcome::BufferEmpty => shared.notify_readable.clone(), ReadOutcome::Closed => return None, ReadOutcome::Item(item) => { return Some(Ok(item)); @@ -251,29 +246,11 @@ impl Sender { self.shared.lock().unwrap().close() } - pub fn register_waker(&self, waker: AssignedWaker) { + pub fn register_waker(&self, waker: Waker) { self.shared.lock().unwrap().wakers_on_writable.push(waker); } - pub async fn notify_closed(&self) { - tracing::info!("notify_close IN"); - loop { - let notify = { - let shared = self.shared.lock().unwrap(); - if shared.closed() { - tracing::info!("notify_close closed!"); - return; - } else { - tracing::info!("notify_close not closed - wait"); - - } - shared.notify_writable.clone() - }; - notify.notified().await; - } - } - - pub async fn write_slice_async(&self, data: &[u8]) -> anyhow::Result<()> { + pub async fn write_all_async(&self, data: &[u8]) -> anyhow::Result<()> { loop { let notify = { let mut shared = self.shared.lock().unwrap(); @@ -284,7 +261,7 @@ impl Sender { let notify = shared.notify_writable.clone(); notify.clone() } else { - let out = shared.write_slice(data.len()).expect("just checked"); + let out = shared.writable_mut(data.len()).expect("just checked"); out.copy_from_slice(data); shared.notify_readable(); break Ok(()); @@ -294,15 +271,15 @@ impl Sender { } } - pub fn send(&self, message: &T) -> anyhow::Result { - self.shared.lock().unwrap().write_message(message) + pub fn send_message(&self, message: &T) -> anyhow::Result { + self.shared.lock().unwrap().send_message(message) } - pub async fn send_async(&self, message: &T) -> anyhow::Result<()> { + pub async fn send_message_async(&self, message: &T) -> anyhow::Result<()> { loop { let notify = { let mut shared = self.shared.lock().unwrap(); - match shared.write_message(message)? { + match shared.send_message(message)? { WriteOutcome::Ok => return Ok(()), WriteOutcome::BufferFull => shared.notify_writable.clone(), WriteOutcome::Closed => return Err(anyhow!("channel is closed")), @@ -312,14 +289,3 @@ impl Sender { } } } - -// pub async fn notify_readable(&self) { -// let shared = self.shared.lock().unwrap(); -// if !shared.peek_read().is_empty() { -// return; -// } -// let notify = shared.notify_readable.clone(); -// drop(shared); -// notify.notified().await -// } -//