From dc7cd8d5bbf8deebf4f2640aaa5270bee56e43ec Mon Sep 17 00:00:00 2001 From: Michal Trybus Date: Sun, 1 Sep 2024 11:24:17 +0200 Subject: [PATCH] server-driven updates --- src/main.rs | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index 7a2bb26..e9f2f6c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,8 @@ use tokio::{ io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter}, net::{UnixListener, UnixStream as TokioUnixStream}, pin, - sync::RwLock, - time::sleep, + sync::broadcast::{self, Sender}, + time::{sleep, sleep_until, Instant}, }; #[macro_use] @@ -159,27 +159,30 @@ async fn run_daemon(other_fork: UnixStream) -> Result<(), String> { .await .map_err(|e| format!("error reading from fork parent: {e}"))?; - let loads: HashMap = HashMap::new(); - let loads = Arc::new(RwLock::new(loads)); - let loads2 = loads.clone(); + let (loads, _) = broadcast::channel(1); + let sender = loads.clone(); tokio::spawn(async move { loop { - measure_pid_loads(&loads2).await; - sleep(UPDATE_INTERVAL).await; + let next = Instant::now() + UPDATE_INTERVAL; + measure_pid_loads(&sender).await; + sleep_until(next).await; } }); loop { match listener.accept().await { Ok((socket, addr)) => { info!("accepted from {addr:?}"); - tokio::spawn(serve_client(socket, loads.clone())); + tokio::spawn(serve_client(socket, loads.subscribe())); } Err(e) => error!("accept function failed: {:?}", e), } } } -async fn serve_client(mut socket: TokioUnixStream, loads: Arc>>) { +async fn serve_client( + mut socket: TokioUnixStream, + mut loads: broadcast::Receiver>>, +) { let (reader, writer) = socket.split(); let reader = BufReader::new(reader); let mut writer = BufWriter::new(writer); @@ -188,9 +191,11 @@ async fn serve_client(mut socket: TokioUnixStream, loads: Arc = { - let loads = loads.read().await; + let Ok(loads) = loads.recv().await else { + continue 'serving; + }; pids.iter() .map(|pid| *loads.get(pid).unwrap_or(&f32::NAN)) .collect() @@ -198,22 +203,20 @@ async fn serve_client(mut socket: TokioUnixStream, loads: Arc>>) { +async fn measure_pid_loads(out_loads: &Sender>>) { let mut ticks = HashMap::new(); for _ in 0..2 { let mut pid_children: HashMap<_, Vec<_>> = Default::default(); @@ -235,7 +238,7 @@ async fn measure_pid_loads(out_loads: &Arc>>) { loads.insert(stat.pid, load); } if !loads.is_empty() { - *out_loads.write().await = get_cumulated(&pid_children, &loads); + let _ = out_loads.send(get_cumulated(&pid_children, &loads).into()); } sleep(MEASURE_PERIOD).await; }