Skip to content

Commit

Permalink
refactor: improve session shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 26, 2024
1 parent ee4f865 commit b38487f
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 97 deletions.
4 changes: 2 additions & 2 deletions iroh-willow/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl<S: Storage> Auth<S> {
selector: &CapSelector,
) -> Result<Option<WriteCapability>, AuthError> {
let cap = self.caps.read().unwrap().get_write_cap(selector);
debug!(?selector, ?cap, "get write cap");
// debug!(?selector, ?cap, "get write cap");
Ok(cap)
}

Expand All @@ -174,7 +174,7 @@ impl<S: Storage> Auth<S> {
selector: &CapSelector,
) -> Result<Option<ReadAuthorisation>, AuthError> {
let cap = self.caps.read().unwrap().get_read_cap(selector);
debug!(?selector, ?cap, "get read cap");
// debug!(?selector, ?cap, "get read cap");
Ok(cap)
}

Expand Down
12 changes: 12 additions & 0 deletions iroh-willow/src/proto/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,18 @@ impl Message {
pub fn same_kind(&self, other: &Self) -> bool {
std::mem::discriminant(self) == std::mem::discriminant(other)
}

pub fn covers_region(&self) -> Option<(AreaOfInterestHandle, u64)> {
match self {
Message::ReconciliationSendFingerprint(msg) => {
msg.covers.map(|covers| (msg.receiver_handle, covers))
}
Message::ReconciliationAnnounceEntries(msg) => {
msg.covers.map(|covers| (msg.receiver_handle, covers))
}
_ => None,
}
}
}

