Skip to content

Commit

Permalink
use net utils for binding sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Nov 19, 2024
1 parent 9d588f5 commit 767e5a1
Show file tree
Hide file tree
Showing 40 changed files with 280 additions and 151 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_net_utils::bind_to,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
},
std::{
cmp::max,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
Expand All @@ -20,7 +21,7 @@ use {
};

fn producer(addr: &SocketAddr, exit: Arc<AtomicBool>) -> JoinHandle<()> {
let send = UdpSocket::bind("0.0.0.0:0").unwrap();
let send = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let batch_size = 10;
let mut packet_batch = PacketBatch::with_capacity(batch_size);
packet_batch.resize(batch_size, Packet::default());
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ quinn = { workspace = true }
rayon = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-net-utils = { workspace = true }
solana-pubsub-client = { workspace = true }
solana-quic-client = { workspace = true }
solana-rpc-client = { workspace = true, features = ["default"] }
Expand Down
3 changes: 2 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ mod tests {
super::*,
crate::connection_cache::ConnectionCache,
crossbeam_channel::unbounded,
solana_net_utils::bind_to,
solana_sdk::signature::Keypair,
solana_streamer::{
quic::{QuicServerParams, SpawnServerResult},
Expand All @@ -217,7 +218,7 @@ mod tests {

fn server_args() -> (UdpSocket, Arc<AtomicBool>, Keypair) {
(
UdpSocket::bind("127.0.0.1:0").unwrap(),
bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
Arc::new(AtomicBool::new(false)),
Keypair::new(),
)
Expand Down
5 changes: 3 additions & 2 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use {
blockstore::{Blockstore, PurgeType},
leader_schedule_cache::LeaderScheduleCache,
},
solana_net_utils::bind_to,
solana_poh::{
poh_recorder::{PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
poh_service::{PohService, DEFAULT_HASHES_PER_BATCH, DEFAULT_PINNED_CPU_CORE},
Expand All @@ -46,7 +47,7 @@ use {
fmt::Display,
fs::File,
io::{self, BufRead, BufReader},
net::{Ipv4Addr, UdpSocket},
net::{IpAddr, Ipv4Addr},
path::PathBuf,
sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -783,7 +784,7 @@ impl BankingSimulator {
// Broadcast stage is needed to save the simulated blocks for post-run analysis by
// inserting produced shreds into the blockstore.
let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage(
vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()],
vec![bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).unwrap()],
cluster_info.clone(),
entry_receiver,
retransmit_slots_receiver,
Expand Down
5 changes: 3 additions & 2 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use {
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_feature_set::FeatureSet,
solana_measure::measure_us,
solana_net_utils::bind_to,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
Expand All @@ -24,7 +25,7 @@ use {
solana_streamer::sendmmsg::batch_send,
std::{
iter::repeat,
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
sync::{atomic::Ordering, Arc, RwLock},
},
};
Expand All @@ -50,7 +51,7 @@ impl<T: LikeClusterInfo> Forwarder<T> {
Self {
poh_recorder,
bank_forks,
socket: UdpSocket::bind("0.0.0.0:0").unwrap(),
socket: bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
cluster_info,
connection_cache,
data_budget,
Expand Down
9 changes: 7 additions & 2 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -917,13 +917,17 @@ mod test {
blockstore::make_many_slot_entries, get_tmp_ledger_path,
get_tmp_ledger_path_auto_delete, shred::Nonce,
},
solana_net_utils::bind_to,
solana_runtime::{accounts_background_service::AbsRequestSender, bank_forks::BankForks},
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
},
solana_streamer::socket::SocketAddrSpace,
std::collections::HashMap,
std::{
collections::HashMap,
net::{IpAddr, Ipv4Addr},
},
trees::tr,
};

Expand Down Expand Up @@ -1345,7 +1349,8 @@ mod test {
impl ManageAncestorHashesState {
fn new(bank_forks: Arc<RwLock<BankForks>>) -> Self {
let ancestor_hashes_request_statuses = Arc::new(DashMap::new());
let ancestor_hashes_request_socket = Arc::new(UdpSocket::bind("0.0.0.0:0").unwrap());
let ancestor_hashes_request_socket =
Arc::new(bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap());
let epoch_schedule = bank_forks
.read()
.unwrap()
Expand Down
16 changes: 11 additions & 5 deletions core/src/repair/quic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,14 @@ mod tests {
super::*,
itertools::{izip, multiunzip},
solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_net_utils::bind_to,
solana_runtime::bank::Bank,
solana_sdk::signature::Signer,
std::{iter::repeat_with, net::Ipv4Addr, time::Duration},
std::{
iter::repeat_with,
net::{IpAddr, Ipv4Addr},
time::Duration,
},
};

#[test]
Expand All @@ -1036,10 +1041,11 @@ mod tests {
.build()
.unwrap();
let keypairs: Vec<Keypair> = repeat_with(Keypair::new).take(NUM_ENDPOINTS).collect();
let sockets: Vec<UdpSocket> = repeat_with(|| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.take(NUM_ENDPOINTS)
.collect::<Result<_, _>>()
.unwrap();
let sockets: Vec<UdpSocket> =
repeat_with(|| bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false))
.take(NUM_ENDPOINTS)
.collect::<Result<_, _>>()
.unwrap();
let addresses: Vec<SocketAddr> = sockets
.iter()
.map(UdpSocket::local_addr)
Expand Down
23 changes: 15 additions & 8 deletions core/src/repair/repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,13 +1075,17 @@ mod test {
get_tmp_ledger_path_auto_delete,
shred::max_ticks_per_n_shreds,
},
solana_net_utils::bind_to,
solana_runtime::bank::Bank,
solana_sdk::{
signature::{Keypair, Signer},
timing::timestamp,
},
solana_streamer::socket::SocketAddrSpace,
std::collections::HashSet,
std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr},
},
};

fn new_test_cluster_info() -> ClusterInfo {
Expand All @@ -1097,9 +1101,9 @@ mod test {
let pubkey = cluster_info.id();
let slot = 100;
let shred_index = 50;
let reader = UdpSocket::bind("127.0.0.1:0").expect("bind");
let reader = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind");
let address = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let sender = bind_to(IpAddr::V4(Ipv4Addr::LOCALHOST), 0, false).expect("bind");
let outstanding_repair_requests = Arc::new(RwLock::new(OutstandingShredRepairs::default()));

// Send a repair request
Expand Down Expand Up @@ -1452,7 +1456,7 @@ mod test {
);
let mut duplicate_slot_repair_statuses = HashMap::new();
let dead_slot = 9;
let receive_socket = &UdpSocket::bind("0.0.0.0:0").unwrap();
let receive_socket = &bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let duplicate_status = DuplicateSlotRepairStatus {
correct_ancestor_to_repair: (dead_slot, Hash::default()),
start_ts: u64::MAX,
Expand Down Expand Up @@ -1481,7 +1485,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1507,7 +1511,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1526,7 +1530,7 @@ mod test {
&blockstore,
&serve_repair,
&mut RepairStats::default(),
&UdpSocket::bind("0.0.0.0:0").unwrap(),
&bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap(),
&None,
&RwLock::new(OutstandingRequests::default()),
&identity_keypair,
Expand All @@ -1541,7 +1545,10 @@ mod test {
let bank_forks = BankForks::new_rw_arc(bank);
let dummy_addr = Some((
Pubkey::default(),
UdpSocket::bind("0.0.0.0:0").unwrap().local_addr().unwrap(),
bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false)
.unwrap()
.local_addr()
.unwrap(),
));
let cluster_info = Arc::new(new_test_cluster_info());
let serve_repair = ServeRepair::new(
Expand Down
5 changes: 3 additions & 2 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use {
gossip_service::{discover, get_client},
},
solana_measure::measure::Measure,
solana_net_utils::bind_to,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
hash::Hash,
Expand All @@ -73,7 +74,7 @@ use {
solana_tps_client::TpsClient,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
std::{
net::{SocketAddr, UdpSocket},
net::{IpAddr, Ipv4Addr, SocketAddr},
process::exit,
sync::Arc,
thread,
Expand Down Expand Up @@ -725,7 +726,7 @@ fn run_dos<T: 'static + TpsClient + Send + Sync>(
_ => panic!("Unsupported data_type detected"),
};

let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
let socket = bind_to(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0, false).unwrap();
let mut last_log = Instant::now();
let mut total_count: usize = 0;
let mut count: usize = 0;
Expand Down
Loading

0 comments on commit 767e5a1

Please sign in to comment.