Skip to content

Commit

Permalink
feat!: implement improved address sharing options
Browse files Browse the repository at this point in the history
This expands on the functionality previously only available to blob tickets, of configuring the information contained in a `NodeAddr`.

## Breaking Changes

- renamed: `iroh::client::blobs::ShareTicketOptions` -> `iroh_base::node_addr::AddrInfoOptions`
- renamed `iroh_cli`: `doc share --ticket-options` -> `doc share --addr-options`
- Default for `AddrInfoOptions` is now `Relay`, before it was `RelayandAddresses`
- Added `addr_options` to `iroh::client::docs::Docs.share`

Closes #2187
  • Loading branch information
dignifiedquire committed Apr 25, 2024
1 parent 265e284 commit 811442b
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 66 deletions.
43 changes: 43 additions & 0 deletions iroh-base/src/node_addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ impl NodeAddr {
self
}

/// Apply the options to `self`.
pub fn apply_options(&mut self, opts: AddrInfoOptions) {
self.info.apply_options(opts);
}

/// Get the direct addresses of this peer.
pub fn direct_addresses(&self) -> impl Iterator<Item = &SocketAddr> {
self.info.direct_addresses.iter()
Expand Down Expand Up @@ -83,6 +88,21 @@ impl AddrInfo {
pub fn is_empty(&self) -> bool {
self.relay_url.is_none() && self.direct_addresses.is_empty()
}

/// Apply the options to `self`.
pub fn apply_options(&mut self, opts: AddrInfoOptions) {
match opts {
AddrInfoOptions::RelayAndAddresses => {
// nothing to do
}
AddrInfoOptions::Relay => {
self.direct_addresses.clear();
}
AddrInfoOptions::Addresses => {
self.relay_url = None;
}
}
}
}

impl NodeAddr {
Expand All @@ -102,6 +122,29 @@ impl NodeAddr {
}
}

/// Options to configure what is included in a `NodeAddr`.
#[derive(
Copy,
Clone,
PartialEq,
Eq,
Default,
Debug,
derive_more::Display,
derive_more::FromStr,
Serialize,
Deserialize,
)]
pub enum AddrInfoOptions {
/// Include both the relay URL and the direct addresses.
RelayAndAddresses,
/// Only include the relay URL.
#[default]
Relay,
/// Only include the direct addresses.
Addresses,
}