impl Encoder for Message {
Expand Down
15 changes: 2 additions & 13 deletions iroh-willow/src/session/data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use futures_lite::StreamExt;

use tokio::sync::broadcast;

use crate::{
Expand All @@ -11,7 +9,6 @@ use crate::{
store::{traits::Storage, Origin, Store},
};

use super::channels::MessageReceiver;
use super::payload::{send_payload_chunked, CurrentPayload};
use super::Session;

Expand Down Expand Up @@ -81,26 +78,18 @@ pub struct DataReceiver<S: Storage> {
session: Session,
store: Store<S>,
current_payload: CurrentPayload,
recv: MessageReceiver<DataMessage>,
}

impl<S: Storage> DataReceiver<S> {
pub fn new(session: Session, store: Store<S>, recv: MessageReceiver<DataMessage>) -> Self {
pub fn new(session: Session, store: Store<S>) -> Self {
Self {
session,
store,
current_payload: Default::default(),
recv,
}
}
pub async fn run(mut self) -> Result<(), Error> {
while let Some(message) = self.recv.try_next().await? {
self.on_message(message).await?;
}
Ok(())
}

async fn on_message(&mut self, message: DataMessage) -> Result<(), Error> {
pub async fn on_message(&mut self, message: DataMessage) -> Result<(), Error> {
match message {
DataMessage::SendEntry(message) => self.on_send_entry(message).await?,
DataMessage::SendPayload(message) => self.on_send_payload(message).await?,
Expand Down
17 changes: 7 additions & 10 deletions iroh-willow/src/session/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use crate::{
traits::{EntryReader, EntryStorage, SplitAction, SplitOpts, Storage},
Origin, Store,
},
util::channel::WriteError,
util::{channel::WriteError, stream::Cancelable},
};

#[derive(derive_more::Debug)]
pub struct Reconciler<S: Storage> {
session: Session,
store: Store<S>,
recv: MessageReceiver<ReconciliationMessage>,
recv: Cancelable<MessageReceiver<ReconciliationMessage>>,
snapshot: <S::Entries as EntryStorage>::Snapshot,
current_payload: CurrentPayload,
}
Expand All @@ -37,7 +37,7 @@ impl<S: Storage> Reconciler<S> {
pub fn new(
session: Session,
store: Store<S>,
recv: MessageReceiver<ReconciliationMessage>,
recv: Cancelable<MessageReceiver<ReconciliationMessage>>,
) -> Result<Self, Error> {
let snapshot = store.entries().snapshot()?;
Ok(Self {
Expand Down Expand Up @@ -68,11 +68,8 @@ impl<S: Storage> Reconciler<S> {
}
}
}
if self.session.reconciliation_is_complete()
&& !self.session.mode().is_live()
&& !self.current_payload.is_active()
{
debug!("reconciliation complete and not in live mode: close session");
if self.session.reconciliation_is_complete() && !self.current_payload.is_active() {
debug!("reconciliation complete");
break;
}
}
Expand Down Expand Up @@ -222,7 +219,7 @@ impl<S: Storage> Reconciler<S> {
their_handle: AreaOfInterestHandle,
covers: Option<u64>,
) -> anyhow::Result<()> {
self.session.mark_range_pending(our_handle);
self.session.mark_our_range_pending(our_handle);
let msg = ReconciliationSendFingerprint {
range,
fingerprint,
Expand Down Expand Up @@ -259,7 +256,7 @@ impl<S: Storage> Reconciler<S> {
covers,
};
if want_response {
self.session.mark_range_pending(our_handle);
self.session.mark_our_range_pending(our_handle);
}
self.send(msg).await?;
for authorised_entry in self
Expand Down
109 changes: 63 additions & 46 deletions iroh-willow/src/session/run.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use futures_lite::StreamExt;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error_span, trace};
use tracing::{debug, error_span, trace, warn};

use crate::{
proto::sync::{ControlIssueGuarantee, LogicalChannel, Message, SetupBindAreaOfInterest},
session::{channels::LogicalChannelReceivers, Error, Scope, Session, SessionInit},
store::{traits::Storage, Store},
util::channel::Receiver,
util::{channel::Receiver, stream::Cancelable},
};

use super::{
Expand All @@ -32,13 +32,21 @@ impl Session {
logical_recv:
LogicalChannelReceivers {
reconciliation_recv,
mut static_tokens_recv,
mut capability_recv,
mut aoi_recv,
static_tokens_recv,
capability_recv,
aoi_recv,
data_recv,
},
} = recv;

// Make all our receivers close once the cancel_token is triggered.
let control_recv = Cancelable::new(control_recv, cancel_token.clone());
let reconciliation_recv = Cancelable::new(reconciliation_recv, cancel_token.clone());
let mut static_tokens_recv = Cancelable::new(static_tokens_recv, cancel_token.clone());
let mut capability_recv = Cancelable::new(capability_recv, cancel_token.clone());
let mut aoi_recv = Cancelable::new(aoi_recv, cancel_token.clone());
let mut data_recv = Cancelable::new(data_recv, cancel_token.clone());

// Spawn a task to handle incoming static tokens.
self.spawn(error_span!("stt"), move |session| async move {
while let Some(message) = static_tokens_recv.try_next().await? {
Expand All @@ -52,7 +60,10 @@ impl Session {
self.spawn(error_span!("dat:r"), {
let store = store.clone();
move |session| async move {
DataReceiver::new(session, store, data_recv).run().await?;
let mut data_receiver = DataReceiver::new(session, store);
while let Some(message) = data_recv.try_next().await? {
data_receiver.on_message(message).await?;
}
Ok(())
}
});
Expand All @@ -74,83 +85,89 @@ impl Session {
});

// Spawn a task to handle incoming areas of interest.
self.spawn(error_span!("aoi"), {
move |session| async move {
while let Some(message) = aoi_recv.try_next().await? {
session.on_bind_area_of_interest(message).await?;
}
Ok(())
self.spawn(error_span!("aoi"), move |session| async move {
while let Some(message) = aoi_recv.try_next().await? {
session.on_bind_area_of_interest(message).await?;
}
Ok(())
});

// Spawn a task to handle reconciliation messages
self.spawn(error_span!("rec"), {
let cancel_token = cancel_token.clone();
let store = store.clone();
move |session| async move {
let res = Reconciler::new(session, store, reconciliation_recv)?
let res = Reconciler::new(session.clone(), store, reconciliation_recv)?
.run()
.await;
cancel_token.cancel();
if !session.mode().is_live() {
debug!("reconciliation complete and not in live mode: close session");
cancel_token.cancel();
}
res
}
});

// Spawn a task to handle control messages
self.spawn(error_span!("ctl"), {
let cancel_token = cancel_token.clone();
let store = store.clone();
let cancel_token = cancel_token.clone();
move |session| async move {
let res = control_loop(session, store, control_recv, init).await;
cancel_token.cancel();
res
}
});

// Spawn a task to handle session termination.
self.spawn(error_span!("fin"), {
let cancel_token = cancel_token.clone();
move |session| async move {
// Wait until the session is cancelled:
// * either because SessionMode is ReconcileOnce and reconciliation finished
// * or because the session was cancelled from the outside session handle
cancel_token.cancelled().await;
debug!("closing session");
// Then close all senders. This will make all other tasks terminate once the remote
// closed their senders as well.
session.close_senders();
// Unsubscribe from the store. This stops the data send task.
store.entries().unsubscribe(session.id());
Ok(())
// Wait until the session is cancelled, or until a task fails.
let result = loop {
tokio::select! {
_ = cancel_token.cancelled() => {
break Ok(());
},
Some((span, result)) = self.join_next_task() => {
let _guard = span.enter();
trace!(?result, remaining = self.remaining_tasks(), "task complete");
if let Err(err) = result {
warn!(?err, "session task failed: abort session");
break Err(err);
}
},
}
});
};

if result.is_err() {
self.abort_all_tasks();
} else {
debug!("closing session");
}

// Wait for all tasks to complete.
// We are not cancelling here so we have to make sure that all tasks terminate (structured
// concurrency basically).
let mut final_result = Ok(());
// Unsubscribe from the store. This stops the data send task.
store.entries().unsubscribe(self.id());

// Wait for remaining tasks to terminate to catch any panics.
// TODO: Add timeout?
while let Some((span, result)) = self.join_next_task().await {
let _guard = span.enter();
// trace!(?result, remaining = self.remaining_tasks(), "task complete");
debug!(?result, remaining = self.remaining_tasks(), "task complete");
trace!(?result, remaining = self.remaining_tasks(), "task complete");
if let Err(err) = result {
tracing::warn!(?err, "task failed: {err}");
cancel_token.cancel();
// self.abort_all_tasks();
if final_result.is_ok() {
final_result = Err(err);
}
warn!("task failed: {err:?}");
}
}
debug!(success = final_result.is_ok(), "session complete");
final_result

// Close our channel senders.
// This will stop the network send loop after all pending data has been sent.
self.close_senders();

debug!(success = result.is_ok(), "session complete");
result
}
}

async fn control_loop<S: Storage>(
session: Session,
store: Store<S>,
mut control_recv: Receiver<Message>,
mut control_recv: Cancelable<Receiver<Message>>,
init: SessionInit,
) -> Result<(), Error> {
debug!(role = ?session.our_role(), "start session");
Expand Down
Loading

0 comments on commit b38487f

Please sign in to comment.