From 767e5a17320edb2ac5cd81e65b0d09106011f6ee Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 19 Nov 2024 17:16:14 +0000 Subject: [PATCH] use net utils for binding sockets --- Cargo.lock | 6 +++ bench-streamer/src/main.rs | 5 +- client/Cargo.toml | 1 + client/src/connection_cache.rs | 3 +- core/src/banking_simulation.rs | 5 +- core/src/banking_stage/forwarder.rs | 5 +- core/src/repair/ancestor_hashes_service.rs | 9 +++- core/src/repair/quic_endpoint.rs | 16 ++++-- core/src/repair/repair_service.rs | 23 ++++++--- dos/src/main.rs | 5 +- gossip/src/cluster_info.rs | 35 ++++++------- local-cluster/Cargo.toml | 1 + local-cluster/src/local_cluster.rs | 5 +- net-utils/src/ip_echo_server.rs | 6 +-- net-utils/src/lib.rs | 11 ++++ programs/sbf/Cargo.lock | 4 ++ quic-client/tests/quic_client.rs | 5 +- rpc-test/Cargo.toml | 1 + rpc-test/tests/rpc.rs | 5 +- streamer/Cargo.toml | 1 + streamer/src/nonblocking/quic.rs | 12 +++-- streamer/src/nonblocking/recvmmsg.rs | 33 +++++++++--- streamer/src/nonblocking/sendmmsg.rs | 50 ++++++++++++++----- streamer/src/nonblocking/testing_utilities.rs | 24 ++------- streamer/src/packet.rs | 11 ++-- streamer/src/quic.rs | 13 +++-- streamer/src/recvmmsg.rs | 20 +++++--- streamer/src/sendmmsg.rs | 27 +++++----- streamer/src/streamer.rs | 7 +-- streamer/tests/recvmmsg.rs | 10 ++-- svm/examples/Cargo.lock | 4 ++ tpu-client/Cargo.toml | 1 + tpu-client/src/tpu_client.rs | 7 +-- turbine/Cargo.toml | 1 + turbine/benches/cluster_info.rs | 10 +++- turbine/benches/retransmit_stage.rs | 7 +-- .../broadcast_stage/standard_broadcast_run.rs | 10 +++- turbine/src/quic_endpoint.rs | 16 ++++-- udp-client/src/nonblocking/udp_client.rs | 5 +- validator/src/admin_rpc_service.rs | 11 +++- 40 files changed, 280 insertions(+), 151 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0456dbe6ff2723..81f1df4ff4da94 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6484,6 +6484,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -7396,6 +7397,7 @@ dependencies = [ "solana-ledger", "solana-local-cluster", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -8218,6 +8220,7 @@ dependencies = [ "solana-client", "solana-connection-cache", "solana-logger", + "solana-net-utils", "solana-pubsub-client", "solana-rpc", "solana-rpc-client", @@ -8763,6 +8766,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -9046,6 +9050,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -9219,6 +9224,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/bench-streamer/src/main.rs b/bench-streamer/src/main.rs index 2d6998f298f3b8..ebfd4cbb559815 100644 --- a/bench-streamer/src/main.rs +++ b/bench-streamer/src/main.rs @@ -3,13 +3,14 @@ use { clap::{crate_description, crate_name, Arg, Command}, crossbeam_channel::unbounded, + solana_net_utils::bind_to, solana_streamer::{ packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE}, streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats}, }, std::{ cmp::max, - net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, @@ -20,7 +21,7 @@ use { }; fn producer(addr: &SocketAddr, exit: Arc) -> JoinHandle<()> { - let send = UdpSocket::bind("0.0.0.0:0").unwrap(); + let send = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let batch_size = 10; let mut packet_batch = PacketBatch::with_capacity(batch_size); packet_batch.resize(batch_size, Packet::default()); diff --git a/client/Cargo.toml b/client/Cargo.toml index f138b2e7a10cf2..ff7d381c32b1bc 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -22,6 +22,7 @@ quinn = { workspace = true } rayon = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true, features = ["default"] } diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index b295299eb3898a..b01f6958b64949 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -201,6 +201,7 @@ mod tests { super::*, crate::connection_cache::ConnectionCache, crossbeam_channel::unbounded, + solana_net_utils::bind_to, solana_sdk::signature::Keypair, solana_streamer::{ quic::{QuicServerParams, SpawnServerResult}, @@ -217,7 +218,7 @@ mod tests { fn server_args() -> (UdpSocket, Arc, Keypair) { ( - UdpSocket::bind("127.0.0.1:0").unwrap(), + bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), ) diff --git a/core/src/banking_simulation.rs b/core/src/banking_simulation.rs index a8a67b3e5653b2..6f336021bcffc6 100644 --- a/core/src/banking_simulation.rs +++ b/core/src/banking_simulation.rs @@ -21,6 +21,7 @@ use { blockstore::{Blockstore, PurgeType}, leader_schedule_cache::LeaderScheduleCache, }, + solana_net_utils::bind_to, solana_poh::{ poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS}, poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE}, @@ -46,7 +47,7 @@ use { fmt::Display, fs::File, io::{self, BufRead, BufReader}, - net::{Ipv4Addr, UdpSocket}, + net::{IpAddr, Ipv4Addr}, path::PathBuf, sync::{ atomic::{AtomicBool, Ordering}, @@ -783,7 +784,7 @@ impl BankingSimulator { // Broadcast stage is needed to save the simulated blocks for post-run analysis by // inserting produced shreds into the blockstore. let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage( - vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()], + vec![bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap()], cluster_info.clone(), entry_receiver, retransmit_slots_receiver, diff --git a/core/src/banking_stage/forwarder.rs b/core/src/banking_stage/forwarder.rs index 0f54f6eb5e62b2..8a5a2d5f81e037 100644 --- a/core/src/banking_stage/forwarder.rs +++ b/core/src/banking_stage/forwarder.rs @@ -16,6 +16,7 @@ use { solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_feature_set::FeatureSet, solana_measure::measure_us, + solana_net_utils::bind_to, solana_perf::{data_budget::DataBudget, packet::Packet}, solana_poh::poh_recorder::PohRecorder, solana_runtime::bank_forks::BankForks, @@ -24,7 +25,7 @@ use { solana_streamer::sendmmsg::batch_send, std::{ iter::repeat, - net::{SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::Ordering, Arc, RwLock}, }, }; @@ -50,7 +51,7 @@ impl Forwarder { Self { poh_recorder, bank_forks, - socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + socket: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), cluster_info, connection_cache, data_budget, diff --git a/core/src/repair/ancestor_hashes_service.rs b/core/src/repair/ancestor_hashes_service.rs index b36878976ef960..d467e262a8227e 100644 --- a/core/src/repair/ancestor_hashes_service.rs +++ b/core/src/repair/ancestor_hashes_service.rs @@ -917,13 +917,17 @@ mod test { blockstore::make_many_slot_entries, get_tmp_ledger_path, get_tmp_ledger_path_auto_delete, shred::Nonce, }, + solana_net_utils::bind_to, solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks}, solana_sdk::{ hash::Hash, signature::{Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - std::collections::HashMap, + std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + }, trees::tr, }; @@ -1345,7 +1349,8 @@ mod test { impl ManageAncestorHashesState { fn new(bank_forks: Arc>) -> Self { let ancestor_hashes_request_statuses = Arc::new(DashMap::new()); - let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap()); + let ancestor_hashes_request_socket = + Arc::new(bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap()); let epoch_schedule = bank_forks .read() .unwrap() diff --git a/core/src/repair/quic_endpoint.rs b/core/src/repair/quic_endpoint.rs index 87183dd84ae628..4b6fc4ca5dab98 100644 --- a/core/src/repair/quic_endpoint.rs +++ b/core/src/repair/quic_endpoint.rs @@ -1021,9 +1021,14 @@ mod tests { super::*, itertools::{izip, multiunzip}, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to, solana_runtime::bank::Bank, solana_sdk::signature::Signer, - std::{iter::repeat_with, net::Ipv4Addr, time::Duration}, + std::{ + iter::repeat_with, + net::{IpAddr, Ipv4Addr}, + time::Duration, + }, }; #[test] @@ -1036,10 +1041,11 @@ mod tests { .build() .unwrap(); let keypairs: Vec = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect(); - let sockets: Vec = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) - .take(NUM_ENDPOINTS) - .collect::>() - .unwrap(); + let sockets: Vec = + repeat_with(|| bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false)) + .take(NUM_ENDPOINTS) + .collect::>() + .unwrap(); let addresses: Vec = sockets .iter() .map(UdpSocket::local_addr) diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index 24173b204a4797..03209a2ee3a261 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -1075,13 +1075,17 @@ mod test { get_tmp_ledger_path_auto_delete, shred::max_ticks_per_n_shreds, }, + solana_net_utils::bind_to, solana_runtime::bank::Bank, solana_sdk::{ signature::{Keypair, Signer}, timing::timestamp, }, solana_streamer::socket::SocketAddrSpace, - std::collections::HashSet, + std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr}, + }, }; fn new_test_cluster_info() -> ClusterInfo { @@ -1097,9 +1101,9 @@ mod test { let pubkey = cluster_info.id(); let slot = 100; let shred_index = 50; - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let address = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default())); // Send a repair request @@ -1452,7 +1456,7 @@ mod test { ); let mut duplicate_slot_repair_statuses = HashMap::new(); let dead_slot = 9; - let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap(); + let receive_socket = &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let duplicate_status = DuplicateSlotRepairStatus { correct_ancestor_to_repair: (dead_slot, Hash::default()), start_ts: u64::MAX, @@ -1481,7 +1485,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1507,7 +1511,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1526,7 +1530,7 @@ mod test { &blockstore, &serve_repair, &mut RepairStats::default(), - &UdpSocket::bind("0.0.0.0:0").unwrap(), + &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), &None, &RwLock::new(OutstandingRequests::default()), &identity_keypair, @@ -1541,7 +1545,10 @@ mod test { let bank_forks = BankForks::new_rw_arc(bank); let dummy_addr = Some(( Pubkey::default(), - UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(), + bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false) + .unwrap() + .local_addr() + .unwrap(), )); let cluster_info = Arc::new(new_test_cluster_info()); let serve_repair = ServeRepair::new( diff --git a/dos/src/main.rs b/dos/src/main.rs index 4480726febdc41..78e206065e49e1 100644 --- a/dos/src/main.rs +++ b/dos/src/main.rs @@ -56,6 +56,7 @@ use { gossip_service::{discover, get_client}, }, solana_measure::measure::Measure, + solana_net_utils::bind_to, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ hash::Hash, @@ -73,7 +74,7 @@ use { solana_tps_client::TpsClient, solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE, std::{ - net::{SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr}, process::exit, sync::Arc, thread, @@ -725,7 +726,7 @@ fn run_dos( _ => panic!("Unsupported data_type detected"), }; - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let mut last_log = Instant::now(); let mut total_count: usize = 0; let mut count: usize = 0; diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b6293da0487729..9419e9d4604ebb 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -56,7 +56,7 @@ use { solana_measure::measure::Measure, solana_net_utils::{ bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config, - bind_more_with_config, bind_two_in_range_with_offset_and_config, + bind_more_with_config, bind_to, bind_two_in_range_with_offset_and_config, find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig, VALIDATOR_PORT_RANGE, }, @@ -224,7 +224,7 @@ impl ClusterInfo { GOSSIP_PING_CACHE_CAPACITY, )), stats: GossipStats::default(), - socket: UdpSocket::bind("0.0.0.0:0").unwrap(), + socket: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), local_message_pending_push_queue: Mutex::default(), contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS, instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())), @@ -2609,8 +2609,6 @@ impl Node { num_quic_endpoints: usize, ) -> Self { let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST); - let localhost_bind_addr = format!("{localhost_ip_addr:?}:0"); - let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED)); let port_range = (1024, 65535); let udp_config = SocketConfig { reuseport: false }; @@ -2629,8 +2627,8 @@ impl Node { let (gossip_port, (gossip, ip_echo)) = bind_common_in_range(localhost_ip_addr, port_range).unwrap(); let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port); - let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let tvu = bind_to(localhost_ip_addr, 0, false).unwrap(); + let tvu_quic = bind_to(localhost_ip_addr, 0, false).unwrap(); let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) = bind_two_in_range_with_offset_and_config( localhost_ip_addr, @@ -2643,24 +2641,23 @@ impl Node { let tpu_forwards_quic = bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone()) .unwrap(); - let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); - + let tpu_vote = bind_to(localhost_ip_addr, 0, false).unwrap(); + let tpu_vote_quic = bind_to(localhost_ip_addr, 0, false).unwrap(); let tpu_vote_quic = bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap(); - let repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); + let repair = bind_to(localhost_ip_addr, 0, false).unwrap(); + let repair_quic = bind_to(localhost_ip_addr, 0, false).unwrap(); let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); let rpc_addr = SocketAddr::new(localhost_ip_addr, rpc_port); let rpc_pubsub_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap(); let rpc_pubsub_addr = SocketAddr::new(localhost_ip_addr, rpc_pubsub_port); - let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()]; - let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap(); - let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap(); - let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap(); - let ancestor_hashes_requests_quic = UdpSocket::bind(&unspecified_bind_addr).unwrap(); + let broadcast = vec![bind_to(localhost_ip_addr, 0, false).unwrap()]; + let retransmit_socket = bind_to(localhost_ip_addr, 0, false).unwrap(); + let serve_repair = bind_to(localhost_ip_addr, 0, false).unwrap(); + let serve_repair_quic = bind_to(localhost_ip_addr, 0, false).unwrap(); + let ancestor_hashes_requests = bind_to(localhost_ip_addr, 0, false).unwrap(); + let ancestor_hashes_requests_quic = bind_to(localhost_ip_addr, 0, false).unwrap(); let mut info = ContactInfo::new( *pubkey, @@ -3002,7 +2999,7 @@ pub fn push_messages_to_peer( "push_messages_to_peer", &reqs, ); - let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); + let sock = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); packet::send_to(&packet_batch, &sock, socket_addr_space)?; Ok(()) } @@ -4378,7 +4375,7 @@ mod tests { let cluster_info44 = Arc::new({ let mut node = Node::new_localhost_with_pubkey(&keypair44.pubkey()); - node.sockets.gossip = UdpSocket::bind("127.0.0.1:65534").unwrap(); + node.sockets.gossip = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 65534, false).unwrap(); info!("{:?}", node); ClusterInfo::new(node.info, keypair44.clone(), SocketAddrSpace::Unspecified) }); diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index cd8e2bf6523152..82104121a2f80a 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -23,6 +23,7 @@ solana-entry = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-quic-client = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index d47adcea941313..c0688d340b30e0 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -19,6 +19,7 @@ use { gossip_service::discover_cluster, }, solana_ledger::{create_new_tmp_ledger_with_size, shred::Shred}, + solana_net_utils::bind_to, solana_rpc_client::rpc_client::RpcClient, solana_runtime::{ genesis_utils::{ @@ -60,7 +61,7 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, - net::{IpAddr, Ipv4Addr, UdpSocket}, + net::{IpAddr, Ipv4Addr}, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::Instant, @@ -1105,7 +1106,7 @@ impl Cluster for LocalCluster { } fn send_shreds_to_validator(&self, dup_shreds: Vec<&Shred>, validator_key: &Pubkey) { - let send_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let send_socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let validator_tvu = self .get_contact_info(validator_key) .unwrap() diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 2d5782dcae1cdc..4e5258d35ee8b8 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -1,11 +1,11 @@ use { - crate::{HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, + crate::{bind_to, HEADER_LENGTH, IP_ECHO_SERVER_RESPONSE_LENGTH}, log::*, serde_derive::{Deserialize, Serialize}, solana_sdk::deserialize_utils::default_on_eof, std::{ io, - net::{IpAddr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, num::NonZeroUsize, time::Duration, }, @@ -111,7 +111,7 @@ async fn process_connection( trace!("request: {:?}", msg); // Fire a datagram at each non-zero UDP port - match std::net::UdpSocket::bind("0.0.0.0:0") { + match bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false) { Ok(udp_socket) => { for udp_port in &msg.udp_ports { if *udp_port != 0 { diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index cc06946649d120..f99641066d137a 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -12,6 +12,7 @@ use { sync::{Arc, RwLock}, time::{Duration, Instant}, }, + tokio::net::UdpSocket as TokioUdpSocket, url::Url, }; @@ -545,6 +546,16 @@ pub fn bind_to(ip_addr: IpAddr, port: u16, reuseport: bool) -> io::Result io::Result { + let config = SocketConfig { reuseport }; + let socket = bind_to_with_config(ip_addr, port, config)?; + TokioUdpSocket::from_std(socket) +} + pub fn bind_to_with_config( ip_addr: IpAddr, port: u16, diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index ab2cc694fae7fa..1d6210a1cfa997 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5242,6 +5242,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -7415,6 +7416,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -7600,6 +7602,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -7713,6 +7716,7 @@ dependencies = [ "solana-ledger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 8d794c54770076..d9099db76b1a54 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -4,6 +4,7 @@ mod tests { crossbeam_channel::{unbounded, Receiver}, log::*, solana_connection_cache::connection_cache_stats::ConnectionCacheStats, + solana_net_utils::bind_to, solana_perf::packet::PacketBatch, solana_quic_client::nonblocking::quic_client::{ QuicClientCertificate, QuicLazyInitializedEndpoint, @@ -15,7 +16,7 @@ mod tests { tls_certificates::new_dummy_x509_certificate, }, std::{ - net::{SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock, @@ -52,7 +53,7 @@ mod tests { fn server_args() -> (UdpSocket, Arc, Keypair) { ( - UdpSocket::bind("127.0.0.1:0").unwrap(), + bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(), Arc::new(AtomicBool::new(false)), Keypair::new(), ) diff --git a/rpc-test/Cargo.toml b/rpc-test/Cargo.toml index 435edafdc5f780..a423d31a1e6d49 100644 --- a/rpc-test/Cargo.toml +++ b/rpc-test/Cargo.toml @@ -21,6 +21,7 @@ serde = { workspace = true } serde_json = { workspace = true } solana-account-decoder = { workspace = true } solana-client = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-rpc = { workspace = true } solana-rpc-client = { workspace = true } diff --git a/rpc-test/tests/rpc.rs b/rpc-test/tests/rpc.rs index 67d229d43a1823..f9db9358eb6e10 100644 --- a/rpc-test/tests/rpc.rs +++ b/rpc-test/tests/rpc.rs @@ -7,6 +7,7 @@ use { serde_json::{json, Value}, solana_account_decoder::UiAccount, solana_client::connection_cache::ConnectionCache, + solana_net_utils::bind_to, solana_pubsub_client::nonblocking::pubsub_client::PubsubClient, solana_rpc_client::rpc_client::RpcClient, solana_rpc_client_api::{ @@ -30,7 +31,7 @@ use { solana_transaction_status::TransactionStatus, std::{ collections::HashSet, - net::UdpSocket, + net::{IpAddr, Ipv4Addr}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -290,7 +291,7 @@ fn test_rpc_subscriptions() { let test_validator = TestValidator::with_no_fees_udp(alice.pubkey(), None, SocketAddrSpace::Unspecified); - let transactions_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let transactions_socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); transactions_socket.connect(test_validator.tpu()).unwrap(); let rpc_client = RpcClient::new(test_validator.rpc_url()); diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index 3c2e62890d9b5a..9a635be3db52ad 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -34,6 +34,7 @@ socket2 = { workspace = true } solana-keypair = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-packet = { workspace = true } solana-perf = { workspace = true } solana-pubkey = { workspace = true } diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 2de8adc1babb08..6b2dd2ef5b7838 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1536,8 +1536,12 @@ pub mod test { crossbeam_channel::{unbounded, Receiver}, quinn::{ApplicationClose, ConnectionError}, solana_keypair::Keypair, + solana_net_utils::bind_to, solana_signer::Signer, - std::collections::HashMap, + std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + }, tokio::time::sleep, }; @@ -1827,7 +1831,7 @@ pub mod test { }, ); - let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let client_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let mut endpoint = quinn::Endpoint::new( EndpointConfig::default(), None, @@ -1990,7 +1994,7 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] async fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, _) = unbounded(); let keypair = Keypair::new(); @@ -2023,7 +2027,7 @@ pub mod test { #[tokio::test(flavor = "multi_thread")] async fn test_quic_server_multiple_streams() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); diff --git a/streamer/src/nonblocking/recvmmsg.rs b/streamer/src/nonblocking/recvmmsg.rs index 90eeca0ab4a9ea..a2f0488e135923 100644 --- a/streamer/src/nonblocking/recvmmsg.rs +++ b/streamer/src/nonblocking/recvmmsg.rs @@ -57,16 +57,23 @@ pub async fn recv_mmsg_exact( mod tests { use { crate::{nonblocking::recvmmsg::*, packet::PACKET_DATA_SIZE}, - std::{net::SocketAddr, time::Instant}, + solana_net_utils::bind_to_async, + std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Instant, + }, tokio::net::UdpSocket, }; type TestConfig = (UdpSocket, SocketAddr, UdpSocket, SocketAddr); async fn test_setup_reader_sender(ip_str: &str) -> io::Result { - let reader = UdpSocket::bind(ip_str).await?; + let sock_addr: SocketAddr = ip_str + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let reader = bind_to_async(sock_addr.ip(), sock_addr.port(), false).await?; let addr = reader.local_addr()?; - let sender = UdpSocket::bind(ip_str).await?; + let sender = bind_to_async(sock_addr.ip(), sock_addr.port(), false).await?; let saddr = sender.local_addr()?; Ok((reader, addr, sender, saddr)) } @@ -138,9 +145,13 @@ mod tests { #[tokio::test] async fn test_recv_mmsg_exact_multi_iter_timeout() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let saddr = sender.local_addr().unwrap(); let sent = TEST_NUM_MSGS; for _ in 0..sent { @@ -166,14 +177,20 @@ mod tests { #[tokio::test] async fn test_recv_mmsg_multi_addrs() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr = reader.local_addr().unwrap(); - let sender1 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender1 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let saddr1 = sender1.local_addr().unwrap(); let sent1 = TEST_NUM_MSGS - 1; - let sender2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender2 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let saddr2 = sender2.local_addr().unwrap(); let sent2 = TEST_NUM_MSGS + 1; diff --git a/streamer/src/nonblocking/sendmmsg.rs b/streamer/src/nonblocking/sendmmsg.rs index 352651d1d61c90..493034eb687785 100644 --- a/streamer/src/nonblocking/sendmmsg.rs +++ b/streamer/src/nonblocking/sendmmsg.rs @@ -61,19 +61,23 @@ mod tests { sendmmsg::SendPktsError, }, assert_matches::assert_matches, + solana_net_utils::bind_to_async, solana_packet::PACKET_DATA_SIZE, std::{ io::ErrorKind, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }, - tokio::net::UdpSocket, }; #[tokio::test] async fn test_send_mmsg_one_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); @@ -88,13 +92,19 @@ mod tests { #[tokio::test] async fn test_send_mmsg_multi_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader2 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets @@ -123,19 +133,29 @@ mod tests { #[tokio::test] async fn test_multicast_msg() { - let reader = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader2 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let reader3 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader3 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr3 = reader3.local_addr().unwrap(); - let reader4 = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let reader4 = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let addr4 = reader4.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false) + .await + .expect("bind"); let packet = Packet::default(); @@ -177,7 +197,9 @@ mod tests { ]; let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; - let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false) + .await + .expect("bind"); let res = batch_send(&sender, &packet_refs[..]).await; assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs).await; @@ -189,7 +211,9 @@ mod tests { let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080); - let sender = UdpSocket::bind("0.0.0.0:0").await.expect("bind"); + let sender = bind_to_async(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false) + .await + .expect("bind"); // test intermediate failures for batch_send let packet_refs: Vec<_> = vec![ diff --git a/streamer/src/nonblocking/testing_utilities.rs b/streamer/src/nonblocking/testing_utilities.rs index aa484cbc511a5d..4c415ba4307ec7 100644 --- a/streamer/src/nonblocking/testing_utilities.rs +++ b/streamer/src/nonblocking/testing_utilities.rs @@ -19,10 +19,11 @@ use { TokioRuntime, TransportConfig, }, solana_keypair::Keypair, + solana_net_utils::bind_to, solana_perf::packet::PacketBatch, solana_quic_definitions::{QUIC_KEEP_ALIVE, QUIC_MAX_TIMEOUT}, std::{ - net::{SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, sync::{atomic::AtomicBool, Arc, RwLock}, }, tokio::task::JoinHandle, @@ -141,28 +142,13 @@ pub fn setup_quic_server( let sockets = { #[cfg(not(target_os = "windows"))] { - use std::{ - os::fd::{FromRawFd, IntoRawFd}, - str::FromStr as _, - }; (0..10) - .map(|_| { - let sock = socket2::Socket::new( - socket2::Domain::IPV4, - socket2::Type::DGRAM, - Some(socket2::Protocol::UDP), - ) - .unwrap(); - sock.set_reuse_port(true).unwrap(); - sock.bind(&SocketAddr::from_str("127.0.0.1:0").unwrap().into()) - .unwrap(); - unsafe { UdpSocket::from_raw_fd(sock.into_raw_fd()) } - }) + .map(|_| bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, true).unwrap()) .collect::>() } #[cfg(target_os = "windows")] { - vec![UdpSocket::bind("127.0.0.1:0").unwrap()] + vec![bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap()] } }; setup_quic_server_with_sockets(sockets, option_staked_nodes, config) @@ -221,7 +207,7 @@ pub async fn make_client_endpoint( addr: &SocketAddr, client_keypair: Option<&Keypair>, ) -> Connection { - let client_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); + let client_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let mut endpoint = quinn::Endpoint::new( EndpointConfig::default(), None, diff --git a/streamer/src/packet.rs b/streamer/src/packet.rs index 5b89c939eaf2fa..0cb3c01fc885a9 100644 --- a/streamer/src/packet.rs +++ b/streamer/src/packet.rs @@ -81,10 +81,11 @@ pub fn send_to( mod tests { use { super::*, + solana_net_utils::bind_to, std::{ io, io::Write, - net::{SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, SocketAddr}, }, }; @@ -101,9 +102,9 @@ mod tests { #[test] pub fn packet_send_recv() { solana_logger::setup(); - let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let recv_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = recv_socket.local_addr().unwrap(); - let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let saddr = send_socket.local_addr().unwrap(); let packet_batch_size = 10; @@ -159,9 +160,9 @@ mod tests { #[test] fn test_packet_resize() { solana_logger::setup(); - let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let recv_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = recv_socket.local_addr().unwrap(); - let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send_socket = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let mut batch = PacketBatch::with_capacity(PACKETS_PER_BATCH); batch.resize(PACKETS_PER_BATCH, Packet::default()); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 1eb94926809f4d..7a1cc445e3ff59 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -701,8 +701,11 @@ pub fn spawn_server_multi( #[cfg(test)] mod test { use { - super::*, crate::nonblocking::quic::test::*, crossbeam_channel::unbounded, - std::net::SocketAddr, + super::*, + crate::nonblocking::quic::test::*, + crossbeam_channel::unbounded, + solana_net_utils::bind_to, + std::net::{IpAddr, Ipv4Addr, SocketAddr}, }; fn setup_quic_server() -> ( @@ -711,7 +714,7 @@ mod test { crossbeam_channel::Receiver, SocketAddr, ) { - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); @@ -766,7 +769,7 @@ mod test { #[test] fn test_quic_server_multiple_streams() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, receiver) = unbounded(); let keypair = Keypair::new(); @@ -811,7 +814,7 @@ mod test { #[test] fn test_quic_server_unstaked_node_connect_failure() { solana_logger::setup(); - let s = UdpSocket::bind("127.0.0.1:0").unwrap(); + let s = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap(); let exit = Arc::new(AtomicBool::new(false)); let (sender, _) = unbounded(); let keypair = Keypair::new(); diff --git a/streamer/src/recvmmsg.rs b/streamer/src/recvmmsg.rs index 2006e3ac4bd5a4..be4f32130908d6 100644 --- a/streamer/src/recvmmsg.rs +++ b/streamer/src/recvmmsg.rs @@ -178,8 +178,9 @@ pub fn recv_mmsg(sock: &UdpSocket, packets: &mut [Packet]) -> io::Result io::Result { - let reader = UdpSocket::bind(ip_str)?; + let sock_addr: SocketAddr = ip_str + .parse() + .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?; + let reader = bind_to(sock_addr.ip(), sock_addr.port(), false)?; let addr = reader.local_addr()?; - let sender = UdpSocket::bind(ip_str)?; + let sender = bind_to(sock_addr.ip(), sock_addr.port(), false)?; let saddr = sender.local_addr()?; Ok((reader, addr, sender, saddr)) } @@ -259,11 +263,11 @@ mod tests { #[test] pub fn test_recv_mmsg_multi_iter_timeout() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); reader.set_read_timeout(Some(Duration::new(5, 0))).unwrap(); reader.set_nonblocking(false).unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let saddr = sender.local_addr().unwrap(); let sent = TEST_NUM_MSGS; for _ in 0..sent { @@ -290,14 +294,14 @@ mod tests { #[test] pub fn test_recv_mmsg_multi_addrs() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); - let sender1 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender1 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let saddr1 = sender1.local_addr().unwrap(); let sent1 = TEST_NUM_MSGS - 1; - let sender2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender2 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let saddr2 = sender2.local_addr().unwrap(); let sent2 = TEST_NUM_MSGS + 1; diff --git a/streamer/src/sendmmsg.rs b/streamer/src/sendmmsg.rs index b1c8e58125a2a5..b47e4361f48d0c 100644 --- a/streamer/src/sendmmsg.rs +++ b/streamer/src/sendmmsg.rs @@ -189,18 +189,19 @@ mod tests { sendmmsg::{batch_send, multi_target_send, SendPktsError}, }, assert_matches::assert_matches, + solana_net_utils::bind_to, solana_packet::PACKET_DATA_SIZE, std::{ io::ErrorKind, - net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket}, + net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, }, }; #[test] pub fn test_send_mmsg_one_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets.iter().map(|p| (&p[..], &addr)).collect(); @@ -215,13 +216,13 @@ mod tests { #[test] pub fn test_send_mmsg_multi_dest() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader2 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let packet_refs: Vec<_> = packets @@ -250,19 +251,19 @@ mod tests { #[test] pub fn test_multicast_msg() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); - let reader2 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader2 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr2 = reader2.local_addr().unwrap(); - let reader3 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader3 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr3 = reader3.local_addr().unwrap(); - let reader4 = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader4 = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr4 = reader4.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let packet = Packet::default(); @@ -303,7 +304,7 @@ mod tests { ]; let dest_refs: Vec<_> = vec![&ip4, &ip6, &ip4]; - let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).expect("bind"); let res = batch_send(&sender, &packet_refs[..]); assert_matches!(res, Err(SendPktsError::IoError(_, /*num_failed*/ 1))); let res = multi_target_send(&sender, &packets[0], &dest_refs); @@ -315,7 +316,7 @@ mod tests { let packets: Vec<_> = (0..5).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect(); let ipv4local = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 8080); let ipv4broadcast = SocketAddr::new(IpAddr::V4(Ipv4Addr::BROADCAST), 8080); - let sender = UdpSocket::bind("0.0.0.0:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).expect("bind"); // test intermediate failures for batch_send let packet_refs: Vec<_> = vec![ diff --git a/streamer/src/streamer.rs b/streamer/src/streamer.rs index b4f1e54429fc07..2b551b735fa31f 100644 --- a/streamer/src/streamer.rs +++ b/streamer/src/streamer.rs @@ -445,11 +445,12 @@ mod test { streamer::{receiver, responder}, }, crossbeam_channel::unbounded, + solana_net_utils::bind_to, solana_perf::recycler::Recycler, std::{ io, io::Write, - net::UdpSocket, + net::{IpAddr, Ipv4Addr}, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -480,11 +481,11 @@ mod test { } #[test] fn streamer_send_test() { - let read = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let read = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); read.set_read_timeout(Some(Duration::new(1, 0))).unwrap(); let addr = read.local_addr().unwrap(); - let send = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let send = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let exit = Arc::new(AtomicBool::new(false)); let (s_reader, r_reader) = unbounded(); let stats = Arc::new(StreamerReceiveStats::new("test")); diff --git a/streamer/tests/recvmmsg.rs b/streamer/tests/recvmmsg.rs index fb3df661d540e9..64d0b202b73226 100644 --- a/streamer/tests/recvmmsg.rs +++ b/streamer/tests/recvmmsg.rs @@ -1,18 +1,22 @@ #![cfg(target_os = "linux")] use { + solana_net_utils::bind_to, solana_streamer::{ packet::{Meta, Packet, PACKET_DATA_SIZE}, recvmmsg::*, }, - std::{net::UdpSocket, time::Instant}, + std::{ + net::{IpAddr, Ipv4Addr}, + time::Instant, + }, }; #[test] pub fn test_recv_mmsg_batch_size() { - let reader = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); let addr = reader.local_addr().unwrap(); - let sender = UdpSocket::bind("127.0.0.1:0").expect("bind"); + let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind"); const TEST_BATCH_SIZE: usize = 64; let sent = TEST_BATCH_SIZE; diff --git a/svm/examples/Cargo.lock b/svm/examples/Cargo.lock index e71db80edcb762..80510d3bde5719 100644 --- a/svm/examples/Cargo.lock +++ b/svm/examples/Cargo.lock @@ -5094,6 +5094,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-quic-client", "solana-rpc-client", @@ -6751,6 +6752,7 @@ dependencies = [ "solana-keypair", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-packet", "solana-perf", "solana-pubkey", @@ -6953,6 +6955,7 @@ dependencies = [ "rayon", "solana-connection-cache", "solana-measure", + "solana-net-utils", "solana-pubsub-client", "solana-rpc-client", "solana-rpc-client-api", @@ -7066,6 +7069,7 @@ dependencies = [ "solana-ledger", "solana-measure", "solana-metrics", + "solana-net-utils", "solana-perf", "solana-poh", "solana-quic-client", diff --git a/tpu-client/Cargo.toml b/tpu-client/Cargo.toml index 77bb2674008036..b07ae3dcb2764c 100644 --- a/tpu-client/Cargo.toml +++ b/tpu-client/Cargo.toml @@ -19,6 +19,7 @@ log = { workspace = true } rayon = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } +solana-net-utils = { workspace = true } solana-pubsub-client = { workspace = true } solana-rpc-client = { workspace = true } solana-rpc-client-api = { workspace = true } diff --git a/tpu-client/src/tpu_client.rs b/tpu-client/src/tpu_client.rs index 3fecab0941771b..979adb73ed1500 100644 --- a/tpu-client/src/tpu_client.rs +++ b/tpu-client/src/tpu_client.rs @@ -8,6 +8,7 @@ use { ConnectionCache, ConnectionManager, ConnectionPool, NewConnectionConfig, }, }, + solana_net_utils::bind_to, solana_rpc_client::rpc_client::RpcClient, solana_sdk::{ client::AsyncClient, @@ -18,7 +19,7 @@ use { }, std::{ collections::VecDeque, - net::UdpSocket, + net::{IpAddr, Ipv4Addr, UdpSocket}, sync::{Arc, RwLock}, }, }; @@ -179,7 +180,7 @@ where tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), rpc_client, tpu_client: Arc::new(tpu_client), }) @@ -202,7 +203,7 @@ where tokio::task::block_in_place(|| rpc_client.runtime().block_on(create_tpu_client))?; Ok(Self { - _deprecated: UdpSocket::bind("0.0.0.0:0").unwrap(), + _deprecated: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), rpc_client, tpu_client: Arc::new(tpu_client), }) diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index ff035eaf11fdea..083b18e9c55e33 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -30,6 +30,7 @@ solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-net-utils = { workspace = true } solana-perf = { workspace = true } solana-poh = { workspace = true } solana-quic-client = { workspace = true } diff --git a/turbine/benches/cluster_info.rs b/turbine/benches/cluster_info.rs index 1f15137175acdb..7418dba3848457 100644 --- a/turbine/benches/cluster_info.rs +++ b/turbine/benches/cluster_info.rs @@ -12,6 +12,7 @@ use { genesis_utils::{create_genesis_config, GenesisConfigInfo}, shred::{Shred, ShredFlags}, }, + solana_net_utils::bind_to, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ pubkey, @@ -25,7 +26,12 @@ use { }, cluster_nodes::ClusterNodesCache, }, - std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Duration}, + std::{ + collections::HashMap, + net::{IpAddr, Ipv4Addr}, + sync::Arc, + time::Duration, + }, test::Bencher, }; @@ -41,7 +47,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { leader_keypair, SocketAddrSpace::Unspecified, ); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_benches(&genesis_config); let bank_forks = BankForks::new_rw_arc(bank); diff --git a/turbine/benches/retransmit_stage.rs b/turbine/benches/retransmit_stage.rs index 75c7ad06bdde34..a6a21cceafd4f8 100644 --- a/turbine/benches/retransmit_stage.rs +++ b/turbine/benches/retransmit_stage.rs @@ -18,6 +18,7 @@ use { shred::{ProcessShredsStats, ReedSolomonCache, Shredder}, }, solana_measure::measure::Measure, + solana_net_utils::bind_to, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, @@ -30,7 +31,7 @@ use { solana_turbine::retransmit_stage::retransmitter, std::{ iter::repeat_with, - net::{Ipv4Addr, UdpSocket}, + net::{IpAddr, Ipv4Addr}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -59,7 +60,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { const NUM_PEERS: usize = 4; let peer_sockets: Vec<_> = repeat_with(|| { let id = Pubkey::new_unique(); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let mut contact_info = ContactInfo::new_localhost(&id, timestamp()); let port = socket.local_addr().unwrap().port(); contact_info.set_tvu((Ipv4Addr::LOCALHOST, port)).unwrap(); @@ -80,7 +81,7 @@ fn bench_retransmitter(bencher: &mut Bencher) { let (shreds_sender, shreds_receiver) = unbounded(); const NUM_THREADS: usize = 2; let sockets = (0..NUM_THREADS) - .map(|_| UdpSocket::bind("0.0.0.0:0").unwrap()) + .map(|_| bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap()) .collect(); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); diff --git a/turbine/src/broadcast_stage/standard_broadcast_run.rs b/turbine/src/broadcast_stage/standard_broadcast_run.rs index 09f3380bf9d8df..e28016d11e9e17 100644 --- a/turbine/src/broadcast_stage/standard_broadcast_run.rs +++ b/turbine/src/broadcast_stage/standard_broadcast_run.rs @@ -523,6 +523,7 @@ mod test { blockstore::Blockstore, genesis_utils::create_genesis_config, get_tmp_ledger_path, shred::max_ticks_per_n_shreds, }, + solana_net_utils::bind_to, solana_runtime::bank::Bank, solana_sdk::{ genesis_config::GenesisConfig, @@ -530,7 +531,12 @@ mod test { signature::{Keypair, Signer}, }, solana_streamer::socket::SocketAddrSpace, - std::{ops::Deref, sync::Arc, time::Duration}, + std::{ + net::{IpAddr, Ipv4Addr}, + ops::Deref, + sync::Arc, + time::Duration, + }, }; #[allow(clippy::type_complexity)] @@ -558,7 +564,7 @@ mod test { leader_keypair.clone(), SocketAddrSpace::Unspecified, )); - let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); + let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(); let mut genesis_config = create_genesis_config(10_000).genesis_config; genesis_config.ticks_per_slot = max_ticks_per_n_shreds(num_shreds_per_slot, None) + 1; diff --git a/turbine/src/quic_endpoint.rs b/turbine/src/quic_endpoint.rs index 47fb173838a5f1..ea657a7cd2d441 100644 --- a/turbine/src/quic_endpoint.rs +++ b/turbine/src/quic_endpoint.rs @@ -826,9 +826,14 @@ mod tests { super::*, itertools::{izip, multiunzip}, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to, solana_runtime::bank::Bank, solana_sdk::signature::Signer, - std::{iter::repeat_with, net::Ipv4Addr, time::Duration}, + std::{ + iter::repeat_with, + net::{IpAddr, Ipv4Addr}, + time::Duration, + }, }; #[test] @@ -841,10 +846,11 @@ mod tests { .build() .unwrap(); let keypairs: Vec = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect(); - let sockets: Vec = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) - .take(NUM_ENDPOINTS) - .collect::>() - .unwrap(); + let sockets: Vec = + repeat_with(|| bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false)) + .take(NUM_ENDPOINTS) + .collect::>() + .unwrap(); let addresses: Vec = sockets .iter() .map(UdpSocket::local_addr) diff --git a/udp-client/src/nonblocking/udp_client.rs b/udp-client/src/nonblocking/udp_client.rs index 56f6cc4f414ad3..d4cc1e8478d508 100644 --- a/udp-client/src/nonblocking/udp_client.rs +++ b/udp-client/src/nonblocking/udp_client.rs @@ -46,6 +46,7 @@ impl ClientConnection for UdpClientConnection { mod tests { use { super::*, + solana_net_utils::bind_to_async, solana_packet::{Packet, PACKET_DATA_SIZE}, solana_streamer::nonblocking::recvmmsg::recv_mmsg, std::net::{IpAddr, Ipv4Addr}, @@ -75,7 +76,9 @@ mod tests { let socket = solana_net_utils::bind_with_any_port(IpAddr::V4(Ipv4Addr::UNSPECIFIED)).unwrap(); let connection = UdpClientConnection::new_from_addr(socket, addr); - let reader = UdpSocket::bind(addr_str).await.expect("bind"); + let reader = bind_to_async(addr.ip(), addr.port(), false) + .await + .expect("bind"); check_send_one(&connection, &reader).await; check_send_batch(&connection, &reader).await; } diff --git a/validator/src/admin_rpc_service.rs b/validator/src/admin_rpc_service.rs index 059404c907f67f..d9f2d3ea2edf6b 100644 --- a/validator/src/admin_rpc_service.rs +++ b/validator/src/admin_rpc_service.rs @@ -873,6 +873,7 @@ mod tests { solana_gossip::cluster_info::ClusterInfo, solana_inline_spl::token, solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_net_utils::bind_to, solana_rpc::rpc::create_validator_exit, solana_runtime::{ bank::{Bank, BankTestConfig}, @@ -888,7 +889,11 @@ mod tests { solana_program::{program_option::COption, program_pack::Pack}, state::{Account as TokenAccount, AccountState as TokenAccountState, Mint}, }, - std::{collections::HashSet, sync::atomic::AtomicBool}, + std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr}, + sync::atomic::AtomicBool, + }, }; #[derive(Default)] @@ -942,7 +947,9 @@ mod tests { vote_account, repair_whitelist, notifies: Vec::new(), - repair_socket: Arc::new(std::net::UdpSocket::bind("0.0.0.0:0").unwrap()), + repair_socket: Arc::new( + bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(), + ), outstanding_repair_requests: Arc::< RwLock, >::default(),