Skip to content

Commit

Permalink
feat(iroh): Gossip client (#2258)
Browse files Browse the repository at this point in the history
## Description

This makes gossip available in iroh. Using iroh-gossip directly, while
not horrible, is a bit verbose.

## Breaking Changes

~~Not sure. It exports a few more things, and adds things. But in theory
it should not modify existing things.~~

There should be none, and the semver test seems to agree...

## Notes & open questions

- ~~There can be some scenarios where this can cause trouble. E.g. when
subscribing and then unsubscribing to a topic that is also used for doc
sync.~~

This ^ is taken care of, since docs no longer use subscribe_all!

How to deal with missed messages due to slow reader:

I think that relying on guaranteed delivery for gossip is not a good
idea in any case. so maybe just sending a Lagged but then continuing is
best? Forcing the client to resubscribe could be a bit annoying.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [x] Tests if relevant.
- [x] All breaking changes documented.
  • Loading branch information
rklaehn authored Jul 5, 2024
1 parent 5aa3fb6 commit b0d5413
Show file tree
Hide file tree
Showing 21 changed files with 828 additions and 11 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions iroh-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ dirs-next = "2.0.0"
flume = "0.11.0"
futures-buffered = "0.2.4"
futures-lite = "2.3"
futures-util = { version = "0.3.30", features = ["futures-sink"] }
hex = "0.4.3"
human-time = "0.1.6"
indicatif = { version = "0.17", features = ["tokio"] }
iroh = { version = "0.19.0", path = "../iroh", features = ["metrics"] }
iroh-gossip = { version = "0.19.0", path = "../iroh-gossip" }
iroh-metrics = { version = "0.19.0", path = "../iroh-metrics" }
parking_lot = "0.12.1"
pkarr = { version = "1.1.5", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions iroh-cli/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod blob;
pub(crate) mod console;
pub(crate) mod doc;
pub(crate) mod doctor;
pub(crate) mod gossip;
pub(crate) mod node;
pub(crate) mod rpc;
pub(crate) mod start;
Expand Down
94 changes: 94 additions & 0 deletions iroh-cli/src/commands/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use anyhow::{Context, Result};
use bao_tree::blake3;
use clap::{ArgGroup, Subcommand};
use futures_lite::StreamExt;
use futures_util::SinkExt;
use iroh::client::gossip::SubscribeOpts;
use iroh::client::Iroh;
use iroh::net::NodeId;
use tokio::io::AsyncBufReadExt;

#[derive(Subcommand, Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum GossipCommands {
/// Subscribe to a topic
#[command(group(
ArgGroup::new("input")
.required(true)
.args(&["topic", "raw_topic"])
))]
Subscribe {
/// Topic string to subscribe to.
///
/// This will be hashed with BLAKE3 to get the actual topic ID.
#[clap(long)]
topic: Option<String>,
/// The raw topic to subscribe to as hex. Needs to be 32 bytes, i.e. 64 hex characters.
#[clap(long)]
raw_topic: Option<String>,
bootstrap: Vec<NodeId>,
#[clap(long, short)]
verbose: bool,
},
}

impl GossipCommands {
pub async fn run(self, iroh: &Iroh) -> Result<()> {
match self {
Self::Subscribe {
topic,
raw_topic,
bootstrap,
verbose,
} => {
let bootstrap = bootstrap.into_iter().collect();
let topic = match (topic, raw_topic) {
(Some(topic), None) => blake3::hash(topic.as_bytes()).into(),
(None, Some(raw_topic)) => {
let mut slice = [0; 32];
hex::decode_to_slice(raw_topic, &mut slice)
.context("failed to decode raw topic")?;
slice.into()
}
_ => anyhow::bail!("either topic or raw_topic must be provided"),
};
// blake3::hash(topic.as_ref()).into();
let opts = SubscribeOpts {
bootstrap,
subscription_capacity: 1024,
};

let (mut sink, mut stream) = iroh.gossip().subscribe_with_opts(topic, opts).await?;
let mut input_lines = tokio::io::BufReader::new(tokio::io::stdin()).lines();
loop {
tokio::select! {
line = input_lines.next_line() => {
let line = line.context("failed to read from stdin")?;
if let Some(line) = line {
sink.send(iroh_gossip::dispatcher::Command::Broadcast(line.into())).await?;
} else {
break;
}
}
res = stream.next() => {
let res = res.context("gossip stream ended")?.context("failed to read gossip stream")?;
match res {
iroh_gossip::dispatcher::Event::Gossip(event) => {
if verbose {
println!("{:?}", event);
} else if let iroh_gossip::dispatcher::GossipEvent::Received(iroh_gossip::dispatcher::Message { content, .. }) = event {
println!("{:?}", content);
}
}
iroh_gossip::dispatcher::Event::Lagged => {
anyhow::bail!("gossip stream lagged");
}
};
}
}
}
}
}
Ok(())
}
}
12 changes: 10 additions & 2 deletions iroh-cli/src/commands/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use iroh::client::Iroh;
use crate::config::ConsoleEnv;

use super::{
author::AuthorCommands, blob::BlobCommands, doc::DocCommands, node::NodeCommands,
tag::TagCommands,
author::AuthorCommands, blob::BlobCommands, doc::DocCommands, gossip::GossipCommands,
node::NodeCommands, tag::TagCommands,
};

#[derive(Subcommand, Debug, Clone)]
Expand Down Expand Up @@ -41,6 +41,13 @@ pub enum RpcCommands {
#[clap(subcommand)]
command: NodeCommands,
},
/// Manage gossip
///
/// Gossip is a way to broadcast messages to a group of nodes.
Gossip {
#[clap(subcommand)]
command: GossipCommands,
},
/// Manage tags
///
/// Tags are local, human-readable names for things iroh should keep.
Expand All @@ -64,6 +71,7 @@ impl RpcCommands {
Self::Doc { command } => command.run(iroh, env).await,
Self::Author { command } => command.run(iroh, env).await,
Self::Tag { command } => command.run(iroh).await,
Self::Gossip { command } => command.run(iroh).await,
}
}
}
7 changes: 6 additions & 1 deletion iroh-gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
genawaiter = { version = "0.99.1", default-features = false, features = ["futures03"] }

# dispatcher dependencies (optional)
futures-util = { version = "0.3.30", optional = true }
flume = { version = "0.11", optional = true }

[dev-dependencies]
clap = { version = "4", features = ["derive"] }
iroh-test = { path = "../iroh-test" }
Expand All @@ -45,8 +49,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.4.0"

[features]
default = ["net"]
default = ["net", "dispatcher"]
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
dispatcher = ["dep:flume", "dep:futures-util"]

[[example]]
name = "chat"
Expand Down
Loading

0 comments on commit b0d5413

Please sign in to comment.