Skip to content

Commit

Permalink
use net utils for binding sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Nov 19, 2024
1 parent 9d588f5 commit b0f3959
Show file tree
Hide file tree
Showing 40 changed files with 278 additions and 135 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_net_utils::bind_to,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
},
std::{
cmp::max,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Expand All @@ -20,7 +21,7 @@ use {
};

fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let send = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let batch_size = 10;
let mut packet_batch = PacketBatch::with_capacity(batch_size);
packet_batch.resize(batch_size, Packet::default());
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ quinn = { workspace = true }
rayon = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-net-utils = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc-client = { workspace = true, features = ["default"] }
Expand Down
3 changes: 2 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_net_utils::bind_to,
solana_sdk::signature::Keypair,
solana_streamer::{
quic::{QuicServerParams, SpawnServerResult},
Expand All @@ -217,7 +218,7 @@ mod tests {

fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
)
Expand Down
4 changes: 2 additions & 2 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use {
fmt::Display,
fs::File,
io::{self, BufRead, BufReader},
net::{Ipv4Addr, UdpSocket},
net::{IpAddr, Ipv4Addr},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -783,7 +783,7 @@ impl BankingSimulator {
// Broadcast stage is needed to save the simulated blocks for post-run analysis by
// inserting produced shreds into the blockstore.
let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage(
vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()],
vec![solana_net_utils::bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap()],
cluster_info.clone(),
entry_receiver,
retransmit_slots_receiver,
Expand Down
5 changes: 3 additions & 2 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_feature_set::FeatureSet,
solana_measure::measure_us,
solana_net_utils::bind_to,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
Expand All @@ -24,7 +25,7 @@ use {
solana_streamer::sendmmsg::batch_send,
std::{
iter::repeat,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
},
};
Expand All @@ -50,7 +51,7 @@ impl<T: LikeClusterInfo> Forwarder<T> {
Self {
poh_recorder,
bank_forks,
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
socket: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
cluster_info,
connection_cache,
data_budget,
Expand Down
9 changes: 7 additions & 2 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,13 +917,17 @@ mod test {
blockstore::make_many_slot_entries, get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete, shred::Nonce,
},
solana_net_utils::bind_to,
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::collections::HashMap,
std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
},
trees::tr,
};

Expand Down Expand Up @@ -1345,7 +1349,8 @@ mod test {
impl ManageAncestorHashesState {
fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let ancestor_hashes_request_statuses = Arc::new(DashMap::new());
let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_request_socket =
Arc::new(bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap());
let epoch_schedule = bank_forks
.read()
.unwrap()
Expand Down
16 changes: 11 additions & 5 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,14 @@ mod tests {
super::*,
itertools::{izip, multiunzip},
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_net_utils::bind_to,
solana_runtime::bank::Bank,
solana_sdk::signature::Signer,
std::{iter::repeat_with, net::Ipv4Addr, time::Duration},
std::{
iter::repeat_with,
net::{IpAddr, Ipv4Addr},
time::Duration,
},
};

#[test]
Expand All @@ -1036,10 +1041,11 @@ mod tests {
.build()
.unwrap();
let keypairs: Vec<Keypair> = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect();
let sockets: Vec<UdpSocket> = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.take(NUM_ENDPOINTS)
.collect::<Result<_, _>>()
.unwrap();
let sockets: Vec<UdpSocket> =
repeat_with(|| bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false))
.take(NUM_ENDPOINTS)
.collect::<Result<_, _>>()
.unwrap();
let addresses: Vec<SocketAddr> = sockets
.iter()
.map(UdpSocket::local_addr)
Expand Down
23 changes: 15 additions & 8 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,13 +1075,17 @@ mod test {
get_tmp_ledger_path_auto_delete,
shred::max_ticks_per_n_shreds,
},
solana_net_utils::bind_to,
solana_runtime::bank::Bank,
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::collections::HashSet,
std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr},
},
};

fn new_test_cluster_info() -> ClusterInfo {
Expand All @@ -1097,9 +1101,9 @@ mod test {
let pubkey = cluster_info.id();
let slot = 100;
let shred_index = 50;
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind");
let address = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind");
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));

// Send a repair request
Expand Down Expand Up @@ -1452,7 +1456,7 @@ mod test {
);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let receive_socket = &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: u64::MAX,
Expand Down Expand Up @@ -1481,7 +1485,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1507,7 +1511,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1526,7 +1530,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1541,7 +1545,10 @@ mod test {
let bank_forks = BankForks::new_rw_arc(bank);
let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)
.unwrap()
.local_addr()
.unwrap(),
));
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new(
Expand Down
5 changes: 3 additions & 2 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use {
gossip_service::{discover, get_client},
},
solana_measure::measure::Measure,
solana_net_utils::bind_to,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
hash::Hash,
Expand All @@ -73,7 +74,7 @@ use {
solana_tps_client::TpsClient,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr},
process::exit,
sync::Arc,
thread,
Expand Down Expand Up @@ -725,7 +726,7 @@ fn run_dos<T: 'static + TpsClient + Send + Sync>(
_ => panic!("Unsupported data_type detected"),
};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let mut last_log = Instant::now();
let mut total_count: usize = 0;
let mut count: usize = 0;
Expand Down
35 changes: 16 additions & 19 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use {
solana_measure::measure::Measure,
solana_net_utils::{
bind_common, bind_common_in_range, bind_in_range, bind_in_range_with_config,
bind_more_with_config, bind_two_in_range_with_offset_and_config,
bind_more_with_config, bind_to, bind_two_in_range_with_offset_and_config,
find_available_port_in_range, multi_bind_in_range, PortRange, SocketConfig,
VALIDATOR_PORT_RANGE,
},
Expand Down Expand Up @@ -224,7 +224,7 @@ impl ClusterInfo {
GOSSIP_PING_CACHE_CAPACITY,
)),
stats: GossipStats::default(),
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
socket: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
Expand Down Expand Up @@ -2609,8 +2609,6 @@ impl Node {
num_quic_endpoints: usize,
) -> Self {
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
let localhost_bind_addr = format!("{localhost_ip_addr:?}:0");
let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
let port_range = (1024, 65535);

let udp_config = SocketConfig { reuseport: false };
Expand All @@ -2629,8 +2627,8 @@ impl Node {
let (gossip_port, (gossip, ip_echo)) =
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
let tvu = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tvu = bind_to(localhost_ip_addr, 0, false).unwrap();
let tvu_quic = bind_to(localhost_ip_addr, 0, false).unwrap();
let ((_tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
bind_two_in_range_with_offset_and_config(
localhost_ip_addr,
Expand All @@ -2643,24 +2641,23 @@ impl Node {
let tpu_forwards_quic =
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config.clone())
.unwrap();
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
let tpu_vote_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();

let tpu_vote = bind_to(localhost_ip_addr, 0, false).unwrap();
let tpu_vote_quic = bind_to(localhost_ip_addr, 0, false).unwrap();
let tpu_vote_quic =
bind_more_with_config(tpu_vote_quic, num_quic_endpoints, quic_config).unwrap();

let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let repair = bind_to(localhost_ip_addr, 0, false).unwrap();
let repair_quic = bind_to(localhost_ip_addr, 0, false).unwrap();
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
let rpc_addr = SocketAddr::new(localhost_ip_addr, rpc_port);
let rpc_pubsub_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
let rpc_pubsub_addr = SocketAddr::new(localhost_ip_addr, rpc_pubsub_port);
let broadcast = vec![UdpSocket::bind(&unspecified_bind_addr).unwrap()];
let retransmit_socket = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let serve_repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
let serve_repair_quic = UdpSocket::bind(&localhost_bind_addr).unwrap();
let ancestor_hashes_requests = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let ancestor_hashes_requests_quic = UdpSocket::bind(&unspecified_bind_addr).unwrap();
let broadcast = vec![bind_to(localhost_ip_addr, 0, false).unwrap()];
let retransmit_socket = bind_to(localhost_ip_addr, 0, false).unwrap();
let serve_repair = bind_to(localhost_ip_addr, 0, false).unwrap();
let serve_repair_quic = bind_to(localhost_ip_addr, 0, false).unwrap();
let ancestor_hashes_requests = bind_to(localhost_ip_addr, 0, false).unwrap();
let ancestor_hashes_requests_quic = bind_to(localhost_ip_addr, 0, false).unwrap();

let mut info = ContactInfo::new(
*pubkey,
Expand Down Expand Up @@ -3002,7 +2999,7 @@ pub fn push_messages_to_peer(
"push_messages_to_peer",
&reqs,
);
let sock = UdpSocket::bind("0.0.0.0:0").unwrap();
let sock = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
packet::send_to(&packet_batch, &sock, socket_addr_space)?;
Ok(())
}
Expand Down Expand Up @@ -4378,7 +4375,7 @@ mod tests {

let cluster_info44 = Arc::new({
let mut node = Node::new_localhost_with_pubkey(&keypair44.pubkey());
node.sockets.gossip = UdpSocket::bind("127.0.0.1:65534").unwrap();
node.sockets.gossip = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 65534, false).unwrap();
info!("{:?}", node);
ClusterInfo::new(node.info, keypair44.clone(), SocketAddrSpace::Unspecified)
});
Expand Down
Loading

0 comments on commit b0f3959

Please sign in to comment.