diff --git a/iroh-docs/src/actor.rs b/iroh-docs/src/actor.rs index a48e8f55b38..04ff497d5f3 100644 --- a/iroh-docs/src/actor.rs +++ b/iroh-docs/src/actor.rs @@ -477,8 +477,9 @@ impl SyncHandle { pub async fn shutdown(&self) -> Result { let (reply, rx) = oneshot::channel(); let action = Action::Shutdown { reply: Some(reply) }; - self.send(action).await.ok(); - Ok(rx.await?) + self.send(action).await?; + let store = rx.await?; + Ok(store) } pub async fn list_authors(&self, reply: flume::Sender>) -> Result<()> { @@ -581,10 +582,12 @@ impl Actor { .enable_time() .build()?; let local_set = tokio::task::LocalSet::new(); - local_set.block_on(&rt, async move { self.run_async().await }) + local_set.block_on(&rt, async move { self.run_async().await }); + Ok(()) } - async fn run_async(mut self) -> Result<()> { - loop { + + async fn run_async(mut self) { + let reply = loop { let timeout = tokio::time::sleep(MAX_COMMIT_DELAY); tokio::pin!(timeout); let action = tokio::select! { @@ -599,7 +602,7 @@ impl Actor { Ok(action) => action, Err(flume::RecvError::Disconnected) => { debug!("action channel disconnected"); - break; + break None; } } @@ -608,14 +611,7 @@ impl Actor { trace!(%action, "tick"); match action { Action::Shutdown { reply } => { - if let Err(cause) = self.store.flush() { - warn!(?cause, "failed to flush store"); - } - self.close_all(); - if let Some(reply) = reply { - send_reply(reply, self.store).ok(); - } - break; + break reply; } action => { if self.on_action(action).is_err() { @@ -623,16 +619,23 @@ impl Actor { } } } + }; + + if let Err(cause) = self.store.flush() { + warn!(?cause, "failed to flush store"); } + self.close_all(); self.tasks.abort_all(); - debug!("shutdown"); - Ok(()) + debug!("docs actor shutdown"); + if let Some(reply) = reply { + reply.send(self.store).ok(); + } } fn on_action(&mut self, action: Action) -> Result<(), SendReplyError> { match action { Action::Shutdown { .. } => { - unreachable!("Shutdown action should be handled in run()") + unreachable!("Shutdown is handled in run()") } Action::ImportAuthor { author, reply } => { let id = author.id(); diff --git a/iroh-docs/src/engine.rs b/iroh-docs/src/engine.rs index b5345b0beae..4b1c496e0d2 100644 --- a/iroh-docs/src/engine.rs +++ b/iroh-docs/src/engine.rs @@ -108,7 +108,7 @@ impl Engine { Err(err) => { // If loading the default author failed, make sure to shutdown the sync actor before // returning. - sync.shutdown().await?; + let _store = sync.shutdown().await.ok(); return Err(err); } }; @@ -207,7 +207,11 @@ impl Engine { /// Shutdown the engine. pub async fn shutdown(&self) -> Result<()> { - self.to_live_actor.send(ToLiveActor::Shutdown).await?; + let (reply, reply_rx) = oneshot::channel(); + self.to_live_actor + .send(ToLiveActor::Shutdown { reply }) + .await?; + reply_rx.await?; Ok(()) } } diff --git a/iroh-docs/src/engine/live.rs b/iroh-docs/src/engine/live.rs index e7f77549b01..99404d4aba3 100644 --- a/iroh-docs/src/engine/live.rs +++ b/iroh-docs/src/engine/live.rs @@ -70,7 +70,9 @@ pub enum ToLiveActor { #[debug("onsehot::Sender")] reply: sync::oneshot::Sender>, }, - Shutdown, + Shutdown { + reply: sync::oneshot::Sender<()>, + }, Subscribe { namespace: NamespaceId, #[debug("sender")] @@ -211,7 +213,7 @@ impl LiveActor { } /// Run the actor loop. - pub async fn run(&mut self, mut gossip_actor: GossipActor) -> Result<()> { + pub async fn run(mut self, mut gossip_actor: GossipActor) -> Result<()> { let me = self.endpoint.node_id().fmt_short(); let gossip_handle = tokio::task::spawn( async move { @@ -222,15 +224,22 @@ impl LiveActor { .instrument(error_span!("sync", %me)), ); - let res = self.run_inner().await; + let shutdown_reply = self.run_inner().await; if let Err(err) = self.shutdown().await { error!(?err, "Error during shutdown"); } gossip_handle.await?; - res + drop(self); + match shutdown_reply { + Ok(reply) => { + reply.send(()).ok(); + Ok(()) + } + Err(err) => Err(err), + } } - async fn run_inner(&mut self) -> Result<()> { + async fn run_inner(&mut self) -> Result> { let mut i = 0; loop { i += 1; @@ -240,8 +249,13 @@ impl LiveActor { msg = self.inbox.recv() => { let msg = msg.context("to_actor closed")?; trace!(?i, %msg, "tick: to_actor"); - if !self.on_actor_message(msg).await.context("on_actor_message")? { - break; + match msg { + ToLiveActor::Shutdown { reply } => { + break Ok(reply); + } + msg => { + self.on_actor_message(msg).await.context("on_actor_message")?; + } } } event = self.replica_events_rx.recv_async() => { @@ -270,14 +284,12 @@ impl LiveActor { } } } - debug!("close (shutdown)"); - Ok(()) } async fn on_actor_message(&mut self, msg: ToLiveActor) -> anyhow::Result { match msg { - ToLiveActor::Shutdown => { - return Ok(false); + ToLiveActor::Shutdown { .. } => { + unreachable!("handled in run"); } ToLiveActor::IncomingSyncReport { from, report } => { self.on_sync_report(from, report).await @@ -365,7 +377,7 @@ impl LiveActor { .await .ok(); // shutdown sync thread - let _ = self.sync.shutdown().await; + let _store = self.sync.shutdown().await; Ok(()) } diff --git a/iroh/src/node.rs b/iroh/src/node.rs index 85df39cc22e..1ad2457a062 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -655,7 +655,6 @@ mod tests { } #[cfg(feature = "fs-store")] - #[ignore = "flaky"] #[tokio::test] async fn test_default_author_persist() -> Result<()> { use crate::util::path::IrohPaths;