diff --git a/iroh/src/docs_engine/gossip.rs b/iroh/src/docs_engine/gossip.rs index 9c9050d856..b9b6cea476 100644 --- a/iroh/src/docs_engine/gossip.rs +++ b/iroh/src/docs_engine/gossip.rs @@ -1,6 +1,6 @@ use std::collections::HashSet; -use anyhow::{anyhow, Context, Result}; +use anyhow::{Context, Result}; use futures_lite::StreamExt; use iroh_docs::{actor::SyncHandle, ContentStatus, NamespaceId}; use iroh_gossip::{ @@ -9,9 +9,13 @@ use iroh_gossip::{ }; use iroh_net::key::PublicKey; use tokio::{ - sync::{broadcast::error::RecvError, mpsc}, + sync::{broadcast, mpsc}, task::JoinSet, }; +use tokio_stream::{ + wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}, + StreamMap, +}; use tracing::{debug, error, trace}; use super::live::{Op, ToLiveActor}; @@ -37,7 +41,8 @@ pub struct GossipActor { to_sync_actor: mpsc::Sender, joined: HashSet, want_join: HashSet, - pending_joins: JoinSet<(NamespaceId, Result)>, + pending_joins: JoinSet<(NamespaceId, Result>)>, + gossip_events: StreamMap>, } impl GossipActor { @@ -55,16 +60,17 @@ impl GossipActor { joined: Default::default(), want_join: Default::default(), pending_joins: Default::default(), + gossip_events: Default::default(), } } pub async fn run(&mut self) -> anyhow::Result<()> { - let mut gossip_events = self.gossip.clone().subscribe_all(); + // let mut gossip_events = self.gossip.clone().subscribe_all(); let mut i = 0; loop { i += 1; trace!(?i, "tick wait"); tokio::select! { - next = gossip_events.next() => { + next = self.gossip_events.next(), if !self.gossip_events.is_empty() => { trace!(?i, "tick: gossip_event"); if let Err(err) = self.on_gossip_event(next).await { error!("gossip actor died: {err:?}"); @@ -82,9 +88,11 @@ impl GossipActor { trace!(?i, "tick: pending_joins"); let (namespace, res) = res.context("pending_joins closed")?; match res { - Ok(_topic) => { + Ok(stream) => { debug!(namespace = %namespace.fmt_short(), "joined gossip"); self.joined.insert(namespace); + let stream = BroadcastStream::new(stream); + self.gossip_events.insert(namespace, stream); }, Err(err) => { if self.want_join.contains(&namespace) { @@ -112,13 +120,11 @@ impl GossipActor { let gossip = self.gossip.clone(); // join gossip for the topic to receive and send message let fut = async move { - let res = gossip.join(namespace.into(), peers).await; - let res = match res { - Ok(fut) => fut.await, - Err(err) => Err(err), - }; - (namespace, res) + let stream = gossip.subscribe(namespace.into()).await?; + let _topic = gossip.join(namespace.into(), peers).await?.await?; + Ok(stream) }; + let fut = fut.map(move |res| (namespace, res)); self.want_join.insert(namespace); self.pending_joins.spawn(fut); } @@ -132,22 +138,20 @@ impl GossipActor { } async fn on_gossip_event( &mut self, - event: Option>, + event: Option<(NamespaceId, Result)>, ) -> Result<()> { - let (topic, event) = match event { - Some(Ok(event)) => event, - None => return Err(anyhow!("Gossip event channel closed")), - Some(Err(err)) => match err { - RecvError::Lagged(n) => { - error!("GossipActor too slow (lagged by {n}) - dropping gossip event"); - return Ok(()); - } - RecvError::Closed => { - return Err(anyhow!("Gossip event channel closed")); - } - }, + let (namespace, event) = event.context("Gossip event channel closed")?; + let event = match event { + Ok(event) => event, + Err(BroadcastStreamRecvError::Lagged(n)) => { + error!("GossipActor too slow (lagged by {n}) - dropping gossip event"); + return Ok(()); + } }; - let namespace: NamespaceId = topic.as_bytes().into(); + if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) { + error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic"); + return Ok(()); + } if let Err(err) = self.on_gossip_event_inner(namespace, event).await { error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event"); }