From eebf6d127fa565c21ec696e9c10bca59a96b7b54 Mon Sep 17 00:00:00 2001 From: Franz Heinzmann Date: Fri, 17 May 2024 09:38:12 +0200 Subject: [PATCH] refactor: do not use gossip subscribe_all in iroh sync engine (#2265) ## Description Changes gossip usage in `iroh::docs_engine` to not use `Gossip::subscribe_all` but subscribe to individual topics instead. ## Breaking Changes ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- iroh/src/docs_engine/gossip.rs | 63 +++++++++++++++++----------------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/iroh/src/docs_engine/gossip.rs b/iroh/src/docs_engine/gossip.rs index 9c9050d856..373bd20ec6 100644 --- a/iroh/src/docs_engine/gossip.rs +++ b/iroh/src/docs_engine/gossip.rs @@ -1,18 +1,20 @@ use std::collections::HashSet; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use futures_lite::StreamExt; +use futures_util::FutureExt; use iroh_docs::{actor::SyncHandle, ContentStatus, NamespaceId}; -use iroh_gossip::{ - net::{Event, Gossip}, - proto::TopicId, -}; +use iroh_gossip::net::{Event, Gossip}; use iroh_net::key::PublicKey; use tokio::{ - sync::{broadcast::error::RecvError, mpsc}, + sync::{broadcast, mpsc}, task::JoinSet, }; -use tracing::{debug, error, trace}; +use tokio_stream::{ + wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, + StreamMap, +}; +use tracing::{debug, error, trace, warn}; use super::live::{Op, ToLiveActor}; @@ -37,7 +39,8 @@ pub struct GossipActor { to_sync_actor: mpsc::Sender, joined: HashSet, want_join: HashSet, - pending_joins: JoinSet<(NamespaceId, Result)>, + pending_joins: JoinSet<(NamespaceId, Result>)>, + gossip_events: StreamMap>, } impl GossipActor { @@ -55,16 +58,16 @@ impl GossipActor { joined: Default::default(), want_join: Default::default(), pending_joins: Default::default(), + gossip_events: Default::default(), } } pub async fn run(&mut self) -> anyhow::Result<()> { - let mut gossip_events = self.gossip.clone().subscribe_all(); let mut i = 0; loop { i += 1; trace!(?i, "tick wait"); tokio::select! { - next = gossip_events.next() => { + next = self.gossip_events.next(), if !self.gossip_events.is_empty() => { trace!(?i, "tick: gossip_event"); if let Err(err) = self.on_gossip_event(next).await { error!("gossip actor died: {err:?}"); @@ -82,9 +85,11 @@ impl GossipActor { trace!(?i, "tick: pending_joins"); let (namespace, res) = res.context("pending_joins closed")?; match res { - Ok(_topic) => { + Ok(stream) => { debug!(namespace = %namespace.fmt_short(), "joined gossip"); self.joined.insert(namespace); + let stream = BroadcastStream::new(stream); + self.gossip_events.insert(namespace, stream); }, Err(err) => { if self.want_join.contains(&namespace) { @@ -112,13 +117,11 @@ impl GossipActor { let gossip = self.gossip.clone(); // join gossip for the topic to receive and send message let fut = async move { - let res = gossip.join(namespace.into(), peers).await; - let res = match res { - Ok(fut) => fut.await, - Err(err) => Err(err), - }; - (namespace, res) + let stream = gossip.subscribe(namespace.into()).await?; + let _topic = gossip.join(namespace.into(), peers).await?.await?; + Ok(stream) }; + let fut = fut.map(move |res| (namespace, res)); self.want_join.insert(namespace); self.pending_joins.spawn(fut); } @@ -132,22 +135,20 @@ impl GossipActor { } async fn on_gossip_event( &mut self, - event: Option>, + event: Option<(NamespaceId, Result)>, ) -> Result<()> { - let (topic, event) = match event { - Some(Ok(event)) => event, - None => return Err(anyhow!("Gossip event channel closed")), - Some(Err(err)) => match err { - RecvError::Lagged(n) => { - error!("GossipActor too slow (lagged by {n}) - dropping gossip event"); - return Ok(()); - } - RecvError::Closed => { - return Err(anyhow!("Gossip event channel closed")); - } - }, + let (namespace, event) = event.context("Gossip event channel closed")?; + let event = match event { + Ok(event) => event, + Err(BroadcastStreamRecvError::Lagged(n)) => { + warn!("GossipActor too slow (lagged by {n}) - dropping gossip event"); + return Ok(()); + } }; - let namespace: NamespaceId = topic.as_bytes().into(); + if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { + error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); + return Ok(()); + } if let Err(err) = self.on_gossip_event_inner(namespace, event).await { error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event"); }