Skip to content

Commit

Permalink
some further cleanup refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 10, 2024
1 parent 6289d0d commit b730532
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 198 deletions.
104 changes: 66 additions & 38 deletions iroh-willow/src/store/actor.rs → iroh-willow/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use crate::{
proto::{
grouping::ThreeDRange,
keys::NamespaceId,
wgps::AreaOfInterestHandle,
willow::{AuthorisedEntry, Entry},
},
session::{
coroutine::{Channels, Coroutine, Yield},
Error, SessionInit, SessionState, SharedSessionState,
coroutine::{ControlRoutine, ReconcileRoutine},
Channels, Error, SessionInit, SessionState, SharedSessionState,
},
store::Store,
};
Expand Down Expand Up @@ -97,7 +98,7 @@ impl StoreHandle {
let join_handle = std::thread::Builder::new()
.name("sync-actor".to_string())
.spawn(move || {
let span = error_span!("store", me=%me.fmt_short());
let span = error_span!("willow_thread", me=%me.fmt_short());
let _guard = span.enter();

let mut actor = StorageThread {
Expand Down Expand Up @@ -164,7 +165,7 @@ pub enum ToActor {
},
IngestEntry {
entry: AuthorisedEntry,
reply: oneshot::Sender<anyhow::Result<()>>,
reply: oneshot::Sender<anyhow::Result<bool>>,
},
Shutdown {
#[debug(skip)]
Expand All @@ -177,6 +178,7 @@ struct Session {
state: SharedSessionState,
channels: Channels,
coroutines: HashSet<CoroId>,
span: Span,
on_done: oneshot::Sender<Result<(), Error>>,
}

Expand Down Expand Up @@ -255,25 +257,20 @@ impl<S: Store> StorageThread<S> {
peer,
state,
channels,
init: setup,
init,
on_done,
} => {
let span = error_span!("session", peer=%peer.fmt_short());
let session = Session {
state: Rc::new(RefCell::new(state)),
channels,
coroutines: Default::default(),
span,
on_done,
};
self.sessions.insert(peer, session);

debug!("start coroutine control");

if let Err(error) = self.start_coroutine(
peer,
|routine| routine.run_control(setup).boxed_local(),
error_span!("session", peer=%peer.fmt_short()),
true,
) {
if let Err(error) = self.start_control_routine(peer, init) {
warn!(?error, peer=%peer.fmt_short(), "abort session: starting failed");
self.remove_session(&peer, Err(error));
}
Expand Down Expand Up @@ -310,40 +307,69 @@ impl<S: Store> StorageThread<S> {
}
}

fn start_control_routine(
&mut self,
session_id: SessionId,
init: SessionInit,
) -> Result<(), Error> {
let create_fn = |co, session: &mut Session| {
let channels = session.channels.clone();
let state = session.state.clone();
ControlRoutine::new(co, channels, state)
.run(init)
.boxed_local()
};
let span_fn = || error_span!("control");
self.start_coroutine(session_id, create_fn, span_fn, true)
}

fn start_reconcile_routine(
&mut self,
session_id: SessionId,
start: Option<InitWithArea>,
) -> Result<(), Error> {
let store_snapshot = Rc::new(self.store.borrow_mut().snapshot()?);
let store_writer = Rc::clone(&self.store);
let create_fn = |co, session: &mut Session| {
let channels = session.channels.clone();
let state = session.state.clone();
ReconcileRoutine::new(co, channels, state, store_snapshot, store_writer)
.run(start)
.boxed_local()
};
let span_fn = || error_span!("reconcile");
self.start_coroutine(session_id, create_fn, span_fn, false)
}

fn start_coroutine(
&mut self,
session_id: SessionId,
producer: impl FnOnce(Coroutine<S::Snapshot, S>) -> CoroFut,
span: Span,
create_fn: impl FnOnce(WakeableCo, &mut Session) -> CoroFut,
span_fn: impl FnOnce() -> Span,
finalizes_session: bool,
) -> Result<(), Error> {
let session = self
.sessions
.get_mut(&session_id)
.ok_or(Error::SessionNotFound)?;
let store_snapshot = Rc::new(self.store.borrow_mut().snapshot()?);

let id = {
let next_id = self.next_coro_id;
let id = self.next_coro_id;
self.next_coro_id += 1;
next_id
id
};
let channels = session.channels.clone();
let state = session.state.clone();
let store_writer = Rc::clone(&self.store);

session.coroutines.insert(id);
let waker = self.notifier.create_waker(id);

let _guard = session.span.enter();
let span = span_fn();
drop(_guard);

let gen = Gen::new(move |co| {
let routine = Coroutine {
store_snapshot,
store_writer,
channels,
state,
co: WakeableCo::new(co, waker),
};
(producer)(routine)
let co = WakeableCo::new(co, waker);
create_fn(co, session)
});
session.coroutines.insert(id);
let state = CoroutineState {
id,
session_id,
Expand All @@ -368,18 +394,12 @@ impl<S: Store> StorageThread<S> {
break Ok(());
}
Yield::StartReconciliation(start) => {
debug!("start coroutine reconciliation");
self.start_coroutine(
coro.session_id,
|state| state.run_reconciliation(start).boxed_local(),
error_span!("reconcile"),
false,
)?;
self.start_reconcile_routine(coro.session_id, start)?;
}
}
}
GeneratorState::Complete(res) => {
debug!(?res, "complete");
debug!(?res, "routine completed");
if res.is_err() || coro.finalizes_session {
self.remove_session(&coro.session_id, res)
}
Expand All @@ -390,6 +410,14 @@ impl<S: Store> StorageThread<S> {
}
}

pub type InitWithArea = (AreaOfInterestHandle, AreaOfInterestHandle);

#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub enum Yield {
Pending,
StartReconciliation(Option<InitWithArea>),
}

#[derive(derive_more::Debug)]
pub struct WakeableCo {
pub waker: Waker,
Expand Down
1 change: 1 addition & 0 deletions iroh-willow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#![allow(missing_docs)]

pub mod actor;
pub mod net;
pub mod proto;
pub mod session;
Expand Down
60 changes: 22 additions & 38 deletions iroh-willow/src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,18 @@ use tokio::{
use tracing::{debug, error_span, instrument, trace, warn, Instrument};

use crate::{
actor::{StoreHandle, ToActor},
proto::wgps::{
AccessChallenge, ChallengeHash, LogicalChannel, Message, CHALLENGE_HASH_LENGTH,
MAX_PAYLOAD_SIZE_POWER,
},
session::{coroutine::Channels, Role, SessionInit, SessionState},
store::actor::{StoreHandle, ToActor},
session::{channels::Channels, Role, SessionInit, SessionState},
util::channel::{inbound_channel, outbound_channel, Reader, Receiver, Sender, Writer},
};

const CHANNEL_CAP: usize = 1024 * 64;

const ERROR_CODE_CLOSE_GRACEFUL: u16 = 1;

#[instrument(skip_all, fields(me=%me.fmt_short(), role=?our_role))]
#[instrument(skip_all, name = "willow_net", fields(me=%me.fmt_short(), peer=%peer.fmt_short()))]
pub async fn run(
me: NodeId,
store: StoreHandle,
Expand All @@ -31,6 +29,7 @@ pub async fn run(
our_role: Role,
init: SessionInit,
) -> anyhow::Result<()> {
debug!(?our_role, "connected");
let mut join_set = JoinSet::new();
let (mut control_send_stream, mut control_recv_stream) = match our_role {
Role::Alfie => conn.open_bi().await?,
Expand All @@ -39,18 +38,16 @@ pub async fn run(
control_send_stream.set_priority(i32::MAX)?;

let our_nonce: AccessChallenge = rand::random();
debug!("start");
let (received_commitment, max_payload_size) = exchange_commitments(
&mut control_send_stream,
&mut control_recv_stream,
&our_nonce,
)
.await?;
debug!("exchanged comittments");
debug!("commitments exchanged");

let (control_send, control_recv) = spawn_channel(
&mut join_set,
peer,
LogicalChannel::Control,
CHANNEL_CAP,
control_send_stream,
Expand All @@ -65,13 +62,12 @@ pub async fn run(
reconciliation_recv_stream.read_u8().await?;
let (reconciliation_send, reconciliation_recv) = spawn_channel(
&mut join_set,
peer,
LogicalChannel::Reconciliation,
CHANNEL_CAP,
reconciliation_send_stream,
reconciliation_recv_stream,
);
debug!("reconcile channel open");
debug!("channels opened");

let channels = Channels {
control_send,
Expand All @@ -97,7 +93,9 @@ pub async fn run(
Ok(())
});

join_all(join_set).await
join_all(join_set).await?;
debug!("all tasks finished");
Ok(())
}

async fn join_all(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<()> {
Expand All @@ -119,7 +117,6 @@ async fn join_all(mut join_set: JoinSet<anyhow::Result<()>>) -> anyhow::Result<(

fn spawn_channel(
join_set: &mut JoinSet<anyhow::Result<()>>,
peer: NodeId,
ch: LogicalChannel,
cap: usize,
send_stream: quinn::SendStream,
Expand All @@ -130,13 +127,13 @@ fn spawn_channel(

let recv_fut = recv_loop(recv_stream, inbound_writer)
.map_err(move |e| e.context(format!("receive loop for {ch:?} failed")))
.instrument(error_span!("recv", peer=%peer.fmt_short(), ch=%ch.fmt_short()));
.instrument(error_span!("recv", ch=%ch.fmt_short()));

join_set.spawn(recv_fut);

let send_fut = send_loop(send_stream, outbound_reader)
.map_err(move |e| e.context(format!("send loop for {ch:?} failed")))
.instrument(error_span!("send", peer=%peer.fmt_short(), ch=%ch.fmt_short()));
.instrument(error_span!("send", ch=%ch.fmt_short()));

join_set.spawn(send_fut);

Expand All @@ -151,7 +148,6 @@ async fn recv_loop(
channel_writer.write_all(&buf.bytes[..]).await?;
trace!(len = buf.bytes.len(), "recv");
}
recv_stream.stop(ERROR_CODE_CLOSE_GRACEFUL.into()).ok();
channel_writer.close();
Ok(())
}
Expand All @@ -165,13 +161,7 @@ async fn send_loop(
send_stream.write_chunk(data).await?;
trace!(len, "sent");
}
match send_stream.finish().await {
Ok(()) => {}
// If the other side closed gracefully, we are good.
Err(quinn::WriteError::Stopped(code))
if code.into_inner() == ERROR_CODE_CLOSE_GRACEFUL as u64 => {}
Err(err) => return Err(err.into()),
}
send_stream.finish().await?;
Ok(())
}

Expand Down Expand Up @@ -207,6 +197,7 @@ mod tests {
use tracing::{debug, info};

use crate::{
actor::{StoreHandle, ToActor},
net::run,
proto::{
grouping::{AreaOfInterest, ThreeDRange},
Expand All @@ -216,10 +207,7 @@ mod tests {
willow::{Entry, InvalidPath, Path, WriteCapability},
},
session::{Role, SessionInit},
store::{
actor::{StoreHandle, ToActor},
MemoryStore,
},
store::MemoryStore,
};

const ALPN: &[u8] = b"iroh-willow/0";
Expand Down Expand Up @@ -374,18 +362,15 @@ mod tests {
) -> anyhow::Result<SessionInit> {
let user_secret = UserSecretKey::generate(rng);
let (read_cap, write_cap) = create_capabilities(namespace_secret, user_secret.public_key());
let subspace_id = user_secret.id();
let namespace_id = namespace_secret.id();
for i in 0..count {
let path = path_fn(i);
let entry = Entry {
namespace_id,
subspace_id,
path: path.expect("invalid path"),
timestamp: 10,
payload_length: 2,
payload_digest: Hash::new("cool things"),
};
let path = path_fn(i).expect("invalid path");
let entry = Entry::new_current(
namespace_secret.id(),
user_secret.id(),
path,
Hash::new("hello"),
5,
);
track_entries.extend([entry.clone()]);
let entry = entry.attach_authorisation(write_cap.clone(), &user_secret)?;
store.ingest_entry(entry).await?;
Expand All @@ -409,7 +394,6 @@ mod tests {
AccessMode::Write,
));
(read_capability, write_capability)
// let init = SessionInit::with_interest(secret_key, read_capability, AreaOfInterest::full())
}

// async fn get_entries_debug(
Expand Down
Loading

0 comments on commit b730532

Please sign in to comment.