Skip to content

Commit

Permalink
fix: properly wait for docs engine shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed Jun 20, 2024
1 parent 55a0c0b commit f3c0057
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 32 deletions.
37 changes: 20 additions & 17 deletions iroh-docs/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,9 @@ impl SyncHandle {
pub async fn shutdown(&self) -> Result<Store> {
let (reply, rx) = oneshot::channel();
let action = Action::Shutdown { reply: Some(reply) };
self.send(action).await.ok();
Ok(rx.await?)
self.send(action).await?;
let store = rx.await?;
Ok(store)
}

pub async fn list_authors(&self, reply: flume::Sender<Result<AuthorId>>) -> Result<()> {
Expand Down Expand Up @@ -581,10 +582,12 @@ impl Actor {
.enable_time()
.build()?;
let local_set = tokio::task::LocalSet::new();
local_set.block_on(&rt, async move { self.run_async().await })
local_set.block_on(&rt, async move { self.run_async().await });
Ok(())
}
async fn run_async(mut self) -> Result<()> {
loop {

async fn run_async(mut self) {
let reply = loop {
let timeout = tokio::time::sleep(MAX_COMMIT_DELAY);
tokio::pin!(timeout);
let action = tokio::select! {
Expand All @@ -599,7 +602,7 @@ impl Actor {
Ok(action) => action,
Err(flume::RecvError::Disconnected) => {
debug!("action channel disconnected");
break;
break None;
}

}
Expand All @@ -608,31 +611,31 @@ impl Actor {
trace!(%action, "tick");
match action {
Action::Shutdown { reply } => {
if let Err(cause) = self.store.flush() {
warn!(?cause, "failed to flush store");
}
self.close_all();
if let Some(reply) = reply {
send_reply(reply, self.store).ok();
}
break;
break reply;
}
action => {
if self.on_action(action).is_err() {
warn!("failed to send reply: receiver dropped");
}
}
}
};

if let Err(cause) = self.store.flush() {
warn!(?cause, "failed to flush store");
}
self.close_all();
self.tasks.abort_all();
debug!("shutdown");
Ok(())
debug!("docs actor shutdown");
if let Some(reply) = reply {
reply.send(self.store).ok();
}
}

fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> {
match action {
Action::Shutdown { .. } => {
unreachable!("Shutdown action should be handled in run()")
unreachable!("Shutdown is handled in run()")
}
Action::ImportAuthor { author, reply } => {
let id = author.id();
Expand Down
8 changes: 6 additions & 2 deletions iroh-docs/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Engine {
Err(err) => {
// If loading the default author failed, make sure to shutdown the sync actor before
// returning.
sync.shutdown().await?;
let _store = sync.shutdown().await.ok();
return Err(err);
}
};
Expand Down Expand Up @@ -207,7 +207,11 @@ impl Engine {

/// Shutdown the engine.
pub async fn shutdown(&self) -> Result<()> {
self.to_live_actor.send(ToLiveActor::Shutdown).await?;
let (reply, reply_rx) = oneshot::channel();
self.to_live_actor
.send(ToLiveActor::Shutdown { reply })
.await?;
reply_rx.await?;
Ok(())
}
}
Expand Down
36 changes: 24 additions & 12 deletions iroh-docs/src/engine/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ pub enum ToLiveActor {
#[debug("onsehot::Sender")]
reply: sync::oneshot::Sender<anyhow::Result<()>>,
},
Shutdown,
Shutdown {
reply: sync::oneshot::Sender<()>,
},
Subscribe {
namespace: NamespaceId,
#[debug("sender")]
Expand Down Expand Up @@ -211,7 +213,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}

/// Run the actor loop.
pub async fn run(&mut self, mut gossip_actor: GossipActor) -> Result<()> {
pub async fn run(mut self, mut gossip_actor: GossipActor) -> Result<()> {
let me = self.endpoint.node_id().fmt_short();
let gossip_handle = tokio::task::spawn(
async move {
Expand All @@ -222,15 +224,22 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
.instrument(error_span!("sync", %me)),
);

let res = self.run_inner().await;
let shutdown_reply = self.run_inner().await;
if let Err(err) = self.shutdown().await {
error!(?err, "Error during shutdown");
}
gossip_handle.await?;
res
drop(self);
match shutdown_reply {
Ok(reply) => {
reply.send(()).ok();
Ok(())
}
Err(err) => Err(err),
}
}

async fn run_inner(&mut self) -> Result<()> {
async fn run_inner(&mut self) -> Result<oneshot::Sender<()>> {
let mut i = 0;
loop {
i += 1;
Expand All @@ -240,8 +249,13 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
msg = self.inbox.recv() => {
let msg = msg.context("to_actor closed")?;
trace!(?i, %msg, "tick: to_actor");
if !self.on_actor_message(msg).await.context("on_actor_message")? {
break;
match msg {
ToLiveActor::Shutdown { reply } => {
break Ok(reply);
}
msg => {
self.on_actor_message(msg).await.context("on_actor_message")?;
}
}
}
event = self.replica_events_rx.recv_async() => {
Expand Down Expand Up @@ -270,14 +284,12 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
}
}
}
debug!("close (shutdown)");
Ok(())
}

async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result<bool> {
match msg {
ToLiveActor::Shutdown => {
return Ok(false);
ToLiveActor::Shutdown { .. } => {
unreachable!("handled in run");
}
ToLiveActor::IncomingSyncReport { from, report } => {
self.on_sync_report(from, report).await
Expand Down Expand Up @@ -365,7 +377,7 @@ impl<B: iroh_blobs::store::Store> LiveActor<B> {
.await
.ok();
// shutdown sync thread
let _ = self.sync.shutdown().await;
let _store = self.sync.shutdown().await;
Ok(())
}

Expand Down
1 change: 0 additions & 1 deletion iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,6 @@ mod tests {
}

#[cfg(feature = "fs-store")]
#[ignore = "flaky"]
#[tokio::test]
async fn test_default_author_persist() -> Result<()> {
use crate::util::path::IrohPaths;
Expand Down

0 comments on commit f3c0057

Please sign in to comment.