Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: keep non async node_id and my_relay #2280

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions iroh-cli/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ where
let node = start_node(iroh_data_root, relay_map).await?;
drop(spinner);

eprintln!("{}", welcome_message(&node).await?);
eprintln!("{}", welcome_message(&node)?);

let client = node.client().clone();

Expand Down Expand Up @@ -141,11 +141,11 @@ pub(crate) async fn start_node(
.await
}

async fn welcome_message<B: iroh::blobs::store::Store>(node: &Node<B>) -> Result<String> {
fn welcome_message<B: iroh::blobs::store::Store>(node: &Node<B>) -> Result<String> {
let msg = format!(
"{}\nNode ID: {}\n",
"Iroh is running".green(),
node.node_id().await?,
node.node_id()
);

Ok(msg)
Expand Down
3 changes: 1 addition & 2 deletions iroh/examples/collection-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<()> {
let node = iroh::node::Node::memory().spawn().await?;

println!("fetching hash: {}", ticket.hash());
println!("node id: {}", node.node_id().await?);
println!("node id: {}", node.node_id());
println!("node listening addresses:");
let addrs = node.my_addr().await?;
for addr in addrs.direct_addresses() {
Expand All @@ -46,7 +46,6 @@ async fn main() -> Result<()> {
println!(
"node relay server url: {:?}",
node.my_relay()
.await?
.expect("a default relay url should be provided")
.to_string()
);
Expand Down
3 changes: 1 addition & 2 deletions iroh/examples/hello-world-fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn main() -> Result<()> {
let node = iroh::node::Node::memory().spawn().await?;

println!("fetching hash: {}", ticket.hash());
println!("node id: {}", node.node_id().await?);
println!("node id: {}", node.node_id());
println!("node listening addresses:");
let addrs = node.my_addr().await?;
for addr in addrs.direct_addresses() {
Expand All @@ -46,7 +46,6 @@ async fn main() -> Result<()> {
println!(
"node relay server url: {:?}",
node.my_relay()
.await?
.expect("a default relay url should be provided")
.to_string()
);
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ where
.await?;

// print some info about the node
let peer = node.node_id().await?;
let peer = node.node_id();
let addrs = node.local_endpoint_addresses().await?;
println!("node PeerID: {peer}");
println!("node listening addresses:");
Expand Down
15 changes: 13 additions & 2 deletions iroh/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use anyhow::{anyhow, Result};
use futures_lite::{future::Boxed as BoxFuture, FutureExt, StreamExt};
use iroh_base::key::PublicKey;
use iroh_blobs::downloader::Downloader;
use iroh_blobs::store::Store as BaoStore;
use iroh_net::util::AbortingJoinHandle;
Expand Down Expand Up @@ -171,6 +172,11 @@ impl<D: BaoStore> Node<D> {
self.inner.local_endpoint_addresses().await
}

/// Returns the [`PublicKey`] of the node.
pub fn node_id(&self) -> PublicKey {
self.inner.secret_key.public()
}

/// Subscribe to [`Event`]s emitted from the node, informing about connections and
/// progress.
///
Expand Down Expand Up @@ -198,6 +204,11 @@ impl<D: BaoStore> Node<D> {
&self.inner.rt
}

/// Get the relay server we are connected to.
pub fn my_relay(&self) -> Option<iroh_net::relay::RelayUrl> {
self.inner.endpoint.my_relay()
}

/// Aborts the node.
///
/// This does not gracefully terminate currently: all connections are closed and
Expand Down Expand Up @@ -398,7 +409,7 @@ mod tests {
let AddOutcome { hash, .. } = node1.blobs.add_bytes(b"foo".to_vec()).await?;

// create a node addr with only a relay URL, no direct addresses
let addr = NodeAddr::new(node1.node_id().await?).with_relay_url(relay_url);
let addr = NodeAddr::new(node1.node_id()).with_relay_url(relay_url);
node2.blobs.download(hash, addr).await?.await?;
assert_eq!(
node2
Expand Down Expand Up @@ -441,7 +452,7 @@ mod tests {
let hash = node1.blobs.add_bytes(b"foo".to_vec()).await?.hash;

// create a node addr with node id only
let addr = NodeAddr::new(node1.node_id().await?);
let addr = NodeAddr::new(node1.node_id());
node2.blobs.download(hash, addr).await?.await?;
assert_eq!(
node2
Expand Down
18 changes: 9 additions & 9 deletions iroh/tests/provide.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ async fn multiple_clients() -> Result<()> {
let file_hash: Hash = expect_hash;
let name = expect_name;
let addrs = node.local_address();
let peer_id = node.node_id().await?;
let peer_id = node.node_id();
let content = content.to_vec();

tasks.push(node.local_pool_handle().spawn_pinned(move || {
Expand Down Expand Up @@ -237,7 +237,7 @@ where
.await?;

let addrs = node.local_endpoint_addresses().await?;
let (secret_key, peer) = get_options(node.node_id().await?, addrs);
let (secret_key, peer) = get_options(node.node_id(), addrs);
let request = GetRequest::all(collection_hash);
let (collection, children, _stats) =
run_collection_get_request(secret_key, peer, request).await?;
Expand Down Expand Up @@ -321,7 +321,7 @@ async fn test_server_close() {
let hash = db.insert_many(collection.to_blobs()).unwrap();
let node = test_node(db).spawn().await.unwrap();
let node_addr = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();

let (events_sender, mut events_recv) = mpsc::unbounded_channel();
node.subscribe(move |event| {
Expand Down Expand Up @@ -393,7 +393,7 @@ async fn test_ipv6() {
}
};
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
let (secret_key, peer) = get_options(peer_id, addrs);
let request = GetRequest::all(hash);
Expand Down Expand Up @@ -421,7 +421,7 @@ async fn test_not_found() {
}
};
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
let (secret_key, peer) = get_options(peer_id, addrs);
let request = GetRequest::single(hash);
Expand Down Expand Up @@ -464,7 +464,7 @@ async fn test_chunk_not_found_1() {
}
};
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
let (secret_key, peer) = get_options(peer_id, addrs);
let request = GetRequest::single(hash);
Expand Down Expand Up @@ -544,7 +544,7 @@ async fn test_run_fsm() {
let (db, hash) = create_test_db([("a", b"hello"), ("b", b"world")]);
let node = test_node(db).spawn().await.unwrap();
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
let (secret_key, peer) = get_options(peer_id, addrs);
let request = GetRequest::all(hash);
Expand Down Expand Up @@ -593,7 +593,7 @@ async fn test_size_request_blob() {
let hash = Hash::from(*hashes.values().next().unwrap());
let node = test_node(db).spawn().await.unwrap();
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
let request = GetRequest::last_chunk(hash);
let (secret_key, peer) = get_options(peer_id, addrs);
Expand Down Expand Up @@ -621,7 +621,7 @@ async fn test_collection_stat() {
let (db, hash) = create_test_db([("a", &child1), ("b", &child2)]);
let node = test_node(db.clone()).spawn().await.unwrap();
let addrs = node.local_endpoint_addresses().await.unwrap();
let peer_id = node.node_id().await.unwrap();
let peer_id = node.node_id();
tokio::time::timeout(Duration::from_secs(10), async move {
// first 1024 bytes
let header = ChunkRanges::from(..ChunkNum(1));
Expand Down
25 changes: 11 additions & 14 deletions iroh/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn spawn_node(
async move {
let node = test_node(secret_key);
let node = node.spawn().await?;
info!(?i, me = %node.node_id().await.unwrap().fmt_short(), "node spawned");
info!(?i, me = %node.node_id().fmt_short(), "node spawned");
Ok(node)
}
}
Expand Down Expand Up @@ -84,7 +84,7 @@ async fn sync_simple() -> Result<()> {
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();

// create doc on node0
let peer0 = nodes[0].node_id().await?;
let peer0 = nodes[0].node_id();
let author0 = clients[0].authors.create().await?;
let doc0 = clients[0].docs.create().await?;
let hash0 = doc0
Expand All @@ -98,7 +98,7 @@ async fn sync_simple() -> Result<()> {
let mut events0 = doc0.subscribe().await?;

info!("node1: join");
let peer1 = nodes[1].node_id().await?;
let peer1 = nodes[1].node_id();
let doc1 = clients[1].docs.import(ticket.clone()).await?;
let mut events1 = doc1.subscribe().await?;
info!("node1: assert 4 events");
Expand Down Expand Up @@ -253,7 +253,7 @@ async fn sync_full_basic() -> Result<()> {
.collect::<Vec<_>>();

// peer0: create doc and ticket
let peer0 = nodes[0].node_id().await?;
let peer0 = nodes[0].node_id();
let author0 = clients[0].authors.create().await?;
let doc0 = clients[0].docs.create().await?;
let mut events0 = doc0.subscribe().await?;
Expand All @@ -275,7 +275,7 @@ async fn sync_full_basic() -> Result<()> {
.await?;

info!("peer1: spawn");
let peer1 = nodes[1].node_id().await?;
let peer1 = nodes[1].node_id();
let author1 = clients[1].authors.create().await?;
info!("peer1: join doc");
let doc1 = clients[1].docs.import(ticket.clone()).await?;
Expand Down Expand Up @@ -342,7 +342,7 @@ async fn sync_full_basic() -> Result<()> {
nodes.push(spawn_node(nodes.len(), &mut rng).await?);
clients.push(nodes.last().unwrap().client().clone());
let doc2 = clients[2].docs.import(ticket).await?;
let peer2 = nodes[2].node_id().await?;
let peer2 = nodes[2].node_id();
let mut events2 = doc2.subscribe().await?;

info!("peer2: wait for 8 events (from sync with peers)");
Expand Down Expand Up @@ -486,7 +486,7 @@ async fn test_sync_via_relay() -> Result<()> {
.insecure_skip_relay_cert_verify(true)
.spawn()
.await?;
let node1_id = node1.node_id().await?;
let node1_id = node1.node_id();
let node2 = Node::memory()
.bind_port(0)
.relay_mode(RelayMode::Custom(relay_map.clone()))
Expand Down Expand Up @@ -581,7 +581,7 @@ async fn sync_restart_node() -> Result<()> {
.node_discovery(discovery_server.discovery(secret_key_1.clone()).into())
.spawn()
.await?;
let id1 = node1.node_id().await?;
let id1 = node1.node_id();

// create doc & ticket on node1
let doc1 = node1.docs.create().await?;
Expand All @@ -600,7 +600,7 @@ async fn sync_restart_node() -> Result<()> {
.node_discovery(discovery_server.discovery(secret_key_2.clone()).into())
.spawn()
.await?;
let id2 = node2.node_id().await?;
let id2 = node2.node_id();
let author2 = node2.authors.create().await?;
let doc2 = node2.docs.import(ticket.clone()).await?;

Expand Down Expand Up @@ -644,7 +644,7 @@ async fn sync_restart_node() -> Result<()> {
.node_discovery(discovery_server.discovery(secret_key_1.clone()).into())
.spawn()
.await?;
assert_eq!(id1, node1.node_id().await?);
assert_eq!(id1, node1.node_id());

let doc1 = node1.docs.open(doc1.id()).await?.expect("doc to exist");
let mut events1 = doc1.subscribe().await?;
Expand Down Expand Up @@ -849,10 +849,7 @@ async fn sync_big() -> Result<()> {
});

let nodes = spawn_nodes(n_nodes, &mut rng).await?;
let mut node_ids = Vec::new();
for node in &nodes {
node_ids.push(node.node_id().await?);
}
let node_ids = nodes.iter().map(|node| node.node_id()).collect::<Vec<_>>();
let clients = nodes.iter().map(|node| node.client()).collect::<Vec<_>>();
let authors = collect_futures(clients.iter().map(|c| c.authors.create())).await?;

Expand Down
Loading