diff --git a/iroh-willow/src/actor.rs b/iroh-willow/src/actor.rs index 071bb4d628..fc395cd47d 100644 --- a/iroh-willow/src/actor.rs +++ b/iroh-willow/src/actor.rs @@ -14,7 +14,7 @@ use crate::{ meadowcap, willow::{AuthorisedEntry, Entry}, }, - session::{coroutine::ControlRoutine, Channels, Error, Role, Session, SessionInit}, + session::{self, Channels, Error, Role, Session, SessionInit}, store::{KeyStore, Store}, util::task_set::{TaskKey, TaskMap}, }; @@ -249,7 +249,7 @@ impl StorageThread { let task_key = self.session_tasks.spawn_local( session_id, - ControlRoutine::run(session, recv, init) + session::run(session, recv, init) .instrument(error_span!("session", peer = %peer.fmt_short())), ); let active_session = ActiveSession { diff --git a/iroh-willow/src/session.rs b/iroh-willow/src/session.rs index 5ed69deea5..ee27ec4dc3 100644 --- a/iroh-willow/src/session.rs +++ b/iroh-willow/src/session.rs @@ -2,16 +2,17 @@ use std::collections::{HashMap, HashSet}; use crate::proto::{grouping::AreaOfInterest, wgps::ReadCapability}; -pub mod aoi_finder; pub mod channels; -pub mod coroutine; mod error; -pub mod resource; +mod reconciler; +mod resource; +mod run; mod state; mod util; pub use self::channels::Channels; pub use self::error::Error; +pub use self::run::run; pub use self::state::Session; /// To break symmetry, we refer to the peer that initiated the synchronisation session as Alfie, diff --git a/iroh-willow/src/session/aoi_finder.rs b/iroh-willow/src/session/aoi_finder.rs deleted file mode 100644 index 49169a90be..0000000000 --- a/iroh-willow/src/session/aoi_finder.rs +++ /dev/null @@ -1,16 +0,0 @@ -// use std::{cell::RefCell, collections::VecDeque, rc::Rc, task::Waker}; -// -// use crate::actor::AreaOfInterestPair; - -// pub struct AoiFinder { -// inner: Rc> -// } -// -// impl AoiFinder { -// fn push -// } -// -// struct Inner { -// queue: VecDeque, -// wakers: VecDeque -// } diff --git a/iroh-willow/src/session/coroutine.rs b/iroh-willow/src/session/reconciler.rs similarity index 60% rename from iroh-willow/src/session/coroutine.rs rename to iroh-willow/src/session/reconciler.rs index a6ddfa2f3a..0e8cb29569 100644 --- a/iroh-willow/src/session/coroutine.rs +++ b/iroh-willow/src/session/reconciler.rs @@ -1,200 +1,24 @@ use std::rc::Rc; use futures_lite::StreamExt; -use strum::IntoEnumIterator; -use tracing::{debug, error_span, trace}; +use tracing::{debug, trace}; use crate::{ proto::{ grouping::ThreeDRange, keys::NamespaceId, wgps::{ - AreaOfInterestHandle, ControlIssueGuarantee, Fingerprint, LengthyEntry, LogicalChannel, - Message, ReconciliationAnnounceEntries, ReconciliationMessage, ReconciliationSendEntry, - ReconciliationSendFingerprint, SetupBindAreaOfInterest, + AreaOfInterestHandle, Fingerprint, LengthyEntry, Message, + ReconciliationAnnounceEntries, ReconciliationMessage, ReconciliationSendEntry, + ReconciliationSendFingerprint, }, willow::AuthorisedEntry, }, - session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit}, + session::{channels::MessageReceiver, state::AreaOfInterestIntersection, Error, Session}, store::{ReadonlyStore, SplitAction, Store, SyncConfig}, - util::channel::{Receiver, WriteError}, + util::channel::WriteError, }; -use super::{ - channels::{ChannelReceivers, MessageReceiver}, - state::AreaOfInterestIntersection, -}; - -const INITIAL_GUARANTEES: u64 = u64::MAX; - -#[derive(derive_more::Debug)] -pub struct ControlRoutine { - control_recv: Receiver, - session: Session, - init: Option, -} - -impl ControlRoutine { - pub async fn run( - session: Session, - recv: ChannelReceivers, - init: SessionInit, - ) -> Result<(), Error> { - let ChannelReceivers { - control_recv, - logical_recv, - } = recv; - let LogicalChannelReceivers { - reconciliation_recv, - mut static_tokens_recv, - mut capability_recv, - mut aoi_recv, - } = logical_recv; - - // Spawn a task to handle incoming static tokens. - session.spawn(error_span!("stt"), move |session| async move { - while let Some(message) = static_tokens_recv.try_next().await? { - session.on_setup_bind_static_token(message); - } - Ok(()) - }); - - // Spawn a task to handle incoming capabilities. - session.spawn(error_span!("cap"), move |session| async move { - while let Some(message) = capability_recv.try_next().await? { - session.on_setup_bind_read_capability(message)?; - } - Ok(()) - }); - - // Spawn a task to handle incoming areas of interest. - session.spawn(error_span!("aoi"), move |session| async move { - while let Some(message) = aoi_recv.try_next().await? { - session.on_bind_area_of_interest(message).await?; - } - Ok(()) - }); - - // Spawn a task to handle reconciliation messages - session.spawn(error_span!("rec"), move |session| async move { - Reconciler::new(session, reconciliation_recv)?.run().await - }); - - // Spawn a task to handle control messages - session.spawn(tracing::Span::current(), move |session| async move { - ControlRoutine::new(session, control_recv, init) - .run_inner() - .await - }); - - // Loop over task completions, break on failure or if reconciliation completed - while let Some((span, result)) = session.join_next_task().await { - let guard = span.enter(); - debug!(?result, "task completed"); - result?; - // Is this the right place for this check? It would run after each task - // completion, so necessarily including the completion of the reconciliation - // task, which is the only condition in which reconciliation can complete at - // the moment. - // - // TODO: We'll want to emit the completion event back to the application and - // let it decide what to do (stop, keep open) - or pass relevant config in - // SessionInit. - if session.reconciliation_is_complete() { - tracing::debug!("stop session: reconciliation is complete"); - drop(guard); - break; - } - } - - // Close all our send streams. - // - // This makes the networking send loops stop. - session.close_senders(); - - Ok(()) - } - - pub fn new(session: Session, control_recv: Receiver, init: SessionInit) -> Self { - Self { - control_recv, - session, - init: Some(init), - } - } - - async fn run_inner(mut self) -> Result<(), Error> { - debug!(role = ?self.session.our_role(), "start session"); - - // Reveal our nonce. - let reveal_message = self.session.reveal_commitment()?; - self.session.send(reveal_message).await?; - - // Issue guarantees for all logical channels. - for channel in LogicalChannel::iter() { - let msg = ControlIssueGuarantee { - amount: INITIAL_GUARANTEES, - channel, - }; - self.session.send(msg).await?; - } - - while let Some(message) = self.control_recv.try_next().await? { - self.on_control_message(message)?; - } - - Ok(()) - } - - fn on_control_message(&mut self, message: Message) -> Result<(), Error> { - debug!(%message, "recv"); - match message { - Message::CommitmentReveal(msg) => { - self.session.on_commitment_reveal(msg)?; - let init = self - .init - .take() - .ok_or_else(|| Error::InvalidMessageInCurrentState)?; - self.session - .spawn(error_span!("setup"), |state| Self::setup(state, init)); - } - Message::ControlIssueGuarantee(msg) => { - let ControlIssueGuarantee { amount, channel } = msg; - debug!(?channel, %amount, "add guarantees"); - self.session.add_guarantees(channel, amount); - } - _ => return Err(Error::UnsupportedMessage), - } - - Ok(()) - } - - async fn setup(session: Session, init: SessionInit) -> Result<(), Error> { - debug!(interests = init.interests.len(), "start setup"); - for (capability, aois) in init.interests.into_iter() { - // TODO: implement private area intersection - let intersection_handle = 0.into(); - let (our_capability_handle, message) = - session.bind_and_sign_capability(intersection_handle, capability)?; - if let Some(message) = message { - session.send(message).await?; - } - - for area_of_interest in aois { - let msg = SetupBindAreaOfInterest { - area_of_interest, - authorisation: our_capability_handle, - }; - // TODO: We could skip the clone if we re-enabled sending by reference. - session.bind_area_of_interest(Scope::Ours, msg.clone())?; - session.send(msg).await?; - } - } - debug!("setup done"); - Ok(()) - } -} - #[derive(derive_more::Debug)] pub struct Reconciler { session: Session, diff --git a/iroh-willow/src/session/run.rs b/iroh-willow/src/session/run.rs new file mode 100644 index 0000000000..579fb05e7d --- /dev/null +++ b/iroh-willow/src/session/run.rs @@ -0,0 +1,159 @@ +use futures_lite::StreamExt; +use strum::IntoEnumIterator; +use tracing::{debug, error_span}; + +use crate::{ + proto::wgps::{ControlIssueGuarantee, LogicalChannel, Message, SetupBindAreaOfInterest}, + session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit}, + store::Store, + util::channel::Receiver, +}; + +use super::{channels::ChannelReceivers, reconciler::Reconciler}; + +const INITIAL_GUARANTEES: u64 = u64::MAX; + +pub async fn run( + session: Session, + recv: ChannelReceivers, + init: SessionInit, +) -> Result<(), Error> { + let ChannelReceivers { + control_recv, + logical_recv, + } = recv; + let LogicalChannelReceivers { + reconciliation_recv, + mut static_tokens_recv, + mut capability_recv, + mut aoi_recv, + } = logical_recv; + + // Spawn a task to handle incoming static tokens. + session.spawn(error_span!("stt"), move |session| async move { + while let Some(message) = static_tokens_recv.try_next().await? { + session.on_setup_bind_static_token(message); + } + Ok(()) + }); + + // Spawn a task to handle incoming capabilities. + session.spawn(error_span!("cap"), move |session| async move { + while let Some(message) = capability_recv.try_next().await? { + session.on_setup_bind_read_capability(message)?; + } + Ok(()) + }); + + // Spawn a task to handle incoming areas of interest. + session.spawn(error_span!("aoi"), move |session| async move { + while let Some(message) = aoi_recv.try_next().await? { + session.on_bind_area_of_interest(message).await?; + } + Ok(()) + }); + + // Spawn a task to handle reconciliation messages + session.spawn(error_span!("rec"), move |session| async move { + Reconciler::new(session, reconciliation_recv)?.run().await + }); + + // Spawn a task to handle control messages + session.spawn(tracing::Span::current(), move |session| async move { + control_loop(session, control_recv, init).await + }); + + // Loop over task completions, break on failure or if reconciliation completed + while let Some((span, result)) = session.join_next_task().await { + let guard = span.enter(); + debug!(?result, "task completed"); + result?; + // Is this the right place for this check? It would run after each task + // completion, so necessarily including the completion of the reconciliation + // task, which is the only condition in which reconciliation can complete at + // the moment. + // + // TODO: We'll want to emit the completion event back to the application and + // let it decide what to do (stop, keep open) - or pass relevant config in + // SessionInit. + if session.reconciliation_is_complete() { + tracing::debug!("stop session: reconciliation is complete"); + drop(guard); + break; + } + } + + // Close all our send streams. + // + // This makes the networking send loops stop. + session.close_senders(); + + Ok(()) +} + +async fn control_loop( + session: Session, + mut control_recv: Receiver, + init: SessionInit, +) -> Result<(), Error> { + debug!(role = ?session.our_role(), "start session"); + let mut init = Some(init); + + // Reveal our nonce. + let reveal_message = session.reveal_commitment()?; + session.send(reveal_message).await?; + + // Issue guarantees for all logical channels. + for channel in LogicalChannel::iter() { + let msg = ControlIssueGuarantee { + amount: INITIAL_GUARANTEES, + channel, + }; + session.send(msg).await?; + } + + while let Some(message) = control_recv.try_next().await? { + debug!(%message, "recv"); + match message { + Message::CommitmentReveal(msg) => { + session.on_commitment_reveal(msg)?; + let init = init.take().ok_or(Error::InvalidMessageInCurrentState)?; + // send setup messages, but in a separate task to not block incoming guarantees + session.spawn(error_span!("setup"), |session| setup(session, init)); + } + Message::ControlIssueGuarantee(msg) => { + let ControlIssueGuarantee { amount, channel } = msg; + debug!(?channel, %amount, "add guarantees"); + session.add_guarantees(channel, amount); + } + _ => return Err(Error::UnsupportedMessage), + } + } + + Ok(()) +} + +async fn setup(session: Session, init: SessionInit) -> Result<(), Error> { + debug!(interests = init.interests.len(), "start setup"); + for (capability, aois) in init.interests.into_iter() { + // TODO: implement private area intersection + let intersection_handle = 0.into(); + let (our_capability_handle, message) = + session.bind_and_sign_capability(intersection_handle, capability)?; + if let Some(message) = message { + session.send(message).await?; + } + + for area_of_interest in aois { + let msg = SetupBindAreaOfInterest { + area_of_interest, + authorisation: our_capability_handle, + }; + // TODO: We could skip the clone if we re-enabled sending by reference. + session.bind_area_of_interest(Scope::Ours, msg.clone())?; + session.send(msg).await?; + } + } + debug!("setup done"); + Ok(()) +}