From 08671bb4dadc8ac4d0996a7ca2393b5efb7dc7bb Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Thu, 12 Dec 2024 12:58:12 +0100 Subject: [PATCH] refactor: remove parking-lot dependency (#3034) ## Description Currently there was an inconsistent usage of `std::sync::Mutex` and `parking_lot::Mutex`. This normalizes the usage to always use `std::sync::Mutex` and remove the external dependency. ## Breaking Changes ## Notes & open questions ## Change checklist - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. --- Cargo.lock | 3 - iroh-dns-server/Cargo.toml | 1 - iroh-net-report/src/lib.rs | 4 ++ iroh-net-report/src/ping.rs | 4 +- iroh-relay/Cargo.toml | 1 - iroh-relay/src/lib.rs | 1 + iroh-relay/src/quic.rs | 3 +- iroh/Cargo.toml | 1 - iroh/src/discovery.rs | 6 +- iroh/src/discovery/pkarr/dht.rs | 22 ++++--- iroh/src/discovery/static_provider.rs | 10 +-- iroh/src/endpoint.rs | 18 +++--- iroh/src/lib.rs | 1 + iroh/src/magicsock.rs | 60 ++++++++++-------- iroh/src/magicsock/node_map.rs | 87 ++++++++++++++++++++------- iroh/src/magicsock/relay_actor.rs | 2 +- iroh/src/test_utils.rs | 51 ++++++++-------- iroh/src/tls/certificate.rs | 3 +- 18 files changed, 168 insertions(+), 110 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60dcfbd665..31e5c5b7c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2186,7 +2186,6 @@ dependencies = [ "netwatch", "num_enum", "once_cell", - "parking_lot", "parse-size", "pin-project", "pkarr", @@ -2318,7 +2317,6 @@ dependencies = [ "iroh-metrics", "iroh-test 0.29.0", "lru", - "parking_lot", "pkarr", "rcgen", "redb", @@ -2476,7 +2474,6 @@ dependencies = [ "libc", "num_enum", "once_cell", - "parking_lot", "pin-project", "postcard", "proptest", diff --git a/iroh-dns-server/Cargo.toml b/iroh-dns-server/Cargo.toml index 78c194ebd7..b0801dcf97 100644 --- a/iroh-dns-server/Cargo.toml +++ b/iroh-dns-server/Cargo.toml @@ -31,7 +31,6 @@ http = "1.0.0" humantime-serde = "1.1.1" iroh-metrics = { version = "0.29.0" } lru = "0.12.3" -parking_lot = "0.12.1" pkarr = { version = "2.2.0", features = [ "async", "relay", "dht"], default-features = false } rcgen = "0.13" redb = "2.0.0" diff --git a/iroh-net-report/src/lib.rs b/iroh-net-report/src/lib.rs index d20e09dfc1..4379590ac5 100644 --- a/iroh-net-report/src/lib.rs +++ b/iroh-net-report/src/lib.rs @@ -5,6 +5,10 @@ //! etc and reachability to the configured relays. // Based on +#![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] +#![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] + use std::{ collections::{BTreeMap, HashMap}, fmt::{self, Debug}, diff --git a/iroh-net-report/src/ping.rs b/iroh-net-report/src/ping.rs index 159e7103a6..2dcebe403e 100644 --- a/iroh-net-report/src/ping.rs +++ b/iroh-net-report/src/ping.rs @@ -54,7 +54,7 @@ impl Pinger { fn get_client(&self, kind: ICMP) -> Result { let client = match kind { ICMP::V4 => { - let mut opt_client = self.0.client_v4.lock().unwrap(); + let mut opt_client = self.0.client_v4.lock().expect("poisoned"); match *opt_client { Some(ref client) => client.clone(), None => { @@ -66,7 +66,7 @@ impl Pinger { } } ICMP::V6 => { - let mut opt_client = self.0.client_v6.lock().unwrap(); + let mut opt_client = self.0.client_v6.lock().expect("poisoned"); match *opt_client { Some(ref client) => client.clone(), None => { diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index 50b6cb002d..1cd06d53ef 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -43,7 +43,6 @@ iroh-metrics = { version = "0.29.0", default-features = false } libc = "0.2.139" num_enum = "0.7" once_cell = "1.18.0" -parking_lot = "0.12.1" pin-project = "1" postcard = { version = "1", default-features = false, features = [ "alloc", diff --git a/iroh-relay/src/lib.rs b/iroh-relay/src/lib.rs index b13d1d963b..926131b47b 100644 --- a/iroh-relay/src/lib.rs +++ b/iroh-relay/src/lib.rs @@ -28,6 +28,7 @@ #![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] #![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] pub mod client; pub mod defaults; diff --git a/iroh-relay/src/quic.rs b/iroh-relay/src/quic.rs index 924cf48703..760bbda5ca 100644 --- a/iroh-relay/src/quic.rs +++ b/iroh-relay/src/quic.rs @@ -70,7 +70,8 @@ pub(crate) mod server { vec![crate::quic::ALPN_QUIC_ADDR_DISC.to_vec()]; let server_config = QuicServerConfig::try_from(quic_config.server_config)?; let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_config)); - let transport_config = Arc::get_mut(&mut server_config.transport).unwrap(); + let transport_config = + Arc::get_mut(&mut server_config.transport).expect("not used yet"); transport_config .max_concurrent_uni_streams(0_u8.into()) .max_concurrent_bidi_streams(0_u8.into()) diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index d4bc71d5e7..c1f5c76f4d 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -51,7 +51,6 @@ netdev = "0.31.0" netwatch = { version = "0.2.0" } num_enum = "0.7" once_cell = "1.18.0" -parking_lot = "0.12.1" pin-project = "1" pkarr = { version = "2", default-features = false, features = [ "async", diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 9a87cd4578..d9c31af319 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -441,12 +441,11 @@ mod tests { use std::{ collections::{BTreeSet, HashMap}, net::SocketAddr, - sync::Arc, + sync::{Arc, Mutex}, time::SystemTime, }; use iroh_base::SecretKey; - use parking_lot::Mutex; use rand::Rng; use tokio_util::task::AbortOnDropHandle; @@ -499,6 +498,7 @@ mod tests { self.shared .nodes .lock() + .unwrap() .insert(self.node_id, (url.cloned(), addrs.clone(), now)); } @@ -508,7 +508,7 @@ mod tests { node_id: NodeId, ) -> Option>> { let addr_info = match self.resolve_wrong { - false => self.shared.nodes.lock().get(&node_id).cloned(), + false => self.shared.nodes.lock().unwrap().get(&node_id).cloned(), true => { let ts = system_time_now() - 100_000; let port: u16 = rand::thread_rng().gen_range(10_000..20_000); diff --git a/iroh/src/discovery/pkarr/dht.rs b/iroh/src/discovery/pkarr/dht.rs index 91dc42a682..63ca19a0cb 100644 --- a/iroh/src/discovery/pkarr/dht.rs +++ b/iroh/src/discovery/pkarr/dht.rs @@ -180,10 +180,11 @@ impl Builder { /// Builds the discovery mechanism. pub fn build(self) -> anyhow::Result { - let pkarr = self - .client - .unwrap_or_else(|| PkarrClient::new(Default::default()).unwrap()) - .as_async(); + let pkarr = match self.client { + Some(client) => client, + None => PkarrClient::new(Default::default())?, + }; + let pkarr = pkarr.as_async(); let ttl = self.ttl.unwrap_or(DEFAULT_PKARR_TTL); let relay_url = self.pkarr_relay; let dht = self.dht; @@ -259,8 +260,8 @@ impl DhtDiscovery { let relay_publish = async { if let Some(relay) = this.0.pkarr_relay.as_ref() { tracing::info!( - "publishing to relay: {}", - this.0.relay_url.as_ref().unwrap().to_string() + "publishing to relay: {:?}", + this.0.relay_url.as_ref().map(|r| r.to_string()) ); match relay.publish(&signed_packet).await { Ok(_) => { @@ -286,8 +287,11 @@ impl DhtDiscovery { let Some(relay) = &self.0.pkarr_relay else { return; }; - let url = self.0.relay_url.as_ref().unwrap(); - tracing::info!("resolving {} from relay {}", pkarr_public_key.to_z32(), url); + tracing::info!( + "resolving {} from relay {:?}", + pkarr_public_key.to_z32(), + self.0.relay_url + ); let response = relay.resolve(&pkarr_public_key).await; match response { Ok(Some(signed_packet)) => { @@ -382,7 +386,7 @@ impl Discovery for DhtDiscovery { }; let this = self.clone(); let curr = tokio::spawn(this.publish_loop(keypair.clone(), signed_packet)); - let mut task = self.0.task.lock().unwrap(); + let mut task = self.0.task.lock().expect("poisoned"); *task = Some(AbortOnDropHandle::new(curr)); } diff --git a/iroh/src/discovery/static_provider.rs b/iroh/src/discovery/static_provider.rs index aa7585bc6e..34cb76f480 100644 --- a/iroh/src/discovery/static_provider.rs +++ b/iroh/src/discovery/static_provider.rs @@ -74,7 +74,7 @@ impl StaticProvider { pub fn set_node_addr(&self, info: impl Into) -> Option { let last_updated = SystemTime::now(); let info: NodeAddr = info.into(); - let mut guard = self.nodes.write().unwrap(); + let mut guard = self.nodes.write().expect("poisoned"); let previous = guard.insert( info.node_id, NodeInfo { @@ -96,7 +96,7 @@ impl StaticProvider { pub fn add_node_addr(&self, info: impl Into) { let info: NodeAddr = info.into(); let last_updated = SystemTime::now(); - let mut guard = self.nodes.write().unwrap(); + let mut guard = self.nodes.write().expect("poisoned"); match guard.entry(info.node_id) { Entry::Occupied(mut entry) => { let existing = entry.get_mut(); @@ -116,7 +116,7 @@ impl StaticProvider { /// Get node info for the given node id. pub fn get_node_addr(&self, node_id: NodeId) -> Option { - let guard = self.nodes.read().unwrap(); + let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id)?; Some(NodeAddr { node_id, @@ -127,7 +127,7 @@ impl StaticProvider { /// Remove node info for the given node id. pub fn remove_node_addr(&self, node_id: NodeId) -> Option { - let mut guard = self.nodes.write().unwrap(); + let mut guard = self.nodes.write().expect("poisoned"); let info = guard.remove(&node_id)?; Some(NodeAddr { node_id, @@ -145,7 +145,7 @@ impl Discovery for StaticProvider { _endpoint: crate::Endpoint, node_id: NodeId, ) -> Option>> { - let guard = self.nodes.read().unwrap(); + let guard = self.nodes.read().expect("poisoned"); let info = guard.get(&node_id); match info { Some(addr_info) => { diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 3d795cb04f..4572bbbcf6 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -144,7 +144,7 @@ impl Builder { .collect::>(); let discovery: Option> = match discovery.len() { 0 => None, - 1 => Some(discovery.into_iter().next().unwrap()), + 1 => Some(discovery.into_iter().next().expect("checked length")), _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))), }; let msock_opts = magicsock::Options { @@ -313,12 +313,16 @@ impl Builder { pub fn discovery_dht(mut self) -> Self { use crate::discovery::pkarr::dht::DhtDiscovery; self.discovery.push(Box::new(|secret_key| { - Some(Box::new( - DhtDiscovery::builder() - .secret_key(secret_key.clone()) - .build() - .unwrap(), - )) + match DhtDiscovery::builder() + .secret_key(secret_key.clone()) + .build() + { + Ok(discovery) => Some(Box::new(discovery)), + Err(err) => { + tracing::error!("failed to build discovery: {:?}", err); + None + } + } })); self } diff --git a/iroh/src/lib.rs b/iroh/src/lib.rs index 50106e40f7..64ea916e1c 100644 --- a/iroh/src/lib.rs +++ b/iroh/src/lib.rs @@ -230,6 +230,7 @@ #![recursion_limit = "256"] #![deny(missing_docs, rustdoc::broken_intra_doc_links)] +#![cfg_attr(not(test), deny(clippy::unwrap_used))] #![cfg_attr(iroh_docsrs, feature(doc_auto_cfg))] mod disco; diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 6e3cf6d9c2..7b33c2632f 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -187,7 +187,7 @@ pub(crate) struct MagicSock { /// /// This waker is used by [`IoPoller`] and the [`RelayActor`] to signal when more /// datagrams can be sent to the relays. - relay_send_waker: Arc>>, + relay_send_waker: Arc>>, /// Counter for ordering of [`MagicSock::poll_recv`] polling order. poll_recv_counter: AtomicUsize, @@ -236,7 +236,7 @@ pub(crate) struct MagicSock { /// List of CallMeMaybe disco messages that should be sent out after the next endpoint update /// completes - pending_call_me_maybes: parking_lot::Mutex>, + pending_call_me_maybes: std::sync::Mutex>, /// Indicates the direct addr update state. direct_addr_update_state: DirectAddrUpdateState, @@ -1342,7 +1342,12 @@ impl MagicSock { fn send_queued_call_me_maybes(&self) { let msg = self.direct_addrs.to_call_me_maybe_message(); let msg = disco::Message::CallMeMaybe(msg); - for (public_key, url) in self.pending_call_me_maybes.lock().drain() { + for (public_key, url) in self + .pending_call_me_maybes + .lock() + .expect("poisoned") + .drain() + { if !self.send_disco_message_relay(&url, public_key, msg.clone()) { warn!(node = %public_key.fmt_short(), "relay channel full, dropping call-me-maybe"); } @@ -1369,6 +1374,7 @@ impl MagicSock { Err(last_refresh_ago) => { self.pending_call_me_maybes .lock() + .expect("poisoned") .insert(dst_node, url.clone()); debug!( ?last_refresh_ago, @@ -1447,7 +1453,7 @@ struct DirectAddrUpdateState { /// If running, set to the reason for the currently the update. running: sync::watch::Sender>, /// If set, start a new update as soon as the current one is finished. - want_update: parking_lot::Mutex>, + want_update: std::sync::Mutex>, } impl DirectAddrUpdateState { @@ -1463,7 +1469,7 @@ impl DirectAddrUpdateState { /// scheduling it for later. fn schedule_run(&self, why: &'static str) { if self.is_running() { - let _ = self.want_update.lock().insert(why); + let _ = self.want_update.lock().expect("poisoned").insert(why); } else { self.run(why); } @@ -1486,7 +1492,7 @@ impl DirectAddrUpdateState { /// Returns the next update, if one is set. fn next_update(&self) -> Option<&'static str> { - self.want_update.lock().take() + self.want_update.lock().expect("poisoned").take() } } @@ -1559,7 +1565,7 @@ impl Handle { closing: AtomicBool::new(false), closed: AtomicBool::new(false), relay_datagrams_queue: relay_datagrams_queue.clone(), - relay_send_waker: Arc::new(parking_lot::Mutex::new(None)), + relay_send_waker: Arc::new(std::sync::Mutex::new(None)), poll_recv_counter: AtomicUsize::new(0), actor_sender: actor_sender.clone(), ipv6_reported: Arc::new(AtomicBool::new(false)), @@ -1676,19 +1682,18 @@ impl Handle { } #[derive(Debug, Default)] -struct DiscoSecrets(parking_lot::Mutex>); +struct DiscoSecrets(std::sync::Mutex>); impl DiscoSecrets { - fn get( - &self, - secret: &SecretKey, - node_id: PublicKey, - ) -> parking_lot::MappedMutexGuard { - parking_lot::MutexGuard::map(self.0.lock(), |inner| { - inner - .entry(node_id) - .or_insert_with(|| secret.shared(&node_id)) - }) + fn get(&self, secret: &SecretKey, node_id: PublicKey, cb: F) -> T + where + F: FnOnce(&mut SharedSecret) -> T, + { + let mut inner = self.0.lock().expect("poisoned"); + let x = inner + .entry(node_id) + .or_insert_with(|| secret.shared(&node_id)); + cb(x) } pub fn encode_and_seal( @@ -1698,7 +1703,7 @@ impl DiscoSecrets { msg: &disco::Message, ) -> Bytes { let mut seal = msg.as_bytes(); - self.get(secret_key, node_id).seal(&mut seal); + self.get(secret_key, node_id, |secret| secret.seal(&mut seal)); disco::encode_message(&secret_key.public(), seal).into() } @@ -1708,9 +1713,9 @@ impl DiscoSecrets { node_id: PublicKey, mut sealed_box: Vec, ) -> Result { - self.get(secret, node_id) - .open(&mut sealed_box) - .map_err(DiscoBoxError::Open)?; + self.get(secret, node_id, |secret| { + secret.open(&mut sealed_box).map_err(DiscoBoxError::Open) + })?; disco::Message::from_bytes(&sealed_box).map_err(DiscoBoxError::Parse) } } @@ -1871,7 +1876,7 @@ struct IoPoller { ipv4_poller: Pin>, ipv6_poller: Option>>, relay_sender: mpsc::Sender, - relay_send_waker: Arc>>, + relay_send_waker: Arc>>, } impl quinn::UdpPoller for IoPoller { @@ -1890,7 +1895,10 @@ impl quinn::UdpPoller for IoPoller { } match this.relay_sender.capacity() { 0 => { - self.relay_send_waker.lock().replace(cx.waker().clone()); + self.relay_send_waker + .lock() + .expect("poisoned") + .replace(cx.waker().clone()); Poll::Pending } _ => Poll::Ready(Ok(())), @@ -2235,7 +2243,7 @@ impl Actor { loopback, } = tokio::task::spawn_blocking(LocalAddresses::new) .await - .unwrap(); + .expect("spawn panicked"); if ips.is_empty() && addrs.is_empty() { // Include loopback addresses only if there are no other interfaces // or public addresses, this allows testing offline. @@ -2575,7 +2583,7 @@ struct DiscoveredDirectAddrs { impl DiscoveredDirectAddrs { /// Updates the direct addresses, returns `true` if they changed, `false` if not. fn update(&self, addrs: BTreeSet) -> bool { - *self.updated_at.write().unwrap() = Some(Instant::now()); + *self.updated_at.write().expect("poisoned") = Some(Instant::now()); let updated = self.addrs.update(addrs).is_ok(); if updated { event!( diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 4cc835203c..aecf0923b8 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -3,6 +3,7 @@ use std::{ hash::Hash, net::{IpAddr, SocketAddr}, pin::Pin, + sync::Mutex, task::{Context, Poll}, time::Instant, }; @@ -10,7 +11,6 @@ use std::{ use futures_lite::stream::Stream; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; use iroh_metrics::inc; -use parking_lot::Mutex; use serde::{Deserialize, Serialize}; use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; @@ -136,20 +136,26 @@ impl NodeMap { /// Add the contact information for a node. pub(super) fn add_node_addr(&self, node_addr: NodeAddr, source: Source) { - self.inner.lock().add_node_addr(node_addr, source) + self.inner + .lock() + .expect("poisoned") + .add_node_addr(node_addr, source) } /// Number of nodes currently listed. pub(super) fn node_count(&self) -> usize { - self.inner.lock().node_count() + self.inner.lock().expect("poisoned").node_count() } pub(super) fn receive_udp(&self, udp_addr: SocketAddr) -> Option<(PublicKey, QuicMappedAddr)> { - self.inner.lock().receive_udp(udp_addr) + self.inner.lock().expect("poisoned").receive_udp(udp_addr) } pub(super) fn receive_relay(&self, relay_url: &RelayUrl, src: NodeId) -> QuicMappedAddr { - self.inner.lock().receive_relay(relay_url, src) + self.inner + .lock() + .expect("poisoned") + .receive_relay(relay_url, src) } pub(super) fn notify_ping_sent( @@ -160,13 +166,23 @@ impl NodeMap { purpose: DiscoPingPurpose, msg_sender: tokio::sync::mpsc::Sender, ) { - if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) { + if let Some(ep) = self + .inner + .lock() + .expect("poisoned") + .get_mut(NodeStateKey::Idx(id)) + { ep.ping_sent(dst, tx_id, purpose, msg_sender); } } pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun_rs::TransactionId) { - if let Some(ep) = self.inner.lock().get_mut(NodeStateKey::Idx(id)) { + if let Some(ep) = self + .inner + .lock() + .expect("poisoned") + .get_mut(NodeStateKey::Idx(id)) + { ep.ping_timeout(tx_id); } } @@ -177,6 +193,7 @@ impl NodeMap { ) -> Option { self.inner .lock() + .expect("poisoned") .get(NodeStateKey::NodeId(node_key)) .map(|ep| *ep.quic_mapped_addr()) } @@ -189,11 +206,17 @@ impl NodeMap { src: SendAddr, tx_id: TransactionId, ) -> PingHandled { - self.inner.lock().handle_ping(sender, src, tx_id) + self.inner + .lock() + .expect("poisoned") + .handle_ping(sender, src, tx_id) } pub(super) fn handle_pong(&self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) { - self.inner.lock().handle_pong(sender, src, pong) + self.inner + .lock() + .expect("poisoned") + .handle_pong(sender, src, pong) } #[must_use = "actions must be handled"] @@ -202,7 +225,10 @@ impl NodeMap { sender: PublicKey, cm: CallMeMaybe, ) -> Vec { - self.inner.lock().handle_call_me_maybe(sender, cm) + self.inner + .lock() + .expect("poisoned") + .handle_call_me_maybe(sender, cm) } #[allow(clippy::type_complexity)] @@ -216,7 +242,7 @@ impl NodeMap { Option, Vec, )> { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().expect("poisoned"); let ep = inner.get_mut(NodeStateKey::QuicMappedAddr(addr))?; let public_key = *ep.public_key(); trace!(dest = %addr, node_id = %public_key.fmt_short(), "dst mapped to NodeId"); @@ -225,21 +251,21 @@ impl NodeMap { } pub(super) fn notify_shutdown(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().expect("poisoned"); for (_, ep) in inner.node_states_mut() { ep.reset(); } } pub(super) fn reset_node_states(&self) { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().expect("poisoned"); for (_, ep) in inner.node_states_mut() { ep.note_connectivity_change(); } } pub(super) fn nodes_stayin_alive(&self) -> Vec { - let mut inner = self.inner.lock(); + let mut inner = self.inner.lock().expect("poisoned"); inner .node_states_mut() .flat_map(|(_idx, node_state)| node_state.stayin_alive()) @@ -252,7 +278,11 @@ impl NodeMap { // we can't avoid `collect` here since it would hold a lock for an indefinite time. Even if // we were to find this acceptable, dealing with the lifetimes of the mutex's guard and the // internal iterator will be a hassle, if possible at all. - self.inner.lock().remote_infos_iter(now).collect() + self.inner + .lock() + .expect("poisoned") + .remote_infos_iter(now) + .collect() } /// Returns a stream of [`ConnectionType`]. @@ -265,21 +295,27 @@ impl NodeMap { /// Will return an error if there is not an entry in the [`NodeMap`] for /// the `public_key` pub(super) fn conn_type_stream(&self, node_id: NodeId) -> anyhow::Result { - self.inner.lock().conn_type_stream(node_id) + self.inner + .lock() + .expect("poisoned") + .conn_type_stream(node_id) } /// Get the [`RemoteInfo`]s for the node identified by [`NodeId`]. pub(super) fn remote_info(&self, node_id: NodeId) -> Option { - self.inner.lock().remote_info(node_id) + self.inner.lock().expect("poisoned").remote_info(node_id) } /// Prunes nodes without recent activity so that at most [`MAX_INACTIVE_NODES`] are kept. pub(super) fn prune_inactive(&self) { - self.inner.lock().prune_inactive(); + self.inner.lock().expect("poisoned").prune_inactive(); } pub(crate) fn on_direct_addr_discovered(&self, discovered: BTreeSet) { - self.inner.lock().on_direct_addr_discovered(discovered); + self.inner + .lock() + .expect("poisoned") + .on_direct_addr_discovered(discovered); } } @@ -740,6 +776,7 @@ mod tests { let id = node_map .inner .lock() + .unwrap() .insert_node(Options { node_id: public_key, relay_url: None, @@ -762,7 +799,7 @@ mod tests { // add address node_map.add_test_addr(node_addr); // make it active - node_map.inner.lock().receive_udp(addr); + node_map.inner.lock().unwrap().receive_udp(addr); } info!("Adding offline/inactive addresses"); @@ -772,7 +809,7 @@ mod tests { node_map.add_test_addr(node_addr); } - let mut node_map_inner = node_map.inner.lock(); + let mut node_map_inner = node_map.inner.lock().unwrap(); let endpoint = node_map_inner.by_id.get_mut(&id).unwrap(); info!("Adding alive addresses"); @@ -811,7 +848,12 @@ mod tests { let active_node = SecretKey::generate().public(); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 167); node_map.add_test_addr(NodeAddr::new(active_node).with_direct_addresses([addr])); - node_map.inner.lock().receive_udp(addr).expect("registered"); + node_map + .inner + .lock() + .unwrap() + .receive_udp(addr) + .expect("registered"); for _ in 0..MAX_INACTIVE_NODES + 1 { let node = SecretKey::generate().public(); @@ -824,6 +866,7 @@ mod tests { node_map .inner .lock() + .unwrap() .get(NodeStateKey::NodeId(active_node)) .expect("should not be pruned"); } diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index a239f8296e..8011d909ef 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -427,7 +427,7 @@ impl RelayActor { } // Wake up the send waker if one is waiting for space in the channel - let mut wakers = self.msock.relay_send_waker.lock(); + let mut wakers = self.msock.relay_send_waker.lock().expect("poisoned"); if let Some(waker) = wakers.take() { waker.wake(); } diff --git a/iroh/src/test_utils.rs b/iroh/src/test_utils.rs index ce31859a90..17e44236a1 100644 --- a/iroh/src/test_utils.rs +++ b/iroh/src/test_utils.rs @@ -90,10 +90,9 @@ pub async fn run_relay_server_with( #[cfg(feature = "metrics")] metrics_addr: None, }; - let server = Server::spawn(config).await.unwrap(); - let url: RelayUrl = format!("https://{}", server.https_addr().expect("configured")) - .parse() - .unwrap(); + let server = Server::spawn(config).await?; + let url: RelayUrl = format!("https://{}", server.https_addr().expect("configured")).parse()?; + let quic = server .quic_addr() .map(|addr| RelayQuicConfig { port: addr.port() }); @@ -102,8 +101,7 @@ pub async fn run_relay_server_with( stun_only: false, stun_port: server.stun_addr().map_or(DEFAULT_STUN_PORT, |s| s.port()), quic, - }]) - .unwrap(); + }])?; Ok((m, url, server)) } @@ -376,14 +374,12 @@ pub(crate) mod pkarr_dns_state { use std::{ collections::{hash_map, HashMap}, future::Future, - ops::Deref, - sync::Arc, + sync::{Arc, Mutex}, time::Duration, }; use anyhow::{bail, Result}; use iroh_base::NodeId; - use parking_lot::{Mutex, MutexGuard}; use pkarr::SignedPacket; use crate::{ @@ -414,7 +410,7 @@ pub(crate) mod pkarr_dns_state { pub async fn on_node(&self, node: &NodeId, timeout: Duration) -> Result<()> { let timeout = tokio::time::sleep(timeout); tokio::pin!(timeout); - while self.get(node).is_none() { + while self.get(node, |p| p.is_none()) { tokio::select! { _ = &mut timeout => bail!("timeout"), _ = self.on_update() => {} @@ -425,7 +421,7 @@ pub(crate) mod pkarr_dns_state { pub fn upsert(&self, signed_packet: SignedPacket) -> anyhow::Result { let node_id = NodeId::from_bytes(&signed_packet.public_key().to_bytes())?; - let mut map = self.packets.lock(); + let mut map = self.packets.lock().expect("poisoned"); let updated = match map.entry(node_id) { hash_map::Entry::Vacant(e) => { e.insert(signed_packet); @@ -447,14 +443,13 @@ pub(crate) mod pkarr_dns_state { } /// Returns a mutex guard, do not hold over await points - pub fn get(&self, node_id: &NodeId) -> Option + '_> { - let map = self.packets.lock(); - if map.contains_key(node_id) { - let guard = MutexGuard::map(map, |state| state.get_mut(node_id).unwrap()); - Some(guard) - } else { - None - } + pub fn get(&self, node_id: &NodeId, cb: F) -> T + where + F: FnOnce(Option<&mut SignedPacket>) -> T, + { + let mut map = self.packets.lock().expect("poisoned"); + let packet = map.get_mut(node_id); + cb(packet) } pub fn resolve_dns( @@ -467,14 +462,16 @@ pub(crate) mod pkarr_dns_state { let Some(node_id) = node_id_from_hickory_name(query.name()) else { continue; }; - let packet = self.get(&node_id); - let Some(packet) = packet.as_ref() else { - continue; - }; - let node_info = NodeInfo::from_pkarr_signed_packet(packet)?; - for record in node_info.to_hickory_records(&self.origin, ttl)? { - reply.add_answer(record); - } + + self.get(&node_id, |packet| { + if let Some(packet) = packet { + let node_info = NodeInfo::from_pkarr_signed_packet(packet)?; + for record in node_info.to_hickory_records(&self.origin, ttl)? { + reply.add_answer(record); + } + } + anyhow::Ok(()) + })?; } Ok(()) } diff --git a/iroh/src/tls/certificate.rs b/iroh/src/tls/certificate.rs index 9903cd5253..c839aada28 100644 --- a/iroh/src/tls/certificate.rs +++ b/iroh/src/tls/certificate.rs @@ -92,7 +92,8 @@ pub fn generate( // and certificate for multiple connections. let certificate_keypair = rcgen::KeyPair::generate_for(P2P_SIGNATURE_ALGORITHM)?; let rustls_key = - rustls::pki_types::PrivateKeyDer::try_from(certificate_keypair.serialize_der()).unwrap(); + rustls::pki_types::PrivateKeyDer::try_from(certificate_keypair.serialize_der()) + .expect("checked"); let certificate = { let mut params = rcgen::CertificateParams::default(); params.distinguished_name = rcgen::DistinguishedName::new();