/// A URL identifying a relay server.
///
/// This is but a wrapper around [`Url`], with a few custom tweaks:
Expand Down
29 changes: 17 additions & 12 deletions iroh-cli/src/commands/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ use indicatif::{
HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressDrawTarget, ProgressState,
ProgressStyle,
};
use iroh::bytes::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
provider::AddProgress,
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress},
BlobFormat, Hash, HashAndFormat, Tag,
};
use iroh::net::{key::PublicKey, relay::RelayUrl, NodeAddr};
use iroh::{
client::{BlobStatus, Iroh, ShareTicketOptions},
base::node_addr::AddrInfoOptions,
bytes::{
get::{db::DownloadProgress, progress::BlobProgress, Stats},
provider::AddProgress,
store::{
ConsistencyCheckProgress, ExportFormat, ExportMode, ReportLevel, ValidateProgress,
},
BlobFormat, Hash, HashAndFormat, Tag,
},
};
use iroh::{
client::{BlobStatus, Iroh},
rpc_protocol::{
BlobDownloadRequest, BlobListCollectionsResponse, BlobListIncompleteResponse,
BlobListResponse, DownloadMode, ProviderService, SetTagOption, WrapOption,
Expand Down Expand Up @@ -141,9 +146,9 @@ pub enum BlobCommands {
Share {
/// Hash of the blob to share.
hash: Hash,
/// Options to configure the generated ticket.
#[clap(long, default_value_t = ShareTicketOptions::RelayAndAddresses)]
ticket_options: ShareTicketOptions,
/// Options to configure the address information in the generated ticket.
#[clap(long, default_value_t = AddrInfoOptions::Relay)]
addr_options: AddrInfoOptions,
/// If the blob is a collection, the requester will also fetch the listed blobs.
#[clap(long, default_value_t = false)]
recursive: bool,
Expand Down Expand Up @@ -350,7 +355,7 @@ impl BlobCommands {
} => add_with_opts(iroh, path, options).await,
Self::Share {
hash,
ticket_options,
addr_options,
recursive,
debug,
} => {
Expand All @@ -360,7 +365,7 @@ impl BlobCommands {
BlobFormat::Raw
};
let status = iroh.blobs.status(hash).await?;
let ticket = iroh.blobs.share(hash, format, ticket_options).await?;
let ticket = iroh.blobs.share(hash, format, addr_options).await?;

let (blob_status, size) = match (status, format) {
(BlobStatus::Complete { size }, BlobFormat::Raw) => ("blob", size),
Expand Down
13 changes: 10 additions & 3 deletions iroh-cli/src/commands/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use colored::Colorize;
use dialoguer::Confirm;
use futures::{Stream, StreamExt, TryStreamExt};
use indicatif::{HumanBytes, HumanDuration, MultiProgress, ProgressBar, ProgressStyle};
use iroh::base::base32::fmt_short;
use iroh::base::{base32::fmt_short, node_addr::AddrInfoOptions};
use quic_rpc::ServiceConnection;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
Expand Down Expand Up @@ -112,6 +112,9 @@ pub enum DocCommands {
#[clap(short, long)]
doc: Option<NamespaceId>,
mode: ShareMode,
/// Options to configure the address information in the generated ticket.
#[clap(long, default_value_t = AddrInfoOptions::Relay)]
addr_options: AddrInfoOptions,
},
/// Set an entry in a document.
Set {
Expand Down Expand Up @@ -354,9 +357,13 @@ impl DocCommands {
println!("{id} {kind}")
}
}
Self::Share { doc, mode } => {
Self::Share {
doc,
mode,
addr_options,
} => {
let doc = get_doc(iroh, env, doc).await?;
let ticket = doc.share(mode.into()).await?;
let ticket = doc.share(mode.into(), addr_options).await?;
println!("{}", ticket);
}
Self::Set {
Expand Down
2 changes: 1 addition & 1 deletion iroh/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod tags;
pub use self::authors::Client as AuthorsClient;
pub use self::blobs::{
BlobAddOutcome, BlobAddProgress, BlobDownloadOutcome, BlobDownloadProgress, BlobReader,
BlobStatus, Client as BlobsClient, ShareTicketOptions,
BlobStatus, Client as BlobsClient,
};
pub use self::docs::{Client as DocsClient, Doc, Entry, LiveEvent};
pub use self::node::Client as NodeClient;
Expand Down
42 changes: 5 additions & 37 deletions iroh/src/client/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt};
use iroh_base::ticket::BlobTicket;
use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket};
use iroh_bytes::{
export::ExportProgress,
format::collection::Collection,
Expand All @@ -18,7 +18,6 @@ use iroh_bytes::{
store::{ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress},
BlobFormat, Hash, Tag,
};
use iroh_net::NodeAddr;
use portable_atomic::{AtomicU64, Ordering};
use quic_rpc::{client::BoxStreamSync, RpcClient, ServiceConnection};
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
Expand Down Expand Up @@ -302,28 +301,11 @@ where
&self,
hash: Hash,
blob_format: BlobFormat,
ticket_options: ShareTicketOptions,
addr_options: AddrInfoOptions,
) -> Result<BlobTicket> {
let NodeStatusResponse { addr, .. } = self.rpc.rpc(NodeStatusRequest).await??;
let mut node_addr = NodeAddr::new(addr.node_id);
match ticket_options {
ShareTicketOptions::RelayAndAddresses => {
node_addr = node_addr.with_direct_addresses(addr.direct_addresses().copied());
if let Some(url) = addr.relay_url() {
node_addr = node_addr.with_relay_url(url.clone());
}
}
ShareTicketOptions::Relay => {
if let Some(url) = addr.relay_url() {
node_addr = node_addr.with_relay_url(url.clone());
}
}
ShareTicketOptions::Addresses => {
node_addr = node_addr.with_direct_addresses(addr.direct_addresses().copied());
}
}

let ticket = BlobTicket::new(node_addr, hash, blob_format).expect("correct ticket");
let NodeStatusResponse { mut addr, .. } = self.rpc.rpc(NodeStatusRequest).await??;
addr.apply_options(addr_options);
let ticket = BlobTicket::new(addr, hash, blob_format).expect("correct ticket");

Ok(ticket)
}
Expand All @@ -340,20 +322,6 @@ where
}
}

/// Options when creating a ticket
#[derive(
Copy, Clone, PartialEq, Eq, Default, Debug, derive_more::Display, derive_more::FromStr,
)]
pub enum ShareTicketOptions {
/// Include both the relay URL and the direct addresses.
#[default]
RelayAndAddresses,
/// Only include the relay URL.
Relay,
/// Only include the direct addresses.
Addresses,
}

/// Status information about a blob.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlobStatus {
Expand Down
9 changes: 7 additions & 2 deletions iroh/src/client/docs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
use anyhow::{anyhow, Context as _, Result};
use bytes::Bytes;
use futures::{Stream, StreamExt, TryStreamExt};
use iroh_base::key::PublicKey;
use iroh_base::{key::PublicKey, node_addr::AddrInfoOptions};
use iroh_bytes::{export::ExportProgress, store::ExportMode, Hash};
use iroh_net::NodeAddr;
use iroh_sync::{
Expand Down Expand Up @@ -302,12 +302,17 @@ where
}

/// Share this document with peers over a ticket.
pub async fn share(&self, mode: ShareMode) -> anyhow::Result<DocTicket> {
pub async fn share(
&self,
mode: ShareMode,
addr_options: AddrInfoOptions,
) -> anyhow::Result<DocTicket> {
self.ensure_open()?;
let res = self
.rpc(DocShareRequest {
doc_id: self.id(),
mode,
addr_options,
})
.await??;
Ok(res.0)
Expand Down
3 changes: 3 additions & 0 deletions iroh/src/rpc_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{collections::BTreeMap, net::SocketAddr, path::PathBuf};

use bytes::Bytes;
use derive_more::{From, TryInto};
use iroh_base::node_addr::AddrInfoOptions;
pub use iroh_bytes::{export::ExportProgress, get::db::DownloadProgress, BlobFormat, Hash};
use iroh_bytes::{
format::collection::Collection,
Expand Down Expand Up @@ -637,6 +638,8 @@ pub struct DocShareRequest {
pub doc_id: NamespaceId,
/// Whether to share read or write access to the document
pub mode: ShareMode,
/// Configuration of the addresses in the ticket.
pub addr_options: AddrInfoOptions,
}

impl RpcMsg<ProviderService> for DocShareRequest {
Expand Down
18 changes: 13 additions & 5 deletions iroh/src/sync_engine/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,23 @@ impl SyncEngine {
}

pub async fn doc_share(&self, req: DocShareRequest) -> RpcResult<DocShareResponse> {
let me = self.endpoint.my_addr().await?;
let capability = match req.mode {
ShareMode::Read => iroh_sync::Capability::Read(req.doc_id),
let DocShareRequest {
doc_id,
mode,
addr_options,
} = req;
let mut me = self.endpoint.my_addr().await?;
me.apply_options(addr_options);

let capability = match mode {
ShareMode::Read => iroh_sync::Capability::Read(doc_id),
ShareMode::Write => {
let secret = self.sync.export_secret_key(req.doc_id).await?;
let secret = self.sync.export_secret_key(doc_id).await?;
iroh_sync::Capability::Write(secret)
}
};
self.start_sync(req.doc_id, vec![]).await?;
self.start_sync(doc_id, vec![]).await?;

Ok(DocShareResponse(DocTicket {
capability,
nodes: vec![me],
Expand Down
25 changes: 19 additions & 6 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use iroh::{
node::{Builder, Node},
rpc_protocol::ShareMode,
};
use iroh_base::node_addr::AddrInfoOptions;
use iroh_net::key::{PublicKey, SecretKey};
use quic_rpc::transport::misc::DummyServerEndpoint;
use rand::{CryptoRng, Rng, SeedableRng};
Expand Down Expand Up @@ -81,7 +82,9 @@ async fn sync_simple() -> Result<()> {
.set_bytes(author0, b"k1".to_vec(), b"v1".to_vec())
.await?;
assert_latest(&doc0, b"k1", b"v1").await;
let ticket = doc0.share(ShareMode::Write).await?;
let ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

let mut events0 = doc0.subscribe().await?;

Expand Down Expand Up @@ -154,7 +157,9 @@ async fn sync_gossip_bulk() -> Result<()> {
let _peer0 = nodes[0].node_id();
let author0 = clients[0].authors.create().await?;
let doc0 = clients[0].docs.create().await?;
let mut ticket = doc0.share(ShareMode::Write).await?;
let mut ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
// unset peers to not yet start sync
let peers = ticket.nodes.clone();
ticket.nodes = vec![];
Expand Down Expand Up @@ -256,7 +261,9 @@ async fn sync_full_basic() -> Result<()> {
"expected LiveEvent::InsertLocal but got {e:?}",
);
assert_latest(&doc0, key0, value0).await;
let ticket = doc0.share(ShareMode::Write).await?;
let ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

info!("peer1: spawn");
let peer1 = nodes[1].node_id();
Expand Down Expand Up @@ -483,7 +490,9 @@ async fn test_sync_via_relay() -> Result<()> {
let inserted_hash = doc1
.set_bytes(author1, b"foo".to_vec(), b"bar".to_vec())
.await?;
let mut ticket = doc1.share(ShareMode::Write).await?;
let mut ticket = doc1
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

// remove direct addrs to force connect via relay
ticket.nodes[0].info.direct_addresses = Default::default();
Expand Down Expand Up @@ -585,7 +594,9 @@ async fn test_download_policies() -> Result<()> {

let doc_a = clients[0].docs.create().await?;
let author_a = clients[0].authors.create().await?;
let ticket = doc_a.share(ShareMode::Write).await?;
let ticket = doc_a
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;

let doc_b = clients[1].docs.import(ticket).await?;
let author_b = clients[1].authors.create().await?;
Expand Down Expand Up @@ -706,7 +717,9 @@ async fn sync_big() -> Result<()> {
let authors = collect_futures(clients.iter().map(|c| c.authors.create())).await?;

let doc0 = clients[0].docs.create().await?;
let mut ticket = doc0.share(ShareMode::Write).await?;
let mut ticket = doc0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await?;
// do not join for now, just import without any peer info
let peer0 = ticket.nodes[0].clone();
ticket.nodes = vec![];
Expand Down

0 comments on commit 811442b

Please sign in to comment.