Skip to content

Commit

Permalink
PR review
Browse files Browse the repository at this point in the history
- remove some error level loggings
- more docs for subscribe
- add subscribe opts
- allow passing in raw topic
  • Loading branch information
rklaehn committed Jun 7, 2024
1 parent 5d6b7d0 commit 865bc0d
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 17 deletions.
35 changes: 31 additions & 4 deletions iroh-cli/src/commands/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{Context, Result};
use bao_tree::blake3;
use clap::Subcommand;
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh::client::gossip::SubscribeOpts;
use iroh::client::{Iroh, RpcService};
use iroh::net::NodeId;
use quic_rpc::ServiceConnection;
Expand All @@ -12,9 +13,20 @@ use tokio::io::AsyncBufReadExt;
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
/// Subscribe to a topic
#[command(group(
ArgGroup::new("input")
.required(true)
.args(&["topic", "raw_topic"])
))]
Subscribe {
/// Topic string to subscribe to.
///
/// This will be hashed with BLAKE3 to get the actual topic ID.
#[clap(long)]
topic: String,
topic: Option<String>,
/// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
#[clap(long)]
raw_topic: Option<String>,
bootstrap: Vec<NodeId>,
#[clap(long, short)]
verbose: bool,
Expand All @@ -29,13 +41,28 @@ impl GossipCommands {
match self {
Self::Subscribe {
topic,
raw_topic,
bootstrap,
verbose,
} => {
let bootstrap = bootstrap.into_iter().collect();
let topic = blake3::hash(topic.as_ref()).into();
let topic = match (topic, raw_topic) {
(Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
(None, Some(raw_topic)) => {
let mut slice = [0; 32];
hex::decode_to_slice(raw_topic, &mut slice)
.context("failed to decode raw topic")?;
slice.into()
}
_ => anyhow::bail!("either topic or raw_topic must be provided"),
};
// blake3::hash(topic.as_ref()).into();
let opts = SubscribeOpts {
bootstrap,
subscription_capacity: 1024,
};

let (mut sink, mut stream) = iroh.gossip.subscribe(topic, bootstrap).await?;
let (mut sink, mut stream) = iroh.gossip.subscribe_with_opts(topic, opts).await?;
let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
Expand Down
14 changes: 5 additions & 9 deletions iroh-gossip/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,7 @@ enum TopicState {
/// Set of bootstrap nodes we are using.
bootstrap: BTreeSet<NodeId>,
/// The task that is driving the join future.
#[allow(dead_code)]
join_task: AbortingJoinHandle<()>,
_join_task: AbortingJoinHandle<()>,
},
/// The topic is currently live.
/// New subscriptions can be immediately added.
Expand Down Expand Up @@ -190,13 +189,13 @@ impl GossipDispatcher {
return;
}
let bootstrap = peers.clone();
let join_task = spawn_owned(self.clone().join_task(topic, bootstrap));
let _join_task = spawn_owned(self.clone().join_task(topic, bootstrap));
inner.current_subscriptions.insert(
topic,
TopicState::Joining {
waiting,
bootstrap: peers,
join_task,
_join_task,
},
);
}
Expand Down Expand Up @@ -322,11 +321,8 @@ impl GossipDispatcher {
///
/// Basically just flattens the two stages of joining into one.
async fn join(gossip: Gossip, topic: TopicId, bootstrap: Vec<NodeId>) -> anyhow::Result<()> {
tracing::error!("Joining gossip topic {:?}", topic);
let join = gossip.join(topic, bootstrap).await?;
tracing::error!("Waiting for joint to gossip topic {:?} to succeed", topic);
join.await?;
tracing::error!("Joined gossip topic {:?}", topic);
Ok(())
}

Expand Down Expand Up @@ -377,12 +373,12 @@ impl GossipDispatcher {
// There is no existing subscription, so we need to start a new one.
let waiting = vec![(updates, send)];
let this = self.clone();
let join_task =
let _join_task =
spawn_owned(this.clone().join_task(topic, options.bootstrap.clone()));
entry.insert(TopicState::Joining {
waiting,
bootstrap: options.bootstrap,
join_task,
_join_task,
});
}
Entry::Occupied(mut entry) => {
Expand Down
1 change: 1 addition & 0 deletions iroh-net/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod chain;

/// A join handle that owns the task it is running, and aborts it when dropped.
#[derive(Debug, derive_more::Deref)]
#[must_use = "aborting join handle will abort the task when dropped"]
pub struct AbortingJoinHandle<T> {
handle: tokio::task::JoinHandle<T>,
}
Expand Down
34 changes: 30 additions & 4 deletions iroh/src/client/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,41 @@ pub struct Client<C> {
pub(super) rpc: RpcClient<RpcService, C>,
}

/// Options for subscribing to a gossip topic.
#[derive(Debug, Clone)]
pub struct SubscribeOpts {
/// Bootstrap nodes to connect to.
pub bootstrap: BTreeSet<NodeId>,
/// Subscription capacity.
pub subscription_capacity: usize,
}

impl<C> Client<C>
where
C: ServiceConnection<RpcService>,
{
/// Subscribe to a gossip topic.
pub async fn subscribe(
///
/// Returns a sink to send updates to the topic and a stream of responses.
///
/// Updates are either [Broadcast](iroh_gossip::dispatcher::Command::Broadcast)
/// or [BroadcastNeighbors](iroh_gossip::dispatcher::Command::BroadcastNeighbors).
///
/// Broadcasts are gossiped to the entire swarm, while BroadcastNeighbors are sent to
/// just the immediate neighbors of the node.
///
/// Responses are either [Gossip](iroh_gossip::dispatcher::Event::Gossip) or
/// [Lagged](iroh_gossip::dispatcher::Event::Lagged).
///
/// Gossip events contain the actual message content, as well as information about the
/// immediate neighbors of the node.
///
/// A Lagged event indicates that the gossip stream has not been consumed quickly enough.
/// You can adjust the buffer size with the [] option.
pub async fn subscribe_with_opts(
&self,
topic: TopicId,
bootstrap: BTreeSet<NodeId>,
opts: SubscribeOpts,
) -> Result<(
impl Sink<GossipSubscribeUpdate, Error = anyhow::Error>,
impl Stream<Item = Result<GossipSubscribeResponse>>,
Expand All @@ -35,8 +61,8 @@ where
.rpc
.bidi(GossipSubscribeRequest {
topic,
bootstrap,
subscription_capacity: 1024,
bootstrap: opts.bootstrap,
subscription_capacity: opts.subscription_capacity,
})
.await?;
let stream = stream.map(|item| anyhow::Ok(item??));
Expand Down

0 comments on commit 865bc0d

Please sign in to comment.