Skip to content

Commit

Permalink
cleanup module structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 15, 2024
1 parent 7c327bf commit 797ff2b
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 203 deletions.
4 changes: 2 additions & 2 deletions iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
meadowcap,
willow::{AuthorisedEntry, Entry},
},
session::{coroutine::ControlRoutine, Channels, Error, Role, Session, SessionInit},
session::{self, Channels, Error, Role, Session, SessionInit},
store::{KeyStore, Store},
util::task_set::{TaskKey, TaskMap},
};
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<S: Store> StorageThread<S> {

let task_key = self.session_tasks.spawn_local(
session_id,
ControlRoutine::run(session, recv, init)
session::run(session, recv, init)
.instrument(error_span!("session", peer = %peer.fmt_short())),
);
let active_session = ActiveSession {
Expand Down
7 changes: 4 additions & 3 deletions iroh-willow/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@ use std::collections::{HashMap, HashSet};

use crate::proto::{grouping::AreaOfInterest, wgps::ReadCapability};

pub mod aoi_finder;
pub mod channels;
pub mod coroutine;
mod error;
pub mod resource;
mod reconciler;
mod resource;
mod run;
mod state;
mod util;

pub use self::channels::Channels;
pub use self::error::Error;
pub use self::run::run;
pub use self::state::Session;

/// To break symmetry, we refer to the peer that initiated the synchronisation session as Alfie,
Expand Down
16 changes: 0 additions & 16 deletions iroh-willow/src/session/aoi_finder.rs

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,200 +1,24 @@
use std::rc::Rc;

use futures_lite::StreamExt;
use strum::IntoEnumIterator;
use tracing::{debug, error_span, trace};
use tracing::{debug, trace};

use crate::{
proto::{
grouping::ThreeDRange,
keys::NamespaceId,
wgps::{
AreaOfInterestHandle, ControlIssueGuarantee, Fingerprint, LengthyEntry, LogicalChannel,
Message, ReconciliationAnnounceEntries, ReconciliationMessage, ReconciliationSendEntry,
ReconciliationSendFingerprint, SetupBindAreaOfInterest,
AreaOfInterestHandle, Fingerprint, LengthyEntry, Message,
ReconciliationAnnounceEntries, ReconciliationMessage, ReconciliationSendEntry,
ReconciliationSendFingerprint,
},
willow::AuthorisedEntry,
},
session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit},
session::{channels::MessageReceiver, state::AreaOfInterestIntersection, Error, Session},
store::{ReadonlyStore, SplitAction, Store, SyncConfig},
util::channel::{Receiver, WriteError},
util::channel::WriteError,
};

use super::{
channels::{ChannelReceivers, MessageReceiver},
state::AreaOfInterestIntersection,
};

const INITIAL_GUARANTEES: u64 = u64::MAX;

#[derive(derive_more::Debug)]
pub struct ControlRoutine<S> {
control_recv: Receiver<Message>,
session: Session<S>,
init: Option<SessionInit>,
}

impl<S: Store> ControlRoutine<S> {
pub async fn run(
session: Session<S>,
recv: ChannelReceivers,
init: SessionInit,
) -> Result<(), Error> {
let ChannelReceivers {
control_recv,
logical_recv,
} = recv;
let LogicalChannelReceivers {
reconciliation_recv,
mut static_tokens_recv,
mut capability_recv,
mut aoi_recv,
} = logical_recv;

// Spawn a task to handle incoming static tokens.
session.spawn(error_span!("stt"), move |session| async move {
while let Some(message) = static_tokens_recv.try_next().await? {
session.on_setup_bind_static_token(message);
}
Ok(())
});

// Spawn a task to handle incoming capabilities.
session.spawn(error_span!("cap"), move |session| async move {
while let Some(message) = capability_recv.try_next().await? {
session.on_setup_bind_read_capability(message)?;
}
Ok(())
});

// Spawn a task to handle incoming areas of interest.
session.spawn(error_span!("aoi"), move |session| async move {
while let Some(message) = aoi_recv.try_next().await? {
session.on_bind_area_of_interest(message).await?;
}
Ok(())
});

// Spawn a task to handle reconciliation messages
session.spawn(error_span!("rec"), move |session| async move {
Reconciler::new(session, reconciliation_recv)?.run().await
});

// Spawn a task to handle control messages
session.spawn(tracing::Span::current(), move |session| async move {
ControlRoutine::new(session, control_recv, init)
.run_inner()
.await
});

// Loop over task completions, break on failure or if reconciliation completed
while let Some((span, result)) = session.join_next_task().await {
let guard = span.enter();
debug!(?result, "task completed");
result?;
// Is this the right place for this check? It would run after each task
// completion, so necessarily including the completion of the reconciliation
// task, which is the only condition in which reconciliation can complete at
// the moment.
//
// TODO: We'll want to emit the completion event back to the application and
// let it decide what to do (stop, keep open) - or pass relevant config in
// SessionInit.
if session.reconciliation_is_complete() {
tracing::debug!("stop session: reconciliation is complete");
drop(guard);
break;
}
}

// Close all our send streams.
//
// This makes the networking send loops stop.
session.close_senders();

Ok(())
}

pub fn new(session: Session<S>, control_recv: Receiver<Message>, init: SessionInit) -> Self {
Self {
control_recv,
session,
init: Some(init),
}
}

async fn run_inner(mut self) -> Result<(), Error> {
debug!(role = ?self.session.our_role(), "start session");

// Reveal our nonce.
let reveal_message = self.session.reveal_commitment()?;
self.session.send(reveal_message).await?;

// Issue guarantees for all logical channels.
for channel in LogicalChannel::iter() {
let msg = ControlIssueGuarantee {
amount: INITIAL_GUARANTEES,
channel,
};
self.session.send(msg).await?;
}

while let Some(message) = self.control_recv.try_next().await? {
self.on_control_message(message)?;
}

Ok(())
}

fn on_control_message(&mut self, message: Message) -> Result<(), Error> {
debug!(%message, "recv");
match message {
Message::CommitmentReveal(msg) => {
self.session.on_commitment_reveal(msg)?;
let init = self
.init
.take()
.ok_or_else(|| Error::InvalidMessageInCurrentState)?;
self.session
.spawn(error_span!("setup"), |state| Self::setup(state, init));
}
Message::ControlIssueGuarantee(msg) => {
let ControlIssueGuarantee { amount, channel } = msg;
debug!(?channel, %amount, "add guarantees");
self.session.add_guarantees(channel, amount);
}
_ => return Err(Error::UnsupportedMessage),
}

Ok(())
}

async fn setup(session: Session<S>, init: SessionInit) -> Result<(), Error> {
debug!(interests = init.interests.len(), "start setup");
for (capability, aois) in init.interests.into_iter() {
// TODO: implement private area intersection
let intersection_handle = 0.into();
let (our_capability_handle, message) =
session.bind_and_sign_capability(intersection_handle, capability)?;
if let Some(message) = message {
session.send(message).await?;
}

for area_of_interest in aois {
let msg = SetupBindAreaOfInterest {
area_of_interest,
authorisation: our_capability_handle,
};
// TODO: We could skip the clone if we re-enabled sending by reference.
session.bind_area_of_interest(Scope::Ours, msg.clone())?;
session.send(msg).await?;
}
}
debug!("setup done");
Ok(())
}
}

#[derive(derive_more::Debug)]
pub struct Reconciler<S: Store> {
session: Session<S>,
Expand Down
Loading

0 comments on commit 797ff2b

Please sign in to comment.