From 865bc0d1d02b1c78e90721bccd1e8f01b4155466 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Fri, 7 Jun 2024 13:35:49 +0300 Subject: [PATCH] PR review - remove some error level loggings - more docs for subscribe - add subscribe opts - allow passing in raw topic --- iroh-cli/src/commands/gossip.rs | 35 +++++++++++++++++++++++++++++---- iroh-gossip/src/dispatcher.rs | 14 +++++-------- iroh-net/src/util.rs | 1 + iroh/src/client/gossip.rs | 34 ++++++++++++++++++++++++++++---- 4 files changed, 67 insertions(+), 17 deletions(-) diff --git a/iroh-cli/src/commands/gossip.rs b/iroh-cli/src/commands/gossip.rs index 26c7f82424..858ef5c9c5 100644 --- a/iroh-cli/src/commands/gossip.rs +++ b/iroh-cli/src/commands/gossip.rs @@ -1,8 +1,9 @@ use anyhow::{Context, Result}; use bao_tree::blake3; -use clap::Subcommand; +use clap::{ArgGroup, Subcommand}; use futures_lite::StreamExt; use futures_util::SinkExt; +use iroh::client::gossip::SubscribeOpts; use iroh::client::{Iroh, RpcService}; use iroh::net::NodeId; use quic_rpc::ServiceConnection; @@ -12,9 +13,20 @@ use tokio::io::AsyncBufReadExt; #[allow(clippy::large_enum_variant)] pub enum GossipCommands { /// Subscribe to a topic + #[command(group( + ArgGroup::new("input") + .required(true) + .args(&["topic", "raw_topic"]) + ))] Subscribe { + /// Topic string to subscribe to. + /// + /// This will be hashed with BLAKE3 to get the actual topic ID. #[clap(long)] - topic: String, + topic: Option, + /// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters. + #[clap(long)] + raw_topic: Option, bootstrap: Vec, #[clap(long, short)] verbose: bool, @@ -29,13 +41,28 @@ impl GossipCommands { match self { Self::Subscribe { topic, + raw_topic, bootstrap, verbose, } => { let bootstrap = bootstrap.into_iter().collect(); - let topic = blake3::hash(topic.as_ref()).into(); + let topic = match (topic, raw_topic) { + (Some(topic), None) => blake3::hash(topic.as_bytes()).into(), + (None, Some(raw_topic)) => { + let mut slice = [0; 32]; + hex::decode_to_slice(raw_topic, &mut slice) + .context("failed to decode raw topic")?; + slice.into() + } + _ => anyhow::bail!("either topic or raw_topic must be provided"), + }; + // blake3::hash(topic.as_ref()).into(); + let opts = SubscribeOpts { + bootstrap, + subscription_capacity: 1024, + }; - let (mut sink, mut stream) = iroh.gossip.subscribe(topic, bootstrap).await?; + let (mut sink, mut stream) = iroh.gossip.subscribe_with_opts(topic, opts).await?; let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines(); loop { tokio::select! { diff --git a/iroh-gossip/src/dispatcher.rs b/iroh-gossip/src/dispatcher.rs index 061b1ef78b..3da9452290 100644 --- a/iroh-gossip/src/dispatcher.rs +++ b/iroh-gossip/src/dispatcher.rs @@ -119,8 +119,7 @@ enum TopicState { /// Set of bootstrap nodes we are using. bootstrap: BTreeSet, /// The task that is driving the join future. - #[allow(dead_code)] - join_task: AbortingJoinHandle<()>, + _join_task: AbortingJoinHandle<()>, }, /// The topic is currently live. /// New subscriptions can be immediately added. @@ -190,13 +189,13 @@ impl GossipDispatcher { return; } let bootstrap = peers.clone(); - let join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); + let _join_task = spawn_owned(self.clone().join_task(topic, bootstrap)); inner.current_subscriptions.insert( topic, TopicState::Joining { waiting, bootstrap: peers, - join_task, + _join_task, }, ); } @@ -322,11 +321,8 @@ impl GossipDispatcher { /// /// Basically just flattens the two stages of joining into one. async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec) -> anyhow::Result<()> { - tracing::error!("Joining gossip topic {:?}", topic); let join = gossip.join(topic, bootstrap).await?; - tracing::error!("Waiting for joint to gossip topic {:?} to succeed", topic); join.await?; - tracing::error!("Joined gossip topic {:?}", topic); Ok(()) } @@ -377,12 +373,12 @@ impl GossipDispatcher { // There is no existing subscription, so we need to start a new one. let waiting = vec![(updates, send)]; let this = self.clone(); - let join_task = + let _join_task = spawn_owned(this.clone().join_task(topic, options.bootstrap.clone())); entry.insert(TopicState::Joining { waiting, bootstrap: options.bootstrap, - join_task, + _join_task, }); } Entry::Occupied(mut entry) => { diff --git a/iroh-net/src/util.rs b/iroh-net/src/util.rs index e94655b51f..a6aa824af0 100644 --- a/iroh-net/src/util.rs +++ b/iroh-net/src/util.rs @@ -14,6 +14,7 @@ pub mod chain; /// A join handle that owns the task it is running, and aborts it when dropped. #[derive(Debug, derive_more::Deref)] +#[must_use = "aborting join handle will abort the task when dropped"] pub struct AbortingJoinHandle { handle: tokio::task::JoinHandle, } diff --git a/iroh/src/client/gossip.rs b/iroh/src/client/gossip.rs index 2f0356c5c7..fd1c7614b1 100644 --- a/iroh/src/client/gossip.rs +++ b/iroh/src/client/gossip.rs @@ -18,15 +18,41 @@ pub struct Client { pub(super) rpc: RpcClient, } +/// Options for subscribing to a gossip topic. +#[derive(Debug, Clone)] +pub struct SubscribeOpts { + /// Bootstrap nodes to connect to. + pub bootstrap: BTreeSet, + /// Subscription capacity. + pub subscription_capacity: usize, +} + impl Client where C: ServiceConnection, { /// Subscribe to a gossip topic. - pub async fn subscribe( + /// + /// Returns a sink to send updates to the topic and a stream of responses. + /// + /// Updates are either [Broadcast](iroh_gossip::dispatcher::Command::Broadcast) + /// or [BroadcastNeighbors](iroh_gossip::dispatcher::Command::BroadcastNeighbors). + /// + /// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to + /// just the immediate neighbors of the node. + /// + /// Responses are either [Gossip](iroh_gossip::dispatcher::Event::Gossip) or + /// [Lagged](iroh_gossip::dispatcher::Event::Lagged). + /// + /// Gossip events contain the actual message content, as well as information about the + /// immediate neighbors of the node. + /// + /// A Lagged event indicates that the gossip stream has not been consumed quickly enough. + /// You can adjust the buffer size with the [] option. + pub async fn subscribe_with_opts( &self, topic: TopicId, - bootstrap: BTreeSet, + opts: SubscribeOpts, ) -> Result<( impl Sink, impl Stream>, @@ -35,8 +61,8 @@ where .rpc .bidi(GossipSubscribeRequest { topic, - bootstrap, - subscription_capacity: 1024, + bootstrap: opts.bootstrap, + subscription_capacity: opts.subscription_capacity, }) .await?; let stream = stream.map(|item| anyhow::Ok(item??));