diff --git a/iroh-willow/src/actor.rs b/iroh-willow/src/actor.rs index 13e13f0c19..071bb4d628 100644 --- a/iroh-willow/src/actor.rs +++ b/iroh-willow/src/actor.rs @@ -14,7 +14,7 @@ use crate::{ meadowcap, willow::{AuthorisedEntry, Entry}, }, - session::{coroutine::ControlRoutine, Channels, Error, Role, SessionInit, Session}, + session::{coroutine::ControlRoutine, Channels, Error, Role, Session, SessionInit}, store::{KeyStore, Store}, util::task_set::{TaskKey, TaskMap}, }; @@ -186,8 +186,7 @@ pub enum ToActor { #[derive(Debug)] struct ActiveSession { on_finish: oneshot::Sender>, - task_key: TaskKey - // state: SharedSessionState + task_key: TaskKey, // state: SharedSessionState } #[derive(Debug)] @@ -245,19 +244,18 @@ impl StorageThread { } => { let session_id = peer; let Channels { send, recv } = channels; - let session = Session::new( - self.store.clone(), - send, - our_role, - initial_transmission, - ); + let session = + Session::new(self.store.clone(), send, our_role, initial_transmission); let task_key = self.session_tasks.spawn_local( session_id, ControlRoutine::run(session, recv, init) .instrument(error_span!("session", peer = %peer.fmt_short())), ); - let active_session = ActiveSession { on_finish, task_key }; + let active_session = ActiveSession { + on_finish, + task_key, + }; self.sessions.insert(session_id, active_session); } ToActor::GetEntries { diff --git a/iroh-willow/src/session.rs b/iroh-willow/src/session.rs index 1c0ce37b11..5ed69deea5 100644 --- a/iroh-willow/src/session.rs +++ b/iroh-willow/src/session.rs @@ -12,7 +12,7 @@ mod util; pub use self::channels::Channels; pub use self::error::Error; -pub use self::state::{SessionState, Session}; +pub use self::state::Session; /// To break symmetry, we refer to the peer that initiated the synchronisation session as Alfie, /// and the other peer as Betty. diff --git a/iroh-willow/src/session/coroutine.rs b/iroh-willow/src/session/coroutine.rs index 7d575a718d..a6ddfa2f3a 100644 --- a/iroh-willow/src/session/coroutine.rs +++ b/iroh-willow/src/session/coroutine.rs @@ -1,4 +1,4 @@ -use std::{cell::RefMut, rc::Rc}; +use std::rc::Rc; use futures_lite::StreamExt; use strum::IntoEnumIterator; @@ -15,10 +15,7 @@ use crate::{ }, willow::AuthorisedEntry, }, - session::{ - channels::LogicalChannelReceivers, Error, Scope, SessionInit, SessionState, - Session, - }, + session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit}, store::{ReadonlyStore, SplitAction, Store, SyncConfig}, util::channel::{Receiver, WriteError}, }; @@ -33,7 +30,7 @@ const INITIAL_GUARANTEES: u64 = u64::MAX; #[derive(derive_more::Debug)] pub struct ControlRoutine { control_recv: Receiver, - state: Session, + session: Session, init: Option, } @@ -57,7 +54,7 @@ impl ControlRoutine { // Spawn a task to handle incoming static tokens. session.spawn(error_span!("stt"), move |session| async move { while let Some(message) = static_tokens_recv.try_next().await? { - session.state_mut().on_setup_bind_static_token(message); + session.on_setup_bind_static_token(message); } Ok(()) }); @@ -65,7 +62,7 @@ impl ControlRoutine { // Spawn a task to handle incoming capabilities. session.spawn(error_span!("cap"), move |session| async move { while let Some(message) = capability_recv.try_next().await? { - session.state_mut().on_setup_bind_read_capability(message)?; + session.on_setup_bind_read_capability(message)?; } Ok(()) }); @@ -73,7 +70,7 @@ impl ControlRoutine { // Spawn a task to handle incoming areas of interest. session.spawn(error_span!("aoi"), move |session| async move { while let Some(message) = aoi_recv.try_next().await? { - Self::on_bind_area_of_interest(session.clone(), message).await?; + session.on_bind_area_of_interest(message).await?; } Ok(()) }); @@ -103,7 +100,7 @@ impl ControlRoutine { // TODO: We'll want to emit the completion event back to the application and // let it decide what to do (stop, keep open) - or pass relevant config in // SessionInit. - if session.state_mut().reconciliation_is_complete() { + if session.reconciliation_is_complete() { tracing::debug!("stop session: reconciliation is complete"); drop(guard); break; @@ -113,29 +110,25 @@ impl ControlRoutine { // Close all our send streams. // // This makes the networking send loops stop. - session.send.close_all(); + session.close_senders(); Ok(()) } - pub fn new( - session: Session, - control_recv: Receiver, - init: SessionInit, - ) -> Self { + pub fn new(session: Session, control_recv: Receiver, init: SessionInit) -> Self { Self { control_recv, - state: session, + session, init: Some(init), } } async fn run_inner(mut self) -> Result<(), Error> { - debug!(role = ?self.state().our_role, "start session"); + debug!(role = ?self.session.our_role(), "start session"); // Reveal our nonce. - let reveal_message = self.state().commitment_reveal()?; - self.state.send(reveal_message).await?; + let reveal_message = self.session.reveal_commitment()?; + self.session.send(reveal_message).await?; // Issue guarantees for all logical channels. for channel in LogicalChannel::iter() { @@ -143,7 +136,7 @@ impl ControlRoutine { amount: INITIAL_GUARANTEES, channel, }; - self.state.send(msg).await?; + self.session.send(msg).await?; } while let Some(message) = self.control_recv.try_next().await? { @@ -157,19 +150,18 @@ impl ControlRoutine { debug!(%message, "recv"); match message { Message::CommitmentReveal(msg) => { - self.state().on_commitment_reveal(msg)?; + self.session.on_commitment_reveal(msg)?; let init = self .init .take() .ok_or_else(|| Error::InvalidMessageInCurrentState)?; - self.state + self.session .spawn(error_span!("setup"), |state| Self::setup(state, init)); } Message::ControlIssueGuarantee(msg) => { let ControlIssueGuarantee { amount, channel } = msg; - let sender = self.state.send.get_logical(channel); debug!(?channel, %amount, "add guarantees"); - sender.add_guarantees(amount); + self.session.add_guarantees(channel, amount); } _ => return Err(Error::UnsupportedMessage), } @@ -177,19 +169,6 @@ impl ControlRoutine { Ok(()) } - async fn on_bind_area_of_interest( - session: Session, - message: SetupBindAreaOfInterest, - ) -> Result<(), Error> { - session - .get_their_resource_eventually(|r| &mut r.capabilities, message.authorisation) - .await; - session - .state_mut() - .bind_area_of_interest(Scope::Theirs, message)?; - Ok(()) - } - async fn setup(session: Session, init: SessionInit) -> Result<(), Error> { debug!(interests = init.interests.len(), "start setup"); for (capability, aois) in init.interests.into_iter() { @@ -207,19 +186,13 @@ impl ControlRoutine { authorisation: our_capability_handle, }; // TODO: We could skip the clone if we re-enabled sending by reference. - session - .state_mut() - .bind_area_of_interest(Scope::Ours, msg.clone())?; + session.bind_area_of_interest(Scope::Ours, msg.clone())?; session.send(msg).await?; } } debug!("setup done"); Ok(()) } - - fn state(&mut self) -> RefMut { - self.state.state_mut() - } } #[derive(derive_more::Debug)] @@ -243,7 +216,7 @@ impl Reconciler { } pub async fn run(mut self) -> Result<(), Error> { - let our_role = self.state().our_role; + let our_role = self.session.our_role(); loop { tokio::select! { message = self.recv.try_next() => { @@ -258,7 +231,7 @@ impl Reconciler { } } } - if self.state().reconciliation_is_complete() { + if self.session.reconciliation_is_complete() { debug!("reconciliation complete, close session"); break; } @@ -288,7 +261,6 @@ impl Reconciler { } = intersection; let range = intersection.into_range(); let fingerprint = self.snapshot.fingerprint(namespace, &range)?; - self.session.state_mut().reconciliation_started = true; self.send_fingerprint(range, fingerprint, our_handle, their_handle, None) .await?; Ok(()) @@ -298,22 +270,16 @@ impl Reconciler { &mut self, message: ReconciliationSendFingerprint, ) -> Result<(), Error> { + let namespace = self.session.on_send_fingerprint(&message)?; trace!("on_send_fingerprint start"); let ReconciliationSendFingerprint { range, fingerprint: their_fingerprint, sender_handle: their_handle, receiver_handle: our_handle, - is_final_reply_for_range, + is_final_reply_for_range: _, } = message; - let namespace = { - let mut state = self.state(); - state.reconciliation_started = true; - state.clear_pending_range_if_some(our_handle, is_final_reply_for_range)?; - state.range_is_authorised(&range, &our_handle, &their_handle)? - }; - let our_fingerprint = self.snapshot.fingerprint(namespace, &range)?; // case 1: fingerprint match. @@ -356,28 +322,17 @@ impl Reconciler { message: ReconciliationAnnounceEntries, ) -> Result<(), Error> { trace!("on_announce_entries start"); + let namespace = self.session.on_announce_entries(&message)?; let ReconciliationAnnounceEntries { range, - count, + count: _, want_response, will_sort: _, sender_handle: their_handle, receiver_handle: our_handle, - is_final_reply_for_range, + is_final_reply_for_range: _, } = message; - let namespace = { - let mut state = self.state(); - state.clear_pending_range_if_some(our_handle, is_final_reply_for_range)?; - if state.pending_entries.is_some() { - return Err(Error::InvalidMessageInCurrentState); - } - let namespace = state.range_is_authorised(&range, &our_handle, &their_handle)?; - if count != 0 { - state.pending_entries = Some(count); - } - namespace - }; if want_response { self.announce_and_send_entries( namespace, @@ -400,7 +355,7 @@ impl Reconciler { .get_their_resource_eventually(|r| &mut r.static_tokens, message.static_token_handle) .await; - self.state().on_send_entry()?; + self.session.on_send_entry()?; let authorised_entry = AuthorisedEntry::try_from_parts( message.entry.entry, @@ -421,9 +376,7 @@ impl Reconciler { their_handle: AreaOfInterestHandle, is_final_reply_for_range: Option, ) -> anyhow::Result<()> { - self.state() - .pending_ranges - .insert((our_handle, range.clone())); + self.session.insert_pending_range(our_handle, range.clone()); let msg = ReconciliationSendFingerprint { range, fingerprint, @@ -446,8 +399,7 @@ impl Reconciler { our_count: Option, ) -> Result<(), Error> { if want_response { - let mut state = self.state(); - state.pending_ranges.insert((our_handle, range.clone())); + self.session.insert_pending_range(our_handle, range.clone()); } let our_count = match our_count { Some(count) => count, @@ -472,10 +424,8 @@ impl Reconciler { let (static_token, dynamic_token) = token.into_parts(); // TODO: partial payloads let available = entry.payload_length; - let (static_token_handle, static_token_bind_msg) = self - .session - .state_mut() - .bind_our_static_token(static_token)?; + let (static_token_handle, static_token_bind_msg) = + self.session.bind_our_static_token(static_token); if let Some(msg) = static_token_bind_msg { self.send(msg).await?; } @@ -535,10 +485,6 @@ impl Reconciler { Ok(()) } - fn state(&mut self) -> RefMut { - self.session.state_mut() - } - async fn send(&self, message: impl Into) -> Result<(), WriteError> { self.session.send(message).await } diff --git a/iroh-willow/src/session/state.rs b/iroh-willow/src/session/state.rs index 3e082ab993..1cdac5fe7e 100644 --- a/iroh-willow/src/session/state.rs +++ b/iroh-willow/src/session/state.rs @@ -17,8 +17,9 @@ use crate::{ grouping::{Area, ThreeDRange}, keys::NamespaceId, wgps::{ - AreaOfInterestHandle, CapabilityHandle, CommitmentReveal, IntersectionHandle, IsHandle, - Message, ReadCapability, SetupBindAreaOfInterest, SetupBindReadCapability, + AreaOfInterestHandle, CapabilityHandle, Channel, CommitmentReveal, IntersectionHandle, + IsHandle, LogicalChannel, Message, ReadCapability, ReconciliationAnnounceEntries, + ReconciliationSendFingerprint, SetupBindAreaOfInterest, SetupBindReadCapability, SetupBindStaticToken, StaticToken, StaticTokenHandle, }, }, @@ -32,25 +33,24 @@ use super::{ Error, Role, Scope, }; -#[derive(derive_more::Debug)] -pub struct Session { - pub state: Rc>, - pub send: ChannelSenders, - #[debug("Store")] - pub store: Rc>, - pub tasks: Rc>>>, -} +#[derive(Debug, derive_more::Deref)] +pub struct Session(Rc>); + impl Clone for Session { fn clone(&self) -> Self { - Self { - state: Rc::clone(&self.state), - send: self.send.clone(), - store: Rc::clone(&self.store), - tasks: Rc::clone(&self.tasks), - } + Self(Rc::clone(&self.0)) } } +#[derive(derive_more::Debug)] +pub struct SessionInner { + state: RefCell, + send: ChannelSenders, + #[debug("Store")] + store: Rc>, + tasks: RefCell>>, +} + impl Session { pub fn new( store: Rc>, @@ -59,12 +59,12 @@ impl Session { initial_transmission: InitialTransmission, ) -> Self { let state = SessionState::new(our_role, initial_transmission); - Self { - state: Rc::new(RefCell::new(state)), + Self(Rc::new(SessionInner { + state: RefCell::new(state), send, store, tasks: Default::default(), - } + })) } pub fn spawn(&self, span: Span, f: F) @@ -93,7 +93,22 @@ impl Session { } pub async fn send(&self, message: impl Into) -> Result<(), WriteError> { - self.send.send(message).await + self.0.send.send(message).await + } + + pub fn close_senders(&self) { + self.0.send.close_all(); + } + + pub fn add_guarantees(&self, channel: LogicalChannel, amount: u64) { + self.0 + .send + .get(Channel::Logical(channel)) + .add_guarantees(amount); + } + + pub fn our_role(&self) -> Role { + self.state.borrow().our_role } pub async fn next_aoi_intersection(&self) -> Option { @@ -112,9 +127,9 @@ impl Session { where F: for<'a> Fn(&'a mut ScopedResources) -> &'a mut ResourceMap, { - let inner = self.state.clone(); + let inner = Rc::clone(&self); poll_fn(move |cx| { - let mut inner = inner.borrow_mut(); + let mut inner = inner.state.borrow_mut(); let res = selector(&mut std::ops::DerefMut::deref_mut(&mut inner).their_resources); let r = std::task::ready!(res.poll_get_eventually(handle, cx)); Poll::Ready(r.clone()) @@ -147,57 +162,66 @@ impl Session { Ok((our_handle, maybe_message)) } - pub fn state_mut(&self) -> RefMut { - self.state.borrow_mut() + pub fn on_announce_entries( + &self, + message: &ReconciliationAnnounceEntries, + ) -> Result { + let mut state = self.state.borrow_mut(); + state.clear_pending_range_if_some( + message.receiver_handle, + message.is_final_reply_for_range.as_ref(), + )?; + if state.pending_entries.is_some() { + return Err(Error::InvalidMessageInCurrentState); + } + let namespace = state.range_is_authorised( + &message.range, + &message.receiver_handle, + &message.sender_handle, + )?; + if message.count != 0 { + state.pending_entries = Some(message.count); + } + Ok(namespace) } - pub fn store(&self) -> RefMut { - self.store.borrow_mut() + pub fn on_send_fingerprint( + &self, + message: &ReconciliationSendFingerprint, + ) -> Result { + let mut state = self.state.borrow_mut(); + state.reconciliation_started = true; + state.clear_pending_range_if_some( + message.receiver_handle, + message.is_final_reply_for_range.as_ref(), + )?; + let namespace = state.range_is_authorised( + &message.range, + &message.receiver_handle, + &message.sender_handle, + )?; + Ok(namespace) } -} -#[derive(Debug)] -pub struct SessionState { - pub our_role: Role, - pub our_resources: ScopedResources, - pub their_resources: ScopedResources, - pub reconciliation_started: bool, - pub pending_ranges: HashSet<(AreaOfInterestHandle, ThreeDRange)>, - pub pending_entries: Option, - pub challenge: ChallengeState, - pub aoi_queue: AoiQueue, -} - -impl SessionState { - pub fn new(our_role: Role, initial_transmission: InitialTransmission) -> Self { - let challenge_state = ChallengeState::Committed { - our_nonce: initial_transmission.our_nonce, - received_commitment: initial_transmission.received_commitment, - }; - // TODO: make use of initial_transmission.their_max_payload_size. - Self { - our_role, - challenge: challenge_state, - reconciliation_started: false, - our_resources: Default::default(), - their_resources: Default::default(), - pending_ranges: Default::default(), - pending_entries: Default::default(), - aoi_queue: Default::default(), - } + pub fn on_setup_bind_static_token(&self, msg: SetupBindStaticToken) { + self.state + .borrow_mut() + .their_resources + .static_tokens + .bind(msg.static_token); } - fn resources(&self, scope: Scope) -> &ScopedResources { - match scope { - Scope::Ours => &self.our_resources, - Scope::Theirs => &self.their_resources, - } + + pub fn on_setup_bind_read_capability(&self, msg: SetupBindReadCapability) -> Result<(), Error> { + // TODO: verify intersection handle + msg.capability.validate()?; + let mut state = self.state.borrow_mut(); + state + .challenge + .verify(msg.capability.receiver(), &msg.signature)?; + state.their_resources.capabilities.bind(msg.capability); + Ok(()) } - // fn resources_mut(&mut self, scope: Scope) -> &ScopedResources { - // match scope { - // Scope::Ours => &mut self.our_resources, - // Scope::Theirs => &mut self.their_resources, - // } - // } + pub fn reconciliation_is_complete(&self) -> bool { // tracing::debug!( // "reconciliation_is_complete started {} pending_ranges {}, pending_entries {}", @@ -205,13 +229,15 @@ impl SessionState { // self.pending_ranges.len(), // self.pending_entries.is_some() // ); - self.reconciliation_started - && self.pending_ranges.is_empty() - && self.pending_entries.is_none() + let state = self.state.borrow(); + state.reconciliation_started + && state.pending_ranges.is_empty() + && state.pending_entries.is_none() } - pub fn commitment_reveal(&mut self) -> Result { - match self.challenge { + pub fn reveal_commitment(&self) -> Result { + let state = self.state.borrow(); + match state.challenge { ChallengeState::Committed { our_nonce, .. } => { Ok(CommitmentReveal { nonce: our_nonce }) } @@ -219,24 +245,83 @@ impl SessionState { } } - pub fn on_commitment_reveal(&mut self, msg: CommitmentReveal) -> Result<(), Error> { - self.challenge.reveal(self.our_role, msg.nonce) + pub fn on_commitment_reveal(&self, msg: CommitmentReveal) -> Result<(), Error> { + let mut state = self.state.borrow_mut(); + let our_role = state.our_role; + state.challenge.reveal(our_role, msg.nonce) } - pub fn on_setup_bind_read_capability( - &mut self, - msg: SetupBindReadCapability, + pub fn bind_area_of_interest( + &self, + scope: Scope, + message: SetupBindAreaOfInterest, ) -> Result<(), Error> { - // TODO: verify intersection handle - msg.capability.validate()?; - self.challenge - .verify(msg.capability.receiver(), &msg.signature)?; - self.their_resources.capabilities.bind(msg.capability); + self.state + .borrow_mut() + .bind_area_of_interest(scope, message) + } + + pub async fn on_bind_area_of_interest( + &self, + message: SetupBindAreaOfInterest, + ) -> Result<(), Error> { + self.get_their_resource_eventually(|r| &mut r.capabilities, message.authorisation) + .await; + self.bind_area_of_interest(Scope::Theirs, message)?; Ok(()) } - pub fn on_setup_bind_static_token(&mut self, msg: SetupBindStaticToken) { - self.their_resources.static_tokens.bind(msg.static_token); + pub fn on_send_entry(&self) -> Result<(), Error> { + self.state.borrow_mut().on_send_entry() + } + + pub fn bind_our_static_token( + &self, + token: StaticToken, + ) -> (StaticTokenHandle, Option) { + self.state.borrow_mut().bind_our_static_token(token) + } + + pub fn insert_pending_range(&self, our_handle: AreaOfInterestHandle, range: ThreeDRange) { + let mut state = self.state.borrow_mut(); + state.reconciliation_started = true; + state.pending_ranges.insert((our_handle, range)); + } + + pub fn store(&self) -> RefMut { + self.store.borrow_mut() + } +} + +#[derive(Debug)] +struct SessionState { + our_role: Role, + our_resources: ScopedResources, + their_resources: ScopedResources, + reconciliation_started: bool, + pending_ranges: HashSet<(AreaOfInterestHandle, ThreeDRange)>, + pending_entries: Option, + challenge: ChallengeState, + aoi_queue: AoiQueue, +} + +impl SessionState { + fn new(our_role: Role, initial_transmission: InitialTransmission) -> Self { + let challenge_state = ChallengeState::Committed { + our_nonce: initial_transmission.our_nonce, + received_commitment: initial_transmission.received_commitment, + }; + // TODO: make use of initial_transmission.their_max_payload_size. + Self { + our_role, + challenge: challenge_state, + reconciliation_started: false, + our_resources: Default::default(), + their_resources: Default::default(), + pending_ranges: Default::default(), + pending_entries: Default::default(), + aoi_queue: Default::default(), + } } /// Bind a area of interest, and start reconciliation if this area of interest has an @@ -246,7 +331,7 @@ impl SessionState { /// [`Self::get_their_resource_eventually`] before calling this. /// /// Returns `true` if the capability was newly bound, and `false` if not. - pub fn bind_area_of_interest( + fn bind_area_of_interest( &mut self, scope: Scope, msg: SetupBindAreaOfInterest, @@ -304,7 +389,7 @@ impl SessionState { Ok(()) } - pub fn on_send_entry(&mut self) -> Result<(), Error> { + fn on_send_entry(&mut self) -> Result<(), Error> { let remaining = self .pending_entries .as_mut() @@ -316,10 +401,10 @@ impl SessionState { Ok(()) } - pub fn clear_pending_range_if_some( + fn clear_pending_range_if_some( &mut self, our_handle: AreaOfInterestHandle, - pending_range: Option, + pending_range: Option<&ThreeDRange>, ) -> Result<(), Error> { if let Some(range) = pending_range { // TODO: avoid clone @@ -334,33 +419,19 @@ impl SessionState { } } - pub fn bind_our_static_token( + fn bind_our_static_token( &mut self, static_token: StaticToken, - ) -> anyhow::Result<(StaticTokenHandle, Option)> { + ) -> (StaticTokenHandle, Option) { let (handle, is_new) = self .our_resources .static_tokens .bind_if_new(static_token.clone()); let msg = is_new.then(|| SetupBindStaticToken { static_token }); - Ok((handle, msg)) + (handle, msg) } - pub fn handle_to_namespace_id( - &self, - scope: Scope, - handle: &AreaOfInterestHandle, - ) -> Result { - let aoi = self.resources(scope).areas_of_interest.try_get(handle)?; - let capability = self - .resources(scope) - .capabilities - .try_get(&aoi.authorisation)?; - let namespace_id = capability.granted_namespace().into(); - Ok(namespace_id) - } - - pub fn range_is_authorised( + fn range_is_authorised( &self, range: &ThreeDRange, receiver_handle: &AreaOfInterestHandle, @@ -387,6 +458,27 @@ impl SessionState { ) -> Result<&SetupBindAreaOfInterest, Error> { self.resources(scope).areas_of_interest.try_get(handle) } + + fn handle_to_namespace_id( + &self, + scope: Scope, + handle: &AreaOfInterestHandle, + ) -> Result { + let aoi = self.resources(scope).areas_of_interest.try_get(handle)?; + let capability = self + .resources(scope) + .capabilities + .try_get(&aoi.authorisation)?; + let namespace_id = capability.granted_namespace().into(); + Ok(namespace_id) + } + + fn resources(&self, scope: Scope) -> &ScopedResources { + match scope { + Scope::Ours => &self.our_resources, + Scope::Theirs => &self.their_resources, + } + } } #[derive(Debug, Clone)] @@ -400,7 +492,7 @@ pub struct AreaOfInterestIntersection { #[derive(Default, Debug)] pub struct AoiQueue { found: VecDeque, - closed: bool, + // closed: bool, wakers: VecDeque, } @@ -409,10 +501,10 @@ impl AoiQueue { self.found.push_back(pair); self.wake(); } - pub fn close(&mut self) { - self.closed = true; - self.wake(); - } + // pub fn close(&mut self) { + // self.closed = true; + // self.wake(); + // } fn wake(&mut self) { for waker in self.wakers.drain(..) { waker.wake(); @@ -423,9 +515,9 @@ impl AoiQueue { &mut self, cx: &mut std::task::Context<'_>, ) -> Poll> { - if self.closed { - return Poll::Ready(None); - } + // if self.closed { + // return Poll::Ready(None); + // } if let Some(item) = self.found.pop_front() { Poll::Ready(Some(item)) } else {