diff --git a/src/main.rs b/src/main.rs index 3c38c21..0da486f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -81,6 +81,14 @@ fn main() { .short("6") .required(false) ) + .arg( + Arg::with_name("client_limit") + .help("limit the number of concurrent clients that can be processed by a server; any over this count will be immediately disconnected") + .takes_value(true) + .long("client-limit") + .required(false) + .default_value("0") + ) .arg( Arg::with_name("client") diff --git a/src/server.rs b/src/server.rs index 86af869..b91b6aa 100644 --- a/src/server.rs +++ b/src/server.rs @@ -276,6 +276,11 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { //config-parsing and pre-connection setup let cpu_affinity_manager = Arc::new(Mutex::new(crate::utils::cpu_affinity::CpuAffinityManager::new(args.value_of("affinity").unwrap())?)); + let client_limit:u16 = args.value_of("client_limit").unwrap().parse()?; + if client_limit > 0 { + log::debug!("limiting service to {} concurrent clients", client_limit); + } + //start listening for connections let port:u16 = args.value_of("port").unwrap().parse()?; let mut listener:TcpListener; @@ -308,25 +313,30 @@ pub fn serve(args:ArgMatches) -> BoxResult<()> { stream.set_nodelay(true).expect("cannot disable Nagle's algorithm"); stream.set_keepalive(Some(KEEPALIVE_DURATION)).expect("unable to set TCP keepalive"); - CLIENTS.fetch_add(1, Ordering::Relaxed); - - let c_cam = cpu_affinity_manager.clone(); - let thread_builder = thread::Builder::new() - .name(address.to_string().into()); - thread_builder.spawn(move || { - //ensure the client is accounted-for even if the handler panics - let _client_thread_monitor = ClientThreadMonitor{ - client_address: address.to_string(), - }; - - match handle_client(&mut stream, c_cam) { - Ok(_) => (), - Err(e) => log::error!("error in client-handler: {}", e), - } - - //in the event of panic, this will happen when the stream is dropped + let client_count = CLIENTS.fetch_add(1, Ordering::Relaxed) + 1; + if client_limit > 0 && client_count > client_limit { + log::warn!("client-limit {} reached; disconnecting {}...", client_limit, address.to_string()); stream.shutdown(Shutdown::Both).unwrap_or_default(); - })?; + CLIENTS.fetch_sub(1, Ordering::Relaxed); + } else { + let c_cam = cpu_affinity_manager.clone(); + let thread_builder = thread::Builder::new() + .name(address.to_string().into()); + thread_builder.spawn(move || { + //ensure the client is accounted-for even if the handler panics + let _client_thread_monitor = ClientThreadMonitor{ + client_address: address.to_string(), + }; + + match handle_client(&mut stream, c_cam) { + Ok(_) => (), + Err(e) => log::error!("error in client-handler: {}", e), + } + + //in the event of panic, this will happen when the stream is dropped + stream.shutdown(Shutdown::Both).unwrap_or_default(); + })?; + } }, Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { //nothing to do break;