Skip to content

Commit

Permalink
refactor: do not use subscribe_all in iroh sync
Browse files Browse the repository at this point in the history
  • Loading branch information
Frando committed May 16, 2024
1 parent 491012c commit 1aceaba
Showing 1 changed file with 30 additions and 26 deletions.
56 changes: 30 additions & 26 deletions iroh/src/docs_engine/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashSet;

use anyhow::{anyhow, Context, Result};
use anyhow::{Context, Result};
use futures_lite::StreamExt;
use iroh_docs::{actor::SyncHandle, ContentStatus, NamespaceId};
use iroh_gossip::{
Expand All @@ -9,9 +9,13 @@ use iroh_gossip::{
};
use iroh_net::key::PublicKey;
use tokio::{
sync::{broadcast::error::RecvError, mpsc},
sync::{broadcast, mpsc},
task::JoinSet,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
StreamMap,
};
use tracing::{debug, error, trace};

use super::live::{Op, ToLiveActor};
Expand All @@ -37,7 +41,8 @@ pub struct GossipActor {
to_sync_actor: mpsc::Sender<ToLiveActor>,
joined: HashSet<NamespaceId>,
want_join: HashSet<NamespaceId>,
pending_joins: JoinSet<(NamespaceId, Result<TopicId>)>,
pending_joins: JoinSet<(NamespaceId, Result<broadcast::Receiver<Event>>)>,
gossip_events: StreamMap<NamespaceId, BroadcastStream<Event>>,
}

impl GossipActor {
Expand All @@ -55,16 +60,17 @@ impl GossipActor {
joined: Default::default(),
want_join: Default::default(),
pending_joins: Default::default(),
gossip_events: Default::default(),
}
}
pub async fn run(&mut self) -> anyhow::Result<()> {
let mut gossip_events = self.gossip.clone().subscribe_all();
// let mut gossip_events = self.gossip.clone().subscribe_all();
let mut i = 0;
loop {
i += 1;
trace!(?i, "tick wait");
tokio::select! {
next = gossip_events.next() => {
next = self.gossip_events.next(), if !self.gossip_events.is_empty() => {
trace!(?i, "tick: gossip_event");
if let Err(err) = self.on_gossip_event(next).await {
error!("gossip actor died: {err:?}");
Expand All @@ -82,9 +88,11 @@ impl GossipActor {
trace!(?i, "tick: pending_joins");
let (namespace, res) = res.context("pending_joins closed")?;
match res {
Ok(_topic) => {
Ok(stream) => {
debug!(namespace = %namespace.fmt_short(), "joined gossip");
self.joined.insert(namespace);
let stream = BroadcastStream::new(stream);
self.gossip_events.insert(namespace, stream);
},
Err(err) => {
if self.want_join.contains(&namespace) {
Expand Down Expand Up @@ -112,13 +120,11 @@ impl GossipActor {
let gossip = self.gossip.clone();
// join gossip for the topic to receive and send message
let fut = async move {
let res = gossip.join(namespace.into(), peers).await;
let res = match res {
Ok(fut) => fut.await,
Err(err) => Err(err),
};
(namespace, res)
let stream = gossip.subscribe(namespace.into()).await?;
let _topic = gossip.join(namespace.into(), peers).await?.await?;
Ok(stream)
};
let fut = fut.map(move |res| (namespace, res));
self.want_join.insert(namespace);
self.pending_joins.spawn(fut);
}
Expand All @@ -132,22 +138,20 @@ impl GossipActor {
}
async fn on_gossip_event(
&mut self,
event: Option<Result<(TopicId, Event), RecvError>>,
event: Option<(NamespaceId, Result<Event, BroadcastStreamRecvError>)>,
) -> Result<()> {
let (topic, event) = match event {
Some(Ok(event)) => event,
None => return Err(anyhow!("Gossip event channel closed")),
Some(Err(err)) => match err {
RecvError::Lagged(n) => {
error!("GossipActor too slow (lagged by {n}) - dropping gossip event");
return Ok(());
}
RecvError::Closed => {
return Err(anyhow!("Gossip event channel closed"));
}
},
let (namespace, event) = event.context("Gossip event channel closed")?;
let event = match event {
Ok(event) => event,
Err(BroadcastStreamRecvError::Lagged(n)) => {
error!("GossipActor too slow (lagged by {n}) - dropping gossip event");
return Ok(());
}
};
let namespace: NamespaceId = topic.as_bytes().into();
if !self.joined.contains(&namespace) && !self.want_join.contains(&namespace) {
error!(namespace = %namespace.fmt_short(), "received gossip event for unknown topic");
return Ok(());
}
if let Err(err) = self.on_gossip_event_inner(namespace, event).await {
error!(namespace = %namespace.fmt_short(), ?err, "Failed to process gossip event");
}
Expand Down

0 comments on commit 1aceaba

Please sign in to comment.